In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import boto3

session = boto3.Session(profile_name='AdministratorAccess', region_name='us-east-2')
# resource vs client: https://www.learnaws.org/2021/02/24/boto3-resource-client/
dynamodb_resource = session.resource('dynamodb')  #  higher level abstractions, recommended to use, fewer methods but creating table returns a table object that you can run operations on, can also grab a Table with Table('name')
# dynamodb_client = session.client('dynamodb')  # low-level, more explicit methods. Creating table returns a dictionary

In [None]:
risingTable = dynamodb_resource.Table('rising')

In [None]:
from datetime import datetime


def daysUntilNow():
  now = datetime.utcnow().date()
  return now
daysUntilNow()

In [None]:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/query.html
def queryByDate(date: str, projectionExpression: str = 'postId'):
  return risingTable.query(
    IndexName='byLoadDate',
    KeyConditionExpression=Key('loadDateUTC').eq(date), 
    ProjectionExpression=projectionExpression
  )['Items']

In [None]:
def flattenItems(listOfListOfItems):
  return [item for sublist in listOfListOfItems for item in sublist] 

def queryByRangeOfDates(dates: list, projectionExpression: str = 'postId'):
  returnedData = []
  for d in dates:
    returnedData.append(queryByDate(d, projectionExpression))
  return flattenItems(returnedData)

In [None]:
postIdQueryResult = queryByRangeOfDates(['2023-04-09', '2023-04-10'])  # [{'postId': XXXXXX}, {'postId': YYYYYY}...]
postsOfInterest = {res['postId'] for res in postIdQueryResult}

In [None]:
print(len(postsOfInterest))

In [None]:
from pyspark.sql import SparkSession, DataFrame
from schema import fromDynamoConversion, toSparkSchema
from functools import reduce


spark = SparkSession.builder.appName('redditData').getOrCreate()

def applyDynamoConversions(dynamoRes, conversionFunctions=fromDynamoConversion):
  return {k:fromDynamoConversion[k](v) for k,v in dynamoRes.items()}

def getPostIdData(table, postId):
  return table.query(
    KeyConditionExpression=Key('postId').eq(postId), 
  )['Items']

def getPostIdSparkDataFrame(table, postIds, flatten=True):
  dataFrames = []  
  for postId in postIds:
    res = getPostIdData(table, postId)
    res = [applyDynamoConversions(item) for item in res]
    dataFrames.append(spark.createDataFrame(res, toSparkSchema))  # convert to DF
  if flatten:
    return reduce(DataFrame.unionAll, dataFrames)
  else:
    return returnedData


In [None]:
# this can take a while due to read constraints placed on dynamo db, consider increasing RCU on database
# it can also be slow because converts each dynamodb partition to a spark dataframe,
# this was done so that it would scale better on a distributed system 
# over keeping all the data in python in one node and trying to then move it to spark
postIdData = getPostIdSparkDataFrame(risingTable, postsOfInterest)

In [None]:
# this is a little slow
pandasTestDf = postIdData.limit(1000).toPandas()
pandasTestDf.head()

In [None]:
import pyspark.sql.functions as F

aggData = (
  postIdData
  .groupBy('postId', 'subreddit', 'title', 'createdTSUTC')
  .agg(
    F.max(F.when(F.col('timeElapsedMin') <= 20, F.col('score'))).alias('maxScore20m')
    , F.max(F.when(F.col('timeElapsedMin').between(21,40), F.col('score'))).alias('maxScore21_40m')
    , F.max(F.when(F.col('timeElapsedMin').between(41,60), F.col('score'))).alias('maxScore41_60m')
    , F.max(F.when(F.col('timeElapsedMin') <= 20, F.col('numComments'))).alias('maxNumComments20m')
    , F.max(F.when(F.col('timeElapsedMin').between(21,40), F.col('numComments'))).alias('maxNumComments21_40m')
    , F.max(F.when(F.col('timeElapsedMin').between(41,60), F.col('numComments'))).alias('maxNumComments41_60m')
    , F.max(F.when(F.col('timeElapsedMin') <= 20, F.col('upvoteRatio'))).alias('maxUpvoteRatio20m')
    , F.max(F.when(F.col('timeElapsedMin').between(21,40), F.col('upvoteRatio'))).alias('maxUpvoteRatio21_40m')
    , F.max(F.when(F.col('timeElapsedMin').between(41,60), F.col('upvoteRatio'))).alias('maxUpvoteRatio41_60m')
    , F.max(F.when(F.col('timeElapsedMin') <= 20, F.col('numGildings'))).alias('maxNumGildings20m')
    , F.max(F.when(F.col('timeElapsedMin').between(21,40), F.col('numGildings'))).alias('maxNumGildings21_40m')
    , F.max(F.when(F.col('timeElapsedMin').between(41,60), F.col('numGildings'))).alias('maxNumGildings41_60m')
  )
)

In [None]:
aggDataPd = aggData.toPandas()

In [None]:
aggDataPd.head()