In [0]:
dbutils.widgets.text('catalog',  'ecommerce',  'Unity Catalog Name')
dbutils.widgets.text('run_date', '2026-02-25', 'Processing Date')

CATALOG  = dbutils.widgets.get('catalog')
RUN_DATE = dbutils.widgets.get('run_date')
print(f'Silver cleaning | catalog: {CATALOG} | run_date: {RUN_DATE}')

In [0]:

# Data profiling 
import pyspark.sql.functions as F
from pyspark.sql.functions import when, col

df = spark.table(f'{CATALOG}.bronze.events_br')

print('=== NULL COUNTS ===')
null_counts = df.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in ['user_id', 'event_type', 'product_id', 'price', 'category_code', 'brand']
])
null_counts.show()

total  = df.count()
unique = df.dropDuplicates(['user_id', 'event_time', 'product_id', 'event_type']).count()
print(f'Total: {total:,} | Unique: {unique:,} | Duplicates: {total - unique:,}')

print('=== EVENT TYPE DISTRIBUTION ===')
df.groupBy('event_type').count().orderBy('count', ascending=False).show()


In [0]:
# Quality tagging 
VALID_EVENT_TYPES = ['view', 'cart', 'purchase']

df_tagged = df.withColumn('_quality_error',
    when(col('user_id').isNull(),                      'NULL_USER_ID')
    .when(col('event_type').isNull(),                  'NULL_EVENT_TYPE')
    .when(~col('event_type').isin(VALID_EVENT_TYPES),  'INVALID_EVENT_TYPE')
    .when(col('price').isNull() &
          (col('event_type') == 'purchase'),           'NULL_PRICE_ON_PURCHASE')
    .when(col('price') < 0,                            'NEGATIVE_PRICE')
    .otherwise(None)
)

df_good = df_tagged.filter(col('_quality_error').isNull())
df_bad  = df_tagged.filter(col('_quality_error').isNotNull())

good_count = df_good.count()
bad_count  = df_bad.count()
total      = df_tagged.count()
pass_rate  = round((good_count / total) * 100, 2)

print(f'Total:     {total:,}')
print(f'Good:      {good_count:,}')
print(f'Bad:       {bad_count:,}')
print(f'Pass rate: {pass_rate}%')

assert pass_rate >= 80, f'PIPELINE FAILED: Pass rate {pass_rate}% below 80% threshold!'
print('Quality gate PASSED â€” proceeding to Silver...')




In [0]:
#  Quarantine bad records (your Day 2 Cell 16)
if bad_count > 0:
    df_bad.withColumn('_quarantined_at', F.current_timestamp()) \
        .write.format('delta').mode('append') \
        .saveAsTable(f'{CATALOG}.bronze.events_quarantine')
    print(f'{bad_count:,} records quarantined')
    df_bad.groupBy('_quality_error').count().orderBy('count', ascending=False).show()
else:
    print('No bad records found!')

In [0]:
# Build and save Silver 
from pyspark.sql.types import TimestampType, DoubleType, LongType

df_silver = df_good \
    .dropDuplicates(['user_id', 'event_time', 'product_id', 'event_type']) \
    .withColumn('event_time', col('event_time').cast(TimestampType())) \
    .withColumn('price',      col('price').cast(DoubleType())) \
    .withColumn('product_id', col('product_id').cast(LongType())) \
    .withColumn('user_id',    col('user_id').cast(LongType())) \
    .withColumn('event_date', F.to_date(col('event_time'))) \
    .drop('_quality_error')

print(f'Silver record count: {df_silver.count():,}')
df_silver.printSchema()

df_silver.write \
    .format('delta') \
    .mode('overwrite') \
    .option('overwriteSchema', 'true') \
    .partitionBy('event_date') \
    .saveAsTable(f'{CATALOG}.silver.events_si')

print('Silver layer saved!')
spark.sql(f'DESCRIBE DETAIL {CATALOG}.silver.events_si').select('numFiles', 'sizeInBytes').show()


In [0]:
# Validate Silver (your Day 2 Cell 20)
df_s = spark.table(f'{CATALOG}.silver.events_si')
for c in ['user_id', 'event_type', 'event_time']:
    n = df_s.filter(col(c).isNull()).count()
    print(f'NULL check [{c}]: {"PASS" if n == 0 else f"FAIL ({n:,} nulls)"}')

silver_count = df_s.count()
print(f'Silver total records: {silver_count:,}')

dbutils.notebook.exit(f'SUCCESS: {silver_count} records')