In [14]:

from pyspark.sql import functions as F

customers_df = spark.read.option("header", "true").option("inferSchema", "true").csv("Files/Customer.csv")
orders_df    = spark.read.option("header", "true").option("inferSchema", "true").csv("Files/Orders.csv")
products_df  = spark.read.option("header", "true").option("inferSchema", "true").csv("Files/Product.csv")

display(customers_df)
display(orders_df)
display(products_df)

StatementMeta(, 1d2d743c-a890-45ea-abaa-605ca326f359, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9152b2db-0e61-4eeb-a70e-dc7c93680bb4)

SynapseWidget(Synapse.DataFrame, 4e00e5ff-8b0e-494e-b0dd-5cacf8d16f58)

SynapseWidget(Synapse.DataFrame, a72fa14f-a2c2-4783-9b8d-7cd32e52b8ce)

## **Clean Customers Table**

In [15]:
customers_clean = (customers_df
    .dropDuplicates(["CustomerID"])
    .withColumn("Name", F.trim(F.col("Name")))
    .withColumn("Email", F.lower(F.col("Email")))
    .withColumn("Country", F.initcap(F.col("Country")))
    .withColumn("Segment", F.upper(F.col("Segment")))
)

display(customers_clean)


StatementMeta(, 1d2d743c-a890-45ea-abaa-605ca326f359, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1b077ff2-6df1-4faa-8148-91fe51ee83ac)

## **Clean Product Table**


In [16]:
products_clean = (products_df
    .dropDuplicates(["ProductID"])
    .withColumnRenamed("Name", "ProductName")
    .withColumn("ProductName", F.trim(F.col("ProductName")))
    .withColumn("Category", F.initcap(F.col("Category")))
)

display(products_clean)



StatementMeta(, 1d2d743c-a890-45ea-abaa-605ca326f359, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 652ba68e-7fbd-418d-ba03-f47f5de172e9)

## **Clean Order Table**

In [19]:
# Orders table already has flattened items
orders_clean = (orders_df
    .dropDuplicates(["OrderID", "ProductID"])  # Each order-item combination unique
    .withColumn("OrderDate", F.to_date("OrderDate", "yyyy-MM-dd"))
    .withColumn("Total", F.col("Quantity") * F.col("UnitPrice"))  # Ensure total is correct
)

display(orders_clean)


StatementMeta(, 1d2d743c-a890-45ea-abaa-605ca326f359, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 25a71145-8ad3-4099-ba49-2ea6bd9578ae)

In [20]:
orders_clean.printSchema()

StatementMeta(, 1d2d743c-a890-45ea-abaa-605ca326f359, 22, Finished, Available, Finished)

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- Status: string (nullable = true)



In [21]:
silver_orders = (orders_clean
    .join(customers_clean, "CustomerID", "left")  # Add customer details
    .join(products_clean, "ProductID", "left")    # Add product details
    .withColumn("LoadDate", F.current_timestamp()) # Track ingestion
    .select(
        "OrderID", "OrderDate", "CustomerID", "Name", "Country", "Segment",
        "ProductID", "ProductName", "Category",
        "Quantity", "UnitPrice", "Total", "LoadDate"
    )
)

display(silver_orders)




StatementMeta(, 1d2d743c-a890-45ea-abaa-605ca326f359, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, bacbbe05-a07b-4071-a2d9-94517e19aff1)

In [22]:
# Save Silver tables as Delta for further Gold layer transformations
customers_clean.write.mode("overwrite").format("delta").saveAsTable("Silver_Customers")
products_clean.write.mode("overwrite").format("delta").saveAsTable("Silver_Products")
silver_orders.write.mode("overwrite").format("delta").saveAsTable("Silver_Orders")

print("✅ Silver tables created: Silver_Customers, Silver_Products, Silver_Orders")


StatementMeta(, 1d2d743c-a890-45ea-abaa-605ca326f359, 24, Finished, Available, Finished)

✅ Silver tables created: Silver_Customers, Silver_Products, Silver_Orders
