# Spark Streaming from MSK (Manage Streaming for Apache Kafka) 

## 1. Introduction
---
This tutorial will guide you through the process of connecting to Amazon MSK (Managed Streaming for Apache Kafka) using the Apache Kafka package and Spark.

**Explanation:**
- Apache Kafka is an open-source distributed event streaming platform used for building real-time data pipelines and streaming applications.
- Amazon MSK is a managed Kafka service that makes it easy to run Kafka without needing to manage the underlying infrastructure.

**What We Will Cover in This Tutorial**
---
1. Downloading and Testing the MSK Cluster Connection with Kafka on Terminal
2. Creating a Bronze, Silver, and Gold Streaming Pipeline
3. Creating a Mockup Producer to Send Messages to MSK
4. Watching How the Streaming Tables Stream the Data

## 2. Download and Extract Apache Kafka
---
Before connecting to MSK, download and extract Apache Kafka.

```cmd
wget -O kafka.tgz https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -xzf kafka.tgz
ls kafka
```

**Explanation:**
- The `wget` command downloads Kafka version 2.6.2 from the Apache archive.
- The `tar -xzf` command extracts the downloaded Kafka package.
- The `mv` command renames the extracted folder to a consise folder name for easier access.
- The `rm` command removes the .tgz zip file since it was already extracted.
- The `ls kafka` will show the contents of the kafka package: `LICENSE NOTICE bin, ...`. We will be working with `bin/`.

In [None]:
!wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
!tar -xzf kafka_2.12-2.6.2.tgz
!rm -rf kafka
!mv kafka_2.12-2.6.2 kafka
!rm kafka_2.12-2.6.2.tgz
!ls kafka

## 3. Test Connection to MSK
---
To verify the connection, create and list the existing topics in your Kafka cluster.

**Create Topic:**
```cmd
bin/kafka-topics.sh --create --bootstrap-server <<bootstrap server address>> --replication-factor 1 --partitions 1 --topic <<topic name>>
```
> Example: `bin/kafka-topics.sh --create --bootstrap-server b-1.mskpublicuseast1clust.dd5y5a.c11.kafka.us-east-1.amazonaws.com:9092,b-2.mskpublicuseast1clust.dd5y5a.c11.kafka.us-east-1.amazonaws.com:9092 --replication-factor 1 --partitions 1 --topic silveraiwolf.product.sale`

**List Topics:**
```cmd
bin/kafka-topics.sh --bootstrap-server <<bootstrap server address>> --list
```
> Example: `bin/kafka-topics.sh --bootstrap-server b-1.mskpublicuseast1clust.dd5y5a.c11.kafka.us-east-1.amazonaws.com:9092,b-2.mskpublicuseast1clust.dd5y5a.c11.kafka.us-east-1.amazonaws.com:9092 --list`

**Explanation:**
- Replace `<<bootstrap server address>>` with the actual address of your MSK cluster.
- This command lists all available topics in the Kafka cluster, ensuring that the connection is established correctly.

In [None]:
!cd kafka && bin/kafka-topics.sh --create --bootstrap-server b-1.msksilveraiwofluseast.eztjay.c11.kafka.us-east-1.amazonaws.com:9092,b-2.msksilveraiwofluseast.eztjay.c11.kafka.us-east-1.amazonaws.com:9092 --replication-factor 1 --partitions 1 --topic supplychain-orders

In [None]:
!cd kafka && bin/kafka-topics.sh --bootstrap-server b-1.msksilveraiwofluseast.eztjay.c11.kafka.us-east-1.amazonaws.com:9092,b-2.msksilveraiwofluseast.eztjay.c11.kafka.us-east-1.amazonaws.com:9092 --list

In [None]:
# # Uncomment to delete the topic if needed
# !cd kafka && bin/kafka-topics.sh --bootstrap-server b-2.msksilveraiwolfuseast.jvye40.c11.kafka.us-east-1.amazonaws.com:9092,b-1.msksilveraiwolfuseast.jvye40.c11.kafka.us-east-1.amazonaws.com:9092 --delete --topic supplychain-ordersa

# 4. Create Spark Streaming Application
---

Here we create a Spark Streaming application, and establish a real-time streaming pipeline to ingest and process data from Kafka into the bronze, silver, and gold layer of a Databricks Green-Blue Medallion architecture. The guide includes commands for creating and listing Kafka topics, and outlines key steps for Kafka stream ingestion, data parsing, transformation, and aggregations.

