In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, udf, upper, to_date
from datetime import datetime
from pyspark.sql.types import StringType

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 13, Finished, Available, Finished)

In [12]:
# Create Spark session if not already created
spark = SparkSession.builder.getOrCreate()

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 14, Finished, Available, Finished)

In [13]:
df = spark.read.table("sales_orders_bronze")

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 15, Finished, Available, Finished)

In [14]:
# Impute missing values
df_imputed = df.withColumn(
    "LastModifiedDate", coalesce(col("LastModifiedDate"), col("CreatedDate"))
)

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 16, Finished, Available, Finished)

In [15]:
# UDF to convert dates in 'yyyy/MM/dd' format to 'yyyy-MM-dd'
def fix_date_format(date_str):
    if date_str is None:
        return None
    try:
        # If already correct format, return as is
        if "-" in date_str:
            datetime.strptime(date_str, "%Y-%m-%d")
            return date_str
        # If has slashes, convert it
        elif "/" in date_str:
            dt = datetime.strptime(date_str, "%Y/%m/%d")
            return dt.strftime("%Y-%m-%d")
    except ValueError:
        return None
    return date_str

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 17, Finished, Available, Finished)

In [16]:
# Register UDF
fix_date_udf = udf(fix_date_format, StringType())

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 18, Finished, Available, Finished)

In [17]:
# Apply date transformation
df_cleaned = df_imputed.withColumn("LastModifiedDate", fix_date_udf(col("LastModifiedDate")))

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 19, Finished, Available, Finished)

In [18]:
# Transform SKU column to uppercase
df_transformed = df_cleaned.withColumn(
    "SKU", upper(col("SKU"))
)

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 20, Finished, Available, Finished)

In [19]:
# Change data type of the createdDate column
df_changed_data_type = df_transformed.withColumn("LastModifiedDate", to_date("LastModifiedDate", "yyyy-MM-dd"))

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 21, Finished, Available, Finished)

In [20]:
# Write the cleaned data to a new table
df_changed_data_type.write.format("delta").mode("overwrite").save("Tables/sales_order_silver")

StatementMeta(, 9999fab3-0f81-4986-8295-d01a828ef8a0, 22, Finished, Available, Finished)