# User ID product recommendations

Using Medallion Architecture 
1. Batch Processing (Pyspark) -> Build user dashboard with recommendation based on view & cart (full refresh)
2. Stream Processing (SQL) -> Build user dasboard with recommendation based on views, cart & popularity (incremental)

In [0]:
%sh
cp /Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv /Volumes/workspace/ecommerce/ecommerce_data/Batch/2019-Oct.csv

In [0]:
%sh
cp /Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv /Volumes/workspace/ecommerce/ecommerce_data/Streaming/2019-Oct.csv

Batch Processing (full refresh)

Loading data to Bronze

In [0]:
%sql
CREATE OR REPLACE TABLE workspace.ecommerce.ecom_bronze_events AS
SELECT
    CAST(data.user_id        AS STRING)   AS user_id,
    CAST(data.user_session   AS STRING)   AS session_id,
    CAST(data.event_time     AS TIMESTAMP)AS event_time,
    CAST(data.event_type     AS STRING)   AS event_type,
    CAST(data.product_id     AS BIGINT)   AS product_id,
    CAST(data.brand          AS STRING)   AS brand,
    CAST(data.category_code  AS STRING)   AS category,
    CAST(data.price          AS DOUBLE)   AS price,
    TO_DATE(data.event_time)              AS event_date
FROM read_files(
    '/Volumes/workspace/ecommerce/ecommerce_data/Batch/',
    format => 'csv',
    header => 'true'
) AS data;

Bronze -> Silver

In [0]:
%sql
CREATE OR REPLACE TABLE workspace.ecommerce.ecom_silver_events AS
SELECT *
FROM (
    SELECT *, row_number() OVER (PARTITION BY user_id, session_id, event_time, event_type ORDER BY product_id) AS rn
    FROM workspace.ecommerce.ecom_bronze_events
    WHERE event_type IN ('view', 'cart', 'purchased')
      AND user_id IS NOT NULL
      AND session_id IS NOT NULL
      AND product_id IS NOT NULL
) sub
WHERE rn = 1;

Every user recently viewed 20 products

In [0]:
%sql
CREATE OR REPLACE TABLE workspace.ecommerce.ecom_gold_user_viewed AS
WITH viewed AS (
  SELECT
    user_id, product_id, brand, category, price, event_time,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time DESC) AS rn
  FROM ecommerce.ecom_silver_events
  WHERE event_type = 'view'
)
SELECT user_id, product_id, brand, category, price, event_time
FROM viewed
WHERE rn <= 20;

Every users cart products 

In [0]:
%sql
CREATE OR REPLACE TABLE workspace.ecommerce.ecom_gold_user_cart AS
  SELECT
    user_id, product_id, brand, category, price, event_time
  FROM ecommerce.ecom_silver_events
  WHERE event_type = 'cart'
  ORDER BY event_time DESC



In [0]:
%sql

select * FROM workspace.ecommerce.ecom_gold_user_viewed


In [0]:
%sql
SELECT * FROM workspace.ecommerce.ecom_gold_user_cart

Stream Processing (Incremental)

Parametering the schema, tables, checkpoint base & dir

In [0]:

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, LongType
from pyspark.sql.window import Window

catalog = "workspace"          # change to your UC catalog
schema_name = "ecommerce" # change to your UC schema
bronze_tbl = f"{catalog}.{schema_name}.ecom_df_bronze_events"
silver_tbl = f"{catalog}.{schema_name}.ecom_df_silver_events"


gold_recent_tbl = f"{catalog}.{schema_name}.ecom_df_gold_user_recently_viewed"
gold_cart_tbl   = f"{catalog}.{schema_name}.ecom_df_gold_user_cart"
gold_pop_tbl    = f"{catalog}.{schema_name}.ecom_df_gold_product_popularity"
gold_pair_tbl   = f"{catalog}.{schema_name}.ecom_df_gold_coview_pairs"
gold_reco_tbl   = f"{catalog}.{schema_name}.ecom_df_gold_user_recommendations"

