In [2]:
!pip install -q pyspark pyarrow pandas


In [3]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("RetailPulse_Colab")          # appears in the Spark UI/job name
         .master("local[*]")                    # use all local CPU cores on the VM
         .config("spark.sql.shuffle.partitions", "4")   # fewer shuffle tasks for tiny data
         .config("spark.sql.adaptive.enabled", "true")  # AQE: smarter plans at runtime
         .getOrCreate())

print("Spark version:", spark.version)


Spark version: 3.5.1


In [6]:
#creating sample raw csv files
import pathlib, textwrap

RAW = pathlib.Path("data/raw"); RAW.mkdir(parents=True, exist_ok=True)

(RAW / "transactions.csv").write_text(textwrap.dedent("""
order_id,customer_id,order_ts,sku,qty,price,country,channel
1001,C001,2025-07-01 10:05:10,SKU_01,1,19.99,NO,web
1001,C001,2025-07-01 10:05:10,SKU_02,2,5.00,NO,web
1002,C002,2025-07-02 13:22:45,SKU_03,1,49.00,US,app
1003,C003,2025-07-03 08:01:00,SKU_01,3,19.99,CA,web
1004,C001,2025-07-10 19:30:12,SKU_04,1,129.00,NO,app
1005,C004,2025-07-12 15:11:59,SKU_02,4,5.00,NO,web
""").strip()+"\n")

(RAW / "customers.csv").write_text(textwrap.dedent("""
customer_id,signup_date,loyalty_tier,region
C001,2024-11-20,Gold,Nordics
C002,2025-01-05,Silver,North America
C003,2023-09-12,Bronze,North America
C004,2025-03-18,Silver,Nordics
""").strip()+"\n")

print("Wrote:", RAW / "transactions.csv")
print("Wrote:", RAW / "customers.csv")


Wrote: data/raw/transactions.csv
Wrote: data/raw/customers.csv


In [7]:
#explicit schema
from pyspark.sql import types as T, functions as F

schema = T.StructType([
    T.StructField("order_id",    T.IntegerType()),
    T.StructField("customer_id", T.StringType()),
    T.StructField("order_ts",    T.TimestampType()),
    T.StructField("sku",         T.StringType()),
    T.StructField("qty",         T.IntegerType()),
    T.StructField("price",       T.DoubleType()),
    T.StructField("country",     T.StringType()),
    T.StructField("channel",     T.StringType()),
])

tx = (spark.read.format("csv")
      .option("header", True)
      .schema(schema)                               # <- important
      .load("data/raw/transactions.csv")
      .withColumn("order_date", F.to_date("order_ts")))  # for partitioning/pruning later

tx.show(truncate=False)
tx.printSchema()


+--------+-----------+-------------------+------+---+-----+-------+-------+----------+
|order_id|customer_id|order_ts           |sku   |qty|price|country|channel|order_date|
+--------+-----------+-------------------+------+---+-----+-------+-------+----------+
|1001    |C001       |2025-07-01 10:05:10|SKU_01|1  |19.99|NO     |web    |2025-07-01|
|1001    |C001       |2025-07-01 10:05:10|SKU_02|2  |5.0  |NO     |web    |2025-07-01|
|1002    |C002       |2025-07-02 13:22:45|SKU_03|1  |49.0 |US     |app    |2025-07-02|
|1003    |C003       |2025-07-03 08:01:00|SKU_01|3  |19.99|CA     |web    |2025-07-03|
|1004    |C001       |2025-07-10 19:30:12|SKU_04|1  |129.0|NO     |app    |2025-07-10|
|1005    |C004       |2025-07-12 15:11:59|SKU_02|4  |5.0  |NO     |web    |2025-07-12|
+--------+-----------+-------------------+------+---+-----+-------+-------+----------+

