In [1]:
import logging
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.avro.functions import from_avro

In [2]:
# ==========================================
# 1. CONFIGURATION & SESSION SETUP
# ==========================================
# We need the Kafka and Avro packages compatible with Spark 3.5

In [3]:
# Download the Postgres Driver JAR manually
!curl -o postgresql-42.7.3.jar https://jdbc.postgresql.org/download/postgresql-42.7.3.jar

# Verify it is there (should show the file size)
!ls -lh postgresql-42.7.3.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1063k  100 1063k    0     0   452k      0  0:00:02  0:00:02 --:--:--  452k
-rw-r--r-- 1 jovyan users 1.1M Dec  2 21:01 postgresql-42.7.3.jar


In [4]:
# Get absolute path to the downloaded JAR
jar_path = os.path.abspath("postgresql-42.7.3.jar")

In [5]:
# Define packages for Kafka & Avro (these usually download fine, but Postgres is tricky)
kafka_avro_packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-avro_2.12:3.5.1"

In [6]:
spark = SparkSession.builder \
    .appName("SalesEnrichmentPipeline") \
    .config("spark.jars.packages", kafka_avro_packages) \
    .config("spark.jars", jar_path) \
    .config("spark.driver.extraClassPath", jar_path) \
    .config("spark.executor.extraClassPath", jar_path) \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

In [7]:
spark.sparkContext.setLogLevel("WARN") # Reduce noise in the logs

In [8]:
print(f"Spark Session Created Successfully. Version: {spark.version}, Postgres Driver loaded from: {jar_path}")

Spark Session Created Successfully. Version: 3.5.3, Postgres Driver loaded from: /home/jovyan/work/spark/postgresql-42.7.3.jar


In [9]:
# ==========================================
# 2. Load Static Reference Data (Enrichment)
# ==========================================

In [10]:
# 1. Postgres Connection Configuration
jdbc_url = "jdbc:postgresql://postgres:5432/data_source"
connection_props = {
    "user": "airflow", 
    "password": "airflow", 
    "driver": "org.postgresql.Driver"
    }

In [11]:
# 2. Load Stores Table
print("Loading Stores Table...")
stores_ref_df = spark.read.jdbc(jdbc_url, "stores", properties=connection_props).cache()
stores_ref_df.printSchema()

Loading Stores Table...
root
 |-- storekey: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- Square Meters: decimal(18,2) (nullable = true)
 |-- Open Date: date (nullable = true)



In [12]:
# 3. Load Products Table
print("Loading Products Table...")
products_ref_df = spark.read.jdbc(jdbc_url, "products", properties=connection_props).cache()
products_ref_df.printSchema()

