In [1]:
################ Dmart analysis using pyspark #####################

from pyspark.sql import SparkSession
import logging
from functools import reduce
from pyspark.sql.functions import col,current_date,to_date


# Clean up any existing handlers (important in Colab!)
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# --- Setup Logging ---
log_file = 'dmart.log'
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.setLevel(logging.INFO)


# Creating Spark session
spark = SparkSession.builder.appName("Dmart analysis using pyspark").getOrCreate()


# Loading CSVs into PySpark DataFrames

# Trying to load customer csv file
try:
  customer_df = spark.read.csv("/content/Customer.csv", header=True, inferSchema=True)
  logging.info("Customer data loaded successfully")
except Exception as e:
  logging.error(f"Error loading Customer data: {e}")

# Trying to load products csv file
try:
  products_df = spark.read.csv("/content/Product.csv", header=True, inferSchema=True)
  logging.info("Product data loaded successfully")
except Exception as e:
  logging.error(f"Error loading Product data: {e}")

# Trying to load sales csv file
try:
  sales_df = spark.read.csv("/content/Sales.csv", header=True, inferSchema=True)
  logging.info("Sales data loaded successfully")
except Exception as e:
  logging.error(f"Error loading Sales data: {e}")


# Preview dataframes
if customer_df:
    logger.info("Customer DataFrame")
    customer_df.show(5)

if products_df:
    logger.info("Products DataFrame")
    products_df.show(5)

if sales_df:
    logger.info("Sales DataFrame")
    sales_df.show(5)

# --- Show log file contents ---
!cat dmart.log

+-----------+---------------+---------+---+-------------+---------------+--------------+-----------+------+
|Customer ID|  Customer Name|  Segment|Age|      Country|           City|         State|Postal Code|Region|
+-----------+---------------+---------+---+-------------+---------------+--------------+-----------+------+
|   CG-12520|    Claire Gute| Consumer| 67|United States|      Henderson|      Kentucky|      42420| South|
|   DV-13045|Darrin Van Huff|Corporate| 31|United States|    Los Angeles|    California|      90036|  West|
|   SO-20335| Sean O'Donnell| Consumer| 65|United States|Fort Lauderdale|       Florida|      33311| South|
|   BH-11710|Brosina Hoffman| Consumer| 20|United States|    Los Angeles|    California|      90032|  West|
|   AA-10480|   Andrew Allen| Consumer| 50|United States|        Concord|North Carolina|      28027| South|
+-----------+---------------+---------+---+-------------+---------------+--------------+-----------+------+
only showing top 5 rows

+--

In [None]:
customer_df.count()


793

In [None]:
products_df.count()

1862

In [None]:
sales_df.count()

9994

In [2]:
################# Data Cleaning and Transformation#####################

# Remove duplicates
customer_df = customer_df.dropDuplicates()
products_df = products_df.dropDuplicates()
sales_df = sales_df.dropDuplicates()
logging.info("Duplicates removed successfully")

In [3]:
# Correct data types (e.g., ensuring Order date is in a Date format, Quantity is an integer, etc.).
customer_df.printSchema()
customer_df.show(5)

products_df.printSchema()
products_df.show(5)

sales_df.printSchema()
sales_df.show(5)

logging.info("Data types corrected successfully")

root
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Region: string (nullable = true)

+-----------+-----------------+-----------+---+-------------+------------+------------+-----------+-------+
|Customer ID|    Customer Name|    Segment|Age|      Country|        City|       State|Postal Code| Region|
+-----------+-----------------+-----------+---+-------------+------------+------------+-----------+-------+
|   PO-19180|Philisse Overcash|Home Office| 46|United States|     Chicago|    Illinois|      60623|Central|
|   MD-17860|Michael Dominguez|  Corporate| 21|United States|Indianapolis|     Indiana|      46203|Central|
|   AA-10645|    Anna Andreadi|   Consumer| 32|United States|     Chester|Pennsylvania|      1901

In [4]:
# Handle missing or null values by either dropping or filling them.

# Identifying the null values
customer_df_with_missing_values = customer_df.filter(
    reduce(lambda a, b: a | b, [col(c).isNull() for c in customer_df.columns])
)
customer_df_with_missing_values.show()

