# Bronze Layer: Kafka to Delta Lake Ingestion
**Purpose:** Ingest real-time trade data from Confluent Kafka into Delta Lake

**Exam Topics:**
- Spark Structured Streaming
- Kafka Integration
- Delta Lake ACID writes
- Checkpointing for exactly-once semantics

## 1. Configurations

In [0]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
  col, from_json, current_timestamp, to_timestamp, from_unixtime
)
from pyspark.sql.types import (
  StructType, StructField, StringType, LongType, DoubleType, IntegerType
)

In [0]:
# Kafka Configurations
KAFKA_BOOTSTRAP_SERVERS = dbutils.secrets.get(scope="confluent", key="bootstrap_server")
KAFKA_USERNAME = dbutils.secrets.get(scope="confluent", key="username")
KAFKA_PASSWORD = dbutils.secrets.get(scope="confluent", key="password")
KAFKA_TOPIC = "market.trades"

In [0]:
KAFKA_BOOTSTRAP_SERVERS = "pkc-rgm37.us-west-2.aws.confluent.cloud:9092"
KAFKA_API_KEY = "BWSU3NRNNSM6V3KB"
KAFKA_API_SECRET = "cflt5mqyWJTiA+AXmFuw1s3acYVHfSEg51crqXnHaOkwKqh04cQhLw7EPgU49J2Q"
KAFKA_TOPIC = "market.trades"

In [0]:
# Delta Lake Table Paths
BRONZE_TABLE = "finance_lakehouse.bronze.market_trades_raw"
CHECKPOINT_PATH = "/tmp/checkpoints/bronze_kafka_ingestion"

## 2. Define Schema

In [0]:
# Schema matching forwarder.py output
trade_schema = StructType([

    StructField("exchange", StringType(), nullable=False),
    StructField("symbol", StringType(), nullable=False),
    StructField("id", StringType(), nullable=False),
    StructField("event_ts", LongType(), nullable=False), # Unix Timestamp (second)
    StructField("price", DoubleType(), nullable=False),
    StructField("qty", DoubleType(), nullable=False),
    StructField("side", StringType(), nullable=False), # "buy" or "sell"
    StructField("ingest_ts", LongType(), nullable=False)
])

## 3. Read from Kafka Stream

In [0]:
kafka_stream = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.sasl.jaas.config", f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{KAFKA_API_KEY}' password='{KAFKA_API_SECRET}';")
  .option("subscribe", KAFKA_TOPIC)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", 10000)
  .load()
)

## 4. Parse Kafka Messages

In [0]:
parsed_stream = (
    kafka_stream
    .selectExpr("CAST(value AS STRING) AS json_value", "timestamp AS kafka_timestamp")
    .select(
        from_json(col("json_value"), trade_schema).alias("data"),
        col("kafka_timestamp")
    )
    .select("data.*", "kafka_timestamp")
    .withColumn("event_datetime", from_unixtime(col("event_ts")).cast("timestamp"))
    .withColumn("ingest_datetime", from_unixtime(col("ingest_ts")).cast("timestamp"))
    .withColumn("processing_time", current_timestamp())
)

## 5. Write to Delta Lake (Bronze)

In [0]:
bronze_query = (
    parsed_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", '/mnt/delta/checkpoints/')
    .option("mergeSchema", "true")
    .trigger(processingTime='10 seconds')
    .toTable(BRONZE_TABLE)
)

## 6. Monitor Stream Health

In [0]:
# Get active streams
for stream in spark.streams.active:
    print(f"Stream ID: {stream.id}")
    print(f"Run ID: {stream.runId}")
    print(f"Status: {stream.status}")
    print(f"Recent Progress:\n{stream.lastProgress}")
    print("-" * 80)

## 7. Verify Data Ingestion

In [0]:
%sql
-- Check row count
SELECT COUNT(*) as total_records
FROM finance_lakehouse.bronze.market_trades_raw;

In [0]:
%sql
-- View recent trades
SELECT *
FROM finance_lakehouse.bronze.market_trades_raw
ORDER BY event_datetime DESC
LIMIT 10;

In [0]:
%sql
-- Check data distribution by side
SELECT
  side,
  COUNT(*) AS num_trades,
  AVG(price) AS avg_price,
  SUM(qty) AS total_volume
FROM finance_lakehouse.bronze.market_trades_raw
GROUP BY side;

## 8. Stop Stream (when needed)

In [0]:
bronze_query.stop()

In [0]:
-- Verify no active streams
for stream in spark.streams.active:
    print(f"Stream ID: {stream.id}")
    print(f"Run ID: {stream.runId}")
    print(f"Status: {stream.status}")
    print(f"Recent Progress:\n{stream.lastProgress}")
    print("-" * 80)