In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Ecommerce CDC Processing") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config(
        'spark.jars.packages',
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,com.clickhouse:clickhouse-jdbc:0.6.0"
    )
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]") 
    .getOrCreate()
)

spark

## Connect

In [2]:
# --- Connect to Kafka ---
KAFKA_SERVERS = "kafka1:9092"
CDC_TOPIC = "pg.public.customers"

print(f"🔗 Kafka servers: {KAFKA_SERVERS}")
print(f"📋 CDC topic: {CDC_TOPIC}")

kafka_stream = (
    spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_SERVERS)
    .option("subscribe", CDC_TOPIC)
    # .option("startingOffsets", "latest")
    .load()
)

print("✅ Kafka stream is connected")
kafka_stream.printSchema()

🔗 Kafka servers: kafka1:9092
📋 CDC topic: pg.public.customers
✅ Kafka stream is connected
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [3]:
kafka_stream.show()

+--------------------+--------------------+-------------------+---------+------+--------------------+-------------+
|                 key|               value|              topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+-------------------+---------+------+--------------------+-------------+
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     0|2025-08-10 11:08:...|            0|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     1|2025-08-10 11:08:...|            0|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     2|2025-08-10 11:08:...|            0|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     3|2025-08-10 11:08:...|            0|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     4|2025-08-10 11:08:...|            0|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0

## Transform

### Parse raw message

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define UDF to decode byte array to string
def decode_bytes(bytes_array):
    if bytes_array is not None:
        try:
            # Convert byte array to Python bytes and decode as UTF-8
            return bytes(bytes_array).decode('utf-8')
        except Exception:
            return None
    return None
decode_udf = udf(decode_bytes, StringType())

In [5]:
from pyspark.sql.functions import expr, col

# --- Parse binary raw message ---
kafka_json_df = (
     kafka_stream
    .withColumn('key_str', decode_udf(col("key")))
    .withColumn('value_str', expr('cast(value as string)'))
)

kafka_json_df.show()

