In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName("CustomerSegmentation").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("product_id", StringType()),
    StructField("category", StringType()),
    StructField("price", DoubleType())
])

batch_counter = {"count": 0}
def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_counter["count"] % 10 == 0:
        spark.stop()

stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))

windowed = (stream.withWatermark("timestamp", "1 minute")
            .groupBy(window("timestamp", "5 minutes"), "user_id")
            .agg(collect_set("event_type").alias("events")))

segmented = (windowed.withColumn("segment", expr("""
    CASE
        WHEN array_contains(events, 'purchase') THEN 'Buyer'
        WHEN array_contains(events, 'cart') THEN 'Cart abandoner'
        ELSE 'Lurker'
    END
""")))

query = (segmented.writeStream
         .outputMode("complete")
         .format("console")
         .foreachBatch(process_batch)
         .start())

Batch ID: 0
+------------------------------------------+-------+----------------------+-------+
|window                                    |user_id|events                |segment|
+------------------------------------------+-------+----------------------+-------+
|{2025-05-14 14:25:00, 2025-05-14 14:30:00}|u2     |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:00:00, 2025-05-14 14:05:00}|u49    |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:10:00, 2025-05-14 14:15:00}|u5     |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:15:00, 2025-05-14 14:20:00}|u20    |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:20:00, 2025-05-14 14:25:00}|u4     |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:05:00, 2025-05-14 14:10:00}|u30    |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:10:00, 2025-05-14 14:15:00}|u45    |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:25:00, 2025-05-14 14:30:00}|u28    |[cart, view, purchase]|Buyer  |
|{2025-05-14 14:20:00, 2025-05-14 14:25:00}|u12    |[cart, view,