In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CDC-Streaming-Bronze-Pipeline") \
    .master("spark://node-2:7077") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0,"
            "io.delta:delta-spark_2.13:4.0.0,"
            "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.11") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", 
            "/home/longnguyen/credentials/key.json") \
    .getOrCreate()

# Verify connection
print(f"Spark Version: {spark.version}")
print(f"Master: {spark.sparkContext.master}")
print(f"App Name: {spark.sparkContext.appName}")

:: loading settings :: url = jar:file:/home/longnguyen/miniconda3/envs/spark_env/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/longnguyen/.ivy2.5.2/cache
The jars for the packages stored in: /home/longnguyen/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
io.delta#delta-spark_2.13 added as a dependency
com.google.cloud.bigdataoss#gcs-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-adc7377b-981c-4efa-823e-e39a1dd6a6fb;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.0 in central
	found org.apache.kafka#kafka-clients;3.9.0 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.7 in central
	found org.slf4j#slf4j-api;2.0.16 in central
	found org.apache.hadoop#hadoop-client-runti

Spark Version: 4.0.0
Master: spark://node-2:7077
App Name: CDC-Streaming-Bronze-Pipeline


# TEST: View raw Kafka messages to understand the actual structure

In [None]:
# TEST: View raw Kafka messages to understand the actual structure
raw_messages = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "10.0.0.2:9092") \
    .option("subscribe", "cdc-pipeline.public.products") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load() \
    .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "topic", "partition", "offset", "timestamp")

# Display raw messages
raw_query = raw_messages \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", "false") \
    .option("numRows", "10") \
    .start()

# Run for 30 seconds
raw_query.awaitTermination(30)
raw_query.stop()
print("\n✓ Raw message inspection complete. Review the 'value' column above.")

25/12/22 16:20:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1006c3cd-2f5c-42e3-b435-c722033d9b03. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/12/22 16:20:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

25/12/22 16:21:02 WARN DAGScheduler: Failed to cancel job group 547ce619-8a9d-4f2f-83d6-5d4915014c46. Cannot find active jobs for it.
25/12/22 16:21:02 WARN DAGScheduler: Failed to cancel job group 547ce619-8a9d-4f2f-83d6-5d4915014c46. Cannot find active jobs for it.


# Consume messages from Kafka topic and write to GCS Delta Lake table

In [2]:
from pyspark.sql.functions import col, from_json, current_timestamp, lit, to_date, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

# Define the schema for Debezium CDC payload (standard format)
debezium_schema = StructType([
    StructField("schema", StructType([
        StructField("type", StringType(), True),
        StructField("fields", StringType(), True),
        StructField("optional", StringType(), True),
        StructField("name", StringType(), True),
        StructField("version", IntegerType(), True)
    ]), True),
    StructField("payload", StructType([
        StructField("before", StructType([
            StructField("product_id", IntegerType(), True),
            StructField("product_name", StringType(), True),
            StructField("aisle_id", IntegerType(), True),
            StructField("department_id", IntegerType(), True)
        ]), True),
        StructField("after", StructType([
            StructField("product_id", IntegerType(), True),
            StructField("product_name", StringType(), True),
            StructField("aisle_id", IntegerType(), True),
            StructField("department_id", IntegerType(), True)
        ]), True),
        StructField("source", StructType([
            StructField("version", StringType(), True),
            StructField("connector", StringType(), True),
            StructField("name", StringType(), True),
            StructField("ts_ms", LongType(), True),
            StructField("snapshot", StringType(), True),
            StructField("db", StringType(), True),
            StructField("sequence", StringType(), True),
            StructField("schema", StringType(), True),
            StructField("table", StringType(), True),
            StructField("txId", LongType(), True),
            StructField("lsn", LongType(), True),
            StructField("xmin", LongType(), True)
        ]), True),
        StructField("op", StringType(), True),  # Operation: c=create, u=update, d=delete, r=read
        StructField("ts_ms", LongType(), True),
        StructField("transaction", StructType([
            StructField("id", StringType(), True),
            StructField("total_order", LongType(), True),
            StructField("data_collection_order", LongType(), True)
        ]), True)
    ]), True)
])

# Read from Kafka topic using Structured Streaming
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "10.0.0.2:9092") \
    .option("subscribe", "cdc-pipeline.public.products") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

# Parse the Kafka message value as JSON
parsed_df = kafka_df \
    .selectExpr("CAST(value AS STRING) as json_value", "timestamp as kafka_timestamp", "offset", "partition", "topic") \
    .select(
        from_json(col("json_value"), debezium_schema).alias("data"),
        col("kafka_timestamp"),
        col("offset"),
        col("partition"),
        col("topic")
    )

# Build bronze layer with full CDC payload and metadata
bronze_df = parsed_df.select(
    # CDC Payload Fields
    col("data.payload.before").alias("before"),
    col("data.payload.after").alias("after"),
    col("data.payload.op").alias("op"),
    col("data.payload.ts_ms").alias("ts_ms"),
    
    # Source Metadata
    col("data.payload.source.db").alias("source_db"),
    col("data.payload.source.table").alias("source_table"),
    col("data.payload.source.ts_ms").alias("source_ts_ms"),
    col("data.payload.source.txId").cast("string").alias("source_txn_id"),
    col("data.payload.source.lsn").alias("source_lsn"),
    
    # Kafka Metadata
    col("topic").alias("kafka_topic"),
    col("partition").alias("kafka_partition"),
    col("offset").alias("kafka_offset"),
    col("kafka_timestamp").alias("kafka_timestamp"),
    
    # Ingestion Metadata
    current_timestamp().alias("ingestion_timestamp"),
    to_date(current_timestamp()).alias("ingestion_date"),
    when(col("data.payload.source.snapshot").isin("true", "last", "first"), lit("snapshot"))
        .otherwise(lit("incremental")).alias("load_type")
)