products_df_with_missing_values = products_df.filter(
    reduce(lambda a, b: a | b, [col(c).isNull() for c in products_df.columns])
)
products_df_with_missing_values.show()

sales_df_with_missing_values = sales_df.filter(
    reduce(lambda a, b: a | b, [col(c).isNull() for c in sales_df.columns])
)
sales_df_with_missing_values.show()

logging.info("Null values identified successfully")

+-----------+-------------+-------+---+-------+----+-----+-----------+------+
|Customer ID|Customer Name|Segment|Age|Country|City|State|Postal Code|Region|
+-----------+-------------+-------+---+-------+----+-----+-----------+------+
+-----------+-------------+-------+---+-------+----+-----+-----------+------+

+----------+--------+------------+------------+
|Product ID|Category|Sub-Category|Product Name|
+----------+--------+------------+------------+
+----------+--------+------------+------------+

+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
|Order Line|Order ID|Order Date|Ship Date|Ship Mode|Customer ID|Product ID|Sales|Quantity|Discount|Profit|
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+



In [5]:
# Dropping the null rows

customer_df=customer_df.dropna()
products_df=products_df.dropna()
sales_df=sales_df.dropna()

logging.info("Null values removed successfully")

In [6]:
# Renaming the column names to snake-case for consistency

customer_df = customer_df.select([col(c).alias(c.lower().replace(' ', '_')) for c in customer_df.columns])
customer_df.show(5)
products_df = products_df.select([col(c).alias(c.lower().replace(' ', '_').replace('-', '_')) for c in products_df.columns])
products_df.show(5)
sales_df = sales_df.select([col(c).alias(c.lower().replace(' ', '_')) for c in sales_df.columns])
sales_df.show(5)

logging.info("Column names renamed successfully")

+-----------+-----------------+-----------+---+-------------+------------+------------+-----------+-------+
|customer_id|    customer_name|    segment|age|      country|        city|       state|postal_code| region|
+-----------+-----------------+-----------+---+-------------+------------+------------+-----------+-------+
|   PO-19180|Philisse Overcash|Home Office| 46|United States|     Chicago|    Illinois|      60623|Central|
|   MD-17860|Michael Dominguez|  Corporate| 21|United States|Indianapolis|     Indiana|      46203|Central|
|   AA-10645|    Anna Andreadi|   Consumer| 32|United States|     Chester|Pennsylvania|      19013|   East|
|   DB-13615|    Doug Bickford|   Consumer| 19|United States| Los Angeles|  California|      90045|   West|
|   KD-16615|         Ken Dana|  Corporate| 41|United States|  Scottsdale|     Arizona|      85254|   West|
+-----------+-----------------+-----------+---+-------------+------------+------------+-----------+-------+
only showing top 5 rows

+--

In [7]:
# Identifying invalid dates

sales_df=sales_df.withColumn("order_date",to_date(col("order_date"), "yyyy-MM-dd"))
sales_df=sales_df.withColumn("ship_date",to_date(col("ship_date"), "yyyy-MM-dd"))
invalid_dates = sales_df.filter((col("order_date") > col("ship_date"))|
                                (col("order_date").isNull())|
                                (col("ship_date").isNull())|
                                (col("order_date") > current_date())|
                                (col("ship_date") > current_date())
                                )
invalid_dates.show()


logging.info("Invalid dates identified successfully")


+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
|order_line|order_id|order_date|ship_date|ship_mode|customer_id|product_id|sales|quantity|discount|profit|
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+
+----------+--------+----------+---------+---------+-----------+----------+-----+--------+--------+------+



In [8]:
# Filling the missing values with dafault

customer_df= customer_df.fillna({
    "age":0,
    "country":"unknown",
    "state":"unknown",
    "city":"unknown",
    "postal_code":0,
    "region":"unknown"
})
products_df=products_df.fillna({
    "category":"unknown",
    "sub_category":"unknown",
    "product_name":"unknown"
})
sales_df=sales_df.fillna({
    "ship_mode":"unknown",
    "sales":0.0,
    "quantity":0,
    "discount":0.0,
    "profit":0.0
})

logging.info("Missing values filled successfully")


In [9]:
# Join the DataFrames on relevant keys (Product ID and Customer ID).

