In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
import time
from datetime import datetime, timedelta
import random
from pyspark.sql.functions import col, count, sum, avg, max, min, desc, year, month, dayofweek, hour, datediff, lit, when, countDistinct, ntile


In [2]:
spark = SparkSession.builder \
    .appName("RealTimeRetailAnalytics") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("✓ Spark Session initialized")
print(f"Spark Version: {spark.version}\n")

✓ Spark Session initialized
Spark Version: 4.0.1



In [3]:
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx


In [4]:
df_pandas = pd.read_excel('Online Retail.xlsx')
df_pandas['InvoiceDate'] = pd.to_datetime(df_pandas['InvoiceDate'])

In [5]:
df_cleaned = df_pandas[
    (df_pandas['CustomerID'].notna()) &
    (df_pandas['Quantity'] > 0) &
    (df_pandas['UnitPrice'] > 0)
].copy()

In [6]:
df_cleaned['TotalPrice'] = df_cleaned['Quantity'] * df_cleaned['UnitPrice']


In [7]:
print(f"Original records: {len(df_pandas)}")
print(f"Cleaned records: {len(df_cleaned)}")
print(f"Date range: {df_cleaned['InvoiceDate'].min()} to {df_cleaned['InvoiceDate'].max()}\n")

Original records: 541909
Cleaned records: 397884
Date range: 2010-12-01 08:26:00 to 2011-12-09 12:50:00



## SIMULATED STREAMING DATA GENERATOR

In [8]:
def create_streaming_data(batch_id, num_transactions=50):
    """
    Simulate real-time transactions by sampling from historical data
    and adding current timestamps
    """
    sample = df_cleaned.sample(n=num_transactions, replace=True)

    # Add artificial timestamps to simulate real-time
    base_time = datetime.now()
    time_offsets = [timedelta(seconds=random.randint(0, 10)) for _ in range(num_transactions)]
    sample['StreamTimestamp'] = [base_time + offset for offset in time_offsets]

    return sample

In [9]:
sample_batch = create_streaming_data(0, 100)

In [10]:
sample_batch

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,TotalPrice,StreamTimestamp
198850,554071,23298,SPOTTY BUNTING,4,2011-05-22 11:17:00,4.95,13113.0,United Kingdom,19.80,2025-12-27 17:31:38.298265
516059,579854,23503,PLAYING CARDS KEEP CALM & CARRY ON,6,2011-11-30 16:26:00,1.25,14713.0,United Kingdom,7.50,2025-12-27 17:31:34.298265
468199,576399,23367,SET 12 COLOUR PENCILS SPACEBOY,16,2011-11-15 10:50:00,0.65,12684.0,France,10.40,2025-12-27 17:31:31.298265
290415,562377,21745,GAOLERS KEYS DECORATIVE GARDEN,3,2011-08-04 15:04:00,3.75,14535.0,United Kingdom,11.25,2025-12-27 17:31:32.298265
228988,557007,22221,CAKE STAND LOVEBIRD 2 TIER PINK,4,2011-06-16 11:30:00,9.95,12484.0,Spain,39.80,2025-12-27 17:31:38.298265
...,...,...,...,...,...,...,...,...,...,...
453666,575491,22600,CHRISTMAS RETROSPOT STAR WOOD,5,2011-11-09 17:47:00,0.85,15531.0,United Kingdom,4.25,2025-12-27 17:31:36.298265
223501,556484,22566,FELTCRAFT HAIRBAND PINK AND PURPLE,2,2011-06-12 13:17:00,0.85,16938.0,United Kingdom,1.70,2025-12-27 17:31:31.298265
527928,580727,23014,GLASS APOTHECARY BOTTLE ELIXIR,1,2011-12-05 17:17:00,8.29,14096.0,United Kingdom,8.29,2025-12-27 17:31:33.298265
342850,566914,23169,CLASSIC GLASS COOKIE JAR,1,2011-09-15 14:08:00,4.15,15466.0,United Kingdom,4.15,2025-12-27 17:31:31.298265


In [11]:
stream_schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", TimestampType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", DoubleType(), True),
    StructField("Country", StringType(), True),
    StructField("TotalPrice", DoubleType(), True),
    StructField("StreamTimestamp", TimestampType(), True)
])

## SLIDING WINDOW AGGREGATIONS

