In [1]:
import sys
from typing import Callable, Dict, Iterator, Tuple
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3
from pyspark.sql.types import Row
from pyspark.sql.functions import concat_ws, col, coalesce, to_date, split


In [2]:
class DDBDelete:
    def __init__(self, table: str, keyGen: Callable[[Row], Dict[str, str]]) -> None:
        self.table = table
        self.keyGen = keyGen
        
    def process(self, df: DynamicFrame) -> None:
        df.toDF().foreachPartition(self.delete)
        
    def delete(self, rows: Iterator[Row]) -> None:
        ddb_underlying_table = boto3.resource("dynamodb").Table(self.table)
        with ddb_underlying_table.batch_writer() as batch:
            for row in rows:
                batch.delete_item(Key=self.keyGen(row))

In [3]:
def init() -> Tuple[GlueContext, Job]:
    params = []
    if '--JOB_NAME' in sys.argv:
        params.append('JOB_NAME')
    args = getResolvedOptions(sys.argv, params)

    context = GlueContext(SparkContext.getOrCreate())
    job = Job(context)

    if 'JOB_NAME' in args:
        jobname = args['JOB_NAME']
    else:
        jobname = "test"
    job.init(jobname, args)
    return (context, job)

context, job = init()

In [4]:
macro_indicators_dynamodb_node: DynamicFrame = context.create_dynamic_frame.from_catalog(
    database="macro_indicators",
    table_name="macro_indicators",
    transformation_ctx="macro_indicators_dynamodb_node",
)

In [5]:
partitioned_dataframe: DynamicFrame = macro_indicators_dynamodb_node.toDF().repartition(1)

In [6]:
partitioned_dynamicframe: DynamicFrame = DynamicFrame.fromDF(partitioned_dataframe, context, "partitioned_df")

In [7]:
concat_node = partitioned_dynamicframe.toDF().withColumn('tags', concat_ws(',', col('tags')))
concat_node = concat_node.withColumn('symbols', concat_ws(',', col('symbols')))
concat_node = concat_node.withColumn('neg_sentiment', coalesce(col('sentiment.neg.long'), col('sentiment.neg.double')))
concat_node = concat_node.withColumn('pos_sentiment', coalesce(col('sentiment.pos.long'), col('sentiment.pos.double')))
concat_node = concat_node.withColumn('new_sentiment', coalesce(col('sentiment.neu.long'), col('sentiment.neu.double')))
concat_node = concat_node.withColumn('publish_timestamp',concat_node['date'])
concat_node = concat_node.withColumn('date', to_date(concat_node['date']))
concat_node = concat_node.withColumn('symbol', split(concat_node['symbol:link'], '#').getItem(0))

In [8]:
drop_node = DynamicFrame.fromDF(concat_node.drop('sentiment'), context, "drop_node")


In [9]:
context.write_dynamic_frame.from_options(
    frame=drop_node,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://tonberry-macro_indicators-staging",
        "partitionKeys": ["symbol","date"],
    },
    transformation_ctx="S3bucket_node3",
)

<awsglue.dynamicframe.DynamicFrame at 0xffff65afb610>

In [15]:

DDBDelete("macro_indicators", lambda x: {"date": x['date'], "symbol:link": x['symbol:link'] }).process(macro_indicators_dynamodb_node)

In [16]:
job.commit()
