# Notebook 04 · Streaming Hourly Funnel Metrics by Brand

## Purpose

This notebook implements the **streaming analytics pipeline** for FunnelPulse. It consumes a simulated real-time event stream (built from historical bronze data) and continuously computes **hourly funnel metrics by brand**, writing the results as a streaming gold table.

Conceptually, this is the **online counterpart** to the batch `gold_funnel_hourly_brand` table created in Notebook 01. The same business metrics are computed, but in a streaming fashion with windowing, watermarks, and checkpointing.

---

## Inputs and Outputs

**Streaming source**

- File based stream from `stream_input/`:
  - Prepared in Notebook 03 by repartitioning a slice of `bronze_events`
  - Contains many small Parquet files that simulate events arriving over time

**Reference table**

- `tables/bronze_events`
  - Used to derive the schema for the streaming source

**Streaming sink**

- `tables/gold_stream_funnel_hourly_brand`
  - Streaming gold table with hourly funnel metrics by brand
  - Written incrementally as new events are processed
  - Backed by a checkpoint directory for recovery

---

## High Level Workflow

1. Initialize Spark and project paths
2. Define a **streaming source** over `stream_input/` using the bronze schema
3. Apply **streaming cleaning and normalization** (streaming “silver” stage)
4. Apply **streaming aggregation** to compute hourly funnel metrics by brand
5. Configure **watermarking** so completed windows can be emitted in append mode
6. Start a **streaming query** that writes hourly brand metrics to a Parquet based streaming gold table
7. Optionally monitor the query and then stop it after enough data has been processed

---

## Streaming Source Design

Instead of connecting to Kafka directly, this notebook uses the log replay setup created in Notebook 03:

- The streaming source is defined with `readStream` over the `stream_input/` directory
- The schema is taken from `bronze_events` to ensure alignment with the batch pipeline
- A `maxFilesPerTrigger` option controls how many new files are processed in each microbatch

As Spark Structured Streaming runs, it treats each new Parquet file in `stream_input/` as the next batch of events in the stream.

---

## Streaming “Silver” · On-the-fly Cleaning and Normalization

The first step on the streaming data is to apply the same kinds of transformations that the **silver** layer uses in batch:

- Filter out events with:
  - Missing or non positive prices
  - Missing `event_time` or `event_type`
- Normalize text fields:
  - Lowercase brand into `brand_norm`
  - Sanitize `category_code` into `category_code_norm`
- Derive additional fields:
  - `event_date` from `event_time`
  - Data quality flags for missing session, brand, category

These operations are applied to the streaming DataFrame, so every microbatch passes through the same quality and normalization logic as in the offline pipeline.

The output of this step is a **streaming “cleaned events” view** that mirrors the batch silver table but is updated continuously.

---

## Streaming “Gold” · Hourly Funnel Metrics by Brand

The core streaming computation reproduces the hourly funnel logic but on live data:

**Grain**

- One row per 1-hour window per brand:
  - `window_start`, `window_end`
  - `brand`

**Metrics**

- For each window × brand, the job maintains:
  - `views`      count of view events
  - `carts`      count of cart events
  - `purchases`  count of purchase events
  - `revenue`    sum of purchase prices
- Derives funnel rates:
  - `view_to_cart_rate`
  - `cart_to_purchase_rate`
  - `conversion_rate`
- Adds `window_date` for partitioning

This aggregation is defined using a time based window on `event_time` and a grouping key on `brand_norm`, just like in the batch pipeline.

---

## Watermarks and Output Mode

Because the job is aggregating over time windows on a streaming source, it must decide when a window is **complete** and safe to emit in **append** mode.

To achieve this, the notebook configures a **watermark** on `event_time`:

- The watermark expresses the maximum allowed lateness (for example, 2 hours)
- Spark keeps state for each window until the watermark passes the end of that window
- Once a window is considered complete, its metrics are written to the sink and its state can be dropped

With a watermark in place, the streaming query is allowed to use `outputMode("append")`, which is appropriate for a production-like anomaly and alerting pipeline where completed windows are appended as new rows.