In [17]:
def process_streaming_batch(batch_df, batch_id):
    """
    Process each micro-batch with various sliding window aggregations
    """
    if batch_df.empty:
        return

    print(f"\n{'='*70}")
    print(f"BATCH {batch_id} | Time: {datetime.now().strftime('%H:%M:%S')}")
    print(f"{'='*70}")

    spark_batch = spark.createDataFrame(batch_df, schema=stream_schema)

    spark_batch = spark_batch.withColumn("event_time", col("StreamTimestamp"))

    # WINDOW 1: 30-Second Tumbling Window - Revenue Metrics
    print("\n[1] 30-Second Revenue Window:")
    revenue_window = spark_batch \
        .groupBy(window("event_time", "30 seconds")) \
        .agg(
            count("*").alias("transaction_count"),
            sum("TotalPrice").alias("total_revenue"),
            avg("TotalPrice").alias("avg_transaction_value"),
            countDistinct("CustomerID").alias("unique_customers")
        ) \
        .orderBy("window")

    revenue_window.show(truncate=False)

    # WINDOW 2: 1-Minute Sliding Window (30s slide) - Top Products
    print("\n[2] Top 5 Products (1-min window, 30s slide):")
    product_window = spark_batch \
        .groupBy(
            window("event_time", "1 minute", "30 seconds"),
            "StockCode",
            "Description"
        ) \
        .agg(
            sum("Quantity").alias("units_sold"),
            sum("TotalPrice").alias("revenue")
        )

    window_spec = Window.partitionBy("window").orderBy(desc("revenue"))
    top_products = product_window \
        .withColumn("rank", row_number().over(window_spec)) \
        .filter(col("rank") <= 5) \
        .select("window", "rank", "Description", "units_sold", "revenue")

    top_products.show(truncate=False)

    # WINDOW 3: 2-Minute Sliding Window (1m slide) - Country Analysis
    print("\n[3] Country Sales (2-min window, 1m slide):")
    country_window = spark_batch \
        .groupBy(
            window("event_time", "2 minutes", "1 minute"),
            "Country"
        ) \
        .agg(
            count("*").alias("orders"),
            sum("TotalPrice").alias("revenue"),
            avg("Quantity").alias("avg_items_per_order")
        ) \
        .orderBy(desc("revenue"))

    country_window.show(5, truncate=False)

    # WINDOW 4: Session Window - Customer Purchase Patterns
    print("\n[4] Customer Activity Summary:")
    customer_stats = spark_batch \
        .groupBy("CustomerID") \
        .agg(
            count("*").alias("transactions"),
            sum("TotalPrice").alias("total_spent"),
            collect_list("StockCode").alias("products_bought"),
            min("event_time").alias("first_purchase"),
            max("event_time").alias("last_purchase")
        )

    customer_stats = customer_stats.withColumn(
        "session_duration_sec",
        (unix_timestamp("last_purchase") - unix_timestamp("first_purchase"))
    )

    customer_stats.select(
        "CustomerID",
        "transactions",
        "total_spent",
        "session_duration_sec"
    ).orderBy(desc("total_spent")).show(10, truncate=False)

    # WINDOW 5: Real-Time Anomaly Detection
    print("\n[5] Anomaly Detection - High Value Transactions:")

    avg_price = spark_batch.agg(avg("TotalPrice")).collect()[0][0]
    stddev_price = spark_batch.agg(stddev("TotalPrice")).collect()[0][0]

    if stddev_price and stddev_price > 0:
        threshold = avg_price + (2.5 * stddev_price)

        anomalies = spark_batch.filter(col("TotalPrice") > threshold) \
            .select(
                "InvoiceNo",
                "CustomerID",
                "Description",
                "Quantity",
                "TotalPrice",
                "event_time"
            )

        anomaly_count = anomalies.count()
        if anomaly_count > 0:
            print(f"⚠ Detected {anomaly_count} high-value transactions (> ${threshold:.2f})")
            anomalies.show(5, truncate=False)
        else:
            print("✓ No anomalies detected")

    # WINDOW 6: Frequent Items in Last Minute
    print("\n[6] Trending Items (Last Minute):")

    current_time = spark_batch.agg(max("event_time")).collect()[0][0]
    if current_time:
        one_min_ago = current_time - timedelta(minutes=1)

        recent_items = spark_batch \
            .filter(col("event_time") >= lit(one_min_ago)) \
            .groupBy("Description") \
            .agg(
                count("*").alias("purchase_frequency"),
                sum("Quantity").alias("total_quantity")
            ) \
            .orderBy(desc("purchase_frequency"))

        recent_items.show(5, truncate=False)


## CONTINUOUS STREAMING SIMULATION

In [14]:
print("\n" + "="*70)
print("STARTING REAL-TIME STREAM PROCESSING")
print("="*70)
print("Simulating 5 batches of streaming transactions...\n")


STARTING REAL-TIME STREAM PROCESSING
Simulating 5 batches of streaming transactions...



In [15]:
num_batches = 5
transactions_per_batch = 80

In [18]:
for batch_id in range(num_batches):
    print(f"\n{'#'*70}")
    print(f"# Generating Batch {batch_id + 1}/{num_batches}")
    print(f"{'#'*70}")

    # Generate streaming data
    batch_data = create_streaming_data(batch_id, transactions_per_batch)

    # Process the batch
    process_streaming_batch(batch_data, batch_id)

    # Wait before next batch (simulate real-time delay)
    if batch_id < num_batches - 1:
        print("\n⏳ Waiting for next batch...")
        time.sleep(2)



######################################################################
# Generating Batch 1/5
######################################################################

BATCH 0 | Time: 17:38:48

[1] 30-Second Revenue Window:
+------------------------------------------+-----------------+------------------+---------------------+----------------+
|window                                    |transaction_count|total_revenue     |avg_transaction_value|unique_customers|
+------------------------------------------+-----------------+------------------+---------------------+----------------+
|{2025-12-27 17:38:30, 2025-12-27 17:39:00}|80               |1737.4200000000003|21.717750000000002   |76              |
+------------------------------------------+-----------------+------------------+---------------------+----------------+


[2] Top 5 Products (1-min window, 30s slide):
+------------------------------------------+----+----------------------------------+----------+-------+
|window             