# 02 — Build Silver Layer

In [None]:
import os
from pyspark.sql import SparkSession, functions as F, Window
spark = SparkSession.builder.appName("silver-dataproc").getOrCreate()
GCS_BASE  = os.getenv("GCS_BASE", "/user/nb")
print("GCS_BASE:", GCS_BASE)

### Table: `silver.crm_cust_info`

**Changes & Why**
- Trim names; standardize marital status & gender.
- Deduplicate by latest `cst_create_date` (SCD‑1).
- Add audit timestamp.

**Columns (post‑transform):**

| Column | Type | Nullable | Description |
|---|---|---|---|
| `cst_id` | `int` | Yes | Natural key; dedup by latest create_date |
| `cst_key` | `string` | Yes | Business key to map ERP |
| `cst_firstname` | `string` | Yes | Trimmed |
| `cst_lastname` | `string` | Yes | Trimmed |
| `cst_marital_status` | `string` | Yes | Standardized: Single/Married/n/a |
| `cst_gndr` | `string` | Yes | Standardized: Male/Female/n/a |
| `cst_create_date` | `date` | Yes | Original create date |
| `dwh_create_date` | `timestamp` | Yes | Audit timestamp |

In [None]:
from pyspark.sql import functions as F, Window

# Load
b = spark.read.parquet(f"{GCS_BASE}/bronze/crm_cust_info")

# Normalize IDs/names first and remove null/blank IDs
clean = (
    b.withColumn("cst_id", F.trim(F.col("cst_id")).cast("string"))
     .withColumn("cst_firstname", F.trim(F.col("cst_firstname")))
     .withColumn("cst_lastname",  F.trim(F.col("cst_lastname")))
     .filter(F.col("cst_id").isNotNull() & (F.col("cst_id") != ""))   # <-- drop NULL/blank IDs
)

# Dedupe by most recent create_date per cst_id
w = Window.partitionBy("cst_id").orderBy(F.col("cst_create_date").desc_nulls_last())
dedup = clean.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")

# Standardize attributes
out = (
    dedup
    .withColumn(
        "cst_marital_status",
        F.when(F.upper(F.trim(F.col("cst_marital_status"))).isin("S", "SINGLE"), "Single")
         .when(F.upper(F.trim(F.col("cst_marital_status"))).isin("M", "MARRIED"), "Married")
         .otherwise(F.lit("n/a"))
    )
    .withColumn(
        "cst_gndr",
        F.when(F.upper(F.trim(F.col("cst_gndr"))).isin("F", "FEMALE"), "Female")
         .when(F.upper(F.trim(F.col("cst_gndr"))).isin("M", "MALE"), "Male")
         .otherwise(F.lit("n/a"))
    )
    .withColumn("dwh_create_date", F.current_timestamp())
)

# Write
out_path = f"{GCS_BASE}/silver/crm_cust_info"
out.write.mode("overwrite").parquet(out_path)

# Quick check
spark.read.parquet(out_path).createOrReplaceTempView("silver_crm_cust_info")
print("Rows:", spark.read.parquet(out_path).count())
spark.sql("SELECT * FROM silver_crm_cust_info LIMIT 10").show(truncate=False)


### Table: `silver.crm_prd_info`

**Changes & Why**
- Derive `cat_id`; decode `prd_line`.
- Effective dating via LEAD-1.
- Add audit timestamp.

**Columns (post‑transform):**

| Column | Type | Nullable | Description |
|---|---|---|---|
| `prd_id` | `int` | Yes | Natural product id |
| `cat_id` | `string` | Yes | Derived from prd_key |
| `prd_key` | `string` | Yes | Clean business key |
| `prd_nm` | `string` | Yes | Product name |
| `prd_cost` | `int` | Yes | Unit cost |
| `prd_line` | `string` | Yes | Decoded label |
| `prd_start_dt` | `date` | Yes | Start date |
| `prd_end_dt` | `date` | Yes | Effective end date via LEAD-1 |
| `dwh_create_date` | `timestamp` | Yes | Audit timestamp |

In [None]:
b = spark.read.parquet(f"{GCS_BASE}/bronze/crm_prd_info")
tmp = (b.withColumn("cat_id", F.regexp_replace(F.substring("prd_key",1,5), "-", "_"))
         .withColumn("prd_key_clean", F.substring("prd_key",7,100)))