---

## Streaming Sink and Checkpointing

The notebook configures a streaming sink that:

- Writes results to `tables/gold_stream_funnel_hourly_brand` in Parquet format
- Uses a dedicated `checkpointLocation` under `checkpoints/` for:
  - Tracking offsets
  - Maintaining aggregation state
  - Enabling fault-tolerant recovery and exactly-once style semantics

The query is started and runs until:

- It has processed enough microbatches to demonstrate the pipeline, or
- It is explicitly stopped in the notebook

Even if the query is stopped early in this environment, the design and configuration mirror how a production streaming job would be deployed.

---

## Role of This Notebook in the Overall System

This notebook is the core of the **online path** in FunnelPulse:

- Notebooks 01 and 02:
  - Build the **batch** lakehouse and gold tables for historical analysis
- Notebook 03:
  - Prepares a file based stream to emulate real-time traffic
- Notebook 04 (this notebook):
  - Implements Spark Structured Streaming logic that:
    - Ingests the event stream
    - Applies silver-like cleaning
    - Computes hourly funnel metrics by brand in real time
    - Writes a streaming gold table with checkpointing and watermarks
- Notebook 05:
  - Builds anomaly detection logic over the hourly funnel metrics

Together, the batch and streaming pipelines show that FunnelPulse can support both **historical analysis** and **near real-time monitoring** of the e-commerce funnel at scale.

In [1]:
# CELL 1: Spark init + paths for streaming job

!free -h | grep "Mem:"
!lscpu | grep "CPU(s):"

import os
import pyspark

conf = pyspark.SparkConf()
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040')
conf.set('spark.sql.repl.eagerEval.enabled', False)
conf.set('spark.driver.memory','6g')

sc = pyspark.SparkContext(conf=conf, master='local[*]')
spark = pyspark.sql.SparkSession.builder.appName("FunnelPulse Streaming Hourly Brand").getOrCreate()

print(spark)
print(f"Spark UI: https://csgy-6513-fall.rcnyu.org{conf.get('spark.ui.proxyBase')}")

home = os.path.expanduser("~")
project_root = os.path.join(home, "funnelpulse")
tables_dir = os.path.join(project_root, "tables")

bronze_path          = os.path.join(tables_dir, "bronze_events")
silver_path          = os.path.join(tables_dir, "silver_events")  # we might reuse logic
stream_input_path    = os.path.join(project_root, "stream_input")
stream_checkpoint    = os.path.join(project_root, "checkpoints", "stream_hourly_brand")
stream_gold_path     = os.path.join(tables_dir, "gold_stream_funnel_hourly_brand")

print("Bronze path          :", bronze_path)
print("Silver path          :", silver_path)
print("Stream input path    :", stream_input_path)
print("Stream checkpoint dir:", stream_checkpoint)
print("Stream gold path     :", stream_gold_path)

Mem:            15Gi       6.4Gi       6.6Gi        40Mi       3.0Gi       9.2Gi
CPU(s):                                  2
NUMA node0 CPU(s):                       0,1


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/26 12:07:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/26 12:07:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/11/26 12:07:11 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/11/26 12:07:11 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


<pyspark.sql.session.SparkSession object at 0x788428fb8380>
Spark UI: https://csgy-6513-fall.rcnyu.org/user/ss18851/proxy/4040
Bronze path          : /home/jovyan/funnelpulse/tables/bronze_events
Silver path          : /home/jovyan/funnelpulse/tables/silver_events
Stream input path    : /home/jovyan/funnelpulse/stream_input
Stream checkpoint dir: /home/jovyan/funnelpulse/checkpoints/stream_hourly_brand
Stream gold path     : /home/jovyan/funnelpulse/tables/gold_stream_funnel_hourly_brand


In [2]:
# CELL 2: Define streaming source from stream_input (Parquet-based stream)

from pyspark.sql.functions import col

# Get schema from bronze (static read)
bronze_sample = spark.read.parquet(bronze_path)
bronze_schema = bronze_sample.schema

print("Bronze schema:")
bronze_sample.printSchema()