root
 |-- order_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_ts: timestamp (nullable = tru

In [9]:
#land raw CSV, then curate to Parquet partitioned by date for pruning
from pathlib import Path

OUT = Path("data/processed/tx_parquet"); OUT.parent.mkdir(parents=True, exist_ok=True)

(tx.repartition("order_date")                 # line data shuffled by day before write
   .write.mode("overwrite")
   .partitionBy("order_date")                 # creates folders like order_date=2025-07-01/
   .parquet(str(OUT)))

print("Wrote:", OUT)


Wrote: data/processed/tx_parquet


In [10]:
#daily KPI: revenue by day
tx = spark.read.parquet(str(OUT)).withColumn("line_total", F.col("qty") * F.col("price"))

daily_rev = (tx.withColumn("order_day", F.to_date("order_ts"))
               .groupBy("order_day")
               .agg(F.round(F.sum("line_total"), 2).alias("revenue"))
               .orderBy("order_day"))

daily_rev.show()


+----------+-------+
| order_day|revenue|
+----------+-------+
|2025-07-01|  29.99|
|2025-07-02|   49.0|
|2025-07-03|  59.97|
|2025-07-10|  129.0|
|2025-07-12|   20.0|
+----------+-------+



In [11]:
#window function - top 3 SKUs by country
from pyspark.sql import Window as W

totals = tx.groupBy("country","sku").agg(F.sum("qty").alias("qty"))
win = W.partitionBy("country").orderBy(F.desc("qty"))
top_skus = (totals
            .withColumn("rk", F.row_number().over(win))   # rank SKUs inside each country
            .where(F.col("rk") <= 3)
            .drop("rk"))

top_skus.show()


+-------+------+---+
|country|   sku|qty|
+-------+------+---+
|     CA|SKU_01|  3|
|     NO|SKU_02|  6|
|     NO|SKU_01|  1|
|     NO|SKU_04|  1|
|     US|SKU_03|  1|
+-------+------+---+



In [12]:
#RFM features (Recency, Frequency, Monetary)
# De-duplicate to 1 row per (order_id, customer_id, order_ts)
tx_orders = (tx.select("order_id","customer_id","order_ts","line_total")
               .dropDuplicates(["order_id","customer_id","order_ts"]))

# Aggregate per customer
rfm = (tx_orders.groupBy("customer_id")
        .agg(F.max("order_ts").alias("last_order_ts"),
             F.countDistinct("order_id").alias("frequency"),
             F.sum("line_total").alias("monetary")))

# Recency in days = (now - last_order_ts) / 86400
rfm = (rfm
       .withColumn("today", F.to_timestamp(F.lit("2025-08-17 00:00:00")))
       .withColumn("recency_days",
                   (F.col("today").cast("long") - F.col("last_order_ts").cast("long"))/86400.0)
       .drop("today"))

rfm.orderBy("customer_id").show(truncate=False)


+-----------+-------------------+---------+--------+-----------------+
|customer_id|last_order_ts      |frequency|monetary|recency_days     |
+-----------+-------------------+---------+--------+-----------------+
|C001       |2025-07-10 19:30:12|2        |148.99  |37.18736111111111|
|C002       |2025-07-02 13:22:45|1        |49.0    |45.44253472222222|
|C003       |2025-07-03 08:01:00|1        |59.97   |44.66597222222222|
|C004       |2025-07-12 15:11:59|1        |20.0    |35.36667824074074|
+-----------+-------------------+---------+--------+-----------------+



In [13]:
#join customer attributes for BI and profiling
cust = (spark.read.format("csv").option("header", True)
        .load("data/raw/customers.csv")
        .withColumn("signup_date", F.to_date("signup_date")))

rfm_joined = (rfm.join(cust, "customer_id", "left")
                .select("customer_id","recency_days","frequency","monetary","loyalty_tier","region"))

rfm_joined.show(truncate=False)


+-----------+-----------------+---------+--------+------------+-------------+
|customer_id|recency_days     |frequency|monetary|loyalty_tier|region       |
+-----------+-----------------+---------+--------+------------+-------------+
|C001       |37.18736111111111|2        |148.99  |Gold        |Nordics      |
|C004       |35.36667824074074|1        |20.0    |Silver      |Nordics      |
|C002       |45.44253472222222|1        |49.0    |Silver      |North America|
|C003       |44.66597222222222|1        |59.97   |Bronze      |North America|
+-----------+-----------------+---------+--------+------------+-------------+



In [14]:
#customer segmentation with KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans

# Assemble & scale features (scaling avoids “big-number” bias)
feats = rfm_joined.fillna(0).select("customer_id","recency_days","frequency","monetary")
va = VectorAssembler(inputCols=["recency_days","frequency","monetary"], outputCol="features_raw")
scaled = (StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)
          .fit(va.transform(feats))
          .transform(va.transform(feats)))

# Train KMeans and assign clusters
kmeans = KMeans(k=3, seed=42, featuresCol="features", predictionCol="cluster")
model = kmeans.fit(scaled)
clusters = model.transform(scaled).select("customer_id","cluster")
clusters.show()

# Cluster profile (avg R/F/M per cluster)
profile = (clusters.join(rfm_joined, "customer_id")
                 .groupBy("cluster")
                 .agg(F.round(F.avg("recency_days"),1).alias("avg_recency"),
                      F.round(F.avg("frequency"),2).alias("avg_freq"),
                      F.round(F.avg("monetary"),2).alias("avg_monetary"),
                      F.count("*").alias("customers")))
profile.orderBy("cluster").show()


+-----------+-------+
|customer_id|cluster|
+-----------+-------+
|       C001|      1|
|       C004|      2|
|       C002|      0|
|       C003|      0|
+-----------+-------+

+-------+-----------+--------+------------+---------+
|cluster|avg_recency|avg_freq|avg_monetary|customers|
+-------+-----------+--------+------------+---------+
|      0|       45.1|     1.0|       54.49|        2|
|      1|       37.2|     2.0|      148.99|        1|
|      2|       35.4|     1.0|        20.0|        1|
+-------+-----------+--------+------------+---------+



In [15]:
#7 day revenue forecast
from pyspark.sql.window import Window
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Make a supervised dataset: trend index + day-of-week
dr = daily_rev.withColumn("day_index", F.row_number().over(Window.orderBy("order_day")) - 1)
dr = dr.withColumn("dow", F.dayofweek("order_day"))  # 1..7

va = VectorAssembler(inputCols=["day_index", "dow"], outputCol="features")
train_ml = va.transform(dr.select("revenue","day_index","dow")).select("features","revenue")

lr = LinearRegression(featuresCol="features", labelCol="revenue")
lr_model = lr.fit(train_ml)

# Build 7 future rows (toy example)
max_idx = dr.agg(F.max("day_index")).first()[0]
max_dow = dr.agg(F.max("dow")).first()[0]

future = spark.range(1, 8).withColumn("future_index", F.col("id") + max_idx).drop("id")
future = (future.withColumn("dow", ((F.lit(max_dow) + F.col("future_index")) % 7) + 1)
                .withColumn("day_index", F.col("future_index")))

pred = (lr_model.transform(va.transform(future.select("day_index","dow")))
                 .withColumnRenamed("prediction","forecast_revenue")
                 .select("day_index","dow","forecast_revenue"))

pred.show()


+---------+---+------------------+
|day_index|dow|  forecast_revenue|
+---------+---+------------------+
|        5|  6|215.75371428571492|
|        6|  7|212.41200000000055|
|        7|  1| 863.1302857142884|
|        8|  2| 859.7885714285742|
|        9|  3| 856.4468571428599|
|       10|  4| 853.1051428571457|
|       11|  5| 849.7634285714312|
+---------+---+------------------+

