In [6]:
# CSV → DataFrame
df_orders = spark.read.csv("abfss://e0cf1801-1533-430a-8805-b23abd9dafae@onelake.dfs.fabric.microsoft.com/3f0d51ed-691b-49c1-983f-b4042e721365/Files/BRONZE/orders_data.csv")


# JSON → DataFrame
df_inventory = (
    spark.read
        .option("multiline", "true")
        .json("abfss://e0cf1801-1533-430a-8805-b23abd9dafae@onelake.dfs.fabric.microsoft.com/3f0d51ed-691b-49c1-983f-b4042e721365/Files/BRONZE/inventory_data.json")
)

# EXCEL → DataFrame
df_return = spark.read.parquet("abfss://e0cf1801-1533-430a-8805-b23abd9dafae@onelake.dfs.fabric.microsoft.com/3f0d51ed-691b-49c1-983f-b4042e721365/Files/BRONZE/returns_data.xlsx.parquet")


StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 8, Finished, Available, Finished)

In [7]:
display(df_return.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 108a2fa9-708d-4e1a-9931-51a6e6e70d35)

In [8]:
display(df_inventory.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8b9c5ac2-6194-40ef-849e-86908714a8b0)

In [9]:
display(df_orders.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e1cf41de-f24c-4a9e-9741-d367161db0c7)

# HANDLING FOR THE FIRST ROW AS HEADER  

In [10]:
# Extract first row as header - ORDERS
orders_header = [str(c).strip() for c in df_orders.first()]

# Extract first row as header - RETURNS
returns_header = [str(c).strip() for c in df_return.first()]

# Remove first row and apply correct headers - ORDERS
df_orders = (
    df_orders
    .rdd
    .zipWithIndex()
    .filter(lambda x: x[1] > 0)
    .map(lambda x: x[0])
    .toDF(orders_header)
)

# Remove first row and apply correct headers - RETURNS
df_returns = (
    df_return
    .rdd
    .zipWithIndex()
    .filter(lambda x: x[1] > 0)
    .map(lambda x: x[0])
    .toDF(returns_header)
)


StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 12, Finished, Available, Finished)

In [11]:
# Show cleaned data
display(df_orders.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4c287e65-5a91-4018-9c2f-29d47f97e0f8)

In [12]:
# Show cleaned data
display(df_returns.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a190e7d0-a4b3-4ccb-9383-44178267e5ef)

In [13]:
df_orders.write.format("delta").mode("overwrite").saveAsTable("order_bronze")
df_return.write.format("delta").mode("overwrite").saveAsTable("return_bronze")
df_inventory.write.format("delta").mode("overwrite").saveAsTable("inventory_bronze")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 15, Finished, Available, Finished)

## SILVER LAYER NOTEBOOK

### ORDERS

In [14]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 16, Finished, Available, Finished)

In [15]:
%%sql
select * from order_bronze limit 5

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 17, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 12 fields>

In [16]:
orders=spark.table("order_bronze")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 18, Finished, Available, Finished)

In [17]:
orders_df = (
    orders
    .withColumnRenamed("Order_ID", "OrderID")
    .withColumnRenamed("cust_id", "CustomerID")
    .withColumnRenamed("Product_Name", "ProductName")
    .withColumnRenamed("Qty", "Quantity")
    .withColumnRenamed("Order_Date", "OrderDate")
    .withColumnRenamed("Order_Amount$", "OrderAmount")
    .withColumnRenamed("Delivery_Status", "DeliveryStatus")
    .withColumnRenamed("Payment_Mode", "PaymentMode")
    .withColumnRenamed("Ship_Address", "ShipAddress")
    .withColumnRenamed("Promo_Code", "PromoCode")
    .withColumnRenamed("Feedback_Score", "FeedbackScore")

    # ---- Quantity Standardization ----
    .withColumn(
        "Quantity",
        when(lower(col("Quantity")) == "one", 1)
        .when(lower(col("Quantity")) == "two", 2)
        .when(lower(col("Quantity")) == "three", 3)
        .otherwise(col("Quantity").cast(IntegerType()))
    )

    # ---- OrderDate Parsing ----
    .withColumn(
        "OrderDate",
        coalesce(
            to_date(col("OrderDate"), "yyyy/MM/dd"),
            to_date(col("OrderDate"), "dd-MM-yyyy"),
            to_date(col("OrderDate"), "MM-dd-yyyy"),
            to_date(col("OrderDate"), "yyyy.MM.dd"),
            to_date(col("OrderDate"), "dd/MM/yyyy"),
            to_date(col("OrderDate"), "dd.MM.yyyy")
        )
    )

    # ---- Clean Order Amount ----
    .withColumn(
        "OrderAmount",
        regexp_replace(col("OrderAmount"), r"(Rs\.|USD|INR|₹|,|\$)", "").cast(DoubleType())
    )

    # ---- Payment Mode Cleaning ----
    .withColumn(
        "PaymentMode",
        regexp_replace(lower(col("PaymentMode")), r"[^A-Za-z]", "")
    )

    # ---- Delivery Status Cleaning ----
    .withColumn(
        "DeliveryStatus",
        regexp_replace(lower(col("DeliveryStatus")), r"[^A-Za-z]", "")
    )

    # ---- Clean Address ----
    .withColumn(
        "ShipAddress",
        regexp_replace(col("ShipAddress"), r"[#@!$]", "")
    )

    # ---- Email Validation ----
    .withColumn(
        "Email",
        when(
            col("Email").rlike(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[A-Za-z]{2,}$"),
            col("Email")
        ).otherwise(None)
    )

    # ---- Feedback Score ----
    .withColumn("FeedbackScore", col("FeedbackScore").cast(DoubleType()))

    # ---- Fill Missing Values ----
    .fillna({
        "Quantity": 0,
        "OrderAmount": 0.0,
        "DeliveryStatus": "unknown",
        "PaymentMode": "unknown"
    })

    # ---- Remove invalid rows ----
    .dropna(subset=["CustomerID", "ProductName"])

    # ---- Remove duplicate Orders ----
    .dropDuplicates(["OrderID"])
)


StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 19, Finished, Available, Finished)

