In [None]:
#Importing all the basic Glue, Spark libraries 

import os, sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Important further required libraries
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from datetime import datetime

# Starting Spark/Glue Context

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("refresh_token", StringType(), True),
    StructField("token_tmp", StringType(), True),
    StructField("login_flg", IntegerType(), True),
    StructField("logout_flg", IntegerType(), True),
    StructField("init_tutorial", IntegerType(), True),
    StructField("introduction", IntegerType(), True),
    StructField("email_slide_tutorial", IntegerType(), True),
    StructField("first_time_set_nickname", IntegerType(), True),
    StructField("auto_send", IntegerType(), True),
    StructField("message_flg", IntegerType(), True),
    StructField("created_at", StringType(), True),
    StructField("updated_at", StringType(), True)
])
# AWS configuration
s3_bucket_name = "s3://dynamodb-csv-importing/regular_report/"
ddb_table_name = 'BPDiary-regular_report_performance'

# Read file from S3
file_list = [
    "regular_report.csv", "regular_report-dummy.csv"
]

# Read each file and union them into a single DataFrame
df_list = []
for file_name in file_list:
    df = spark.read.load(s3_bucket_name + file_name, 
                         format="csv", 
                         sep=",", 
                         inferSchema="true",
                         schema=schema,
                         header="true")
    df_list.append(df)

df = df_list[0]
for temp_df in df_list[1:]:
    df = df.union(temp_df)

# transform DataFrame into DynamicFrame
df_dyf = DynamicFrame.fromDF(df, glueContext, "df_dyf")

# write data to DynamoDB
print("Start writing to DynamoDB: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
glueContext.write_dynamic_frame_from_options(
    frame=df_dyf,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": ddb_table_name,
        "dynamodb.throughput.write.percent": "1.0"
    }
)

print(f"Schema of DataFrame: {df.printSchema()}")
print(f"Preview of DataFrame: {df.show(5)}")

print("Finished writing to DynamoDB: {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
# count data
print(f"Number of records written: {df.count()}")
