In [0]:

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("Sales Data Analysis") \
      .getOrCreate() 


In [0]:
df_sales = spark.read.csv("/FileStore/tables/Sales.csv", header=True, inferSchema=True)
df_product = spark.read.csv("/FileStore/tables/Product.csv", header=True, inferSchema=True)
df_customer = spark.read.csv("/FileStore/tables/Customer.csv", header=True, inferSchema=True)

In [0]:
# Display schema of each DataFrame
df_sales.printSchema()
df_product.printSchema()
df_customer.printSchema()

# Show first few rows of each DataFrame
df_sales.show(5)
df_product.show(5)
df_customer.show(5)

root
 |-- Order Line: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)

root
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)

root
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)

+----------

In [0]:
from pyspark.sql.functions import col

def standardize_column_names(df):
    """
    Standardizes column names by:
    - Converting to lowercase
    - Replacing spaces with underscores
    - Removing special characters
    """
    new_columns = [col(c).alias(c.lower().strip().replace(" ", "_")) for c in df.columns]
    return df.select(*new_columns)

# Apply this function to all DataFrames
df_products = standardize_column_names(df_product)
df_sales = standardize_column_names(df_sales)
df_customers = standardize_column_names(df_customer)

# Verify the new column names
print(df_products.columns)
print(df_sales.columns)
print(df_customers.columns)


['product_id', 'category', 'sub-category', 'product_name']
['order_line', 'order_id', 'order_date', 'ship_date', 'ship_mode', 'customer_id', 'product_id', 'sales', 'quantity', 'discount', 'profit']
['customer_id', 'customer_name', 'segment', 'age', 'country', 'city', 'state', 'postal_code', 'region']


In [0]:
from pyspark.sql.functions import col, sum

def check_missing_values(df):
    return df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Check missing values in each DataFrame
check_missing_values(df_products).show()
check_missing_values(df_sales).show()
check_missing_values(df_customers).show()


+----------+--------+------------+------------+
|product_id|category|sub-category|product_name|
+----------+--------+------------+------------+
|         0|       0|           0|           0|
+----------+--------+------------+------------+

+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
|order_line|order_id|order_date|ship_date|ship_mode|customer_id|product_id|sales|quantity|discount|profit|
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
|         0|       0|         0|        0|        0|          0|         0|    0|       0|       0|     0|
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+

+-----------+-------------+-------+---+-------+----+-----+-----------+------+
|customer_id|customer_name|segment|age|country|city|state|postal_code|region|
+-----------+-------------+-------+---+-------+----+-----+---------

In [0]:
from pyspark.sql.types import IntegerType, FloatType, DateType
from pyspark.sql.functions import to_date

df_sale = df_sales.withColumn("order_line", col("order_line").cast(IntegerType())) \
                   .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))\

# df_product = df_product.withColumn("price", col("price").cast(FloatType()))

# df_customer = df_customer.withColumn("customer_id", col("customer_id").cast(IntegerType()))


In [0]:
df_sale.show(5)

+----------+--------------+----------+----------+--------------+-----------+---------------+--------+--------+--------+--------+
|order_line|      order_id|order_date| ship_date|     ship_mode|customer_id|     product_id|   sales|quantity|discount|  profit|
+----------+--------------+----------+----------+--------------+-----------+---------------+--------+--------+--------+--------+
|         1|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|   CG-12520|FUR-BO-10001798|  261.96|       2|     0.0| 41.9136|
|         2|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|   CG-12520|FUR-CH-10000454|  731.94|       3|     0.0| 219.582|
|         3|CA-2016-138688|2016-06-12|2016-06-16|  Second Class|   DV-13045|OFF-LA-10000240|   14.62|       2|     0.0|  6.8714|
|         4|US-2015-108966|2015-10-11|2015-10-18|Standard Class|   SO-20335|FUR-TA-10000577|957.5775|       5|    0.45|-383.031|
|         5|US-2015-108966|2015-10-11|2015-10-18|Standard Class|   SO-20335|OFF-ST-10000760|  22.

In [0]:
df_sale.printSchema()

root
 |-- order_line: integer (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_mode: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)
 |-- profit: double (nullable = true)



In [0]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub-category: string (nullable = true)
 |-- product_name: string (nullable = true)



In [0]:
df_customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: integer (nullable = true)
 |-- region: string (nullable = true)



In [0]:
# Join sales with product data
df_sales_product = df_sales.join(df_products, on="product_id", how="left")

# Join the result with customer data
df_final = df_sales_product.join(df_customers, on="customer_id", how="left")

# Show final integrated dataset
df_final.show(5)


+-----------+---------------+----------+--------------+----------+----------+--------------+--------+--------+--------+--------+---------------+------------+--------------------+---------------+---------+---+-------------+---------------+----------+-----------+------+
|customer_id|     product_id|order_line|      order_id|order_date| ship_date|     ship_mode|   sales|quantity|discount|  profit|       category|sub-category|        product_name|  customer_name|  segment|age|      country|           city|     state|postal_code|region|
+-----------+---------------+----------+--------------+----------+----------+--------------+--------+--------+--------+--------+---------------+------------+--------------------+---------------+---------+---+-------------+---------------+----------+-----------+------+
|   CG-12520|FUR-BO-10001798|         1|CA-2016-152156|2016-11-08|2016-11-11|  Second Class|  261.96|       2|     0.0| 41.9136|      Furniture|   Bookcases|Bush Somerset Col...|    Claire Gute

#1. Total Sales for Each Product Category

