# 02 — Build Silver Layer (Spark SQL)

> **Colab-ready Spark SQL notebooks** following Medallion Architecture.
> Run notebooks in order: **01 Bronze → 02 Silver → 03 Gold → Analytics**.

### Conventions
- Databases (schemas): `bronze`, `silver`, `gold`
- Naming: `snake_case`
- Storage: managed tables under `/content/spark-warehouse` (created automatically)
- All code uses **Spark SQL** via `spark.sql(...)` and shows previews with `.show(10, truncate=False)`

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("Medallion-SparkSQL")
         .config("spark.sql.warehouse.dir", "/content/spark-warehouse")
         .enableHiveSupport().getOrCreate())
for db in ["bronze","silver","gold"]:
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
print("Databases ready:", [r.databaseName for r in spark.sql("SHOW DATABASES").collect()])

In [None]:
def preview(table_fqn, limit=10):
    print("\n=== Preview:", table_fqn, "===")
    spark.sql(f"SELECT COUNT(*) AS row_count FROM {table_fqn}").show(truncate=False)
    spark.sql(f"SELECT * FROM {table_fqn} LIMIT {limit}").show(limit, truncate=False)

### silver.crm_cust_info

In [None]:
spark.sql("DROP TABLE IF EXISTS silver.crm_cust_info")
spark.sql('''CREATE TABLE silver.crm_cust_info AS
WITH ranked AS (SELECT b.*, ROW_NUMBER() OVER (PARTITION BY cst_id ORDER BY cst_create_date DESC) rn FROM bronze.crm_cust_info b)
SELECT CAST(cst_id AS INT) cst_id, CAST(cst_key AS STRING) cst_key,
       TRIM(cst_firstname) cst_firstname, TRIM(cst_lastname) cst_lastname,
       CASE UPPER(TRIM(cst_marital_status)) WHEN 'S' THEN 'Single' WHEN 'M' THEN 'Married' ELSE 'n/a' END cst_marital_status,
       CASE UPPER(TRIM(cst_gndr)) WHEN 'F' THEN 'Female' WHEN 'M' THEN 'Male' ELSE 'n/a' END cst_gndr,
       CAST(cst_create_date AS DATE) cst_create_date, current_timestamp() dwh_create_ts
FROM ranked WHERE rn=1'''); preview("silver.crm_cust_info")

### silver.crm_prd_info

In [None]:
spark.sql("DROP TABLE IF EXISTS silver.crm_prd_info")
spark.sql('''CREATE TABLE silver.crm_prd_info AS
WITH base AS (
  SELECT CAST(prd_id AS INT) prd_id, CAST(prd_key AS STRING) prd_key,
         SUBSTR(prd_key,1,5) cat_id_raw, SUBSTR(prd_key,7) prd_key_clean,
         CAST(prd_nm AS STRING) prd_nm, CAST(prd_cost AS INT) prd_cost,
         CAST(prd_line AS STRING) prd_line, CAST(prd_start_dt AS TIMESTAMP) prd_start_ts
  FROM bronze.crm_prd_info),
ordered AS (
  SELECT prd_id, REGEXP_REPLACE(cat_id_raw,'-','_') cat_id, prd_key_clean prd_key, prd_nm, prd_cost,
         CASE UPPER(TRIM(prd_line)) WHEN 'M' THEN 'Mountain' WHEN 'R' THEN 'Road' WHEN 'S' THEN 'Other Sales' WHEN 'T' THEN 'Touring' ELSE 'n/a' END prd_line,
         CAST(prd_start_ts AS DATE) prd_start_dt,
         LEAD(CAST(prd_start_ts AS DATE)) OVER (PARTITION BY prd_key_clean ORDER BY prd_start_ts) next_start
  FROM base)
SELECT prd_id, cat_id, prd_key, prd_nm, prd_cost, prd_line, prd_start_dt,
       CASE WHEN next_start IS NULL THEN NULL ELSE date_sub(next_start,1) END prd_end_dt,
       current_timestamp() dwh_create_ts
FROM ordered'''); preview("silver.crm_prd_info")