w = Window.partitionBy("prd_key_clean").orderBy(F.col("prd_start_dt").asc_nulls_last())
out = (tmp
    .withColumn("prd_line", F.when(F.upper(F.trim("prd_line"))=="M","Mountain")
                             .when(F.upper(F.trim("prd_line"))=="R","Road")
                             .when(F.upper(F.trim("prd_line"))=="S","Other Sales")
                             .when(F.upper(F.trim("prd_line"))=="T","Touring")
                             .otherwise(F.lit("n/a")))
    .withColumn("prd_start_dt", F.to_date("prd_start_dt"))
    .withColumn("prd_end_dt", F.to_date(F.lead("prd_start_dt").over(w) - F.expr("INTERVAL 1 DAY")))
    .withColumn("prd_key", F.col("prd_key_clean")).drop("prd_key_clean")
    .withColumn("dwh_create_date", F.current_timestamp())
)
out_path = f"{GCS_BASE}/silver/crm_prd_info"
out.write.mode("overwrite").parquet(out_path)
spark.read.parquet(out_path).createOrReplaceTempView("silver_crm_prd_info")
print("Rows:", spark.read.parquet(out_path).count())
spark.sql("SELECT * FROM silver_crm_prd_info LIMIT 10").show(truncate=False)

### Table: `silver.crm_sales_details`

**Changes & Why**
- Parse dates; recompute sales if inconsistent; backfill price; add audit timestamp.

**Columns (post‑transform):**

| Column | Type | Nullable | Description |
|---|---|---|---|
| `sls_ord_num` | `string` | Yes | Order number |
| `sls_prd_key` | `string` | Yes | Product key |
| `sls_cust_id` | `int` | Yes | Customer id |
| `sls_order_dt` | `date` | Yes | Parsed from YYYYMMDD |
| `sls_ship_dt` | `date` | Yes | Parsed from YYYYMMDD |
| `sls_due_dt` | `date` | Yes | Parsed from YYYYMMDD |
| `sls_sales` | `int` | Yes | Recomputed if inconsistent |
| `sls_quantity` | `int` | Yes | Units |
| `sls_price` | `int` | Yes | Backfilled if missing |
| `dwh_create_date` | `timestamp` | Yes | Audit timestamp |

In [None]:
from pyspark.sql import functions as F

b = spark.read.parquet(f"{GCS_BASE}/bronze/crm_sales_details")

def parse_yyyymmdd(col):
    # Input can be int or string; cast to string before parsing
    return F.to_date(F.col(col).cast("string"), "yyyyMMdd")

# Normalize numeric columns to doubles once to avoid type issues
qty   = F.col("sls_quantity").cast("double")
price = F.col("sls_price").cast("double")
sales = F.col("sls_sales").cast("double")

# Useful helpers
len_is_8 = lambda c: F.length(F.col(c).cast("string")) == 8
is_zero  = lambda c: F.col(c) == F.lit(0)

out = (
    b
    # Clean dates: set to null if 0 or malformed, else parse yyyyMMdd
    .withColumn(
        "sls_order_dt",
        F.when(is_zero("sls_order_dt") | (~len_is_8("sls_order_dt")), F.lit(None))
         .otherwise(parse_yyyymmdd("sls_order_dt"))
    )
    .withColumn(
        "sls_ship_dt",
        F.when(is_zero("sls_ship_dt") | (~len_is_8("sls_ship_dt")), F.lit(None))
         .otherwise(parse_yyyymmdd("sls_ship_dt"))
    )
    .withColumn(
        "sls_due_dt",
        F.when(is_zero("sls_due_dt") | (~len_is_8("sls_due_dt")), F.lit(None))
         .otherwise(parse_yyyymmdd("sls_due_dt"))
    )

    # Recompute sales if missing/invalid
    .withColumn("sls_sales_fix", qty * F.abs(price))
    .withColumn(
        "sls_sales",
        F.when(sales.isNull() | (sales <= 0) | (sales != F.col("sls_sales_fix")),
               F.col("sls_sales_fix")
        ).otherwise(sales)
    )

    # Recompute price if missing/invalid: sales / qty (qty==0 -> null)
    .withColumn(
        "sls_price",
        F.when(price.isNull() | (price <= 0),
               (F.col("sls_sales") / F.nullif(qty, F.lit(0.0))).cast("double")
        ).otherwise(price)
    )

    .drop("sls_sales_fix")
    .withColumn("dwh_create_date", F.current_timestamp())
)

out_path = f"{GCS_BASE}/silver/crm_sales_details"
out.write.mode("overwrite").parquet(out_path)