In [18]:
display(orders_df.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f7f28e93-d642-4fb7-a207-58e67bee7841)

In [19]:
orders_df.write.format("delta").mode("overwrite").saveAsTable("silver_orders")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 21, Finished, Available, Finished)

#### SILVER INVENTORY

In [20]:
%%sql
select * from inventory_bronze limit 5

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 22, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 7 fields>

In [21]:
inventory=spark.table("inventory_bronze")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 23, Finished, Available, Finished)

In [22]:
inventory_new_df = (
    inventory
    .withColumnRenamed("productName","ProductName")
    .withColumnRenamed("last_stocked","lastStocked")
    .withColumnRenamed("cost_price","CostPrice")

    # ---- Stock cleanup ----
    .withColumn(
        "Stock",
        when(col("stock").rlike("^[0-9]+$"), col("stock").cast(IntegerType()))
        .when(col("stock").isNull() | (col("stock") == ""), lit(None))
        .otherwise(
            when(lower(col("stock")).rlike(".*twentyfive.*"), lit(25))
            .when(lower(col("stock")).rlike(".*twenty.*"), lit(20))
            .when(lower(col("stock")).rlike(".*eighteen.*"), lit(18))
            .when(lower(col("stock")).rlike(".*fifteen.*"), lit(15))
            .when(lower(col("stock")).rlike(".*twelve.*"), lit(12))
            .otherwise(lit(None))
        )
        .cast(IntegerType())
    )

    # ---- lastStocked date cleanup ----
    .withColumn(
        "lastStocked",
        to_date(
            regexp_replace("lastStocked", r"[./]", "-"),
            "yyyy-MM-dd"
        )
    )

    # ---- CostPrice extraction ----
    .withColumn(
        "CostPrice",
        regexp_extract(col("CostPrice"), r"(\d+\.?\d*)", 1).cast(DoubleType())
    )

    # ---- Warehouse cleanup ----
    .withColumn(
        "Warehouse",
        initcap(trim(regexp_replace(col("warehouse"), r"[^a-zA-Z0-9 ]", "")))
    )

    # ---- Available cleanup ----
    .withColumn(
        "Available",
        when(lower(col("available")).isin("yes", "y", "true"), lit(True))
        .when(lower(col("available")).isin("no", "n", "false"), lit(False))
        .otherwise(lit(None))
    )
)


StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 24, Finished, Available, Finished)

In [23]:
display(inventory_new_df.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 25, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9bc63db6-c866-4687-81b8-6d112e5e9dfe)

In [24]:
inventory_new_df.write.format("delta").mode("overwrite").saveAsTable("silver_inventory")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 26, Finished, Available, Finished)

##### SILVER RETURNS

In [25]:
%%sql
select * from return_bronze limit 5

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 27, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 9 fields>

In [26]:
return_df=spark.table("return_bronze")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 28, Finished, Available, Finished)

In [27]:
display(return_df.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 683d04dd-1fec-4f3a-94f5-fe33f98413a4)

In [28]:
# Extract first row as header - RETURNS
returns_header = [str(c).strip() for c in return_df.first()]

# Remove first row and apply correct headers - RETURNS
returns_df_new = (
    return_df
    .rdd
    .zipWithIndex()
    .filter(lambda x: x[1] > 0)
    .map(lambda x: x[0])
    .toDF(returns_header)
)

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 30, Finished, Available, Finished)

In [29]:
display(returns_df_new.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5bbf434f-bbdf-49a9-8ebb-9e8bc6db7961)