joined_df = sales_df.join(customer_df, on="customer_id").join(products_df, on="product_id")
joined_df.show(5)

logging.info("Dataframes joined successfully")

# Show the contents of log file
!cat dmart.log

joined_df.count()

+---------------+-----------+----------+--------------+----------+----------+--------------+-------+--------+--------+----------+----------------+---------+---+-------------+-------------+--------------+-----------+-------+---------------+------------+--------------------+
|     product_id|customer_id|order_line|      order_id|order_date| ship_date|     ship_mode|  sales|quantity|discount|    profit|   customer_name|  segment|age|      country|         city|         state|postal_code| region|       category|sub_category|        product_name|
+---------------+-----------+----------+--------------+----------+----------+--------------+-------+--------+--------+----------+----------------+---------+---+-------------+-------------+--------------+-----------+-------+---------------+------------+--------------------+
|OFF-ST-10002974|   TS-21610|       128|US-2017-107272|2017-11-05|2017-11-12|Standard Class|243.992|       7|     0.2|    30.499|    Troy Staebel| Consumer| 33|United States|    

9994

In [10]:
# Creating a temporary view

joined_df.createOrReplaceTempView("sales_data")
customer_df.createOrReplaceTempView("customer_data")

result=spark.sql("select * from sales_data")
result.show(5)
logging.info("Temporary views created successfully")
result.count()

+---------------+-----------+----------+--------------+----------+----------+--------------+-------+--------+--------+----------+----------------+---------+---+-------------+-------------+--------------+-----------+-------+---------------+------------+--------------------+
|     product_id|customer_id|order_line|      order_id|order_date| ship_date|     ship_mode|  sales|quantity|discount|    profit|   customer_name|  segment|age|      country|         city|         state|postal_code| region|       category|sub_category|        product_name|
+---------------+-----------+----------+--------------+----------+----------+--------------+-------+--------+--------+----------+----------------+---------+---+-------------+-------------+--------------+-----------+-------+---------------+------------+--------------------+
|OFF-ST-10002974|   TS-21610|       128|US-2017-107272|2017-11-05|2017-11-12|Standard Class|243.992|       7|     0.2|    30.499|    Troy Staebel| Consumer| 33|United States|    

9994

In [11]:
################# Run Queries on the Pyspark #####################

# 1.What is the total sales for each product category?

# Using pyspark
from pyspark.sql import functions as F

joined_df.groupBy("category") \
        .agg(F.round(F.sum("sales"),2).alias("Total_Sales_in_Rupees")) \
        .show()

# Using spark sql
result = spark.sql("SELECT category, round(SUM(sales),2) AS total_sales_in_Rupees FROM sales_data GROUP BY category")
result.show()
logging.info("Query 1 executed successfully")

+---------------+---------------------+
|       category|Total_Sales_in_Rupees|
+---------------+---------------------+
|Office Supplies|            719047.03|
|      Furniture|             741999.8|
|     Technology|            836154.03|
+---------------+---------------------+

+---------------+---------------------+
|       category|total_sales_in_Rupees|
+---------------+---------------------+
|Office Supplies|            719047.03|
|      Furniture|             741999.8|
|     Technology|            836154.03|
+---------------+---------------------+



In [12]:
# 2. Which customer has made the highest number of purchases?

# Using Pyapark
joined_df.groupBy("customer_name") \
        .agg(F.count("order_id").alias("purchase_count")) \
        .orderBy(F.desc("purchase_count")) \
        .limit(1) \
        .show()

# Using spark sql
result = spark.sql("SELECT customer_name, COUNT(order_id) AS purchase_count FROM sales_data GROUP BY customer_name ORDER BY purchase_count DESC LIMIT 1")
result.show()
logging.info("Query 2 executed successfully")

+-------------+--------------+
|customer_name|purchase_count|
+-------------+--------------+
|William Brown|            37|
+-------------+--------------+

+-------------+--------------+
|customer_name|purchase_count|
+-------------+--------------+
|William Brown|            37|
+-------------+--------------+



In [14]:
# 3. What is the average discount given on sales across all products?

# using pyspark
joined_df.agg(F.round(F.avg("discount"),2).alias("Avg_Discount")).show()

