# READ Bronze data - create data frame

In [1]:
df_orders_raw = spark.read.parquet("abfss://medallion@onelake.dfs.fabric.microsoft.com/medallion.Lakehouse/Files/bronze/orders_data.parquet")
df_returns_raw = spark.read.parquet("abfss://medallion@onelake.dfs.fabric.microsoft.com/medallion.Lakehouse/Files/bronze/returns_data.xlsx.parquet")
df_inventory_raw = spark.read.parquet("abfss://medallion@onelake.dfs.fabric.microsoft.com/medallion.Lakehouse/Files/bronze/inventory_data.parquet")



StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 3, Finished, Available, Finished)

In [4]:
display(df_orders_raw)

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, df41c8c5-3b3d-40f3-bb66-d86022632195)

# handle first row for inventory data frame files 

In [7]:
# Extract first row as header
first_row = df_returns_raw.first()
columns = [str(item).strip() for item in first_row]

# Remove the first row (header row now part of data)
df_returns_raw = df_returns_raw.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0]).toDF(columns)

# Show cleaned data
display(df_returns_raw)


StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7af2275a-e04d-494a-a1fc-70d4d99f424c)

In [2]:
display(df_inventory_raw)

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3faf40ef-59f4-4c82-8a4d-3fdb67368206)

# create bronze delta tables 

In [8]:
df_orders_raw.write.mode("overwrite").format("delta").saveAsTable("bronze_orders")
df_returns_raw.write.mode("overwrite").format("delta").saveAsTable("bronze_returns")
df_inventory_raw.write.mode("overwrite").format("delta").saveAsTable("bronze_inventory")

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 10, Finished, Available, Finished)

In [9]:
%%sql 
select * from bronze_inventory

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 11, Finished, Available, Finished)

<Spark SQL result set with 15 rows and 7 fields>

# clean the data - silver layer preparation 

## Cleaning Orders Data

In [12]:
display(df_orders_raw.limit(5))

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, efec62da-1afe-4e95-b47a-e2bc4dd04579)

In [15]:

from pyspark.sql.functions import *
from pyspark.sql.types import *

df_orders = (
    df_orders_raw

    # 2. Clean column names
    .withColumnRenamed("Order_ID", "OrderID")
    .withColumnRenamed("cust_id", "CustomerID")
    .withColumnRenamed("Product_Name", "ProductName")
    .withColumnRenamed("Qty", "Quantity")
    .withColumnRenamed("Order_Date", "OrderDate")
    .withColumnRenamed("Order_Amount$", "OrderAmount")
    .withColumnRenamed("Delivery_Status", "DeliveryStatus")
    .withColumnRenamed("Payment_Mode", "PaymentMode")
    .withColumnRenamed("Ship_Address", "ShipAddress")
    .withColumnRenamed("Promo_Code", "PromoCode")
    .withColumnRenamed("Feedback_Score", "FeedbackScore")

    # 3. Normalize Quantity: convert words like 'one', 'Two' to integer
    .withColumn("Quantity", 
        when(lower(col("Quantity")) == "one", 1)
        .when(lower(col("Quantity")) == "two", 2)
        .when(lower(col("Quantity")) == "three", 3)
        .otherwise(col("Quantity").cast(IntegerType()))
    )

    # 4. Standardize date format using multiple patterns
    .withColumn("OrderDate", to_date(
        coalesce(
            to_date(col("OrderDate"), "yyyy/MM/dd"),
            to_date(col("OrderDate"), "dd-MM-yyyy"),
            to_date(col("OrderDate"), "MM-dd-yyyy"),
            to_date(col("OrderDate"), "yyyy.MM.dd"),
            to_date(col("OrderDate"), "dd/MM/yyyy"),
            to_date(col("OrderDate"), "dd.MM.yyyy"),
            to_date(col("OrderDate"), "MMMM dd yyyy")
        )
    ))

    # 5. Clean and convert OrderAmount
    .withColumn("OrderAmount", regexp_replace(col("OrderAmount"), "[$₹Rs. USD, INR]", ""))
    .withColumn("OrderAmount", col("OrderAmount").cast(DoubleType()))

    # 6. Standardize PaymentMode ! 
    .withColumn("PaymentMode", lower(regexp_replace(col("PaymentMode"), "[^a-zA-Z]", "")))

    # 7. Standardize DeliveryStatus
    .withColumn("DeliveryStatus", lower(regexp_replace(col("DeliveryStatus"), "[^a-zA-Z ]", "")))

    # 8. Validate email using simple regex pattern
    .withColumn("Email", when(col("Email").rlike("^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$"), col("Email")).otherwise(None))

    # 9. Clean address: remove special characters like #, !, $, @ etc.
    .withColumn("ShipAddress", regexp_replace(col("ShipAddress"), r"[#@!$]", ""))

    # 10. FeedbackScore: convert to float, handle NaN/bad values
    .withColumn("FeedbackScore", col("FeedbackScore").cast(DoubleType()))

    # 11. Fill nulls where possible
    .fillna({"Quantity": 0, "OrderAmount": 0.0, "DeliveryStatus": "unknown", "PaymentMode": "unknown"})

    # 12. Drop rows with no CustomerID or ProductName
    .na.drop(subset=["CustomerID", "ProductName"])

    # 13. Remove duplicates by OrderID
    .dropDuplicates(["OrderID"])
)