In [30]:
returns_df_new = (
    returns_df_new
        .withColumnRenamed("Return_ID", "ReturnID")
        .withColumnRenamed("Order_ID", "OrderID")
        .withColumnRenamed("Customer_ID", "CustomerID")
        .withColumnRenamed("Return_Reason", "ReturnReason")
        .withColumnRenamed("Return_Date", "ReturnDate")
        .withColumnRenamed("Refund_Status", "RefundStatus")
        .withColumnRenamed("Pickup_Address", "PickupAddress")
        .withColumnRenamed("Return_Amount", "ReturnAmount")

        # ---- Return Date ----
        .withColumn(
            "ReturnDate",
            to_date(
                regexp_replace(col("ReturnDate"), r"[./]", "-"),  # Normalize separators
                "dd-MM-yyyy"
            )
        )

        # ---- Refund Status ----
        .withColumn(
            "RefundStatus",
            lower(regexp_replace(col("RefundStatus"), r"[^a-zA-Z]", ""))
        )

        # ---- Return Amount ----
        .withColumn(
            "ReturnAmount",
            regexp_extract(col("ReturnAmount"), r"(\d+\.?\d*)", 1)
                .cast(DoubleType())
        )

        # ---- Pickup Address ----
        .withColumn(
            "PickupAddress",
            initcap(trim(regexp_replace(col("PickupAddress"), r"[^a-zA-Z0-9\s]", "")))
        )

        # ---- Product Name ----
        .withColumn(
            "Product",
            initcap(trim(regexp_replace(col("Product"), r"[^a-zA-Z0-9\s]", "")))
        )

        # ---- Customer ID Standardization ----
        .withColumn(
            "CustomerID",
            trim(upper(col("CustomerID")))
        )
)

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 32, Finished, Available, Finished)

In [31]:
display(returns_df_new.limit(5))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 33, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d41add26-33a5-436b-a57a-af5dfcec900a)

In [32]:
returns_df_new.write.format("delta").mode("overwrite").saveAsTable("silver_return")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 34, Finished, Available, Finished)

###### GOLD LAYER TABLES

In [33]:
order = spark.table("silver_orders").alias("o")
returns = spark.table("silver_return").alias("r")
inventory = spark.table("silver_inventory").alias("i")


StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 35, Finished, Available, Finished)

In [34]:
display(order.limit(2))
display(returns.limit(2))
display(inventory.limit(2))

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 36, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 66ffd839-74ab-4654-a349-9e2603615530)

SynapseWidget(Synapse.DataFrame, bd189b6f-0be9-4eb0-8620-e16b65c5ffa9)

SynapseWidget(Synapse.DataFrame, 93bea017-135f-4b68-9c38-f1bd59f34115)

In [35]:
df_pregold = (
    order.join(
        returns,
        on=col("o.OrderID") == col("r.OrderID"),
        how="left"
    )
    .join(
        inventory,
        on=col("o.ProductName") == col("i.ProductName"),
        how="left"
    )
    .select(
        col("o.ProductName").alias("ProductName"),
        col("o.OrderID").alias("OrderID"),
        col("o.CustomerID").alias("CustomerID"),
        col("o.OrderAmount").alias("OrderAmount"),
        col("i.Stock").alias("Stock"),
        col("i.CostPrice").alias("CostPrice"),
        col("r.ReturnID").alias("ReturnID")
    )
)

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 37, Finished, Available, Finished)

In [37]:
display(df_pregold)

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 39, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a5d9eef4-1d8b-4a89-be1b-01d8e7ab4b0e)

In [38]:
df_gold = (
    df_pregold.groupBy("ProductName")
    .agg(
        count("OrderID").alias("Total_Orders"),
        countDistinct("CustomerID").alias("Unique_Customers"),
        count("ReturnID").alias("Total_Returns"),
        round((count("ReturnID") / count("OrderID")) * 100, 2).alias("Return_Rate%"),
        round(sum("OrderAmount"), 2).alias("Total_Revenue"),
        round(avg("OrderAmount"), 2).alias("Avg_Order_Value"),
        sum("Stock").alias("Total_Stock"),
        round(avg("CostPrice"), 2).alias("Avg_Cost"),
        # Net Profit = Total Revenue - (Total Stock * Avg Cost)
        round(
            sum("OrderAmount") - (sum("Stock") * avg("CostPrice")),
            2
        ).alias("Net_Profit")
    )
)

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 40, Finished, Available, Finished)

In [39]:
display(df_gold)

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 41, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f64c4806-8e12-4008-b31d-c51739cf3778)

In [40]:
df_gold.write.format("delta").mode("overwrite").saveAsTable("gold_retail")

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 42, Finished, Available, Finished)

In [43]:
%%sql
select * from gold_retail limit 5

StatementMeta(, b6d66d4c-06a6-4442-9cbd-29aa6629f453, 45, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 10 fields>