> If there is a need to make changes to streaming tables that require a drop. We must clear the checkpoints with `dbutils.fs.rm("/Volumes/green/silver/files/checkpoints/", True)`. WARNING: This will cause Spark to reprocess old data from the source.


## Bronze Streaming

This section establishes a **real-time streaming pipeline** to ingest and process data from **Kafka** into the **bronze layer** of a Databricks **Green-Blue Medallion architecture**.  

#### **Key Steps:**
1. **Kafka Stream Ingestion:**  
   - Reads data from Kafka using **Structured Streaming**.  
   - Connects to Kafka brokers (`kafka.bootstrap.servers`).  
   - Subscribes to a specified topic (`kafka_topic`).  
   - Reads messages from the earliest available offset for completeness.  

2. **Data Parsing & Transformation:**  
   - Defines a **JSON schema** for structured data extraction.  
   - Converts the Kafka message **key and value** from binary to **string format**.  
   - Parses the **JSON payload** into structured fields (e.g., `order_id`, `product_id`, `quantity`, `status`, etc.).  
   - Retains additional Kafka metadata (e.g., offsets, timestamps) for auditing.  

3. **Writing to the Bronze Delta Table:**  
   - Streams the parsed data into a **Delta table (`green.bronze.orders`)** in **append mode**.  
   - Uses a **checkpoint location** to track progress and ensure fault tolerance.  
   - Provides **real-time visibility** of incoming Kafka data.  

This ensures that raw **supply chain order data** is reliably captured in the **bronze layer**, ready for further cleansing and enrichment in the **silver layer**. 🚀

In [None]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

# In production use a secret for the bootstrap_servers, this is just for demo purposes and it is deleted after each demo
kafka_bootstrap_servers = "b-2.msksilveraiwolfuseast.6im2sm.c11.kafka.us-east-1.amazonaws.com:9092,b-1.msksilveraiwolfuseast.6im2sm.c11.kafka.us-east-1.amazonaws.com:9092"
kafka_topic = "supplychain-orders"

In [None]:
# Create a stream to read data from Kafka
raw_stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load())

# Parse the data from Kafka to a human-readable format and cast to JSON
other_columns = raw_stream.drop("key", "value").columns
parsed_stream = raw_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", *other_columns)

bronze_query = (parsed_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/green/bronze/files/checkpoints/orders")
    .table("green.bronze.orders")
)

## Silver Streaming

This section processes raw order data from the **bronze layer** and prepares it for structured analysis in the **silver layer** of the Databricks **Green-Blue Medallion architecture**.  

#### **Key Steps:**
1. **Schema Definition for JSON Parsing:**  
   - Defines a structured schema for the **order data**.  
   - Ensures correct data types (e.g., `order_id` as an `Integer`, `product_id` as a `String`).  
   - Prepares for structured querying and downstream transformations.  

2. **Reading from the Bronze Delta Table:**  
   - Streams raw **Kafka-ingested order data** from `green.bronze.orders`.  
   - Maintains real-time processing capabilities with **Structured Streaming**.  

3. **Data Transformation & Cleansing:**  
   - Extracts the JSON payload from the `value` column.  
   - Flattens the **nested JSON structure** into individual columns.  
   - Renames `order_creation_date` to `event_timestamp` for standardization.  

4. **Writing to the Silver Delta Table:**  
   - Saves the **cleaned and structured data** into `green.silver.orders`.  
   - Uses **append mode** to continuously update the dataset.  
   - Implements a **checkpoint location** to track progress and ensure fault tolerance.  

This process **standardizes and cleans** order data, making it **ready for analytical processing** in the **gold layer** for reporting and business intelligence. 🚀

In [None]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define schema for JSON parsing
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("warehouse_location", StringType(), True),
    StructField("status", StringType(), True),
    StructField("event_timestamp", TimestampType(), True)
])

# Read Bronze Table and apply transformations
bronze_stram = spark.readStream.format("delta").table("green.bronze.orders")

# Applying schema and expanding the schema
silver_stream = (bronze_stram
    .withColumn("value", from_json(col("value"), schema))
    .select("value.*", "offset", "timestamp")
)

silver_query = (silver_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/green/silver/files/checkpoints/orders")
    .table("green.silver.orders"))

