# Init

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Read from bronze

In [0]:
df = spark.table("workspace.bronze.crm_sales_details")
display(df)

# Trimming


In [0]:
for field in df.schema.fields:
    if isinstance(field.dataType, StringType):
        df1 = df.withColumn(field.name, trim(col(field.name)))

display(df1)

# Date handling

In [0]:

df2 = (df1
        .withColumn("sls_order_dt",
                    when((length(col("sls_order_dt"))!=8), None)
                    .otherwise(to_date(col("sls_order_dt").cast("string"), "yyyyMMdd"))
                   )         
        .withColumn("sls_ship_dt",
                    when((length(col("sls_ship_dt"))!=8), None)
                    .otherwise(to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd"))
                    )
        .withColumn("sls_due_dt",
                    when((length(col("sls_due_dt"))!=8), None)
                    .otherwise(to_date(col("sls_due_dt").cast("string"), "yyyyMMdd"))
                    )
)

display(df2)

# Price corrections

In [0]:
df3 = (
    df2.withColumn(
        "sls_sales", when(((col("sls_sales")<0) | (col("sls_sales").isNull())), lit(0)).otherwise(col("sls_sales"))
        )
    .withColumn(
        "sls_quantity", when(((col("sls_quantity")<0) | (col("sls_quantity").isNull())), lit(0)).otherwise(col("sls_quantity"))
        )
    .withColumn(
        "sls_price", when(((col("sls_price")<0) | (col("sls_price").isNull())), lit(0)).otherwise(col("sls_price"))
        )
)

df3 = (
    df3.withColumn(
        "sls_sales", when((col("sls_price")>0)& (col("sls_quantity")>0), col("sls_price")*col("sls_quantity")).otherwise(lit(0))
        )
    .withColumn(
        "sls_price", when((col("sls_sales")>0) & (col("sls_quantity")>0), col("sls_sales")/col("sls_quantity")).otherwise(lit(0))
        )
    .withColumn(
        "sls_quantity", when((col("sls_sales")>0) & (col("sls_price")>0), col("sls_sales")/col("sls_price")).otherwise(lit(0))
        )
)

display(df3)

# display(df2.filter(col("sls_price")<0))

# Renaming Columns

In [0]:
rename_map = {
    "sls_ord_num": "order_id",
    "sls_prd_key": "product_id",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
}

df4 = df3

for old_name, new_name in rename_map.items():
    df4 = df4.withColumnRenamed(old_name, new_name)

display(df4)

# Writing to Silver Table

In [0]:
df4.write.mode("overwrite").saveAsTable("workspace.silver.crm_sales")

# Sanity check for silver table

In [0]:
%sql
select * from workspace.silver.crm_sales limit 10