# 💡 Lab Introduction: Analyzing Retail Data with Structured Streaming in PySpark

Welcome to this hands-on lab where we'll explore **Structured Streaming**, Spark's high-level API for real-time data processing. This lab bridges the gap between **batch processing** and **stream processing** by reusing the same DataFrame operations you've already seen, now applied to **streaming data**.

## 🎯 Objective

We will simulate a real-time retail scenario where sales data is generated daily by multiple retail stores. By the end of this lab, you will:

- Understand how to **read static data** using PySpark and define a schema.
- Use **window functions** to group transactions over daily intervals.
- Transition seamlessly from a **batch job to a streaming job** with minimal code changes.
- Learn how Spark incrementally reads new data using `.readStream()` with `maxFilesPerTrigger`.
- Output live streaming aggregations to:
  - An **in-memory table** (for SQL queries)
  - The **console** (for inspection during development)


Each CSV file represents one day of transactions, making it ideal for **time-series analysis** and **stream simulation**.

## 🔁 Workflow Overview

1. **Static Analysis**: Read all CSV files as a batch and explore using SQL + window functions.
2. **Schema Extraction**: Capture the inferred schema for later use in streaming mode.
3. **Streaming Setup**: Use `.readStream()` to incrementally process one file at a time.
4. **Aggregation**: Compute total customer spending per day using time windows.
5. **Action & Output**: Write the result to an in-memory table or console for live monitoring.

---

> ✅ This lab demonstrates how you can reuse familiar batch-based DataFrame logic in a streaming context, making real-time analytics more accessible and production-ready.



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("StructuredStreamingLab") \
    .getOrCreate()

In [None]:
spark

In [None]:
staticDataFrame = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Users/satkarkarki/spark_the_definitive_guide/data/retail-data/by-day/*.csv")

# staticDataFrame.createOrReplaceTempView("retail_data")
# staticSchema = staticDataFrame.schema


In [None]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

## Performing SQL query which calculates the sum of total costs per customer per day

In [None]:
from pyspark.sql.functions import window, column, desc, col

staticDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .show(5)

**The configuration specifies the number of partitions that should be created afater a shuffle.**

    - By default, the value is 200
    - For local instances, it is better to set to 5
    

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

This block of code initiates a **streaming read operation**.

readStream is used to **process data incrementally** like a live feed.

In [None]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("/Users/satkarkarki/spark_the_definitive_guide/data/retail-data/by-day/*.csv")

In [None]:
# performing a check to see whether our DataFrame is streaming

streamingDataFrame.isStreaming

## The business logic set, this is still a lazy operation. We later need to call the streaming action

In [None]:
purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
            "CustomerId",
            "(UnitPrice * Quantity) AS total_cost",
            "InvoiceDate")\
        .groupBy(
            col("CustomerID"), window(col("InvoiceDate"), "1 day"))\
        .sum("total_cost")

This block is the streaming action. Previously, we defined a streamingDataFrame, applied transformations.

Since, transformations are lazy, we call the **streaming action** next:


In [None]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

## Once we start the stream, we can run queries against it to debug what our result will look like if we were to write this out to the production sink:

In [None]:
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY 'sum(total_cost)' DESC
""")\
    .show(5)

In [None]:
# This is still lazy — Spark plans, but doesn't act yet

purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr("CustomerId", "(UnitPrice * Quantity) AS total_cost", "InvoiceDate")\
    .groupBy(col("CustomerID"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")

In [None]:
# This is the action — only now Spark starts executing

purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

## Machine Learning and Advanced Analyitcs

    - Built-in library of machine learning called MLlib.
    - MLlib allows for preprocessing, munging, training of models, and making predictions at scale on data.
    
For the purpose of this lab, we will perform some basic clustering on our data using k-means.

In [None]:
# view schema for the DF:

staticDataFrame.printSchema()

In [None]:
# In MLlib, data must be represented as numerical values:
# we'll use several DataFrame transformations to manipulate our date data:


from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
    .coalesce(5)

In [None]:
# manually creating train-test split:

trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

In [None]:
testDataFrame.count()


In [None]:
trainDataFrame.count()

In [None]:
# We perform now data transformations which will be covered extensively later (so just run):
# This transformation will convert days of weeks into corresponding numerical values.

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

In [None]:
# Using one-hot encoding to create Boolean flags so each values get their own columns:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_encoded")

In [None]:
# Each of these will assemble in a set of columns which we need assemble into a vector next:

from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
    .setOutputCol("features")
    

In [None]:
# Now, we have three key features: the price, the quantity, and the day of week
# Next, we'll set this up into a pipleline so that any future data we need to transform can go throiug hthe same process:

from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

## Preparing for training is a two-step process:

- We first need to fit our transformers to the dataset.
- After that we are ready tottake that fitted pipeline and use it to transform al of our data in a consistent and repeatable way.

In [None]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [None]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [None]:
# saving on .cache() t okeep it in memory and save time for later use:

transformedTraining.cache()

## now its time to train the model, lets go:
## in spark, training ML models happen in a two-phase process:

    - Initialize an Untrained Model: **start an algorithm object**
    - Fit the Model to Data **train it using .fit(data)**



In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans()\
    .setK(20)\
    .setSeed(1)

In [None]:
kmModel = kmeans.fit(transformedTraining)

In [None]:
# Training cost
kmModel.summary.trainingCost


## 🔧 What are Lower-Level APIs?

### In Spark, you have two major ways to work with data:

- Level	API	Description
- High-Level	DataFrame & Dataset APIs	Modern, optimized, SQL-like; easy to use
- Low-Level	RDDs (Resilient Distributed Datasets)	Manual control, lower-level operations

The DataFrame API is built on top of RDDs — which means:

Everything you do with DataFrames eventually becomes RDDs under the hood.

But as a user, you don’t need to touch RDDs 95% of the time.



In [None]:
## This turns the raw in-memory list into a distributed Spark DataFrame

from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()