# GCS path for bronze layer (update with your bucket name)
bronze_path = "gs://cdc-pipeline-data/uat/bronze/products"
checkpoint_path = "gs://cdc-pipeline-data/uat/checkpoints/products"

# Write to Delta Lake in GCS bronze layer
query = bronze_df \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .option("path", bronze_path) \
    .start()

# Monitor the streaming query
print(f"Streaming Query ID: {query.id}")
print(f"Writing to: {bronze_path}")
print("Waiting for data...")

# Keep running (adjust timeout as needed)
query.awaitTermination(30)

25/12/22 16:50:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Streaming Query ID: 981cfc88-8637-428a-8a81-bdf48afa12bb
Writing to: gs://cdc-pipeline-data/uat/bronze/products
Waiting for data...


                                                                                

False

In [3]:
# Read the product table from bronze layer
print("Reading products from bronze layer...")
df_products = spark.read.format("delta").load(bronze_path)

print(f"Total records: {df_products.count()}")
print("\nSchema:")
df_products.printSchema()

print("\nSample data:")
df_products.sort("kafka_timestamp").show(20, truncate=False)

print("\nOperation breakdown:")
df_products.groupBy("op").count().show()

Reading products from bronze layer...


25/12/22 16:50:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Total records: 4

Schema:
root
 |-- before: struct (nullable = true)
 |    |-- product_id: integer (nullable = true)
 |    |-- product_name: string (nullable = true)
 |    |-- aisle_id: integer (nullable = true)
 |    |-- department_id: integer (nullable = true)
 |-- after: struct (nullable = true)
 |    |-- product_id: integer (nullable = true)
 |    |-- product_name: string (nullable = true)
 |    |-- aisle_id: integer (nullable = true)
 |    |-- department_id: integer (nullable = true)
 |-- op: string (nullable = true)
 |-- ts_ms: long (nullable = true)
 |-- source_db: string (nullable = true)
 |-- source_table: string (nullable = true)
 |-- source_ts_ms: long (nullable = true)
 |-- source_txn_id: string (nullable = true)
 |-- source_lsn: long (nullable = true)
 |-- kafka_topic: string (nullable = true)
 |-- kafka_partition: integer (nullable = true)
 |-- kafka_offset: long (nullable = true)
 |-- kafka_timestamp: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nulla

                                                                                

+------+----------------------------+---+-------------+---------+------------+-------------+-------------+----------+----------------------------+---------------+------------+-----------------------+-----------------------+--------------+---------+
|before|after                       |op |ts_ms        |source_db|source_table|source_ts_ms |source_txn_id|source_lsn|kafka_topic                 |kafka_partition|kafka_offset|kafka_timestamp        |ingestion_timestamp    |ingestion_date|load_type|
+------+----------------------------+---+-------------+---------+------------+-------------+-------------+----------+----------------------------+---------------+------------+-----------------------+-----------------------+--------------+---------+
|NULL  |{3, Boneless Chicken, 12, 3}|r  |1766422158260|postgres |products    |1766422158029|888          |201744864 |cdc-pipeline.public.products|1              |0           |2025-12-22 16:49:18.691|2025-12-22 16:50:16.205|2025-12-22    |snapshot |
|NUL

                                                                                

In [29]:
# Delete bronze data and checkpoint to completely reset and reconsume all messages
hadoop_conf = spark._jsc.hadoopConfiguration()

# Delete bronze data
bronze_path_obj = spark._jvm.org.apache.hadoop.fs.Path(bronze_path)
fs_bronze = bronze_path_obj.getFileSystem(hadoop_conf)
if fs_bronze.exists(bronze_path_obj):
    fs_bronze.delete(bronze_path_obj, True)
    print(f"✓ Deleted bronze data at {bronze_path}")
else:
    print(f"Bronze data not found at {bronze_path}")

# Delete checkpoint
checkpoint_path_obj = spark._jvm.org.apache.hadoop.fs.Path(checkpoint_path)
fs_checkpoint = checkpoint_path_obj.getFileSystem(hadoop_conf)
if fs_checkpoint.exists(checkpoint_path_obj):
    fs_checkpoint.delete(checkpoint_path_obj, True)
    print(f"✓ Deleted checkpoint at {checkpoint_path}")
else:
    print(f"Checkpoint not found at {checkpoint_path}")

print("\n✓ Ready to reconsume all messages from the beginning")

✓ Deleted bronze data at gs://cdc-pipeline-data/uat/bronze/products
✓ Deleted checkpoint at gs://cdc-pipeline-data/uat/checkpoints/products

✓ Ready to reconsume all messages from the beginning


25/12/22 16:48:14 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:64)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:101)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:112)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAd

In [4]:
# Stop Spark session
spark.stop()
print("✓ Spark session stopped")

✓ Spark session stopped
