![my_test_image](files/tables/cdcimg1.jpg)

In [0]:
# Make sure you replace three variables (awsAccessKeyId, awsSecretKey and rdsConnectString). Follow section "Preparing the Delta as a CDC Source Notebook Environment"
# from the article

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import Row

awsAccessKeyId = "**********************"
awsSecretKey = "*************************"
kinesisStreamName = "hotelStream"
awsRegion = "us-east-1"
hotelBucket = "hotelcdcbucket"
rdsConnectString = "jdbc:mysql://****************.rds.amazonaws.com/cdc"
rdsUsername = "admin"
rdsPassword = "admin123"

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", awsAccessKeyId)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", awsSecretKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + awsRegion + ".amazonaws.com")

bronzeHotelCheckpointLocation = "s3a://%s/deltaCheckpoint/bronze/hotels" % hotelBucket
bronzeHotelLocation = "s3a://%s/deltaTable/bronze/hotels" % hotelBucket
changeLogLocation = "s3a://%s/deltaTable/silver/changelog" % hotelBucket
silverHotelLocation = "s3a://%s/deltaTable/silver/hotels" % hotelBucket

In [0]:
# Read events using structured streaming, applying the schema to JSON, extracting values from it, and finally saving results as a Delta table in the bronze layer

kinesisDF = spark \
                 .readStream.format("kinesis") \
                 .option("streamName", kinesisStreamName) \
                 .option("region", awsRegion) \
                 .option("initialPosition", "LATEST") \
                 .option("awsAccessKey", awsAccessKeyId) \
                 .option("awsSecretKey", awsSecretKey) \
                 .option("inferSchema", "true") \
                 .option("format","json") \
                 .load()

hotelBronzeSchema = StructType() \
                    .add("timestamp", StringType()) \
                    .add("city", StringType()) \
                    .add("hotel", StringType()) \
                    .add("price", FloatType())

bronzehotelDF = kinesisDF \
                   .selectExpr("cast (data as STRING) jsonData") \
                   .select(from_json("jsonData", hotelBronzeSchema).alias("hotels_bronze")) \
                   .select("hotels_bronze.*") \
                   .writeStream \
                   .partitionBy("timestamp") \
                   .format("delta") \
                   .option("checkpointLocation", bronzeHotelCheckpointLocation) \
                   .start(bronzeHotelLocation) 

In [0]:
# Curate data from the stream such as changing Unix epoch time to date, changing data types and splitting fields city and country. 

bronzehotelData = spark.readStream.format("delta").load(bronzeHotelLocation)

curatedhotelDF = bronzehotelData.withColumn('timestamp', from_unixtime('timestamp', 'MM/dd/yyy:HH:m:ss')) \
                                .withColumn('timestamp_new', to_timestamp('timestamp', 'MM/dd/yyy:HH:m:ss')) \
                                .drop('timestamp') \
                                .withColumnRenamed('timestamp_new', 'timestamp') \
                                .withColumn('citytmp', split('city', ',').getItem(0)) \
                                .withColumn('country', split('city', ',').getItem(1)) \
                                .drop('city') \
                                .withColumnRenamed('citytmp', 'city')

display(curatedhotelDF)        

In [0]:
# Drop previous versions of the table in Hive Catalog

spark.sql("DROP TABLE IF EXISTS hotels_silver")
spark.sql("DROP TABLE IF EXISTS change_log")

In [0]:
# Create the silver layer table hotels_silver where records from each micro-batch are either inserted as a new records or merged into existing ones

silverhotelSchema = StructType() \
                    .add("price", FloatType()) \
                    .add("timestamp", TimestampType()) \
                    .add("city", StringType()) \
                    .add("country", StringType()) \
                    .add("hotel", StringType())

silverhotelemptyRDD = spark.sparkContext.emptyRDD()
silverhotelDF = spark.createDataFrame(silverhotelemptyRDD, silverhotelSchema) \
                     .write \
                     .format("delta") \
                     .partitionBy("city") \
                     .mode("overwrite") \
                     .option("path", silverHotelLocation) \
                     .saveAsTable("hotels_silver")

In [0]:
# Create the change_log table to save immutable log of changes over time

changehotelSchema = StructType() \
                    .add("city", StringType()) \
                    .add("hotel", StringType()) \
                    .add("batchId", IntegerType()) 

changehotelemptyRDD = spark.sparkContext.emptyRDD()
changehotelDF = spark.createDataFrame(changehotelemptyRDD, changehotelSchema) \
                     .write \
                     .format("delta") \
                     .mode("overwrite") \
                     .option("path", changeLogLocation) \
                     .saveAsTable("change_log")

In [0]:
# Function to perform Idempotent writes operations on change_log and hotels_silver tables

def upsertToDelta(microBatchOutputDF, batchId): 
  
  batchDF = microBatchOutputDF.withColumn("batchId", lit(batchId)).dropDuplicates(["city","hotel", "batchId"])
  batchDF.createOrReplaceTempView("updates")

  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO hotels_silver t
    USING updates s
    ON s.city = t.city AND s.hotel = t.hotel
    WHEN MATCHED THEN UPDATE SET price = s.price
    WHEN NOT MATCHED THEN INSERT *
  """)
  
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO change_log t
    USING updates u
    ON u.city = t.city AND u.hotel = t.hotel AND u.batchId = t.batchId
    WHEN NOT MATCHED THEN INSERT (city, hotel, batchId) VALUES (u.city, u.hotel, u.batchId)
  """)
  

In [0]:
# Write each micro-batch of curated data as idempotent writes to change_log and hotels_silver tables
# Notice the use of upsertToDelta function that is called foreachBatch()

curatedhotelDF.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .start()

In [0]:
# Declare a stream for 

changeData = spark.readStream.format("delta").option("ignoreChanges", "true").load(changeLogLocation)
changeData.createOrReplaceTempView("changelog")

silverData = spark.readStream.format("delta").option("ignoreChanges", "true").load(silverHotelLocation)
silverData.createOrReplaceTempView("silver")
changeStreamFullRecords = changeData.join(silverData, ["city", "hotel"], "inner").dropDuplicates()

In [0]:
display(changeStreamFullRecords)

In [0]:
# The reconstructed CDC records can then be synced downstream. In the example below we are sending the CDC records to a relational data store.

def sink_change_record(df, batchId):
    df.write \
      .format("jdbc") \
      .option("url", rdsConnectString) \
      .option("dbtable", "hotelcdcprices") \
      .option("user", rdsUsername) \
      .option("password", rdsPassword) \
      .mode("append") \
      .save()
  
changeStreamFullRecords.writeStream.foreachBatch(sink_change_record).start()  