## Initialization

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

## Read Bronze table

In [0]:
df = spark.table("workspace.bronze.crm_prd_info")

In [0]:
df.limit(10).display()

In [0]:
## get the number of nulls for each column
from pyspark.sql.functions import col, count, when, isnan
# Check only for isNull() if the column is a Date/String
# Check for both isNull() and isnan() only if it's a Float/Double
null_counts = df.select([
    count(
        when(
            col(c).isNull() | 
            (isnan(col(c)) if df.schema[c].dataType.typeName() in ['double', 'float'] else False), 
            c
        )
    ).alias(c) 
    for c in df.columns
])
null_counts.show()

## Silver Transformation
### Trimming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

## Product Key Parsing

In [0]:
df.columns

In [0]:
## substring prod key from 1 to 5 and replace - by _
df = df.withColumn("cat_id", F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_"))
## substring prod key from 7 to end and replace this value by the prd_key
df = df.withColumn("prd_key", F.substring(col("prd_key"), 7, F.length(col("prd_key"))))
df.display()

## Cost Cleanup

In [0]:
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

## Product Line Normalization

In [0]:
df = (
    df
    # Normalize product line
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)

## Date Casting

In [0]:
df = df.withColumn("prd_start_dt", col("prd_start_dt").cast(DateType()))

## Renaming Columns

In [0]:
RENAME_MAP = {
    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}
for old_name, new_name in RENAME_MAP.items():
    df = df.withColumnRenamed(old_name, new_name)

## Sanity Checks

In [0]:
df.limit(10).display()

## Writing Silver Table

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")

## Sanity checks of silver table

In [0]:
%sql
SELECT * FROM workspace.silver.crm_products LIMIT 10