## Gold Streaming State Handling

In Spark Structured Streaming, a sliding window allows you to perform aggregations over a window of time that slides continuously. When combined with watermarks, it helps manage late data and ensures efficient state management. Similarly, a tumbling window performs aggregations over fixed-size, non-overlapping windows of time, providing a straightforward way to handle time-based data.

#### Key Concepts:

1. **Sliding Window:**: 
It allows overlapping aggregations, meaning an event may belong to multiple windows.
   
   For example:
   - A new window starts every 10 minutes.
   - Every 5 minutes, a new overlapping window starts.
   - Each window includes data from the last 10 minutes.
   - Events may contribute to more than one window.

| Event Timestamp | Arrival Time | Lag (min) | Order Count | Windows Covering This Event    | Late Arrival |
|-----------|--------------|-----------|-------------|--------------------------------------|--------------|
| 12:01 PM  | 12:01 PM     | 0         | 5           | 12:00 - 12:10                        |              |
| 12:06 PM  | 12:06 PM     | 0         | 8           | 12:00 - 12:10, 12:05 - 12:15         |              |
| 12:07 PM  | 12:12 PM     | 5         | 2           | 12:00 - 12:10, 12:05 - 12:15         | ✅           |
| 12:07 PM  | 12:26 PM     | 9         | 1           | 12:00 - 12:10, 12:05 - 12:15         | ❌           |
| 12:12 PM  | 12:12 PM     | 0         | 10          | 12:05 - 12:15, 12:10 - 12:20         |              |
| 12:13 PM  | 12:18 PM     | 5         | 4           | 12:05 - 12:15, 12:10 - 12:20         | ✅           |
| 12:13 PM  | 12:40 PM     | 27        | 1           | 12:05 - 12:15, 12:10 - 12:20         | ❌           |
| 12:17 PM  | 12:17 PM     | 0         | 7           | 12:10 - 12:20, 12:15 - 12:25         |              |

    
| Window Start | Window End | Orders Before Watermark | Orders Arriving Late (Before 5 min) | Orders Arriving Too Late (Ignored) | Final Count |
|--------------|------------|-------------------------|--------------------------------------|------------------------------------|-------------|
| 12:00 PM     | 12:10 PM   | 5 + 8 = 13              | +2 at 12:12 PM                       | +1 at 12:26 PM                     | 13 + 2 = 15 |
| 12:05 PM     | 12:15 PM   | 8 + 10 = 18             | +2 at 12:12 PM, +4 at 12:18 PM       | +1 at 12:26 PM, +1 at 12:40 PM     | 18 + 2 + 4 = 24 |
| 12:10 PM     | 12:20 PM   | 10 + 7 = 17             | +4 at 12:18 PM                       | +1 at 12:40 PM                     | 17 + 4 = 21 |
| 12:15 PM     | 12:25 PM   | 7                       |                                      |                                    | 7           |

2. **Tumbling Window:**:
It groups data into fixed-size intervals without overlap. Every event belongs to exactly one window.
   - A new window starts every 10 minutes.
   - Each window processes only the data within its interval.
   - Non-overlapping → Each event belongs to just one window.

| Event Timestamp | Arrival Time | Lag (min) | Order Count | Window Start | Window End | Late Arrival |
|-----------|--------------|-----------|-------------|--------------|------------|--------------|
| 12:01 PM  | 12:01 PM     | 0         | 5           | 12:00 PM     | 12:10 PM   |              |
| 12:05 PM  | 12:05 PM     | 0         | 8           | 12:00 PM     | 12:10 PM   |              |
| 12:08 PM  | 12:13 PM     | 5         | 2           | 12:00 PM     | 12:10 PM   | ✅            |
| 12:09 PM  | 12:15 PM     | 6         | 1           | 12:00 PM     | 12:10 PM   | ❌            |
| 12:12 PM  | 12:12 PM     | 0         | 10          | 12:10 PM     | 12:20 PM   |              |
| 12:17 PM  | 12:17 PM     | 0         | 7           | 12:10 PM     | 12:20 PM   |              |
| 12:18 PM  | 12:22 PM     | 4         | 3           | 12:10 PM     | 12:20 PM   | ✅            |
| 12:19 PM  | 12:26 PM     | 7         | 2           | 12:10 PM     | 12:20 PM   | ❌            |


