In [0]:
bronze_path = "/mnt/bronze/superstore"
df_bronze = spark.read.format("delta").load(bronze_path)


In [0]:
#Add Total Cost Column
from pyspark.sql.functions import col

df_transformed = df_bronze.withColumn("TotalCost", col("Sales") - col("Profit"))
df_transformed.show(5)

+------+------------+----------+----------+--------------+-----------+-----------------+-----------+--------------+---------------+--------------------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+--------+-------------+--------------+------+------------------+
|Row_ID|    Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|    Customer Name|    Segment|          City|          State|             Country|Postal Code|Market|Region|      Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|  Profit|Shipping Cost|Order Priority|Row ID|         TotalCost|
+------+------------+----------+----------+--------------+-----------+-----------------+-----------+--------------+---------------+--------------------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+--------+-------------+--------------+------+---------------

In [0]:
#Convert Sales and Profit to Float

df_transformed = df_transformed.withColumn("Sales", col("Sales").cast("float"))
df_transformed = df_transformed.withColumn("Profit", col("Profit").cast("float"))

df_transformed.show(5)

+------+------------+----------+----------+--------------+-----------+-----------------+-----------+--------------+---------------+--------------------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+--------+-------------+--------------+------+------------------+
|Row_ID|    Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|    Customer Name|    Segment|          City|          State|             Country|Postal Code|Market|Region|      Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|  Profit|Shipping Cost|Order Priority|Row ID|         TotalCost|
+------+------------+----------+----------+--------------+-----------+-----------------+-----------+--------------+---------------+--------------------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+--------+-------------+--------------+------+---------------

In [0]:
#Filter Rows with Zero or Negative Profit

df_transformed = df_transformed.filter(col("Profit") > 0)
df_transformed.show(5)

+------+------------+----------+----------+--------------+-----------+------------------+-----------+-----------+-----------+-------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+------+-------------+--------------+------+------------------+
|Row_ID|    Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|       City|      State|Country|Postal Code|Market|Region|      Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|Shipping Cost|Order Priority|Row ID|         TotalCost|
+------+------------+----------+----------+--------------+-----------+------------------+-----------+-----------+-----------+-------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+------+-------------+--------------+------+------------------+
|  NULL|AG-2011-1390|2011-08-16|2011-08-21|Standard Class|

In [0]:
from pyspark.sql.functions import col, sum
#Group by Category and Region and perform aggregation

# Step 1: Clean the data by filtering out rows with non-numeric values in Sales, Quantity, or Discount
df_clean = df_bronze.filter(
    (col("Sales").rlike("^[0-9]*\.?[0-9]+$")) &  # Allow decimal numbers in Sales
    (col("Quantity").rlike("^[0-9]+$")) &  # Allow only integers in Quantity
    (col("Discount").rlike("^[0-9]*\.?[0-9]+$"))  # Allow decimal numbers in Discount
)

# Step 2: Cast the cleaned columns to appropriate numeric types
df_clean = df_clean.withColumn("Sales", col("Sales").cast("float")) \
                   .withColumn("Quantity", col("Quantity").cast("int")) \
                   .withColumn("Discount", col("Discount").cast("float"))

# Step 3: Drop rows with any null values in the numeric columns (after casting)
df_clean = df_clean.dropna(subset=["Sales", "Quantity", "Discount", "Profit"])

# Step 4: Group by Category and Region and perform aggregation
df_grouped = df_clean.groupBy("Category", "Region") \
    .agg(
        sum("Sales").alias("TotalSales"),
        sum("Profit").alias("TotalProfit"),
        sum("Quantity").alias("TotalQuantity"),
        sum("Discount").alias("TotalDiscount")
    )

# Step 5: Show the transformed data
df_grouped.show(5)


