In [2]:
# enabling the job_bookmark
%%configure
{
    "--job-bookmark-option":"job-bookmark-enable"
}

You are already connected to a glueetl session 0921f56a-0d13-4659-a557-9e176e78819d.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


The following configurations have been updated: {'--job-bookmark-option': 'job-bookmark-enable'}


In [2]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import current_timestamp, current_date, lit, concat_ws, col, sha2, split
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions

# creating our spark_session
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job.init(args['JOB_NAME'],args)

# declaring bucket_variables
bucket_name = "data-engineering-project-920372994009"
source_folder_name = "bronze_data"
processed_folder_name = "silver_data"
db_name = "dev"
table_name = "Customer"

# declaring glue_catalog_variables
glue_database = "data-engineering-project-glue-db"
glue_table_name = "raw_data_customer"

# creating/reading data from the glue_catalog

customer_df_from_catalog = glueContext.create_data_frame_from_catalog(glue_database,glue_table_name,\
                                                                      additional_options = {"useCatalogSchema":True,"useSparkDataSource":True,"header":True},\
                                                                      transformation_ctx = "customer_df_from_catalog")

if customer_df_from_catalog.count() > 0:
    # update the data_type of the columns, rename the columns as needed.
    renamed_customer_df = customer_df_from_catalog.withColumnRenamed("op","cdc_operations")\
                                                  .withColumnRenamed("customerid","customer_id")\
                                                  .withColumnRenamed("name","customer_name")\
                                                  .withColumnRenamed("email","customer_email")\
                                                  .withColumnRenamed("phone","customer_phone")\
                                                  .withColumnRenamed("address","customer_address")\
                                                  .withColumnRenamed("country","customer_country")\
                                                  .withColumnRenamed("city","customer_city")\

    # declaring scd_variables
    current_date = current_date()
    current_timestamp = current_timestamp()
    record_end_ts = lit('9999-12-31').cast(TimestampType())
    active_flag = lit(1)

    hash_value = concat_ws('', 
                           col("customer_name"), 
                           col("customer_email"), 
                           col("customer_phone"), 
                           col("customer_address"), 
                           col("customer_country"), 
                           col("customer_city"))

    customer_final_df = renamed_customer_df.withColumn("hash_value", sha2(hash_value, 256))\
                                           .withColumn("ingestion_date",current_date)\
                                           .withColumn("record_start_ts",current_timestamp)\
                                           .withColumn("record_end_ts",record_end_ts)\
                                           .withColumn("cust_first_name", split(renamed_customer_df["customer_name"]," ")[0])\
                                           .withColumn("cust_last_name", split(renamed_customer_df["customer_name"]," ")[1])\
                                           .withColumn("active_flag",active_flag)\
                                           .drop("customer_name")

    final_dyf_customer = DynamicFrame.fromDF(customer_final_df,glueContext,"final_dyf_customer")

    glueContext.write_dynamic_frame.from_options(
        frame = final_dyf_customer,
        connection_type = 's3',
        connection_options = {"path":f"s3://{bucket_name}/{processed_folder_name}/{db_name}/{table_name}/","partitionKeys":["ingestion_date"]},
        format = 'parquet',
        transformation_ctx = 'customer_dyf_to_s3'
    )
else:
    print("No data found in the glue_catalog")

job.commit()