| Window Start | Window End | Orders Before Watermark | Orders Arriving Late (Before 5 min) | Orders Arriving Too Late (Ignored) | Final Count |
|--------------|------------|-------------------------|--------------------------------------|------------------------------------|-------------|
| 12:00 PM     | 12:10 PM   | 5 + 8 = 13              | +2 at 12:12 PM → accepted            | +1 at 12:26 PM           | 13 + 2 = 15 |
| 12:10 PM     | 12:20 PM   | 10 + 7 = 17             | +3 at 12:22 PM → accepted            | +2 at 12:36 PM           | 17 + 3 = 20 |


3. **Watermark:**:
A Watermark defines how late data can be before Spark discards it from processing.
   - A mechanism to handle late data.
   - Specifies the maximum delay allowed for late data.
   - Helps in cleaning up old state and reducing memory usage.



### *Tumbling Window with Watermark - Needs Update
In this section, we implement a tumbling window aggregation in PySpark Structured Streaming to count the number of orders per warehouse over a 10-second window. The results are written to a Gold Table in Delta format for further analysis.

**Read Stream**
- `silver_stream` → Represents a streaming DataFrame containing incoming order data.
- `withWatermark("event_timestamp", "5 seconds")` → Handles late data, allowing events to arrive up to 5 seconds late before discarding them.
- `groupBy(window(col("event_timestamp"), "10 seconds"), col("warehouse_location"))` → Groups data into non-overlapping 10-second tumbling windows, while also grouping by warehouse_location.
- `agg(count("order_id").alias("total_orders"))` → Counts the number of orders (order_id) per warehouse in each window.

**Write Stream**
- `.writeStream.format("delta")` → Saves results in Delta format, which supports ACID transactions and efficient updates.
- `.outputMode("append")` → Writes new aggregated data incrementally, ensuring continuous updates to the Gold Table.
- `.option("checkpointLocation", "/Volumes/green/gold/files/checkpoints_tumbling/")` → Uses a checkpoint directory to track progress and prevent data loss.
- `.table("green.gold.orders_tumbling")` → Stores the results in the Gold Layer of the data lakehouse.

In [None]:
from pyspark.sql.functions import window, count, col

tumbling_window_stream = (silver_stream
    .withWatermark("event_timestamp", "20 seconds") 
    .groupBy(
        window(col("event_timestamp"), "10 seconds"),
        col("warehouse_location")
    )
    .agg(count("order_id").alias("total_orders"), avg("quantity").alias("avg_quantity"))
)

# Writing the result to the Gold Table
tumbling_window_query = (tumbling_window_stream.writeStream
    .format("delta")
    .outputMode("append")  # "append" allows incremental processing
    .option("checkpointLocation", "/Volumes/green/gold/files/checkpoints/orders_tumbling")
    .table("green.gold.orders_tumbling"))


### *Sliding Window with Watermark - Needs Update
This section demonstrates how to perform a sliding window aggregation in PySpark Structured Streaming to calculate the average quantity of orders for each product over time. The results are continuously written to a Gold Table in Delta format for further analysis.

**Read Stream**
- `silver_stream` → Represents a streaming DataFrame containing incoming order data.
- `withWatermark("event_timestamp", "20 minutes")` → Allows Spark to wait for late data up to 20 minutes before dropping it.
- `groupBy(window(col("event_timestamp"), "10 minutes", "5 minutes"), col("product_id"))` → Creates a sliding window of 10 minutes, updated every 5 minutes.
  - Each event contributes to two overlapping windows: one covering the past 10 minutes, and another overlapping 5 minutes later.
  - This ensures more frequent updates and smooth trend tracking.
- `agg(avg("quantity").alias("avg_quantity"))` → Computes the average quantity ordered per product within each window.

**Write Stream**
- `.writeStream.format("delta")` → Saves results in Delta format, ensuring efficient incremental updates and ACID compliance.
- `.outputMode("append")` → Ensures that only new aggregated results are added to the Gold Table.
- `.option("checkpointLocation", "/Volumes/green/gold/files/checkpoints_sliding/")` → Uses a checkpoint directory for tracking progress and maintaining fault tolerance.
- `.table("green.gold.orders_sliding")` → Stores the results in the Gold Layer of the data lakehouse.

In [None]:
from pyspark.sql.functions import avg

