In [None]:
import os
from awsglue.utils import getResolvedOptions
from awsglue import DynamicFrame
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql import functions as F
import sys

In [None]:
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

In [None]:
bucketName = 'reddit-koroomvn-bucket'
dbName = 'redditdb'

In [None]:
dyf =  glueContext.create_dynamic_frame_from_options(
    connection_type='s3'
    , connection_options={
        'paths': [f's3://{bucketName}/raw/']
        , 'recurse': True
    }
    , format='csv'
    , format_options={
        'withHeader': True
    }
)

dyfApplyMapping = dyf.apply_mapping([
    ('id', 'string', 'id', 'string')
    , ('title', 'string', 'title', 'string')
    , ('score', 'string', 'score', 'integer')
    , ('num_comments', 'string', 'num_comments', 'integer')
    , ('author', 'string', 'author', 'string')
    , ('created_utc', 'string', 'created_utc', 'timestamp')
    , ('url', 'string', 'url', 'string')
    , ('over_18', 'string', 'over_18', 'boolean')
    , ('edited', 'string', 'edited', 'boolean')
    , ('spoiler', 'string', 'spoiler', 'boolean')
    , ('stickied', 'string', 'stickied', 'boolean')
])

def addColumns(rec):
    rec['ess_updated'] = (
        str(rec['edited']) 
        + '-' + str(rec['spoiler']) 
        + '-' + str(rec['stickied'])
        )
    
    rec['year'] = rec['created_utc'].year
    rec['month'] = rec['created_utc'].month
    rec['day'] = rec['created_utc'].day
    
    return rec

dyfMap = dyfApplyMapping.map(f=addColumns)

dyfDropFields = dyfMap.drop_fields(['edited', 'spoiler', 'stickied'])
dyfDropFields.show()
sink = glueContext.getSink(
    connection_type='s3'
    , path=f's3://{bucketName}/transformed/'
    , enableUpdateCatalog=True
    , updateBehavior='UPDATE_IN_DATABASE'
    , partitionKeys=['year', 'month', 'day']
)

sink.setFormat(
    format='csv'
    , separator=','
    , writeHeader=False
)

sink.setCatalogInfo(
    catalogDatabase=dbName
    , catalogTableName='transformed'
)
sink.writeFrame(
    dynamic_frame=dyfDropFields
)

In [None]:
job.commit()