Loading Products Table...
root
 |-- productkey: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- color: string (nullable = true)
 |-- Unit_Cost_USD: string (nullable = true)
 |-- Unit_Price_USD: string (nullable = true)
 |-- subcategorykey: integer (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- categorykey: integer (nullable = true)
 |-- category: string (nullable = true)



In [13]:
# ==========================================
# FINAL ENRICHED STREAMING PIPELINE
# ==========================================

In [14]:
# ==========================================
# 3. SCHEMA DEFINITION
# ==========================================
# This strictly matches your sales_event_schema.avsc
# We use a standard StructType for Spark to parse the data.

In [15]:
# ==========================================
# 3. SCHEMA DEFINITION
# ==========================================
# This strictly matches your sales_event_schema.avsc
# We use a standard StructType for Spark to parse the data.

avroSchema = """
{
  "type": "record",
  "name": "SalesEvent",
  "fields": [
    {"name": "event_type", "type": {"type": "enum", "name": "EventType", "symbols": ["ORDER", "CANCEL", "RETURN"]}},
    {"name": "order_number", "type": "long"},
    {"name": "line_item", "type": "int"},
    {"name": "order_serial", "type": "string"},
    {"name": "order_date", "type": "string"},
    {"name": "delivery_date", "type": ["null", "long"]},
    {"name": "customerkey", "type": "long"},
    {"name": "storekey", "type": "long"},
    {"name": "productkey", "type": "long"},
    {"name": "quantity", "type": "int"},
    {"name": "currency_code", "type": {"type": "enum", "name": "CurrencyCode", "symbols": ["USD", "CAD", "GBP", "AUD", "EUR"]}},
    {"name": "payment_method", "type": {"type": "enum", "name": "PaymentMethod", "symbols": ["CASH", "WALLET", "DEBIT_CARD", "CREDIT_CARD"]}},
    {"name": "status", "type": "string"},
    {"name": "order_group_id", "type": "string"},
    {"name": "unit_price_usd", "type": "double"},
    {"name": "line_total_amount", "type": "double"},
    {"name": "event_timestamp", "type": "string"}
  ]
}
"""

In [16]:
# ==========================================
# 3. READ STREAM (INGESTION)
# ==========================================

In [17]:
#Read Raw Stream from Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("subscribe", "sales_events_avro") \
    .option("startingOffsets", "earliest") \
    .load()

print("raw_stream Schema:")
raw_stream.printSchema()

raw_stream Schema:
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [18]:
# ==========================================
# 4. TRANSFORMATION LAYER
# ==========================================
# Logic:
# 1. Strip first 5 bytes (Confluent Header).
# 2. Deserialize Avro binary to Struct.
# 3. Convert String timestamps to TimestampType.
# 4. Calculate 'total_sales_usd'.

In [19]:
# A. Unwrap the Avro
parsed_stream = raw_stream \
    .withColumn("fixed_value", expr("substring(value, 6, length(value)-5)")) \
    .select(from_avro(col("fixed_value"), avroSchema).alias("data")) \
    .select("data.*")

print("parsed_stream Schema:")
parsed_stream.printSchema()

parsed_stream Schema:
root
 |-- event_type: string (nullable = true)
 |-- order_number: long (nullable = true)
 |-- line_item: integer (nullable = true)
 |-- order_serial: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- delivery_date: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- storekey: long (nullable = true)
 |-- productkey: long (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- currency_code: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- status: string (nullable = true)
 |-- order_group_id: string (nullable = true)
 |-- unit_price_usd: double (nullable = true)
 |-- line_total_amount: double (nullable = true)
 |-- event_timestamp: string (nullable = true)



In [20]:
# B. Enrich with Static Data (Stores & Products)
# Note: We rely on stores_ref_df and products_ref_df loaded in previous steps
# We select ONLY the columns we need from Products to avoid the "unit_price" collision
clean_products_df = products_ref_df.select("productkey", "product_name", "category")

enriched_stream = parsed_stream \
    .join(stores_ref_df, "storekey", "left_outer") \
    .join(clean_products_df, "productkey", "left_outer")

print("enriched_stream Schema:")
enriched_stream.printSchema()

enriched_stream Schema:
root
 |-- productkey: long (nullable = true)
 |-- storekey: long (nullable = true)
 |-- event_type: string (nullable = true)
 |-- order_number: long (nullable = true)
 |-- line_item: integer (nullable = true)
 |-- order_serial: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- delivery_date: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- currency_code: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- status: string (nullable = true)
 |-- order_group_id: string (nullable = true)
 |-- unit_price_usd: double (nullable = true)
 |-- line_total_amount: double (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- Square Meters: decimal(18,2) (nullable = true)
 |-- Open Date: date (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nulla

In [21]:
# ==========================================
# 1. COMMON TRANSFORMATION LAYER
# ==========================================
common_stream = enriched_stream \
    .withColumn("event_ts", to_timestamp(col("event_timestamp"), "dd-MM-yyyy HH:mm:ss")) \
    .withColumn("total_sales_usd", round(col("quantity") * col("unit_price_usd"), 2)) \
    .withColumn("processing_lag", unix_timestamp(current_timestamp()) - unix_timestamp(col("event_ts"))) \
    .withColumnRenamed("country", "store_country") \
    .withColumnRenamed("state", "store_city") \
    .withColumn("is_return", when(col("event_type") == "RETURN", 1).otherwise(0)) \
    .withColumn("is_cancel", when(col("event_type") == "CANCEL", 1).otherwise(0)) \
    .withColumn("sales_channel", when(col("storekey") == 0, "Online").otherwise("In-Store"))

In [22]:
# ==========================================
# STREAM A: ITEM LEVEL (Detailed)
# ==========================================
item_stream_output = common_stream.select(
    col("order_serial").alias("key"),
    to_json(struct(
        col("order_serial"),
        col("order_date"),
        col("event_type"),
        col("product_name"),
        col("category"),
        col("quantity"),
        col("unit_price_usd"),
        col("total_sales_usd"),
        col("line_total_amount"),
        col("storekey"),
        col("store_country"),
        col("store_city"),
        col("customerkey"),
        col("payment_method"),
        col("is_return"),
        col("is_cancel"),
        col("processing_lag"),
        
        # --- V3 NEW FIELD ---
        col("sales_channel"),
        
        col("event_ts")
    )).alias("value")
)

In [23]:
query_items = item_stream_output.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("topic", "processed_sales_items") \
    .option("checkpointLocation", "/home/jovyan/work/checkpoints/dual_items_v1") \
    .start()

In [24]:
# ==========================================
# STREAM B: ORDER LEVEL (Aggregated)
# ==========================================
order_agg_stream = common_stream \
    .withWatermark("event_ts", "10 minutes") \
    .groupBy(
        "order_group_id",
        "event_type",
        "store_country",
        "store_city",
        "currency_code",
        "order_date",
        "customerkey",
        "payment_method"
    ) \
    .agg(
        sum("quantity").alias("total_items_count"),
        round(sum("line_total_amount"), 2).alias("total_order_local"),
        round(sum("total_sales_usd"), 2).alias("total_order_usd"),
        max("is_return").alias("is_return_flag"),
        max("is_cancel").alias("is_cancel_flag"),
        max("processing_lag").alias("max_lag_seconds"),
        max("event_ts").alias("last_update")
    )

In [25]:
order_stream_output = order_agg_stream.select(
    col("order_group_id").alias("key"),
    to_json(struct(
        col("order_group_id"),
        col("order_date"),
        col("event_type"),
        col("total_items_count"),
        col("total_order_local"),
        col("total_order_usd"),
        col("store_country"),
        col("store_city"),
        col("currency_code"),
        
        col("customerkey"),
        col("payment_method"),
        # ---------------------
        
        col("is_return_flag"),
        col("is_cancel_flag"),
        col("max_lag_seconds"),
        col("last_update")
    )).alias("value")
)

In [26]:
query_orders = order_stream_output.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("topic", "processed_orders_agg") \
    .outputMode("update") \
    .option("checkpointLocation", "/home/jovyan/work/checkpoints/dual_orders_v1") \
    .start()

In [None]:
print(f"Active streams: {len(spark.streams.active)}")

# This monitors BOTH streams for errors
spark.streams.awaitAnyTermination()

Active streams: 2


In [None]:
print(f"Active streams before stop: {len(spark.streams.active)}")


In [None]:
# Stop all active streams
#print(f"Active streams before stop: {len(spark.streams.active)}")

#for stream in spark.streams.active:
#    print(f"Stopping stream: {stream.id}...")
#    stream.stop()

#print("All streams stopped.")