In [0]:
from silver.Transform_Functions.Trim import func_trim_col
from pyspark.sql import functions as F
from pyspark.sql.functions import trim,col,length

# READING FROM BRONZE TABLE

In [0]:
df = spark.table("workspace.bronze.crm_sales_details_raw")

In [0]:
df.printSchema()

In [0]:
df.filter(col("sls_quantity") > 1).display()

# Data Transformation

## Trim whitespaces

In [0]:
df = func_trim_col(df)

## Date columns - Data type Change from Integer to Date 

In [0]:
df = df.withColumn(
    "sls_order_dt", 
    F.when(
        (col("sls_order_dt") == 0) | (length(col("sls_order_dt")) != 8),
        None
    ).otherwise(F.to_date(col("sls_order_dt").cast("string"), "yyyyMMdd"))
)

df = df.withColumn(
    "sls_ship_dt", 
    F.when(
        (col("sls_ship_dt") == 0) | (length(col("sls_ship_dt")) != 8),
        None
    ).otherwise(F.to_date(col("sls_ship_dt").cast("string"), "yyyyMMdd"))
)

df = df.withColumn(
    "sls_due_dt", 
    F.when(
        (col("sls_due_dt") == 0) | (length(col("sls_due_dt")) != 8),
        None
    ).otherwise(F.to_date(col("sls_due_dt").cast("string"), "yyyyMMdd"))
)

In [0]:
df.printSchema()

## Correct Price column values

In [0]:
df = (
    df
    .withColumn(
        "sls_price",
        F.when(
            (col("sls_price").isNull()) | (col("sls_price") <= 0),
            F.when(
                col("sls_quantity") != 0,
                col("sls_sales") / col("sls_quantity")
            ).otherwise(None)
        ).otherwise(col("sls_price"))
    )
)

## Renaming the columns

In [0]:
sales_table_header = {
    "sls_ord_num": "order_number",
    "sls_prd_key": "product_key",
    "sls_cust_id": "customer_id",
    "sls_order_dt": "order_date",
    "sls_ship_dt": "ship_date",
    "sls_due_dt": "due_date",
    "sls_sales": "sales_amount",
    "sls_quantity": "quantity",
    "sls_price": "price"
    }


In [0]:
for old_header, new_header in sales_table_header.items():
    df = df.withColumnRenamed(old_header, new_header)

In [0]:
df.display()

# Write into Silver Schema

In [0]:
(
    df.write.mode("overwrite").format("delta").saveAsTable("silver.crm_sales")
)


In [0]:
%sql
SELECT * FROM silver.crm_sales
LIMIT 5;