In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RetailETL").getOrCreate()
file_path = "/FileStore/tables/Sample___Superstore.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(5)
df.printSchema()

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|   CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|      42420| South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset 

In [0]:
from pyspark.sql.functions import col
#Changing string to numeric columns
df = df.withColumn("Sales", col("Sales").cast("double")) \
       .withColumn("Quantity", col("Quantity").cast("int")) \
       .withColumn("Discount", col("Discount").cast("double"))

df.printSchema()


root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



In [0]:
df = df.dropDuplicates()
print(f"Total records after cleaning: {df.count()}")


Total records after cleaning: 9994


In [0]:
from pyspark.sql.functions import col, sum

df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()


+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row ID|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Customer Name|Segment|Country|City|State|Postal Code|Region|Product ID|Category|Sub-Category|Product Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|     0|       0|         0|        0|        0|          0|            0|      0|      0|   0|    0|          0|     0|         0|       0|           0|           0|  300|      20|      11|     0|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+



In [0]:
df = df.fillna({
    "Sales": 0.0,           # If sales is null, assuming 0
    "Quantity": 1,          # Assuming 1 item minimum if missing
    "Discount": 0.0         # If no discount, assuming 0%
})


In [0]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()


+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|Row ID|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Customer Name|Segment|Country|City|State|Postal Code|Region|Product ID|Category|Sub-Category|Product Name|Sales|Quantity|Discount|Profit|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+
|     0|       0|         0|        0|        0|          0|            0|      0|      0|   0|    0|          0|     0|         0|       0|           0|           0|    0|       0|       0|     0|
+------+--------+----------+---------+---------+-----------+-------------+-------+-------+----+-----+-----------+------+----------+--------+------------+------------+-----+--------+--------+------+



In [0]:
from pyspark.sql.functions import sum

sales_by_region = df.groupBy("Region") \
    .agg(sum("Sales").alias("Total_Sales")) \
    .orderBy("Total_Sales", ascending=False)

sales_by_region.show()


+-------+------------------+
| Region|       Total_Sales|
+-------+------------------+
|   West| 713471.3445000012|
|   East| 672194.0540000005|
|Central|497800.87279999955|
|  South|388983.58499999985|
+-------+------------------+



In [0]:
from pyspark.sql.functions import round

category_summary = df.groupBy("Category") \
    .agg(
        round(sum("Sales"), 2).alias("Total_Sales"),
        round(sum("Profit"), 2).alias("Total_Profit")
    ).orderBy("Total_Sales", ascending=False)

category_summary.show()


+---------------+-----------+------------+
|       Category|Total_Sales|Total_Profit|
+---------------+-----------+------------+
|     Technology|  835900.07|    145388.3|
|      Furniture|  733046.86|    19686.43|
|Office Supplies|  703502.93|   120632.88|
+---------------+-----------+------------+



In [0]:
from pyspark.sql.functions import date_format

monthly_sales = df.withColumn("Month", date_format("Order Date", "yyyy-MM")) \
    .groupBy("Month") \
    .agg(round(sum("Sales"), 2).alias("Monthly_Sales")) \
    .orderBy("Month")

monthly_sales.show(10)


+-------+-------------+
|  Month|Monthly_Sales|
+-------+-------------+
|2014-01|     14161.35|
|2014-02|      4119.82|
|2014-03|      55526.2|
|2014-04|     28139.56|
|2014-05|     23634.67|
|2014-06|      34509.0|
|2014-07|     33500.87|
|2014-08|     27603.51|
|2014-09|     81496.81|
|2014-10|     31394.94|
+-------+-------------+
only showing top 10 rows



In [0]:
sales_by_region.write.mode("overwrite").parquet("/FileStore/processed/sales_by_region")
category_summary.write.mode("overwrite").parquet("/FileStore/processed/category_summary")
monthly_sales.write.mode("overwrite").parquet("/FileStore/processed/monthly_sales")


In [0]:

sales_by_region.write.mode("overwrite").option("header", "true").csv("/FileStore/export_csv/sales_by_region")
category_summary.write.mode("overwrite").option("header", "true").csv("/FileStore/export_csv/category_summary")
monthly_sales.write.mode("overwrite").option("header", "true").csv("/FileStore/export_csv/monthly_sales")


In [0]:
%fs ls /FileStore/export_csv/sales_by_region

path,name,size,modificationTime
dbfs:/FileStore/export_csv/sales_by_region/_SUCCESS,_SUCCESS,0,1743792690000
dbfs:/FileStore/export_csv/sales_by_region/_committed_1997021815839773224,_committed_1997021815839773224,112,1743792690000
dbfs:/FileStore/export_csv/sales_by_region/_started_1997021815839773224,_started_1997021815839773224,0,1743792690000
dbfs:/FileStore/export_csv/sales_by_region/part-00000-tid-1997021815839773224-215315b7-c6b3-43a5-ade2-15ecc1660058-53-1-c000.csv,part-00000-tid-1997021815839773224-215315b7-c6b3-43a5-ade2-15ecc1660058-53-1-c000.csv,117,1743792690000


In [0]:
%fs ls /FileStore/export_csv/category_summary

path,name,size,modificationTime
dbfs:/FileStore/export_csv/category_summary/_SUCCESS,_SUCCESS,0,1743792694000
dbfs:/FileStore/export_csv/category_summary/_committed_6132953377316251626,_committed_6132953377316251626,112,1743792694000
dbfs:/FileStore/export_csv/category_summary/_started_6132953377316251626,_started_6132953377316251626,0,1743792693000
dbfs:/FileStore/export_csv/category_summary/part-00000-tid-6132953377316251626-b017a375-8e03-4fab-9916-1ec04d7348aa-59-1-c000.csv,part-00000-tid-6132953377316251626-b017a375-8e03-4fab-9916-1ec04d7348aa-59-1-c000.csv,129,1743792694000


In [0]:
%fs ls /FileStore/export_csv/monthly_sales

path,name,size,modificationTime
dbfs:/FileStore/export_csv/monthly_sales/_SUCCESS,_SUCCESS,0,1743792698000
dbfs:/FileStore/export_csv/monthly_sales/_committed_3280995565486109757,_committed_3280995565486109757,112,1743792697000
dbfs:/FileStore/export_csv/monthly_sales/_started_3280995565486109757,_started_3280995565486109757,0,1743792697000
dbfs:/FileStore/export_csv/monthly_sales/part-00000-tid-3280995565486109757-b76fbb61-62b4-4eda-a7a1-414d0600d923-65-1-c000.csv,part-00000-tid-3280995565486109757-b76fbb61-62b4-4eda-a7a1-414d0600d923-65-1-c000.csv,831,1743792697000
