# INITIALIZATION

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim, length
from pyspark.sql.window import Window

In [0]:
# Map the raw CRM column names for cleaner Silver Column names
RENAME_MAP = {
    "prd_id": "product_id",
    "prd_key": "product_key",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date",
    "cat_id": "category_id"
}

# READ FROM BRONZE TABLE


In [0]:
# Create a DataFrame that reads from the Bronze table
df= spark.table("workspace.bronze.crm_prd_info_raw")

# Silver Data Transformations

## Trimming

In [0]:
# For trimming the whitespace on all the string columents which helps to prevent filter and join issues from messy text
for field in df.schema.fields:
  if isinstance(field.dataType, StringType):
    df = df.withColumn(field.name, trim(col(field.name)))

## Derivations and Normalization

In [0]:
# Derive fields and nroomalize codes with basic cleanup
df = (
    df
    # Extract category id from product key
    .withColumn(
        "cat_id",
        F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_")
    )

    # Extract product key part
    .withColumn(
        "prd_key",
        F.substring(col("prd_key"), 7, F.length(col("prd_key")))
    )

    # Replace null cost with 0
    .withColumn(
        "prd_cost",
        F.coalesce(col("prd_cost"), F.lit(0))
    )

    # Normalize product line to more readable labels
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )

    # Cast start date into a real data type
    .withColumn(
        "prd_start_dt",
        col("prd_start_dt").cast(DateType())
    )
)

## Fixing End Date

In [0]:
# Partiton by product key and ober by start date
window_spec = Window.partitionBy("prd_key").orderBy("prd_start_dt")
# Set the end date to next start date - 1 day)
df = df.withColumn("prd_end_dt", F.date_sub(F.lead("prd_start_dt").over(window_spec), 1))

## Renaming

In [0]:
# Rename the columns using the mapping from raw to standardized
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

# Writing the Silver Table

In [0]:
# We write a silver table that is cleaned + standardized
(
    df.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("silver.crm_products")
)

In [0]:
%sql
-- Quick Check
SELECT * FROM silver.crm_products