In [0]:
%sql
Drop schema if exists etl.silver cascade;
create schema etl.silver

In [0]:
# Drop existing table if exists
from pyspark.sql.functions import trim, col, when, upper, row_number
from pyspark.sql import Window


spark.sql("DROP TABLE IF EXISTS etl.silver.crm_cust_info") 

df_crm_cust_info = (spark.read.table("etl.bronze.crm_cust_info"))
                                       
w = Window.partitionBy(df_crm_cust_info.cst_id).orderBy(df_crm_cust_info.cst_create_date.desc())
                       
df_crm_cust_info = (df_crm_cust_info
                    .withColumn("flag_last", row_number().over (w)))

df = (df_crm_cust_info
      .withColumn("cst_firstname", trim("cst_firstname"))
      .withColumn("cst_lastname", trim("cst_lastname"))
      .withColumn("cst_marital_status", 
                  when(upper(trim(col("cst_marital_status"))) == "S", "Single")
                 .when(upper(trim("cst_marital_status")) == "M", "Married")
                 .otherwise("n/a"))
      .withColumn("cst_gndr", 
                  when(upper(trim(col("cst_gndr"))) == "F", "Female")
                  .when(upper(trim(col("cst_gndr"))) == "M", "Male")
                  .otherwise("n/a"))
      .select("cst_id", "cst_key", "cst_firstname", "cst_lastname", "cst_marital_status", "cst_gndr", "cst_create_date")
      .where(col("flag_last") == 1)
    )

df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.crm_cust_info")


In [0]:
from pyspark.sql.functions import substring, regexp_replace, length, isnull, when, upper, trim, to_date, lead
from pyspark.sql import Window

spark.sql("DROP TABLE IF EXISTS etl.silver.crm_prd_info")

df_crm_prd_info = (spark.read.table("etl.bronze.crm_prd_info"))

df = (df_crm_prd_info
      .withColumn("cat_id", regexp_replace(substring("prd_key", 1, 5), "-", "_"))
      .withColumn("prd_key", substring("prd_key", 7, length("prd_key")))
      .withColumn("prd_cost", 
                  when(col("prd_cost").isNull(), 0)
                  .otherwise(col("prd_cost")))
      .withColumn("prd_line", 
                  when(upper(trim(col("prd_line"))) == "M", "Mountain")
                  .when(upper(trim(col("prd_line"))) == "R", "Road")
                  .when(upper(trim(col("prd_line"))) == "S", "Other Sales")
                  .when(upper(trim(col("prd_line"))) == "T", "Touring")
                  .otherwise("n/a"))
      .withColumn("prd_start_dt", to_date("prd_start_dt", "yyyy-MM-dd"))
      .withColumn("prd_end_dt", to_date(lead("prd_start_dt").over(Window.partitionBy("prd_key").orderBy("prd_start_dt")) - 1, "yyyy-MM-dd"))

      .select("prd_id", "cat_id", "prd_key", "prd_nm", "prd_cost", "prd_line", "prd_start_dt", "prd_end_dt"))

df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.crm_prd_info")

In [0]:
from pyspark.sql.functions import length, to_date, isnull, abs

spark.sql("DROP TABLE IF EXISTS etl.silver.crm_sales_details")

df_crm_sales_details = (spark.read.table("etl.bronze.crm_sales_details"))

df = (df_crm_sales_details
      .withColumn("sls_order_dt", 
                  when((col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8), None)
                  .otherwise(to_date(col("sls_order_dt").cast("string"), "yyyy-MM-dd")))
      .withColumn("sls_ship_dt", 
                  when((col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8), None)
                  .otherwise(to_date(col("sls_ship_dt").cast("string"), "yyyy-MM-dd")))
      .withColumn("sls_due_dt",
                  when((col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8), None)
                  .otherwise(to_date(col("sls_due_dt").cast("string"), "yyyy-MM-dd")))
      .withColumn("sls_sales",
                  when((col("sls_sales").isNull()) | (col("sls_sales") <= 0) | (col("sls_sales") != (col("sls_quantity") * abs(col("sls_price")))), (col("sls_quantity") * abs(col("sls_price")))))
      .withColumn("sls_price",
                  when((col("sls_price").isNull()) | (col("sls_price") <= 0 ), col("sls_sales") / col("sls_quantity")))
      
      .select("sls_ord_num", "sls_prd_key", "sls_cust_id", "sls_order_dt", "sls_ship_dt", "sls_due_dt", "sls_sales", "sls_quantity", "sls_price"))

df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.crm_sales_details")

In [0]:
from pyspark.sql.functions import current_date, upper, trim

spark.sql("DROP TABLE IF EXISTS etl.silver.erp_cust_az12")

df_erp_cust_az12 = (spark.read.table("etl.bronze.erp_cust_az12"))

df = (df_erp_cust_az12
      .withColumn("cid", 
                  when(col("cid").like('NAS%'), substring(col("cid"), 4, length(col("cid"))))
                  .otherwise(col("cid")))
      .withColumn("bdate",
                  when(col("bdate") > current_date(), None)
                  .otherwise(col("bdate")))
      .withColumn("gen", 
                  when(upper(trim(col("gen"))).isin('F', 'FEMALE'), "Female")
                  .when(upper(trim(col("gen"))).isin('M', 'MALE'), "Male")
                  .otherwise("n/a"))
      .select("cid", "bdate", "gen"))

df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.erp_cust_az12")

In [0]:
from pyspark.sql.functions import regexp_replace, trim, isnull, when, col

spark.sql("DROP TABLE IF EXISTS etl.silver.erp_loc_a101")

df_erp_loc_a101 = (spark.read.table("etl.bronze.erp_loc_a101"))

df =(df_erp_loc_a101
     .withColumn("cid", regexp_replace("cid", '-', ''))
     .withColumn("cntry", 
                 when(trim(col("cntry")) == "DE", "Germany")
                 .when(trim(col("cntry")).isin("US", "USA"), "United States")
                 .when((trim(col("cntry")) == "") | (col("cntry").isNull()), "n/a")
                 .otherwise(trim(col("cntry")))
                 )
     .select("cid", "cntry")
     )

df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.erp_loc_a101")

In [0]:
spark.sql("DROP TABLE IF EXISTS etl.silver.erp_px_cat_g1v2")

df_erp_px_cat_g1v2 = (spark.read.table("etl.bronze.erp_px_cat_g1v2"))

df = (df_erp_px_cat_g1v2
      .select("id", "cat", "subcat", "maintenance"))

df.write.mode("overwrite").format("delta").saveAsTable("etl.silver.erp_px_cat_g1v2")