In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

Read from Bronze table crm_prd_info


In [0]:
df = spark.table('workspace.bronze.crm_prd_info')
df.show()


In [0]:
%sql
Describe workspace.bronze.crm_prd_info

Trim the initial white spaces from the columns, let's use for loop to do it for all the columns from the table


In [0]:

for field in df.schema.fields:
  if isinstance(field.dataType, StringType):
      df = df.withColumn(field.name, trim(col(field.name)))
    
df.show()

Parsing the Product Key

In [0]:
# following code changes the dataframe to have a column called "cat_id" with the first 5 characters of "prd_key" and replaces the "-" with "_"

# Do not run this code, it is just an example of how to change the dataframe 

#df = df.withColumn("cat_id", F.regexp_replace(F.substring(col("prd_key"), 1, 5), "-", "_"))
#df = df.withColumn("prd_key", F.substring(col("prd_key"), 7, F.length(col("prd_key"))))

df.show()


In [0]:
# Cost cleanup replace Null with 0
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))
df.show()

In [0]:
# Product line Normalization


df = (
    df
    # Normalize product line
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)


In [0]:
# Rename columns
Rename_map = {


    "prd_id": "product_id",
    "cat_id": "category_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "product_start_date",
    "prd_end_dt": "product_end_date"
}
for old_name, new_name in Rename_map.items():
    df = df.withColumnRenamed(old_name, new_name)
    df.show(5)

Write to Silver layer

In [0]:
df.write.mode("overwrite").format("delta").saveAsTable("workspace.silver.crm_products")

In [0]:
# sanity check
result = spark.sql("select * from workspace.silver.crm_products limit 10")
display(result)