+--------------------+--------------------+-------------------+---------+------+--------------------+-------------+---------+--------------------+
|                 key|               value|              topic|partition|offset|           timestamp|timestampType|  key_str|           value_str|
+--------------------+--------------------+-------------------+---------+------+--------------------+-------------+---------+--------------------+
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     0|2025-08-10 11:08:...|            0| {"id":2}|{"before":null,"a...|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     1|2025-08-10 11:08:...|            0| {"id":4}|{"before":null,"a...|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     2|2025-08-10 11:08:...|            0| {"id":5}|{"before":null,"a...|
|[7B 22 69 64 22 3...|[7B 22 62 65 66 6...|pg.public.customers|        0|     3|2025-08-10 11:08:...|            0| {"

### Extract JSON data

In [6]:
from pyspark.sql.types import StructType, IntegerType, StringType, LongType, StructField

# Schema for CDC value JSON (Debezium format)
key_schema = StructType([
    StructField("id", IntegerType(), True)
])

value_schema = StructType([
    StructField("before", StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("created_at", LongType(), True)
    ]), True),
    StructField("after", StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("created_at", LongType(), True)
    ]), True),
    StructField("source", StructType([
        StructField("ts_ms", LongType(), True),
        StructField("schema", StringType(), True),
        StructField("table", StringType(), True)
    ]), True),
    StructField("op", StringType(), True),
    StructField("ts_ms", LongType(), True)  # Transaction timestamp
])


In [7]:
from pyspark.sql.functions import from_json

transformed_df = (
    kafka_json_df
    .withColumn("key_json", from_json(col("key_str"), key_schema))
    .withColumn('value_json', from_json(col('value_str'), value_schema))
    .drop('value', 'key')
)

In [8]:
transformed_df.show()

+-------------------+---------+------+--------------------+-------------+---------+--------------------+--------+--------------------+
|              topic|partition|offset|           timestamp|timestampType|  key_str|           value_str|key_json|          value_json|
+-------------------+---------+------+--------------------+-------------+---------+--------------------+--------+--------------------+
|pg.public.customers|        0|     0|2025-08-10 11:08:...|            0| {"id":2}|{"before":null,"a...|     {2}|{null, {2, Bob, b...|
|pg.public.customers|        0|     1|2025-08-10 11:08:...|            0| {"id":4}|{"before":null,"a...|     {4}|{null, {4, BatchC...|
|pg.public.customers|        0|     2|2025-08-10 11:08:...|            0| {"id":5}|{"before":null,"a...|     {5}|{null, {5, BatchC...|
|pg.public.customers|        0|     3|2025-08-10 11:08:...|            0| {"id":6}|{"before":null,"a...|     {6}|{null, {6, BatchC...|
|pg.public.customers|        0|     4|2025-08-10 11:08:

### Handle CDC ops

In [9]:
from pyspark.sql.functions import when, lit

# --- Extract fields & handle CDC ops
# For create/update/read: Use 'after' + _version = ts_ms
# For delete: Insert with null fields or skip (for ReplacingMergeTree, insert with higher _version to replace)
cdc_df = transformed_df.select(
    # ID: From after/before/key
    when(col("value_json.op").isin("c", "u", "r"), col("value_json.after.id"))
    .when(col("value_json.op") == "d", col("value_json.before.id"))
    .otherwise(col("key_json.id")).alias("id"),
    
    # Fields: From after for insert/update, null for delete
    when(col("value_json.op").isin("c", "u", "r"), col("value_json.after.name"))
    .otherwise(lit(None)).alias("name"),
    
    when(col("value_json.op").isin("c", "u", "r"), col("value_json.after.email"))
    .otherwise(lit(None)).alias("email"),
    
    when(col("value_json.op").isin("c", "u", "r"), col("value_json.after.created_at"))
    .otherwise(lit(None)).alias("created_at"),
    
    # _version: From ts_ms
    col("value_json.ts_ms").alias("_version"),
    
    # _deleted: 0 for insert/update, 1 for delete
    when(col("value_json.op") == "d", lit(1))
    .otherwise(lit(0)).alias("_deleted")
)

cdc_df.show()

+---+--------------------+--------------------+----------------+-------------+--------+
| id|                name|               email|      created_at|     _version|_deleted|
+---+--------------------+--------------------+----------------+-------------+--------+
|  2|                 Bob|     bob@example.com|1754742098873040|1754824114822|       0|
|  4|BatchCustomer_175...|batchcustomer_175...|1754804426378610|1754824114828|       0|
|  5|BatchCustomer_175...|batchcustomer_175...|1754804426381457|1754824114829|       0|
|  6|BatchCustomer_175...|batchcustomer_175...|1754804426383958|1754824114829|       0|
|  7|BatchCustomer_175...|batchcustomer_175...|1754804426386369|1754824114830|       0|
|  8|BatchCustomer_175...|batchcustomer_175...|1754804426389480|1754824114830|       0|
|  9|BatchCustomer_175...|batchcustomer_175...|1754804426393612|1754824114831|       0|
| 10|BatchCustomer_175...|batchcustomer_175...|1754804426396510|1754824114831|       0|
| 11|BatchCustomer_175...|batchc

## Write event

### ClickHouse Table Schema

First, create the table in ClickHouse:

```sql
-- Connect to ClickHouse and create database
CREATE DATABASE IF NOT EXISTS ecommerce_analytics;

-- Create ReplacingMergeTree table for CDC
CREATE TABLE IF NOT EXISTS ecommerce_analytics.customers_cdc (
    id Int32,
    name Nullable(String),
    email Nullable(String),
    created_at Nullable(Int64),
    _version Int64,  -- For ReplacingMergeTree versioning
    _deleted UInt8   -- 0 = active, 1 = deleted
) ENGINE = ReplacingMergeTree(_version)
ORDER BY id;
```

In [12]:
# --- Write to ClickHouse ---
CLICKHOUSE_URL = "jdbc:clickhouse://clickhouse:8123/ecommerce_analytics"
CLICKHOUSE_USER = "default"
CLICKHOUSE_PASSWORD = "clickhouse123"

# ClickHouse connection properties
clickhouse_properties = {
    "user": CLICKHOUSE_USER,
    "password": CLICKHOUSE_PASSWORD,
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

print(f"🗄️ ClickHouse URL: {CLICKHOUSE_URL}")
print(f"👤 User: {CLICKHOUSE_USER}")

# --- Write to Console (for debugging) ---
print("\n📊 Writing to console for debugging:")
cdc_df.show(20, truncate=False)

# --- Write to ClickHouse ---
try:
    print("\n💾 Writing to ClickHouse...")
    
    # Write to ClickHouse table (ReplacingMergeTree with _version)
    (cdc_df
     .write
     .format("jdbc")
     .option("url", CLICKHOUSE_URL)
     .option("dbtable", "customers_cdc")  # Table name in ClickHouse
     .option("user", CLICKHOUSE_USER)
     .option("password", CLICKHOUSE_PASSWORD)
     .option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
     .mode("append")  # Always append for CDC
     .save()
    )
    
    print("✅ Data written to ClickHouse successfully!")
    
except Exception as e:
    print(f"❌ Error writing to ClickHouse: {e}")
    print("📝 Writing to console only...")
    cdc_df.show()

🗄️ ClickHouse URL: jdbc:clickhouse://clickhouse:8123/ecommerce_analytics
👤 User: default

📊 Writing to console for debugging:
+---+--------------------------+--------------------------------------+----------------+-------------+--------+
|id |name                      |email                                 |created_at      |_version     |_deleted|
+---+--------------------------+--------------------------------------+----------------+-------------+--------+
|2  |Bob                       |bob@example.com                       |1754742098873040|1754824114822|0       |
|4  |BatchCustomer_1754804426_1|batchcustomer_1754804426_1@example.com|1754804426378610|1754824114828|0       |
|5  |BatchCustomer_1754804426_2|batchcustomer_1754804426_2@example.com|1754804426381457|1754824114829|0       |
|6  |BatchCustomer_1754804426_3|batchcustomer_1754804426_3@example.com|1754804426383958|1754824114829|0       |
|7  |BatchCustomer_1754804426_4|batchcustomer_1754804426_4@example.com|1754804426386369|17

NameError: name 'cdc_df' is not defined