# Build file-based streaming source
stream_raw = (
    spark.readStream
    .schema(bronze_schema)              # required for file-based streams
    .option("maxFilesPerTrigger", 1)    # process 1 new file per micro-batch
    .parquet(stream_input_path)
)

print("Is stream_raw streaming?", stream_raw.isStreaming)
stream_raw.printSchema()

                                                                                

Bronze schema:
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- event_date: date (nullable = true)

Is stream_raw streaming? True
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- event_date: date (nullable = true)



In [3]:
# CELL 3: Streaming cleaning (Silver-like) on stream_raw

from pyspark.sql.functions import lower, regexp_replace, when, to_date

stream_clean = stream_raw

# Drop bad rows
stream_clean = stream_clean.filter(stream_clean.price.isNotNull() & (stream_clean.price > 0))
stream_clean = stream_clean.filter(stream_clean.event_time.isNotNull() & stream_clean.event_type.isNotNull())

# Normalize brand/category
stream_clean = (
    stream_clean
    .withColumn("brand_norm", lower(col("brand")))
    .withColumn("category_code_norm", lower(col("category_code")))
    .withColumn(
        "category_code_norm",
        regexp_replace(col("category_code_norm"), "[^a-z0-9\\.]", "_")
    )
    .withColumn("event_date", to_date(col("event_time")))
    .withColumn("dq_missing_session", stream_clean.user_session.isNull())
    .withColumn("dq_missing_brand", stream_clean.brand.isNull())
    .withColumn("dq_missing_category", stream_clean.category_code.isNull())
)

stream_clean.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- event_date: date (nullable = true)
 |-- brand_norm: string (nullable = true)
 |-- category_code_norm: string (nullable = true)
 |-- dq_missing_session: boolean (nullable = false)
 |-- dq_missing_brand: boolean (nullable = false)
 |-- dq_missing_category: boolean (nullable = false)



In [6]:
from pyspark.sql.functions import window, sum as _sum, to_date, col, when

# CELL 4 (updated): Streaming aggregation - hourly funnel metrics by brand with watermark

stream_windowed = (
    stream_clean
    # watermark says: we expect late events up to 2 hours behind the latest seen event_time
    .withWatermark("event_time", "2 hours")
    .groupBy(
        window(col("event_time"), "1 hour").alias("w"),
        col("brand_norm").alias("brand")
    )
    .agg(
        _sum((col("event_type") == "view").cast("int")).alias("views"),
        _sum((col("event_type") == "cart").cast("int")).alias("carts"),
        _sum((col("event_type") == "purchase").cast("int")).alias("purchases"),
        _sum(
            when(col("event_type") == "purchase", col("price")).otherwise(0.0)
        ).alias("revenue")
    )
)

stream_gold = (
    stream_windowed
    .withColumn("window_start", col("w.start"))
    .withColumn("window_end", col("w.end"))
    .drop("w")
)

stream_gold = (
    stream_gold
    .withColumn(
        "view_to_cart_rate",
        when(col("views") > 0, col("carts") / col("views"))
    )
    .withColumn(
        "cart_to_purchase_rate",
        when(col("carts") > 0, col("purchases") / col("carts"))
    )
    .withColumn(
        "conversion_rate",
        when(col("views") > 0, col("purchases") / col("views"))
    )
    .withColumn("window_date", to_date(col("window_start")))
)

stream_gold.printSchema()