source_dir = "/Volumes/workspace/ecommerce/ecommerce_data/Streaming/"  # <â€” folder (not a single file)
chk_base   = "/Volumes/workspace/ecommerce/ecommerce_data/_chk"  # adjust a DBFS/ABFSS checkpoint root


schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])



Lodaing data to Bronze stream

In [0]:
bronze_df = (spark.readStream.format("cloudfiles")
             .option("cloudFiles.schemaLocation", f"{chk_base}/schema/bronze")
             .option("header","true")
             .option("cloudfiles.format", "csv")
             .option("cloudFiles.maxFilesPerTrigger", "1")
             .schema(schema)
             .load("/Volumes/workspace/ecommerce/ecommerce_data/Streaming/")
             .withColumn("event_date", F.to_date(F.col("event_time")))
             .withColumnRenamed("user_session", "session_id"))

(bronze_df.writeStream
 .format("delta")
 .outputMode("append")
 .option("checkpointLocation",f"{chk_base}/schema/bronze")
 .partitionBy("event_date")
 .trigger(availableNow=True)
 .toTable(bronze_tbl))

Loading data to Silver Stream (Making unique values & removing null)

In [0]:
silver_df = (spark.readStream.table(bronze_tbl)
             .filter(F.col("event_type").isin(["view", "cart", "purchased"]))
             .filter(F.col("user_id").isNotNull())
             .filter(F.col("product_id").isNotNull())
             .filter(F.col("session_id").isNotNull())
             .withColumnRenamed("category_code", "category")
)

silver_dedup = (silver_df
                .withWatermark("event_time", "1 hour")
                .dropDuplicates(["user_id", "session_id", "event_time", "event_type"]))

(silver_dedup.writeStream
 .format("delta")
 .outputMode("append")
 .option("checkpointLocation", f"{chk_base}/schema/silver")
 .partitionBy("event_date")
 .trigger(availableNow=True)
 .toTable(silver_tbl))

1. Gold table with recent views per user
2. Gold table with products in their cart
3. Gold table with popular products
4. Gold table with pair products based on purchase for recommendations


In [0]:
window = Window.partitionBy("user_id").orderBy(F.col("event_time").desc())

Gold_df_recent = (spark.read.table(silver_tbl)
                  .filter(F.col("event_type") == "view")
                  .withColumn("rn", F.row_number().over(window))
                  .filter(F.col("rn") <= 20)
                  .select("user_id","product_id","brand","category_code","price","event_time")
)

Gold_df_recent.write.mode("overwrite").saveAsTable(gold_recent_tbl)

Gold_df_cart = (spark.read.table(silver_tbl)
                .filter(F.col("event_type") == "cart")
                .select("user_id","product_id","brand","category_code","price","event_time")
)

Gold_df_cart.write.mode("overwrite").saveAsTable(gold_cart_tbl)

Gold_df_pop = (spark.read.table(silver_tbl)
               .groupBy("category_code","brand")
               .agg(F.countDistinct("product_id").alias("product_count"))
               .orderBy(F.col("product_count").desc())
               .limit(20)
)
Gold_df_pop.write.mode("overwrite").saveAsTable(gold_pop_tbl)

Gold_df_pair = (spark.read.table(silver_tbl)
                  .filter(F.col("event_type") == "purchased")
                  .select("session_id","product_id")
                  .distinct()
)
Gold_df_pairing = (Gold_df_pair.alias("p1")
                   .join(Gold_df_pair.alias("p2"), on = "session_id")
                   .filter(F.col("p1.product_id") < F.col("p2.product_id"))
                   .select(
                       F.col("p1.product_id").alias("product_id_1"),
                       F.col("p2.product_id").alias("product_id_2"))
)
Gold_df_pairing.write.mode("overwrite").saveAsTable(gold_pair_tbl)