##Read from bronze

In [0]:
%sql
drop table if exists workspace.silver.crm_production

In [0]:
##Initilaization

import pyspark.sql.functions as F
from pyspark.sql.types import StringType, DateType
from pyspark.sql.functions import col, trim
from pyspark.sql.window import Window

In [0]:
df = spark.table("workspace.bronze.crm_prd_info")

In [0]:
df.display()

prd_id,prd_key,prd_nm,prd_cost,prd_line,prd_start_dt,prd_end_dt
210,CO-RF-FR-R92B-58,HL Road Frame - Black- 58,,R,2003-07-01,
211,CO-RF-FR-R92R-58,HL Road Frame - Red- 58,,R,2003-07-01,
212,AC-HE-HL-U509-R,Sport-100 Helmet- Red,12.0,S,2011-07-01,2007-12-28
213,AC-HE-HL-U509-R,Sport-100 Helmet- Red,14.0,S,2012-07-01,2008-12-27
214,AC-HE-HL-U509-R,Sport-100 Helmet- Red,13.0,S,2013-07-01,
215,AC-HE-HL-U509,Sport-100 Helmet- Black,12.0,S,2011-07-01,2007-12-28
216,AC-HE-HL-U509,Sport-100 Helmet- Black,14.0,S,2012-07-01,2008-12-27
217,AC-HE-HL-U509,Sport-100 Helmet- Black,13.0,S,2013-07-01,
218,CL-SO-SO-B909-M,Mountain Bike Socks- M,3.0,M,2011-07-01,2007-12-28
219,CL-SO-SO-B909-L,Mountain Bike Socks- L,3.0,M,2011-07-01,2007-12-28


##Transform

###Triming

In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df = df.withColumn(field.name, trim(col(field.name)))

###Product line Normalization


In [0]:
df = (
    df
    # create a column as a category_id based on the prd_key >> for joining in gold
    .withColumn(
        "category_id",
        F.regexp_replace(F.substring(F.col("prd_key"), 1, 5), "-", "_")
    )
    # normalization
    .withColumn(
        "prd_line",
        F.when(F.upper(col("prd_line")) == "M", "Mountain")
         .when(F.upper(col("prd_line")) == "R", "Road")
         .when(F.upper(col("prd_line")) == "S", "Other Sales")
         .when(F.upper(col("prd_line")) == "T", "Touring")
         .otherwise("n/a")
    )
)

In [0]:
df.select("prd_line").distinct().display()

prd_line
Road
Other Sales
Mountain
""
Touring


In [0]:
df.display()

prd_id,prd_key,prd_nm,prd_cost,prd_line,prd_start_dt,prd_end_dt,category_id
210,CO-RF-FR-R92B-58,HL Road Frame - Black- 58,,Road,2003-07-01,,CO_RF
211,CO-RF-FR-R92R-58,HL Road Frame - Red- 58,,Road,2003-07-01,,CO_RF
212,AC-HE-HL-U509-R,Sport-100 Helmet- Red,12.0,Other Sales,2011-07-01,2007-12-28,AC_HE
213,AC-HE-HL-U509-R,Sport-100 Helmet- Red,14.0,Other Sales,2012-07-01,2008-12-27,AC_HE
214,AC-HE-HL-U509-R,Sport-100 Helmet- Red,13.0,Other Sales,2013-07-01,,AC_HE
215,AC-HE-HL-U509,Sport-100 Helmet- Black,12.0,Other Sales,2011-07-01,2007-12-28,AC_HE
216,AC-HE-HL-U509,Sport-100 Helmet- Black,14.0,Other Sales,2012-07-01,2008-12-27,AC_HE
217,AC-HE-HL-U509,Sport-100 Helmet- Black,13.0,Other Sales,2013-07-01,,AC_HE
218,CL-SO-SO-B909-M,Mountain Bike Socks- M,3.0,Mountain,2011-07-01,2007-12-28,CL_SO
219,CL-SO-SO-B909-L,Mountain Bike Socks- L,3.0,Mountain,2011-07-01,2007-12-28,CL_SO