display(df_orders.limit(6))

# Save cleaned Orders data
df_orders.write.mode("overwrite").format("delta").saveAsTable("silver_orders")


StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9e48a447-dccf-4763-b35a-8a077e0c1823)

# silver - inventory 

In [16]:
display(df_inventory_raw.limit(5))

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 745f1273-215a-4c2a-bbbd-7d88f1a26dc0)

In [20]:
df_inventory = (
    df_inventory_raw
    .withColumnRenamed("productName", "ProductName")
    .withColumnRenamed("cost_price", "CostPrice")
    .withColumnRenamed("last_stocked", "LastStocked")
    
    # 2. Clean stock column: convert to integer
    .withColumn("Stock", 
        when(col("stock").rlike("^[0-9]+$"), col("stock").cast(IntegerType()))  # Numeric values
        .when(col("stock").isNull() | (col("stock") == ""), lit(None))  # Null or blank
        .otherwise(
            when(col("stock").rlike(".*twenty five.*"), lit(25))
            .when(col("stock").rlike(".*twenty.*"), lit(20))
            .when(col("stock").rlike(".*eighteen.*"), lit(18))
            .when(col("stock").rlike(".*fifteen.*"), lit(15))
            .when(col("stock").rlike(".*twelve.*"), lit(12))
            .otherwise(lit(None))
        ).cast(IntegerType())
    )
    
    # 3. Clean LastStocked: normalize multiple date formats to yyyy-MM-dd
    .withColumn("LastStocked", to_date(
        regexp_replace("LastStocked", "[./]", "-"), "yyyy-MM-dd"
    ))
    
    # 4. Clean CostPrice: extract numeric value and convert to float
    .withColumn("CostPrice", 
        regexp_extract(col("CostPrice"), r"(\d+\.?\d*)", 1).cast(DoubleType())
    )

    # 5. Clean Warehouse: remove special characters, trim, capitalize first letter
    .withColumn("Warehouse", 
        initcap(trim(regexp_replace(col("warehouse"), r"[^a-zA-Z0-9\s]", " ")))
    )
    
    # 6. Standardize Available: convert to boolean
    .withColumn("Available", 
        when(lower(col("available")).isin("yes", "y", "true"), lit(True))
        .when(lower(col("available")).isin("no", "n", "false"), lit(False))
        .otherwise(None)
    )
    
)

# Display the cleaned data
display(df_inventory)

# Optional: Save to Silver Layer
df_inventory.write.mode("overwrite").format("delta").saveAsTable("silver_inventory")

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 762e65c3-2fca-49d3-81cf-104bf1ee3a7c)

# silver - return data

In [21]:
display(df_returns_raw)

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3230dea0-8db2-4ff4-ac25-32a78f72469a)

In [27]:

