- **Name:** 20.5_streaming_aggregations
- **Author:** Shamas Imran
- **Desciption:** Performing aggregations in structured streaming
- **Date:** 19-Aug-2025
<!--
REVISION HISTORY
Version          Date        Author           Desciption
01           19-Aug-2025   Shamas Imran       Applied groupBy aggregations on streams  
                                              Used tumbling and sliding windows  
                                              Demonstrated append vs update modes  
-->

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.functions import col, window, session_window

In [0]:
# ------------------------------------------------------------
# 1) Spark Session
# ------------------------------------------------------------
spark = (
    SparkSession.builder
        .appName("Aggregations_and_WindowedAggregations")
        .getOrCreate()
)

In [0]:
# ------------------------------------------------------------
# 2) Folder Paths
# ------------------------------------------------------------
inputPath       = "/Volumes/datapurcatalog/default/datapurvolume/spark-streaming/csv_input"
checkpointPath  = "/Volumes/datapurcatalog/default/datapurvolume/spark-streaming/checkpoints/aggregations"
outputPath      = "/Volumes/datapurcatalog/default/datapurvolume/spark-streaming/csv_output_aggregations"

In [0]:
# ------------------------------------------------------------
# 3) Define Schema
# ------------------------------------------------------------
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("event_time", TimestampType(), True)
])

# ------------------------------------------------------------
# 4) Create Streaming DataFrame
# ------------------------------------------------------------
df_stream = (
    spark.readStream
         .option("header", "true")
         .schema(schema)
         .csv(inputPath)
)

In [0]:
# ------------------------------------------------------------
# 5) Aggregation WITHOUT Watermark
# ------------------------------------------------------------
# Stateful aggregation without watermark (state can grow indefinitely)
agg_no_watermark = df_stream.groupBy("name").count()

agg_no_watermark_query = (
    agg_no_watermark.writeStream
                    .format("console")
                    .option("checkpointLocation", checkpointPath + "/no_watermark")
                    .outputMode("update")
                    .start()
)

# ------------------------------------------------------------
# 6) Aggregation WITH Watermark
# ------------------------------------------------------------
# Keep state for 10 minutes; late events beyond 10 min are dropped
df_watermarked = df_stream.withWatermark("event_time", "10 minutes")

agg_with_watermark = df_watermarked.groupBy("name").count()

agg_with_watermark_query = (
    agg_with_watermark.writeStream
                      .format("console")
                      .option("checkpointLocation", checkpointPath + "/with_watermark")
                      .outputMode("update")
                      .start()
)

In [0]:
# ------------------------------------------------------------
# 7) Tumbling Window Aggregation
# ------------------------------------------------------------
# Fixed-size, non-overlapping 5-minute windows
tumbling_window_agg = (
    df_watermarked.groupBy(window(col("event_time"), "5 minutes"), col("name"))
                   .count()
                   .orderBy("window")
)

tumbling_window_query = (
    tumbling_window_agg.writeStream
                       .format("console")
                       .option("checkpointLocation", checkpointPath + "/tumbling_window")
                       .outputMode("update")
                       .start()
)

In [0]:
# ------------------------------------------------------------
# 8) Sliding Window Aggregation
# ------------------------------------------------------------
# Fixed-size 5-min windows, sliding every 1 minute
sliding_window_agg = (
    df_watermarked.groupBy(window(col("event_time"), "5 minutes", "1 minute"), col("name"))
                   .count()
                   .orderBy("window")
)

sliding_window_query = (
    sliding_window_agg.writeStream
                      .format("console")
                      .option("checkpointLocation", checkpointPath + "/sliding_window")
                      .outputMode("update")
                      .start()
)


In [0]:
# ------------------------------------------------------------
# 9) Session Window Aggregation
# ------------------------------------------------------------
# Dynamic window based on inactivity gap of 5 minutes
session_window_agg = (
    df_watermarked.groupBy(session_window(col("event_time"), "5 minutes"), col("name"))
                   .count()
                   .orderBy("session_window")
)

session_window_query = (
    session_window_agg.writeStream
                      .format("console")
                      .option("checkpointLocation", checkpointPath + "/session_window")
                      .outputMode("update")
                      .start()
)

# ------------------------------------------------------------
# 10) Wait for All Streams
# ------------------------------------------------------------
spark.streams.awaitAnyTermination()

In [0]:
id,name,score,event_time
1,Ahsan,85,2025-08-18 10:00:00
2,Sana,92,2025-08-18 10:01:00
3,Ali,78,2025-08-18 10:02:00
4,Ahsan,88,2025-08-18 10:03:00
5,Sana,95,2025-08-18 10:04:00
6,Ali,82,2025-08-18 10:06:00
7,Ahsan,90,2025-08-18 10:07:00
8,Sana,87,2025-08-18 10:09:00
9,Ali,91,2025-08-18 10:10:00
10,Ahsan,80,2025-08-18 09:50:00   # Late, within 10-min watermark
11,Sana,85,2025-08-18 09:40:00    # Too late, will be dropped by watermark
12,Ali,88,2025-08-18 10:12:00
13,Ahsan,92,2025-08-18 10:14:00
14,Sana,90,2025-08-18 10:15:00
15,Ali,83,2025-08-18 10:18:00