In [6]:
import os
import sys
import ast
import json
import time
import boto3
import pprint
import datetime
import botocore.session
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr

In [7]:
session = boto3.session.Session()
client = session.client(
    service_name='secretsmanager',
    region_name='us-east-2'
)

with open("../creds.json", "r") as f:
    creds = json.load(f)

aws_client = creds['aws-access-key']
aws_secret = creds['aws-secret-access-key']

In [8]:
spark = SparkSession \
  .builder \
  .config("spark.hadoop.fs.s3a.access.key", aws_client) \
  .config("spark.hadoop.fs.s3a.secret.key", aws_secret) \
  .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
  .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
  .enableHiveSupport() \
  .getOrCreate()

In [9]:
def get_secret():

    secret_name = "twitter_creds.json"
    region_name = "us-east-2"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    # Decrypts secret using the associated KMS key.
    secret = json.loads(get_secret_value_response['SecretString'])

    # Your code goes here.
    return(secret)


In [15]:
def scan_dynamo(table, creds):

    client = boto3.client('dynamodb',
    # endpoint_url = "http://localhost:8000",
    aws_access_key_id=creds["aws-access-key"],
    aws_secret_access_key=creds["aws-secret-access-key"],
    region_name='us-east-2')

    dynamodb = boto3.resource('dynamodb', \
                # endpoint_url = "http://localhost:8000", \
                aws_access_key_id=creds["aws-access-key"],
                aws_secret_access_key=creds["aws-secret-access-key"],
                region_name='us-east-2')

    tweets = dynamodb.Table(table)
    # print(tweets.item_count)

    done = False
    start_key = None
    scan_kwargs = {}
    table_list = []

    while not done:
        if start_key:
            scan_kwargs['ExclusiveStartKey'] = start_key
        response = tweets.scan(**scan_kwargs)
        table_list.extend(response.get('Items', []))
        start_key = response.get('LastEvaluatedKey', None)
        done = start_key is None
        print("read data for key {}".format(start_key))
        time.sleep(5)
    
    return(table_list)

In [16]:
dynamo_list = scan_dynamo("tweets", creds)

read data for key {'user': 'amandaviviann', 'created_at': '2018-07-13T02:52:52Z'}
read data for key {'user': 'PacSwim4', 'created_at': '2018-05-31T00:09:18Z'}
read data for key {'user': 'ViviannRutan', 'created_at': '2015-12-30T03:51:07Z'}
read data for key {'user': 'notrealneverwas', 'created_at': '2023-01-23T21:07:49Z'}
read data for key {'user': 'AwkwardturtleAM', 'created_at': '2014-07-14T20:21:25Z'}
read data for key {'user': 'veronicamullen_', 'created_at': '2016-04-19T01:52:09Z'}
read data for key {'user': 'GriffM725', 'created_at': '2016-06-11T14:33:57Z'}
read data for key {'user': 'VivienDrinkwat1', 'created_at': '2023-02-03T11:53:51Z'}
read data for key {'user': 'dane_b_', 'created_at': '2021-11-11T03:10:42Z'}
read data for key None


In [22]:
tweetSchema = StructType([       
    StructField('favorites', DecimalType(), True),
    StructField('created_at', StringType(), True),
    StructField('id', DecimalType(), True),
    StructField('tweet', StringType(), True),
    StructField('retweets', DecimalType(), True),
    StructField('inserted_at', StringType(), True),
    StructField('user', StringType(), True)
])

df = spark.createDataFrame(data=dynamo_list, schema = tweetSchema)

Exception in thread "Thread-20" java.lang.IncompatibleClassChangeError: Implementing class
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another 

Py4JError: SimplePythonFunction does not exist in the JVM

In [None]:
df.show()

In [None]:

filepath = "s3a://dynamo-tweets/clean/"
df.write.format("parquet").partitionBy("user").mode("overwrite").option("overwriteSchema", "true").option("header", True).save(filepath)