df_returns = (
    df_returns_raw
    # 2.1 Standardize column names (if needed)
    .withColumnRenamed("Return_ID", "ReturnID")
    .withColumnRenamed("Order_ID", "OrderID")
    .withColumnRenamed("Customer_ID", "CustomerID")
    .withColumnRenamed("Return_Reason", "ReturnReason")
    .withColumnRenamed("Return_Date", "ReturnDate")
    .withColumnRenamed("Refund_Status", "RefundStatus")
    .withColumnRenamed("Pickup_Address", "PickupAddress")
    .withColumnRenamed("Return_Amount", "ReturnAmount")
    
    # 2.2 Clean ReturnDate → standardize date formats
    .withColumn("ReturnDate", to_date(
        regexp_replace("ReturnDate", r"[./]", "-"), "dd-MM-yyyy"
    ))
    
    # 2.3 Clean RefundStatus → lowercase, remove special characters
    .withColumn("RefundStatus", lower(regexp_replace(col("RefundStatus"), r"[^a-zA-Z]", "")))
    
    # 2.4 Clean ReturnAmount → extract numeric part regardless of currency  1$ - 1
    .withColumn("ReturnAmount", 
        regexp_extract(col("ReturnAmount"), r"(\d+\.?\d*)", 1).cast(DoubleType())
    )
    
    # 2.5 Clean PickupAddress → remove special characters
    .withColumn("PickupAddress", initcap(trim(regexp_replace(col("PickupAddress"), r"[^a-zA-Z0-9\s]", " "))))
    
    # 2.6 Clean Product → remove extra symbols and spaces
    .withColumn("Product", initcap(trim(regexp_replace(col("Product"), r"[^a-zA-Z0-9\s]", ""))))
    
    # 2.7 Clean CustomerID → trim, fix wrong prefixes
    .withColumn("CustomerID", trim(upper(col("CustomerID"))))
    
)

# Step 3: Show cleaned Silver data
display(df_returns)

# Step 4: Save to Silver Delta Table
df_returns.write.mode("overwrite").format("delta").saveAsTable("silver_returns")

StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ad2a6ecd-f214-40f7-b068-42993b79a98f)

# Gold aggregate

In [43]:
from pyspark.sql.functions import *

# STEP 1: Load cleaned Silver tables with aliases
orders = spark.table("silver_orders").alias("o")
returns = spark.table("silver_returns").alias("r")
inventory = spark.table("silver_inventory").alias("i")

# STEP 2: Join Orders with Returns (LEFT)
order_return = orders.join(
    returns,
    on=col("o.OrderID") == col("r.OrderID"),
    how="left"
)

# STEP 3: Join with Inventory (LEFT)
enriched = order_return.join(
    inventory,
    on=col("o.ProductName") == col("i.ProductName"),
    how="left"
)



# STEP 5: Select explicit columns to avoid ambiguity
df_enriched = enriched.select(
    col("o.ProductName").alias("ProductName"),
    col("o.OrderID").alias("OrderID"),
    col("o.CustomerID").alias("CustomerID"),
    col("o.OrderAmount").alias("OrderAmount"),
    col("r.ReturnID").alias("ReturnID"),
    col("i.Stock").alias("Stock"),
    col("i.CostPrice").alias("CostPrice")
)

# STEP 6: Aggregate KPIs by Product and Month
df_kpi = (
    df_enriched.groupBy("ProductName")
    .agg(
        count("OrderID").alias("Total_Orders"),
        countDistinct("CustomerID").alias("Unique_Customers"),
        count("ReturnID").alias("Total_Returns"),
        round((count("ReturnID") / count("OrderID")) * 100, 2).alias("Return_Rate_%"),
        round(sum("OrderAmount"), 2).alias("Total_Revenue"),
        round(avg("OrderAmount"), 2).alias("Avg_Order_Value"),
        sum("Stock").alias("Total_Stock"),
        round(avg("CostPrice"), 2).alias("Avg_Cost"),
        round(sum("OrderAmount") - (sum("Stock") * avg("CostPrice")), 2).alias("Net_Profit")
    )
)

# STEP 7: Display results
display(df_kpi)

# (Optional) STEP 8: Save to Gold Delta table
df_kpi.write.mode("overwrite").format("delta").saveAsTable("gold_product_month_kpis")


StatementMeta(, a61c4fa9-6f5f-46ca-bdd7-f70add0891f2, 45, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 724c43d7-b4fe-410f-933f-3a2c46d5f558)