# Using spark sql
result = spark.sql("SELECT round(AVG(discount),2) AS average_discount FROM sales_data")
result.show()
logging.info("Query 3 executed successfully")

+------------+
|Avg_Discount|
+------------+
|        0.16|
+------------+

+----------------+
|average_discount|
+----------------+
|            0.16|
+----------------+



In [13]:
# 4. How many unique products were sold in each region?

# Using Pyspark
joined_df.groupBy("region") \
        .agg(F.countDistinct("product_name").alias("Unique_Products_Sold")) \
        .show()

# Using spark sql
result = spark.sql("SELECT region, COUNT(DISTINCT(product_name)) AS unique_products FROM sales_data GROUP BY region")
result.show()
logging.info("Query 4 executed successfully")

+-------+--------------------+
| region|Unique_Products_Sold|
+-------+--------------------+
|  South|                1037|
|Central|                1289|
|   East|                1374|
|   West|                1505|
+-------+--------------------+

+-------+---------------+
| region|unique_products|
+-------+---------------+
|  South|           1037|
|Central|           1289|
|   East|           1374|
|   West|           1505|
+-------+---------------+



In [15]:
# 5. What is the total profit generated in each state?

# Using Pyspark
joined_df.groupBy("state") \
        .agg(F.round(F.sum("profit"),2).alias("Total_Profit")) \
        .show()

# Using spark sql
result = spark.sql("SELECT state, round(SUM(profit),2) AS total_profit FROM sales_data GROUP BY state")
result.show()
logging.info("Query 5 executed successfully")

+--------------------+------------+
|               state|Total_Profit|
+--------------------+------------+
|                Utah|     1818.19|
|           Minnesota|     7202.52|
|                Ohio|     5985.89|
|              Oregon|      234.05|
|            Arkansas|      -62.95|
|               Texas|    20528.91|
|        Pennsylvania|    13604.93|
|         Connecticut|       533.5|
|            Nebraska|     1166.02|
|              Nevada|      278.07|
|          Washington|     24405.8|
|            Illinois|     9560.15|
|            Oklahoma|      829.02|
|District of Columbia|      490.96|
|            Delaware|     3336.38|
|          New Mexico|     1340.14|
|            Missouri|     2212.87|
|        Rhode Island|      2276.7|
|             Georgia|    12781.34|
|            Virginia|     6940.11|
+--------------------+------------+
only showing top 20 rows

+--------------------+------------+
|               state|total_profit|
+--------------------+------------+
| 

In [16]:
# 6. Which product sub-category has the highest sales?

# Using Pyspark
joined_df.groupBy("sub_category") \
        .agg(F.round(F.sum("sales"),2).alias("Total_Sales")) \
        .orderBy(F.desc("Total_Sales")) \
        .limit(1) \
        .show()

# Using spark sql
result = spark.sql("SELECT sub_category, round(SUM(sales),2) AS total_sales FROM sales_data GROUP BY sub_category ORDER BY total_sales DESC LIMIT 1")
result.show()
logging.info("Query 6 executed successfully")

+------------+-----------+
|sub_category|Total_Sales|
+------------+-----------+
|      Phones|  330007.05|
+------------+-----------+

+------------+-----------+
|sub_category|total_sales|
+------------+-----------+
|      Phones|  330007.05|
+------------+-----------+



In [17]:
# 7. What is the average age of customers in each segment?

# Using Pyspark
joined_df.groupBy("segment") \
        .agg(F.round(F.avg("age"),2).alias("Average_Age")) \
        .show()

# Using spark sql
result = spark.sql("SELECT segment, round(AVG(age),2) AS average_age FROM customer_data GROUP BY segment")
result.show()
logging.info("Query 7 executed successfully")


+-----------+-----------+
|    segment|Average_Age|
+-----------+-----------+
|   Consumer|      44.61|
|Home Office|      43.28|
|  Corporate|      44.82|
+-----------+-----------+

+-----------+-----------+
|    segment|average_age|
+-----------+-----------+
|   Consumer|       44.7|
|Home Office|      43.81|
|  Corporate|      44.47|
+-----------+-----------+



In [18]:
# 8. How many orders were shipped in each shipping mode?