###Cleanup product cost

In [0]:
df = df.withColumn("prd_cost", F.coalesce(col("prd_cost"), F.lit(0)))

In [0]:
df.display()


prd_id,prd_key,prd_nm,prd_cost,prd_line,prd_start_dt,prd_end_dt,category_id
210,CO-RF-FR-R92B-58,HL Road Frame - Black- 58,0,Road,2003-07-01,,CO_RF
211,CO-RF-FR-R92R-58,HL Road Frame - Red- 58,0,Road,2003-07-01,,CO_RF
212,AC-HE-HL-U509-R,Sport-100 Helmet- Red,12,Other Sales,2011-07-01,2007-12-28,AC_HE
213,AC-HE-HL-U509-R,Sport-100 Helmet- Red,14,Other Sales,2012-07-01,2008-12-27,AC_HE
214,AC-HE-HL-U509-R,Sport-100 Helmet- Red,13,Other Sales,2013-07-01,,AC_HE
215,AC-HE-HL-U509,Sport-100 Helmet- Black,12,Other Sales,2011-07-01,2007-12-28,AC_HE
216,AC-HE-HL-U509,Sport-100 Helmet- Black,14,Other Sales,2012-07-01,2008-12-27,AC_HE
217,AC-HE-HL-U509,Sport-100 Helmet- Black,13,Other Sales,2013-07-01,,AC_HE
218,CL-SO-SO-B909-M,Mountain Bike Socks- M,3,Mountain,2011-07-01,2007-12-28,CL_SO
219,CL-SO-SO-B909-L,Mountain Bike Socks- L,3,Mountain,2011-07-01,2007-12-28,CL_SO


###Renaming Columns

In [0]:

RENAME_MAP = {
    "prd_id": "product_id",
    "prd_key": "product_number",
    "prd_nm": "product_name",
    "prd_cost": "product_cost",
    "prd_line": "product_line",
    "prd_start_dt": "start_date",
    "prd_end_dt": "end_date"
}
for old,new in RENAME_MAP.items():
    df = df.withColumnRenamed(old,new)

In [0]:
# redefine product_number >> for joining in gold
substring_col = F.substring(F.col("product_number"), 7, F.length(F.col("product_number")) - 6)
df = df.withColumn("product_number", F.regexp_replace(substring_col, "-", "_"))
      


In [0]:
df.display()

product_id,product_number,product_name,product_cost,product_line,start_date,end_date,category_id
210,FR_R92B_58,HL Road Frame - Black- 58,0,Road,2003-07-01,,CO_RF
211,FR_R92R_58,HL Road Frame - Red- 58,0,Road,2003-07-01,,CO_RF
212,HL_U509_R,Sport-100 Helmet- Red,12,Other Sales,2011-07-01,2007-12-28,AC_HE
213,HL_U509_R,Sport-100 Helmet- Red,14,Other Sales,2012-07-01,2008-12-27,AC_HE
214,HL_U509_R,Sport-100 Helmet- Red,13,Other Sales,2013-07-01,,AC_HE
215,HL_U509,Sport-100 Helmet- Black,12,Other Sales,2011-07-01,2007-12-28,AC_HE
216,HL_U509,Sport-100 Helmet- Black,14,Other Sales,2012-07-01,2008-12-27,AC_HE
217,HL_U509,Sport-100 Helmet- Black,13,Other Sales,2013-07-01,,AC_HE
218,SO_B909_M,Mountain Bike Socks- M,3,Mountain,2011-07-01,2007-12-28,CL_SO
219,SO_B909_L,Mountain Bike Socks- L,3,Mountain,2011-07-01,2007-12-28,CL_SO


##Write into silver

In [0]:
df.write.mode("overwrite").saveAsTable("workspace.silver.crm_production")