In [43]:
%status

Session ID: a5f5013f-7cae-4cc8-bfb9-18ee904dfa5b
Status: READY
Role: arn:aws:iam::767828724616:role/glue-role
CreatedOn: 2024-08-11 19:28:58.329000+05:30
GlueVersion: 4.0
Session Type: glueetl
Idle Timeout: 2880
Tags: {'owner': '767828724616'}
Worker Type: G.1X
Number of Workers: 5
Region: us-east-1
Applying the following default arguments:
--glue_kernel_version 1.0.6
--enable-glue-datacatalog true
Arguments Passed: ['--glue_kernel_version: 1.0.6', '--enable-glue-datacatalog: true']


In [424]:
%idle_timeout 15
%glue_version 4.0
%worker_type G.1X
%number_of_workers 1

import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Initialize Glue context and Spark session
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
spark=SparkSession.builder.appName("CDC").getOrCreate()


You are already connected to a glueetl session a5f5013f-7cae-4cc8-bfb9-18ee904dfa5b.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 15 minutes.
idle_timeout has been set to 15 minutes.


You are already connected to a glueetl session a5f5013f-7cae-4cc8-bfb9-18ee904dfa5b.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session a5f5013f-7cae-4cc8-bfb9-18ee904dfa5b.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session a5f5013f-7cae-4cc8-bfb9-18ee904dfa5b.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 1
Setting new number of workers to: 1



In [425]:
s3_bucket = "s3://cdc-pipeline-stage/sales/orders/LOAD00000001.csv"




In [426]:
# Read the CSV file from S3 into a Spark DataFrame
full_df = spark.read.format("csv") \
    .load(s3_bucket)




In [427]:
full_df.show(2)

+---+----------+----------+-------+
|_c0|       _c1|       _c2|    _c3|
+---+----------+----------+-------+
|  1|  john doe|    laptop|1200.00|
|  2|jane smith|smartphone| 800.00|
+---+----------+----------+-------+
only showing top 2 rows


In [428]:
full_df = full_df \
    .withColumnRenamed("_c0", "orderid") \
    .withColumnRenamed("_c1", "customername") \
    .withColumnRenamed("_c2", "productname") \
    .withColumnRenamed("_c3", "orderamount")







In [429]:
full_df.show(2)

+-------+------------+-----------+-----------+
|orderid|customername|productname|orderamount|
+-------+------------+-----------+-----------+
|      1|    john doe|     laptop|    1200.00|
|      2|  jane smith| smartphone|     800.00|
+-------+------------+-----------+-----------+
only showing top 2 rows


In [430]:
full_df.write.mode("OVERWRITE").option("header","true").parquet("s3://cdc-dms-final/sales/orders/final_output/final_file.parquet")




In [431]:
s3_bucket_final_file="s3://cdc-dms-final/sales/orders/final_output/final_file.parquet"
final_file_df=spark.read.format("parquet").option("header","true").option("inferSchema","true").load(s3_bucket_final_file)





In [432]:
s3_bucket_updated_file="s3://cdc-pipeline-stage/sales/orders/20240811-191438099.csv"

updated_df = spark.read.format("csv").load(s3_bucket_updated_file)
updated_df.show()

+---+---+------------+-----+-------+
|_c0|_c1|         _c2|  _c3|    _c4|
+---+---+------------+-----+-------+
|  U|  2|       bachu|shoes| 800.00|
|  I|  5|   appu raja|watch|1000.00|
|  I|  6|babu bisleri|cycle| 150.00|
|  U|  5|        dadu|watch|1000.00|
|  D|  6|babu bisleri|cycle| 150.00|
+---+---+------------+-----+-------+


In [433]:
updated_df = updated_df \
        .withColumnRenamed("_c0", "action") \
        .withColumnRenamed("_c1", "orderid") \
        .withColumnRenamed("_c2", "customername") \
        .withColumnRenamed("_c3", "productname") \
        .withColumnRenamed("_c4", "orderamount")





In [434]:
updated_df.show()

+------+-------+------------+-----------+-----------+
|action|orderid|customername|productname|orderamount|
+------+-------+------------+-----------+-----------+
|     U|      2|       bachu|      shoes|     800.00|
|     I|      5|   appu raja|      watch|    1000.00|
|     I|      6|babu bisleri|      cycle|     150.00|
|     U|      5|        dadu|      watch|    1000.00|
|     D|      6|babu bisleri|      cycle|     150.00|
+------+-------+------------+-----------+-----------+


In [435]:
updated_df.collect()

[Row(action='U', orderid='2', customername='bachu', productname='shoes', orderamount='800.00'), Row(action='I', orderid='5', customername='appu raja', productname='watch', orderamount='1000.00'), Row(action='I', orderid='6', customername='babu bisleri', productname='cycle', orderamount='150.00'), Row(action='U', orderid='5', customername='dadu', productname='watch', orderamount='1000.00'), Row(action='D', orderid='6', customername='babu bisleri', productname='cycle', orderamount='150.00')]


In [436]:
from pyspark.sql.functions import *




In [437]:

for row in updated_df.collect():

    if row['action'] == 'U':
        final_file_df=final_file_df.withColumn('customername',when(final_file_df["orderid"] == row["orderid"] ,row["customername"]).otherwise(final_file_df["customername"]))

        final_file_df=final_file_df.withColumn('productname',when(final_file_df["orderid"] == row["orderid"] ,row["productname"]).otherwise(final_file_df["productname"]))

        final_file_df=final_file_df.withColumn('orderamount',when(final_file_df["orderid"] == row["orderid"] ,row["orderamount"]).otherwise(final_file_df["orderamount"]))

    if row['action'] == 'I':
        inserted_row=[list(row)[1:]]
        columns=['orderid','customername','productname','orderamount']
        new_df=spark.createDataFrame(inserted_row,columns)
        final_file_df=final_file_df.union(new_df)

    if row['action'] == 'D':
        final_file_df=final_file_df.filter(final_file_df.orderid != row['orderid'])




In [438]:
final_file_df.show()

+-------+-------------+-----------+-----------+
|orderid| customername|productname|orderamount|
+-------+-------------+-----------+-----------+
|      1|     john doe|     laptop|    1200.00|
|      2|        bachu|      shoes|     800.00|
|      3|alice johnson|     tablet|     300.00|
|      4|    bob brown| headphones|     150.00|
|      5|         dadu|      watch|    1000.00|
+-------+-------------+-----------+-----------+


In [None]:
final_file_df.write.mode("OVERWRITE").parquet("s3://cdc-dms-final/sales/orders/final_output/final_file.parquet")