# Using Pyspark
joined_df.groupBy("ship_mode") \
        .agg(F.count("order_id").alias("Order_Count")) \
        .show()

# Using spark sql
result = spark.sql("SELECT ship_mode, COUNT(order_id) AS order_count FROM sales_data GROUP BY ship_mode")
result.show()
logging.info("Query 8 executed successfully")

+--------------+-----------+
|     ship_mode|Order_Count|
+--------------+-----------+
|   First Class|       1538|
|      Same Day|        543|
|  Second Class|       1945|
|Standard Class|       5968|
+--------------+-----------+

+--------------+-----------+
|     ship_mode|order_count|
+--------------+-----------+
|   First Class|       1538|
|      Same Day|        543|
|  Second Class|       1945|
|Standard Class|       5968|
+--------------+-----------+



In [19]:
# 9. What is the total quantity of products sold in each city?

# Using Pyspark
joined_df.groupBy("city") \
        .agg(F.sum("quantity").alias("Total_Quantity")) \
        .show()

# Using spark sql
result = spark.sql("SELECT city, SUM(quantity) AS total_quantity FROM sales_data GROUP BY city")
result.show()
logging.info("Query 9 executed successfully")


+---------------+--------------+
|           city|Total_Quantity|
+---------------+--------------+
|          Tyler|            22|
|    Springfield|           282|
|  Bowling Green|            89|
|         Auburn|           214|
|North Las Vegas|            29|
|        Phoenix|           256|
|  Lake Elsinore|            35|
|         Monroe|           184|
| Pembroke Pines|            48|
|       Westland|            54|
|    Lindenhurst|            42|
|         Marion|            77|
|          Omaha|            43|
|   Fort Collins|            55|
|        Everett|            24|
|     Greensboro|            51|
|   Lincoln Park|            28|
|       Franklin|           184|
|         Dallas|           602|
|      Encinitas|            87|
+---------------+--------------+
only showing top 20 rows

+---------------+--------------+
|           city|total_quantity|
+---------------+--------------+
|          Tyler|            22|
|    Springfield|           282|
|  Bowling Green|

In [20]:
# 10. Which customer segment has the highest profit margin?

# Using Pyspark
joined_df.groupBy("segment") \
        .agg(F.round(F.sum("profit")/F.sum("sales"),2).alias("Profit_Margin")) \
        .orderBy(F.desc("Profit_Margin")) \
        .limit(1) \
        .show()

# Using spark sql
result = spark.sql("SELECT segment, round(SUM(profit)/SUM(sales),2) AS profit_margin FROM sales_data GROUP BY segment ORDER BY profit_margin DESC LIMIT 1")
result.show()
logging.info("Query 10 executed successfully")

+-----------+-------------+
|    segment|Profit_Margin|
+-----------+-------------+
|Home Office|         0.14|
+-----------+-------------+

+-----------+-------------+
|    segment|profit_margin|
+-----------+-------------+
|Home Office|         0.14|
+-----------+-------------+



In [21]:
# log file
!cat dmart.log

2025-05-09 09:52:23,308 - INFO - Customer data loaded successfully
2025-05-09 09:52:24,282 - INFO - Product data loaded successfully
2025-05-09 09:52:25,218 - INFO - Sales data loaded successfully
2025-05-09 09:52:25,219 - INFO - Customer DataFrame
2025-05-09 09:52:25,694 - INFO - Products DataFrame
2025-05-09 09:52:26,142 - INFO - Sales DataFrame
2025-05-09 09:53:01,177 - INFO - Duplicates removed successfully
2025-05-09 09:53:07,198 - INFO - Data types corrected successfully
2025-05-09 09:53:20,020 - INFO - Null values identified successfully
2025-05-09 09:53:29,145 - INFO - Null values removed successfully
2025-05-09 09:53:34,442 - INFO - Column names renamed successfully
2025-05-09 09:53:45,794 - INFO - Invalid dates identified successfully
2025-05-09 09:53:55,225 - INFO - Missing values filled successfully
2025-05-09 09:54:06,052 - INFO - Dataframes joined successfully
2025-05-09 09:54:17,878 - INFO - Temporary views created successfully
2025-05-09 09:54:28,078 - INFO - Query 1 ex