# Import

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col,trim
from pyspark.sql.types import StringType, DateType
from pyspark.sql.window import Window


#Renaming columns

In [0]:
df =spark.table("workspace.bronze.crm_prd_info") 

# Data Transformation 

## Trimming

In [0]:
for fields in df.schema.fields:
    if isinstance(fields.dataType, StringType):
        df = df.withColumn(fields.name,trim(col(fields.name)))


## Product Key parsing

In [0]:
df = df.withColumn("cat_id",F.regexp_replace(F.substring(col("prd_key"),1,5),"-","_"))

df = df.withColumn("prd_key",F.substring(col("prd_key"),7, F.length(col("prd_key"))))


## Product Cost cleanup

In [0]:
df = df.withColumn("prd_cost",F.coalesce(col("prd_cost"),F.lit(0)))

## Product Line Normalization

In [0]:
df =(

    df
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")).contains("M"), "Mountain")
        .when(F.upper(col("prd_line")).contains("R"), "Road")
        .when(F.upper(col("prd_line")).contains("S"), "Other Sales")
        .when(F.upper(col("prd_line")).contains("T"), "Touring")
        .otherwise("n/a")
    )
)

## Date Casting

In [0]:
df = df.withColumn("prd_start_dt",col("prd_start_dt").cast(DateType()))

df = df.withColumn("prd_end_dt",col("prd_end_dt").cast(DateType()))

# Renaming columns

In [0]:
RENAME_MAP = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_key",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}
for old_name,new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

# Write to Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")

In [0]:
%sql 
SELECT * FROM workspace.silver.crm_products LIMIT 10