+----------+------+------------------+------------------+-------------+-----------------+
|  Category|Region|        TotalSales|       TotalProfit|TotalQuantity|    TotalDiscount|
+----------+------+------------------+------------------+-------------+-----------------+
| Furniture|Canada|10595.279964447021|2613.2400000000002|           78|              0.0|
|Technology|  EMEA|  300854.583026886|         17494.443|         2259|189.1000056862831|
| Furniture|  East| 205540.3473367691| 2501.816200000002|         2151| 90.6000018119812|
|Technology|Africa| 322367.0430994034|44129.492999999995|         2031|143.1999975964427|
|Technology|  East| 264872.0816922188|        47439.5576|         1927|76.30000080168247|
+----------+------+------------------+------------------+-------------+-----------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import col

# Add a Profit Margin Column 
df_grouped = df_transformed.withColumn("ProfitMargin", (col("Profit") / col("Sales")) * 100)

# Show the result
df_grouped.show(5)


+------+------------+----------+----------+--------------+-----------+------------------+-----------+-----------+-----------+-------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+------+-------------+--------------+------+------------------+------------------+
|Row_ID|    Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|       City|      State|Country|Postal Code|Market|Region|      Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|Shipping Cost|Order Priority|Row ID|         TotalCost|      ProfitMargin|
+------+------------+----------+----------+--------------+-----------+------------------+-----------+-----------+-----------+-------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+------+-------------+--------------+------+------------------+------------------+
|

In [0]:
# Remove Duplicate Rows based on 'Category' and 'Region' 
df_grouped = df_transformed.dropDuplicates(["Category", "Region"])

df_grouped.show(5)


+------+--------------+----------+----------+--------------+-----------+------------------+--------+-------------------+-------------+-------------+-----------+------+------------+----------------+---------+------------+--------------------+------+--------+--------+-------+-------------+--------------+------+------------------+
|Row_ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name| Segment|               City|        State|      Country|Postal Code|Market|      Region|      Product ID| Category|Sub-Category|        Product Name| Sales|Quantity|Discount| Profit|Shipping Cost|Order Priority|Row ID|         TotalCost|
+------+--------------+----------+----------+--------------+-----------+------------------+--------+-------------------+-------------+-------------+-----------+------+------------+----------------+---------+------------+--------------------+------+--------+--------+-------+-------------+--------------+------+------------------+
|  NULL|  

In [0]:
#Rename Columns for Clarity
df_grouped = df_transformed.withColumnRenamed("Sales", "TotalSales") \
                                .withColumnRenamed("Profit", "TotalProfit")
                                
df_grouped.show(5)

+------+------------+----------+----------+--------------+-----------+------------------+-----------+-----------+-----------+-------+-----------+------+------+----------------+---------------+------------+--------------------+----------+--------+--------+-----------+-------------+--------------+------+------------------+
|Row_ID|    Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Customer Name|    Segment|       City|      State|Country|Postal Code|Market|Region|      Product ID|       Category|Sub-Category|        Product Name|TotalSales|Quantity|Discount|TotalProfit|Shipping Cost|Order Priority|Row ID|         TotalCost|
+------+------------+----------+----------+--------------+-----------+------------------+-----------+-----------+-----------+-------+-----------+------+------+----------------+---------------+------------+--------------------+----------+--------+--------+-----------+-------------+--------------+------+------------------+
|  NULL|AG-2011-1390|2011-08-16

In [0]:
from pyspark.sql.functions import col, year, to_date

# Add a Year Column 
df_grouped = df_bronze.withColumn("Year", year(to_date(col("Order Date"), "MM/dd/yyyy")))

# Show the result
df_grouped.show(5)


+------+------------+----------+----------+--------------+-----------+-----------------+-----------+--------------+---------------+--------------------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+--------+-------------+--------------+------+----+
|Row_ID|    Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|    Customer Name|    Segment|          City|          State|             Country|Postal Code|Market|Region|      Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|  Profit|Shipping Cost|Order Priority|Row ID|Year|
+------+------------+----------+----------+--------------+-----------+-----------------+-----------+--------------+---------------+--------------------+-----------+------+------+----------------+---------------+------------+--------------------+------+--------+--------+--------+-------------+--------------+------+----+
|  NULL|AE-2011-9160|2011-10-03|2011-