root
 |-- brand: string (nullable = true)
 |-- views: long (nullable = true)
 |-- carts: long (nullable = true)
 |-- purchases: long (nullable = true)
 |-- revenue: double (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- view_to_cart_rate: double (nullable = true)
 |-- cart_to_purchase_rate: double (nullable = true)
 |-- conversion_rate: double (nullable = true)
 |-- window_date: date (nullable = true)



In [7]:
# CELL 5: Start streaming query (write to Parquet and console)

# Parquet sink
stream_query = (
    stream_gold
    .writeStream
    .outputMode("append")                # new windows only
    .format("parquet")
    .option("checkpointLocation", stream_checkpoint)
    .option("path", stream_gold_path)
    .trigger(processingTime="10 seconds")  # micro-batch every 10 seconds
    .start()
)

print("Streaming query started. ID:", stream_query.id)
print("Use 'stream_query.status' to check status.")

25/11/26 12:16:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Streaming query started. ID: 9e3f0bd1-1f22-40f1-bdd4-ee889e46eab0
Use 'stream_query.status' to check status.


[Stage 3:=>                                                       (6 + 2) / 200]

In [8]:
# CELL 6: Monitor status a few times

import time

for i in range(5):
    print("Status check", i, ":", stream_query.status)
    time.sleep(5)

Status check 0 : {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}




Status check 1 : {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}




Status check 2 : {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}




Status check 3 : {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}




Status check 4 : {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}


25/11/26 12:17:11 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 52059 milliseconds
25/11/26 12:17:50 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 38364 milliseconds
25/11/26 12:18:12 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 22445 milliseconds
25/11/26 12:18:32 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 19545 milliseconds
25/11/26 12:18:51 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 18917 milliseconds
25/11/26 12:19:10 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 19340 milliseconds
25/11/26 12:19:27 WARN ProcessingTimeExecutor: Current batch is falling behi

In [9]:
# CELL 7: Stop streaming query

stream_query.stop()
print("Stopped:", stream_query.id)

Stopped: 9e3f0bd1-1f22-40f1-bdd4-ee889e46eab0


25/11/26 12:24:21 ERROR FileFormatWriter: Aborting job 17fb3c0d-c0fa-43f6-9d5e-ce14bac443e3.
java.lang.InterruptedException
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1048)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:243)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:255)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:104)
	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:374)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:998)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$1(FileFormatWriter.scala:240)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:270)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(

In [10]:
# In the same notebook or a new one

from pyspark.sql.functions import desc

stream_gold_df = spark.read.parquet(stream_gold_path)

print("Rows in streaming gold funnel table:", stream_gold_df.count())
stream_gold_df.orderBy("window_start", "brand").show(20, truncate=False)

# Compare with your batch gold table if you like:
batch_gold_df = spark.read.parquet(os.path.join(tables_dir, "gold_funnel_hourly_brand"))

print("Batch gold rows:", batch_gold_df.count())
batch_gold_df.orderBy("window_start", "brand").show(20, truncate=False)

                                                                                

Rows in streaming gold funnel table: 17027


                                                                                

+----------+-----+-----+---------+-------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|brand     |views|carts|purchases|revenue|window_start       |window_end         |view_to_cart_rate  |cart_to_purchase_rate|conversion_rate    |window_date|
+----------+-----+-----+---------+-------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|NULL      |38   |15   |1        |3.0    |2019-10-15 00:00:00|2019-10-15 01:00:00|0.39473684210526316|0.06666666666666667  |0.02631578947368421|2019-10-15 |
|airnails  |1    |1    |0        |0.0    |2019-10-15 00:00:00|2019-10-15 01:00:00|1.0                |0.0                  |0.0                |2019-10-15 |
|art-visage|1    |0    |0        |0.0    |2019-10-15 00:00:00|2019-10-15 01:00:00|0.0                |NULL                 |0.0                |2019-10-15 |
|artex     |1    |0    |0        |0.0    |2019-10-15 00:00

                                                                                

Batch gold rows: 183145




+-----------+-----+-----+---------+-----------------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|brand      |views|carts|purchases|revenue          |window_start       |window_end         |view_to_cart_rate  |cart_to_purchase_rate|conversion_rate    |window_date|
+-----------+-----+-----+---------+-----------------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|NULL       |202  |185  |59       |284.0199999999999|2019-09-30 20:00:00|2019-09-30 21:00:00|0.9158415841584159 |0.31891891891891894  |0.29207920792079206|2019-09-30 |
|airnails   |0    |2    |2        |2.38             |2019-09-30 20:00:00|2019-09-30 21:00:00|NULL               |1.0                  |NULL               |2019-09-30 |
|ardell     |1    |0    |0        |0.0              |2019-09-30 20:00:00|2019-09-30 21:00:00|0.0                |NULL                 |0.0                |2019-

                                                                                

In [11]:
from pyspark.sql.functions import col, sum as _sum

# Read both tables
batch_gold = spark.read.parquet(os.path.join(tables_dir, "gold_funnel_hourly_brand"))
stream_gold_df = spark.read.parquet(stream_gold_path)

# Filter batch to the streaming date range (2019-10-15 to 2019-10-31)
batch_subset = batch_gold.filter(
    (col("window_date") >= "2019-10-15") & (col("window_date") <= "2019-10-31")
)

print("Batch subset rows:", batch_subset.count())
print("Stream rows      :", stream_gold_df.count())

# Compare aggregate metrics over that range (should be very close / identical)
batch_summary = batch_subset.agg(
    _sum("views").alias("views_batch"),
    _sum("carts").alias("carts_batch"),
    _sum("purchases").alias("purchases_batch"),
    _sum("revenue").alias("revenue_batch")
)
stream_summary = stream_gold_df.agg(
    _sum("views").alias("views_stream"),
    _sum("carts").alias("carts_stream"),
    _sum("purchases").alias("purchases_stream"),
    _sum("revenue").alias("revenue_stream")
)

batch_summary.show()
stream_summary.show()

Batch subset rows: 49736


                                                                                

Stream rows      : 17027
+-----------+-----------+---------------+----------------+
|views_batch|carts_batch|purchases_batch|   revenue_batch|
+-----------+-----------+---------------+----------------+
|     994940|     586150|         135298|659827.479999999|
+-----------+-----------+---------------+----------------+





+------------+------------+----------------+------------------+
|views_stream|carts_stream|purchases_stream|    revenue_stream|
+------------+------------+----------------+------------------+
|       40104|       23636|            5246|25270.739999999976|
+------------+------------+----------------+------------------+



                                                                                

In [12]:
test_hour = "2019-10-15 00:00:00"

batch_hour_brand = batch_subset.filter(col("window_start") == test_hour)
stream_hour_brand = stream_gold_df.filter(col("window_start") == test_hour)

print("BATCH:")
batch_hour_brand.orderBy("brand").show(20, truncate=False)

print("STREAM:")
stream_hour_brand.orderBy("brand").show(20, truncate=False)

BATCH:
+-----------+-----+-----+---------+------------------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|brand      |views|carts|purchases|revenue           |window_start       |window_end         |view_to_cart_rate  |cart_to_purchase_rate|conversion_rate    |window_date|
+-----------+-----+-----+---------+------------------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|NULL       |562  |414  |22       |119.00999999999999|2019-10-15 00:00:00|2019-10-15 01:00:00|0.7366548042704626 |0.05314009661835749  |0.03914590747330961|2019-10-15 |
|airnails   |4    |15   |0        |0.0               |2019-10-15 00:00:00|2019-10-15 01:00:00|3.75               |0.0                  |0.0                |2019-10-15 |
|ardell     |2    |0    |0        |0.0               |2019-10-15 00:00:00|2019-10-15 01:00:00|0.0                |NULL                 |0.0         



+----------+-----+-----+---------+-------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|brand     |views|carts|purchases|revenue|window_start       |window_end         |view_to_cart_rate  |cart_to_purchase_rate|conversion_rate    |window_date|
+----------+-----+-----+---------+-------+-------------------+-------------------+-------------------+---------------------+-------------------+-----------+
|NULL      |38   |15   |1        |3.0    |2019-10-15 00:00:00|2019-10-15 01:00:00|0.39473684210526316|0.06666666666666667  |0.02631578947368421|2019-10-15 |
|airnails  |1    |1    |0        |0.0    |2019-10-15 00:00:00|2019-10-15 01:00:00|1.0                |0.0                  |0.0                |2019-10-15 |
|art-visage|1    |0    |0        |0.0    |2019-10-15 00:00:00|2019-10-15 01:00:00|0.0                |NULL                 |0.0                |2019-10-15 |
|artex     |1    |0    |0        |0.0    |2019-10-15 00:00

                                                                                