sliding_window_stream = (silver_stream
    .withWatermark("event_timestamp", "20 seconds")
    .groupBy(
        window(col("event_timestamp"), "10 seconds", "5 seconds"),
        col("warehouse_location")
    )
    .agg(count("order_id").alias("total_orders"), avg("quantity").alias("avg_quantity"))
)

# Writing the result to the Gold Table
sliding_window_query = (sliding_window_stream.writeStream
    .format("delta")
    .outputMode("append")  # "append" mode processes only new updates
    .option("checkpointLocation", "/Volumes/green/gold/files/checkpoints/orders_sliding")
    .table("green.gold.orders_sliding"))


# 5. Create Kafka Order Producer Simulation
---

This section sets up a **Kafka Producer** to simulate and stream **real-time supply chain order data** into a Kafka topic. This enables the **ingestion of synthetic order events**, which will be processed downstream in the **Green-Blue Medallion architecture**.

#### **Key Steps:**

1. **Kafka Configuration:**  
   - Defines the Kafka **bootstrap servers** for message transmission.  
   - Specifies the **Kafka topic (`supplychain-orders`)** where messages will be published.  

2. **Kafka Producer Setup:**  
   - Initializes a **KafkaProducer** with:  
     - **UTF-8 key serialization** for message keys.  
     - **JSON serialization** for the message values.  

3. **Generating Supply Chain Order Messages:**  
   - Defines a function to **simulate order events**, including:  
     - `order_id`: Unique order identifier.  
     - `product_id`: Random product ID.  
     - `quantity`: Random order quantity.  
     - `warehouse_location`: Random warehouse identifier.  
     - `status`: Random order status (`shipped`, `pending`, `delivered`, `cancelled`).  
     - `order_timestamp`: Timestamp of the order in **ISO 8601 UTC format**.  

4. **Streaming Data into Kafka:**  
   - Sends **10 simulated order messages** to Kafka with a **5-second delay** between each.  
   - Uses the `order_id` as the message key for partitioning.  
   - Prints confirmation logs for tracking messages.  

5. **Finalizing the Producer:**  
   - **Flushes** the Kafka buffer to ensure all messages are sent, guaranteeing that no data is lost before closing the producer.
   - **Closes** the producer to free resources.  

This Kafka producer **emulates real-world order processing events**, providing a **continuous data stream** that feeds into **Databricks’ Medallion architecture**, where it will be further processed and stored in the **bronze layer**. 🚀

In [None]:
from kafka import KafkaProducer
import json
import time
import random
from datetime import datetime, timezone

# Setting up the Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=kafka_bootstrap_servers,
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    value_serializer=lambda value: json.dumps(value).encode("utf-8"),
)

# Function to generate supply chain messages
def generate_supply_chain_message(order_id):
    return {
        "order_id": order_id,
        "product_id": f"P-{random.randint(1000, 1005)}",
        "quantity": random.randint(1, 100),
        "warehouse_location": f"WH-{random.randint(1, 3)}",
        "status": random.choice(["shipped", "pending", "delivered", "cancelled"]),
        "event_timestamp": datetime.now(timezone.utc).isoformat()
    }

# Sending supply chain messages to Kafka
for i in range(20):
    key = str(i)
    message = generate_supply_chain_message(i)
    producer.send(kafka_topic, key=key, value=message)
    print(f"Sent: {message} with key: {key}")
    time.sleep(random.randint(1, 15))

producer.flush()
producer.close()

# Streaming Table Display

In [None]:
# Display the streaming tables prior to sending messages
spark.readStream.table("green.bronze.orders").display()

In [None]:
spark.readStream.table("green.silver.orders").display()

In [None]:
spark.readStream.table("green.gold.orders_tumbling").display()

In [None]:
spark.readStream.table("green.gold.orders_sliding").display()

# Clear Tables and Checkpoints

In [None]:
# Clear streaming tables and checkpoints
tables = ["green.bronze.orders", "green.silver.orders", "green.gold.orders", "green.gold.orders_tumbling", "green.gold.orders_sliding"]
for table in tables:
    truncate_query = f"DROP TABLE {table}"
    try: spark.sql(truncate_query)
    except: pass

    table_split = table.split(".")
    dbutils.fs.rm(f"/Volumes/{table_split[0]}/{table_split[1]}/files/checkpoints/", True)

In [None]:
for s in spark.streams.active:
    print(s.name)
    s.stop()

# END OF NOTEBOOK