spark.read.parquet(out_path).createOrReplaceTempView("silver_crm_sales_details")
print("Rows:", spark.read.parquet(out_path).count())
spark.sql("SELECT * FROM silver_crm_sales_details LIMIT 10").show(truncate=False)


### Table: `silver.erp_cust_az12`

**Changes & Why**
- Strip `NAS` in `cid`; future `bdate` → NULL; normalize gender; add audit timestamp.

**Columns (post‑transform):**

| Column | Type | Nullable | Description |
|---|---|---|---|
| `cid` | `string` | Yes | NAS prefix stripped where present |
| `bdate` | `date` | Yes | Future dates set to NULL |
| `gen` | `string` | Yes | Normalized gender |
| `dwh_create_date` | `timestamp` | Yes | Audit timestamp |

In [None]:
from pyspark.sql import functions as F
b = spark.read.parquet(f"{GCS_BASE}/bronze/erp_cust_az12")
out = (b
    .withColumn("cid", F.when(F.col("cid").startswith("NAS"), F.expr("substring(cid,4)")).otherwise(F.col("cid")))
    .withColumn("bdate", F.when(F.col("bdate") > F.current_date(), F.lit(None)).otherwise(F.col("bdate")))
    .withColumn("gen", F.when(F.upper(F.trim("gen")).isin("F","FEMALE"), F.lit("Female"))
                        .when(F.upper(F.trim("gen")).isin("M","MALE"), F.lit("Male"))
                        .otherwise(F.lit("n/a")))
    .withColumn("dwh_create_date", F.current_timestamp())
)
out_path = f"{GCS_BASE}/silver/erp_cust_az12"
out.write.mode("overwrite").parquet(out_path)
spark.read.parquet(out_path).createOrReplaceTempView("silver_erp_cust_az12")
print("Rows:", spark.read.parquet(out_path).count())
spark.sql("SELECT * FROM silver_erp_cust_az12 LIMIT 10").show(truncate=False)

### Table: `silver.erp_loc_a101`

**Changes & Why**
- Remove hyphens in `cid`; map country labels; add audit timestamp.

**Columns (post‑transform):**

| Column | Type | Nullable | Description |
|---|---|---|---|
| `cid` | `string` | Yes | Hyphens removed |
| `cntry` | `string` | Yes | Mapped labels (DE/US) or n/a |
| `dwh_create_date` | `timestamp` | Yes | Audit timestamp |

In [None]:
from pyspark.sql import functions as F
b = spark.read.parquet(f"{GCS_BASE}/bronze/erp_loc_a101")
out = (b
    .withColumn("cid", F.regexp_replace("cid","-",""))
    .withColumn("cntry",
        F.when(F.trim("cntry")=="DE","Germany")
         .when(F.trim("cntry").isin("US","USA"),"United States")
         .when((F.trim("cntry")=="" ) | F.col("cntry").isNull(), "n/a")
         .otherwise(F.trim("cntry"))
    )
    .withColumn("dwh_create_date", F.current_timestamp())
)
out_path = f"{GCS_BASE}/silver/erp_loc_a101"
out.write.mode("overwrite").parquet(out_path)
spark.read.parquet(out_path).createOrReplaceTempView("silver_erp_loc_a101")
print("Rows:", spark.read.parquet(out_path).count())
spark.sql("SELECT * FROM silver_erp_loc_a101 LIMIT 10").show(truncate=False)

### Table: `silver.erp_px_cat_g1v2`

**Changes & Why**
- Carry over mapping; add audit timestamp.

**Columns (post‑transform):**

| Column | Type | Nullable | Description |
|---|---|---|---|
| `id` | `string` | Yes | Category id |
| `cat` | `string` | Yes | Category |
| `subcat` | `string` | Yes | Subcategory |
| `maintenance` | `string` | Yes | Maintenance attribute |
| `dwh_create_date` | `timestamp` | Yes | Audit timestamp |

In [None]:
from pyspark.sql import functions as F
b = spark.read.parquet(f"{GCS_BASE}/bronze/erp_px_cat_g1v2")
out = b.withColumn("dwh_create_date", F.current_timestamp())
out_path = f"{GCS_BASE}/silver/erp_px_cat_g1v2"
out.write.mode("overwrite").parquet(out_path)
spark.read.parquet(out_path).createOrReplaceTempView("silver_erp_px_cat_g1v2")
print("Rows:", spark.read.parquet(out_path).count())
spark.sql("SELECT * FROM silver_erp_px_cat_g1v2 LIMIT 10").show(truncate=False)

## Summary
- Wrote Silver tables as Parquet and registered temporary views.