In [0]:
dbutils.widgets.text('catalog',  'ecommerce',  'Unity Catalog Name')
dbutils.widgets.text('run_date', '2026-02-25', 'Processing Date')

CATALOG  = dbutils.widgets.get('catalog')
RUN_DATE = dbutils.widgets.get('run_date')
print(f'Silver cleaning | catalog: {CATALOG} | run_date: {RUN_DATE}')

Silver cleaning | catalog: ecommerce | run_date: 2026-02-25


In [0]:
# Core aggregations
from pyspark.sql.functions import (
    count, countDistinct, sum as spark_sum, avg, max as spark_max, when, col
)
from pyspark.sql import Window
import pyspark.sql.functions as F

df_s = spark.table(f'{CATALOG}.silver.events')

df_agg = df_s.groupBy('user_id').agg(
    count('*').alias('total_events'),
    countDistinct('user_session').alias('total_sessions'),
    count(when(col('event_type') == 'view',True)).alias('total_views'),
    count(when(col('event_type') == 'cart',True)).alias('total_cart_adds'),
    count(when(col('event_type') == 'purchase',True)).alias('total_purchases'),
    countDistinct(when(col('event_type') == 'purchase', col('product_id'))).alias('unique_products_purchased'),
    spark_sum(when(col('event_type') == 'purchase', col('price'))).alias('total_revenue'),
    avg(when(col('event_type') == 'purchase',col('price'))).alias('avg_purchase_price'),
    avg(when(col('event_type') == 'view', col('price'))).alias('avg_price_viewed'),
    spark_max('event_time').alias('last_seen_at'),
)




In [0]:
#  Favourite brand (your Day 2 Cell 25)
df_purchases = df_s.filter(col('event_type') == 'purchase')
df_fav_brand = df_purchases \
    .groupBy('user_id', 'brand') \
    .agg(count('*').alias('brand_purchase_count')) \
    .withColumn('rank', F.rank().over(
        Window.partitionBy('user_id').orderBy(col('brand_purchase_count').desc())
    )) \
    .filter(col('rank') == 1) \
    .select('user_id', col('brand').alias('favourite_brand'))


In [0]:
# Top viewed category 
df_views = df_s.filter((col('event_type') == 'view') & col('category_code').isNotNull())
df_top_cat = df_views \
    .groupBy('user_id', 'category_code') \
    .agg(count('*').alias('view_count')) \
    .withColumn('rank', F.rank().over(
        Window.partitionBy('user_id').orderBy(col('view_count').desc())
    )) \
    .filter(col('rank') == 1) \
    .select('user_id', col('category_code').alias('top_viewed_category'))

# Join all features + purchase_rate (your Day 2 Cells 27-28)
df_gold = df_agg \
    .join(df_fav_brand, on='user_id', how='left') \
    .join(df_top_cat,   on='user_id', how='left')

df_gold = df_gold.withColumn('purchase_rate',
    F.round(col('total_purchases') / col('total_events'), 4)
)

print(f'Gold feature table: {df_gold.count():,} unique users')
display(df_gold.limit(10))



In [0]:
#  Save Gold feature table (your Day 2 Cell 29)
df_gold.write \
    .format('delta') \
    .mode('overwrite') \
    .option('overwriteSchema', 'true') \
    .saveAsTable(f'{CATALOG}.gold.user_features_1')

print('Gold feature table saved!')
spark.sql(f'DESCRIBE DETAIL {CATALOG}.gold.user_features_1').select('numFiles', 'sizeInBytes').show()

gold_count = df_gold.count()
dbutils.notebook.exit(f'SUCCESS: {gold_count} users')