In [0]:
df_final.groupBy("category") \
    .agg(sum(col("quantity") * col("sales")).alias("total_sales")) \
    .orderBy(col("total_sales").desc()) \
    .show()


+---------------+------------------+
|       category|       total_sales|
+---------------+------------------+
|     Technology|4080261.5249999864|
|      Furniture|3859215.2288999907|
|Office Supplies| 3548585.318000002|
+---------------+------------------+



#2.Which customer has made the highest number of purchases?

In [0]:
df_final.groupBy("customer_name") \
    .agg(sum("quantity").alias("total_purchases")) \
    .orderBy(col("total_purchases").desc()) \
    .limit(1) \
    .show()


+----------------+---------------+
|   customer_name|total_purchases|
+----------------+---------------+
|Jonathan Doherty|            150|
+----------------+---------------+



#3.What is the average discount given on sales across all products?

In [0]:
from pyspark.sql.functions import avg
df_final.agg(avg("discount").alias("average_discount")).show()

+-------------------+
|   average_discount|
+-------------------+
|0.15620272163298934|
+-------------------+



#4.How many unique products were sold in each region?

In [0]:
from pyspark.sql.functions import countDistinct

df_final.groupBy("region") \
    .agg(countDistinct("product_id").alias("unique_products_sold")) \
    .show()


+-------+--------------------+
| region|unique_products_sold|
+-------+--------------------+
|  South|                1059|
|Central|                1316|
|   East|                1408|
|   West|                1536|
+-------+--------------------+



#5.What is the total profit generated in each state?

In [0]:
df_final.groupBy("state") \
    .agg(sum("profit").alias("total_profit")) \
    .orderBy(col("total_profit").desc()) \
    .show()


+-------------+------------------+
|        state|      total_profit|
+-------------+------------------+
|   California| 59398.31250000002|
|     New York|58177.834100000066|
|   Washington|24405.796599999983|
|        Texas| 20528.91100000002|
| Pennsylvania|13604.935000000007|
|      Georgia|12781.342599999998|
|      Arizona| 9563.200100000004|
|     Illinois| 9560.145599999993|
|    Wisconsin| 8569.869700000003|
|     Michigan|7752.2969000000085|
|    Minnesota| 7202.522500000001|
|     Virginia| 6940.111200000005|
|         Ohio| 5985.887000000001|
|Massachusetts|         5905.5446|
|     Kentucky| 4513.313999999998|
|    Tennessee| 3434.276499999999|
|     Delaware| 3336.382700000002|
|      Alabama|         2845.0624|
|      Indiana| 2707.349500000002|
|    Louisiana|         2659.2401|
+-------------+------------------+
only showing top 20 rows



#6.Which product sub-category has the highest sales?

In [0]:
df_final.groupBy("sub-category") \
    .agg(sum("quantity").alias("total_units_sold")) \
    .orderBy(col("total_units_sold").desc()) \
    .limit(1) \
    .show()


+------------+----------------+
|sub-category|total_units_sold|
+------------+----------------+
|     Binders|            5974|
+------------+----------------+



#7.What is the average age of customers in each segment?

In [0]:
df_final.groupBy("segment") \
    .agg(avg("age").alias("average_age")) \
    .show()


+-----------+------------------+
|    segment|       average_age|
+-----------+------------------+
|   Consumer| 44.60585628973223|
|Home Office| 43.28210880538418|
|  Corporate|44.816556291390725|
+-----------+------------------+



#8.How many orders were shipped in each shipping mode?

In [0]:
from pyspark.sql.functions import count

df_final.groupBy("ship_mode") \
    .agg(count("*").alias("total_orders_shipped")) \
    .orderBy(col("total_orders_shipped").desc()) \
    .show()


+--------------+--------------------+
|     ship_mode|total_orders_shipped|
+--------------+--------------------+
|Standard Class|                5968|
|  Second Class|                1945|
|   First Class|                1538|
|      Same Day|                 543|
+--------------+--------------------+



#9.What is the total quantity of products sold in each city?

In [0]:
df_final.groupBy("city") \
    .agg(sum("quantity").alias("total_quantity_sold")) \
    .orderBy(col("total_quantity_sold").desc()) \
    .show()


+-------------+-------------------+
|         city|total_quantity_sold|
+-------------+-------------------+
|New York City|               3217|
|  Los Angeles|               2756|
| Philadelphia|               2299|
|San Francisco|               1773|
|      Houston|               1425|
|      Seattle|               1371|
|      Chicago|               1153|
|     Columbus|                854|
|       Aurora|                611|
|    San Diego|                609|
|       Dallas|                602|
| Jacksonville|                362|
|      Detroit|                332|
|  Springfield|                282|
|    Rochester|                279|
|    Charlotte|                275|
|   Wilmington|                271|
|       Tucson|                257|
|      Phoenix|                256|
|        Dover|                256|
+-------------+-------------------+
only showing top 20 rows



#10.Which customer segment has the highest profit margin?

In [0]:
df_final = df_final.withColumn("price", (col("sales") / col("quantity")))

In [0]:
df_final.groupBy("segment") \
    .agg(avg(col("profit") / (col("quantity") * col("price"))).alias("average_profit_margin")) \
    .orderBy(col("average_profit_margin").desc()) \
    .show()

+-----------+---------------------+
|    segment|average_profit_margin|
+-----------+---------------------+
|Home Office|  0.14286958506103364|
|  Corporate|  0.12120260868746456|
|   Consumer|   0.1120495213315656|
+-----------+---------------------+

