#Initialization

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

In [0]:
RENAME_MAP = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_key",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}

# Read Bronze table

In [0]:
df = spark.table("workspace.bronze.crm_prd_info")

#Silver Transformations

## Trimming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

## Derivations and Normalization

In [0]:
df = (
    df
    # Extract category id from product key
    .withColumn(
        "cat_id",
        F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_")
    )

    # Extract product key part
    .withColumn(
        "prd_key",
        F.substring(col("prd_key"), 7, F.length(col("prd_key")))
    )

    # Replace null cost with 0
    .withColumn(
        "prd_cost",
        F.coalesce(col("prd_cost"), F.lit(0))
    )

    # Normalize product line
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )

    # Cast start date
    .withColumn(
        "prd_start_dt",
        col("prd_start_dt").cast(DateType())
    )
)

## Fixing end date

In [0]:
window_spec = Window.partitionBy("prd_key").orderBy("prd_start_dt")
df = df.withColumn("prd_end_dt", F.date_sub(F.lead("prd_start_dt").over(window_spec), 1))


## Renaming

In [0]:
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

# Writing Silver Table

In [0]:
(
    df.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("silver.crm_products")
)

In [0]:
%sql
select * from silver.crm_products