### silver.crm_sales_details

In [None]:
spark.sql("DROP TABLE IF EXISTS silver.crm_sales_details")
spark.sql('''CREATE TABLE silver.crm_sales_details AS
WITH parsed AS (
  SELECT CAST(sls_ord_num AS STRING) sls_ord_num, CAST(sls_prd_key AS STRING) sls_prd_key, CAST(sls_cust_id AS INT) sls_cust_id,
         to_date(CAST(sls_order_dt AS STRING),'yyyyMMdd') sls_order_dt,
         to_date(CAST(sls_ship_dt AS STRING),'yyyyMMdd') sls_ship_dt,
         to_date(CAST(sls_due_dt AS STRING),'yyyyMMdd') sls_due_dt,
         CAST(sls_sales AS INT) sls_sales, CAST(sls_quantity AS INT) sls_quantity, CAST(sls_price AS INT) sls_price
  FROM bronze.crm_sales_details),
fixed AS (
  SELECT sls_ord_num, sls_prd_key, sls_cust_id, sls_order_dt, sls_ship_dt, sls_due_dt,
         COALESCE(sls_sales,0) sls_sales, sls_quantity, sls_price,
         (sls_quantity * abs(COALESCE(sls_price,0))) computed_sales FROM parsed),
reconciled AS (
  SELECT sls_ord_num, sls_prd_key, sls_cust_id, sls_order_dt, sls_ship_dt, sls_due_dt,
         CASE WHEN sls_sales <= 0 OR sls_sales <> computed_sales THEN computed_sales ELSE sls_sales END sls_sales,
         sls_quantity,
         CASE WHEN sls_price IS NULL OR sls_price <= 0 THEN CAST(sls_sales / NULLIF(sls_quantity,0) AS INT) ELSE sls_price END sls_price
  FROM fixed)
SELECT *, current_timestamp() dwh_create_ts FROM reconciled'''); preview("silver.crm_sales_details")

### silver.erp_cust_az12

In [None]:
spark.sql("DROP TABLE IF EXISTS silver.erp_cust_az12")
spark.sql('''CREATE TABLE silver.erp_cust_az12 AS
SELECT CASE WHEN cid LIKE 'NAS%%' THEN SUBSTR(cid,4) ELSE cid END cid,
       CASE WHEN bdate > current_date() THEN NULL ELSE CAST(bdate AS DATE) END bdate,
       CASE WHEN UPPER(TRIM(gen)) IN ('F','FEMALE') THEN 'Female' WHEN UPPER(TRIM(gen)) IN ('M','MALE') THEN 'Male' ELSE 'n/a' END gen,
       current_timestamp() dwh_create_ts
FROM bronze.erp_cust_az12'''); preview("silver.erp_cust_az12")

### silver.erp_loc_a101

In [None]:
spark.sql("DROP TABLE IF EXISTS silver.erp_loc_a101")
spark.sql('''CREATE TABLE silver.erp_loc_a101 AS
SELECT REPLACE(CAST(cid AS STRING),'-','') cid,
       CASE TRIM(CAST(cntry AS STRING)) WHEN 'DE' THEN 'Germany' WHEN 'US' THEN 'United States' WHEN 'USA' THEN 'United States'
            WHEN '' THEN 'n/a' ELSE TRIM(CAST(cntry AS STRING)) END cntry,
       current_timestamp() dwh_create_ts
FROM bronze.erp_loc_a101'''); preview("silver.erp_loc_a101")

### silver.erp_px_cat_g1v2

In [None]:
spark.sql("DROP TABLE IF EXISTS silver.erp_px_cat_g1v2")
spark.sql("CREATE TABLE silver.erp_px_cat_g1v2 AS SELECT CAST(id AS STRING) id, CAST(cat AS STRING) cat, CAST(subcat AS STRING) subcat, CAST(maintenance AS STRING) maintenance, current_timestamp() dwh_create_ts FROM bronze.erp_px_cat_g1v2"); preview("silver.erp_px_cat_g1v2")