In [0]:
from pyspark.sql.functions import col, year, to_date, sum
#Filter the Data for a Specific Year

# Step 1: Add the Year Column to the Original DataFrame
df = df_bronze.withColumn("Year", year(to_date(col("Order Date"), "MM/dd/yyyy")))

# Step 2: Perform the aggregation including 'Year'
df_transformed = df.groupBy("Category", "Region", "Year") \
    .agg(
        sum("Sales").alias("TotalSales"),
        sum("Profit").alias("TotalProfit"),
        sum("Quantity").alias("TotalQuantity"),
        sum("Discount").alias("TotalDiscount")
    )

# Step 3: Filter the Data for a Specific Year
df_grouped = df_transformed.filter(col("Year") == 2012)

# Show the result
df_grouped.show(5)


+----------+-------+----+----------+------------------+-------------+------------------+
|  Category| Region|Year|TotalSales|       TotalProfit|TotalQuantity|     TotalDiscount|
+----------+-------+----+----------+------------------+-------------+------------------+
| Furniture|Oceania|2012|100519.008| 8623.818000000001|        607.0|26.199999999999978|
|Technology| Africa|2012| 64734.582| 6320.741999999999|        404.0|36.699999999999996|
|Technology|  North|2012|126353.563|25098.863000000005|        809.0| 7.377999999999993|
| Furniture| Canada|2012|   1600.68|            290.19|         16.0|               0.0|
|Technology|Oceania|2012| 89761.767|         14203.827|        688.0|23.499999999999986|
+----------+-------+----+----------+------------------+-------------+------------------+
only showing top 5 rows



In [0]:
#Sort Data by Total Sales
df_grouped = df_transformed.orderBy(col("TotalSales").desc())
df_grouped.show()

+---------------+-------+----+------------------+------------------+------------------+------------------+
|       Category| Region|Year|        TotalSales|       TotalProfit|     TotalQuantity|     TotalDiscount|
+---------------+-------+----+------------------+------------------+------------------+------------------+
|     Technology|Central|2014|343261.71891999996| 46266.45321999999|            2600.0| 93.55999999999997|
|Office Supplies|Central|2014| 313521.1169999999|36457.128400000016|          9564.338| 406.1000000000015|
|     Technology|Central|2013|      281501.66124| 39571.08374000002|2230.7799999999997| 76.50000000000003|
|      Furniture|Central|2014|281108.88969999994|15576.571300000007|          2778.252|129.51999999999995|
|Office Supplies|Central|2013|249453.54200000004|36849.007399999995|          7735.108|285.80000000000035|
|     Technology|Central|2012|237291.81162000002|       31373.85432|            1791.0| 61.14200000000001|
|      Furniture|Central|2013|       

In [0]:
#
df_final = df_grouped.withColumn("HighDiscount", col("TotalDiscount") > 50)
df_final.show(5)

+---------------+-------+----+------------------+------------------+------------------+------------------+------------+
|       Category| Region|Year|        TotalSales|       TotalProfit|     TotalQuantity|     TotalDiscount|HighDiscount|
+---------------+-------+----+------------------+------------------+------------------+------------------+------------+
|     Technology|Central|2014|343261.71891999996| 46266.45321999999|            2600.0| 93.55999999999997|        true|
|Office Supplies|Central|2014| 313521.1169999999|36457.128400000016|          9564.338| 406.1000000000015|        true|
|     Technology|Central|2013|      281501.66124| 39571.08374000002|2230.7799999999997| 76.50000000000003|        true|
|      Furniture|Central|2014|281108.88969999994|15576.571300000007|          2778.252|129.51999999999995|        true|
|Office Supplies|Central|2013|249453.54200000004|36849.007399999995|          7735.108|285.80000000000035|        true|
+---------------+-------+----+----------

In [0]:
#Save Transformed Data in Silver Zone
silver_zone_path = "/mnt/silver/superstore"
df_final.write.format("delta").mode("overwrite").save(silver_zone_path)




In [0]:
dbutils.notebook.exit(silver_zone_path)
