In [5]:
# Labraries
from kafka import KafkaProducer
import json, time, random, uuid
from datetime import datetime, timedelta
from pyspark.sql.functions import expr, col, from_json
from pyspark.sql.types import StringType, StructField, StructType, LongType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct

In [6]:
# 1. Create Spark session
spark = (
    SparkSession
    .builder
    .appName("OrdersToKafka")
    .config('spark.streaming.stopGracefullyOnShutdown', True)
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0")
    .config('spark.sql.shuffle.partitions', 4)
    .master('local[*]')
    .getOrCreate()
)
# Read streaming data from order_data kafka:9092
order_df = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', 'kafka:9092')
    .option('subscribe', 'order_data')
    .option('startingOffsets', 'earliest')
    .load()
)
# Read streaming data from customer_event kafka:9092
customer_df = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', 'kafka:9092')
    .option('subscribe', 'customer_event')
    .option('startingOffsets', 'earliest')
    .load()
)


In [7]:
# Tranform data order_data
order_trans = order_df.withColumn('value', expr('cast(value as string)'))
order_trans.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# Tranform data order_data
order_trans = order_df.withColumn('value', expr('cast(value as string)'))
# Create a schema
order_schema = (
    StructType([
        StructField("order_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("quantity", StringType(), True),
        StructField("price", StringType(), True),
        StructField("total_value", StringType(), True),
        StructField("order_date", StringType(), True),
        StructField("timestamp", TimestampType(), True)
    ])
)

# order data to table format
order_stream_df = (
    order_trans
    .withColumn('value_json', from_json(col('value'), order_schema))
    .selectExpr('value_json.*'
                , "timestamp as kafka_timestamp"
               )
    .withWatermark("kafka_timestamp", "1 minute")  # watermark on Kafka ingestion time
)

def order_data_output(df, batch_id):
    print(f"Batch id: {batch_id}")
    # Write each batch to parquet in /data (mounted to host ./data)
    (df
        .write
        .format('parquet')
        .mode('append')
        .save('./data/output/orders/')
    )
    # Show a preview in notebook logs
    df.show(truncate=False)
         
# Streaming query
(order_stream_df
 .writeStream
 .foreachBatch(order_data_output)
 .trigger(processingTime="10 seconds")
 .option("checkpointLocation", "./data/checkpoints/orders")
 .start()
 .awaitTermination()
)

Batch id: 8
+------------------------------------+-----------+----------+------------+--------+-----+-----------+----------+---------+-----------------------+
|order_id                            |customer_id|product_id|product_name|quantity|price|total_value|order_date|timestamp|kafka_timestamp        |
+------------------------------------+-----------+----------+------------+--------+-----+-----------+----------+---------+-----------------------+
|192d5a67-99c1-4615-93b9-e93582095e7c|CUST0187   |P005      |Keyboard    |5       |60   |300        |2024-02-06|null     |2025-08-24 15:25:10.909|
|7cd7b5e3-c161-4968-9b79-1618fba6b06b|CUST0141   |P003      |Headphones  |4       |150  |600        |2024-03-14|null     |2025-08-24 15:25:11.916|
|ac40a551-5199-4d0c-a4c5-7b7741edb18e|CUST0186   |P003      |Headphones  |2       |150  |300        |2024-03-29|null     |2025-08-24 15:25:12.929|
|64ebd372-d1d3-4c2b-a776-b02ee457dd6a|CUST0106   |P003      |Headphones  |2       |150  |300        |2024-

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

Batch id: 10
+------------------------------------+-----------+----------+------------+--------+-----+-----------+----------+---------+-----------------------+
|order_id                            |customer_id|product_id|product_name|quantity|price|total_value|order_date|timestamp|kafka_timestamp        |
+------------------------------------+-----------+----------+------------+--------+-----+-----------+----------+---------+-----------------------+
|f715d80e-afcf-449a-8e24-2e4395a4aabc|CUST0167   |P001      |Laptop      |3       |1200 |3600       |2024-05-09|null     |2025-08-24 15:31:50.871|
|4cec16a7-46c9-4d8a-88a9-84a36b3d044c|CUST0179   |P005      |Keyboard    |1       |60   |60         |2024-06-17|null     |2025-08-24 15:31:51.881|
|928f73ad-91a1-45a3-a3ef-9b51cf0fa2d4|CUST0048   |P002      |Phone       |2       |800  |1600       |2024-05-31|null     |2025-08-24 15:31:52.894|
|11378c0b-dbb9-4de6-82fd-1ebe8e97613f|CUST0144   |P001      |Laptop      |4       |1200 |4800       |2024

In [None]:
# Tranform data customer_event
customer_trans = customer_df.withColumn('value', expr('cast(value as string)'))
# Create a schema
customer_schema = (
    StructType([
        StructField("event_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("product_id", StringType(), True),
        StructField("action", StringType(), True),
        StructField("timestamp", TimestampType(), True)
    ])
)

# order data to table format
customer_stream_df = customer_trans.withColumn('value_json', from_json(col('value'), customer_schema)).selectExpr('value_json.*')

def customer_event_output(df, batch_id):
    print(f"Batch id: {batch_id}")
    (
        df
        .write
        .format('parquet')
        .mode('append')
        .save('./data/output/customer_event/')
    )

# Write the output to console sink to check the output --to remove later
(customer_stream_df
 .writeStream
 .foreachBatch(customer_event_output)
 .trigger(processingTime='10 seconds')
 .option('checkpointLocation', './data/checkpoints/customer_event')
 .start()
 .awaitTermination()
)

Batch id: 11
Batch id: 0
+------------------------------------+-----------+----------+------------+--------+-----+-----------+----------+---------+-----------------------+
|order_id                            |customer_id|product_id|product_name|quantity|price|total_value|order_date|timestamp|kafka_timestamp        |
+------------------------------------+-----------+----------+------------+--------+-----+-----------+----------+---------+-----------------------+
|1e380808-c492-45fa-9b0c-4a9ab2557557|CUST0093   |P001      |Laptop      |2       |1200 |2400       |2024-04-15|null     |2025-08-24 15:32:00.966|
|2f282c6e-962b-4feb-8a07-bab5fa4cf395|CUST0054   |P002      |Phone       |5       |800  |4000       |2024-03-21|null     |2025-08-24 15:32:01.975|
|4b41421f-2224-42fd-89ea-eedd960eac32|CUST0053   |P005      |Keyboard    |1       |60   |60         |2024-03-07|null     |2025-08-24 15:32:02.985|
|c1980e40-c653-4b40-ab0d-31be80fef39f|CUST0177   |P001      |Laptop      |1       |1200 |1200