In [25]:
#################### Walmart Sales Data Pipeline using Pyspark ##############################
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import logging


# Clean up any existing handlers (important in Colab!)
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# --- Setup Logging ---
log_file = 'walmart.log'
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Creating Spark session
spark = SparkSession.builder.appName("Walmart Sales Data Pipeline using Pyspark").getOrCreate()

# Loading TSVs into PySpark DataFrames

# Trying to load customers tsv file
try:
  customers_df = spark.read.csv("/content/customers.tsv", sep="\t", header=False, inferSchema=True)
  logging.info("Customers data loaded successfully")
except Exception as e:
  logging.error(f"Error loading Customer data: {e}")

# Trying to load sales transactions tsv file
try:
  salestxns_df = spark.read.csv("/content/salestxns.tsv", sep="\t", header=False, inferSchema=True)
  logging.info("Sales transactions data loaded successfully")
except Exception as e:
  logging.error(f"Error loading Sales data: {e}")

# Assign proper column names
customers_df = customers_df.toDF("Customer_Id", "Name", "City", "State", "Zip_Code")
salestxns_df = salestxns_df.toDF("Sales_Txn_Id", "Category_Id", "Category_Name","Product_Id",
                         "Product_Name", "Price", "Quantity", "Customer_Id"
)

# Preview dataframes
if customers_df:
    logger.info("Customers DataFrame:")
    print("Customers DataFrame:")
    customers_df.show(5)

if salestxns_df:
    logger.info("Sales Transactions DataFrame:")
    print("Sales Transactions DataFrame:")
    salestxns_df.show(5)

# --- Show log file contents ---
!cat walmart.log

Customers DataFrame:
+-----------+--------------+---------+-----+--------+
|Customer_Id|          Name|     City|State|Zip_Code|
+-----------+--------------+---------+-----+--------+
|      11039|   Mary Torres|   Caguas|   PR|     725|
|       5623|    Jose Haley| Columbus|   OH|   43207|
|       5829|    Mary Smith|  Houston|   TX|   77015|
|       6336|Richard Maddox|   Caguas|   PR|     725|
|       1708|Margaret Booth|Arlington|   TX|   76010|
+-----------+--------------+---------+-----+--------+
only showing top 5 rows

Sales Transactions DataFrame:
+------------+-----------+----------------+----------+--------------------+------+--------+-----------+
|Sales_Txn_Id|Category_Id|   Category_Name|Product_Id|        Product_Name| Price|Quantity|Customer_Id|
+------------+-----------+----------------+----------+--------------------+------+--------+-----------+
|           1|         43|Camping & Hiking|       957|Diamondback Women...|299.98|       1|      11599|
|           2|        

In [2]:
customers_df.count()

1244

In [3]:
salestxns_df.count()

172198

In [4]:
################# Data Cleaning and Transformation#####################

# Remove duplicates
customers_df = customers_df.dropDuplicates()
salestxns_df = salestxns_df.dropDuplicates()
logging.info("Duplicates removed successfully")
print("Duplicates removed successfully")


Duplicates removed successfully


In [5]:
# Dropping null rows
customers_df = customers_df.dropna()
salestxns_df = salestxns_df.dropna()
logging.info("Missing values handled successfully")
print("Missing values handled successfully")


Missing values handled successfully


In [26]:
# Renaming the column names to snake-case for consistency
customers_df = customers_df.select([col(c).alias(c.lower().replace(' ', '_')) for c in customers_df.columns])
salestxns_df = salestxns_df.select([col(c).alias(c.lower().replace(' ', '_')) for c in salestxns_df.columns])

logging.info("Column names renamed successfully")
print("Column names renamed successfully")

customers_df.show(5)
salestxns_df.show(5)


Column names renamed successfully
+-----------+--------------+---------+-----+--------+
|customer_id|          name|     city|state|zip_code|
+-----------+--------------+---------+-----+--------+
|      11039|   Mary Torres|   Caguas|   PR|     725|
|       5623|    Jose Haley| Columbus|   OH|   43207|
|       5829|    Mary Smith|  Houston|   TX|   77015|
|       6336|Richard Maddox|   Caguas|   PR|     725|
|       1708|Margaret Booth|Arlington|   TX|   76010|
+-----------+--------------+---------+-----+--------+
only showing top 5 rows

+------------+-----------+----------------+----------+--------------------+------+--------+-----------+
|sales_txn_id|category_id|   category_name|product_id|        product_name| price|quantity|customer_id|
+------------+-----------+----------------+----------+--------------------+------+--------+-----------+
|           1|         43|Camping & Hiking|       957|Diamondback Women...|299.98|       1|      11599|
|           2|         48|    Water Spo

In [27]:
# Ensuring correct data types

customers_df = customers_df.withColumn("customer_id", col("customer_id").cast("int"))
salestxns_df = salestxns_df.withColumn("price", col("price").cast("double"))
salestxns_df = salestxns_df.withColumn("quantity", col("quantity").cast("int"))

logging.info("Data types ensured successfully")
print("Data types ensured successfully")

customers_df.printSchema()
salestxns_df.printSchema()


Data types ensured successfully
root
 |-- customer_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: integer (nullable = true)

root
 |-- sales_txn_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- customer_id: integer (nullable = true)



In [28]:
# Adding "total_amount" column in sales transaction
salestxns_df = salestxns_df.withColumn("total_amount", col("price") * col("quantity"))

logging.info("Total amount column added successfully")
print("Total amount column added successfully")
salestxns_df.show(5)

Total amount column added successfully
+------------+-----------+----------------+----------+--------------------+------+--------+-----------+------------+
|sales_txn_id|category_id|   category_name|product_id|        product_name| price|quantity|customer_id|total_amount|
+------------+-----------+----------------+----------+--------------------+------+--------+-----------+------------+
|           1|         43|Camping & Hiking|       957|Diamondback Women...|299.98|       1|      11599|      299.98|
|           2|         48|    Water Sports|      1073|Pelican Sunstream...|199.99|       1|        256|      199.99|
|           3|         24| Women's Apparel|       502|Nike Men's Dri-FI...|  50.0|       5|        256|       250.0|
|           4|         18|  Men's Footwear|       403|Nike Men's CJ Eli...|129.99|       1|        256|      129.99|
|           5|         40|     Accessories|       897|Team Golf New Eng...| 24.99|       2|       8827|       49.98|
+------------+-----------

In [29]:
# Join the DataFrames on relevant keys

joined_df = salestxns_df.join(customers_df, on="customer_id")
joined_df.show(5)

logging.info("Dataframes joined successfully")
print("Dataframes joined successfully")

joined_df.count()

+-----------+------------+-----------+----------------+----------+--------------------+------+--------+------------+-----------+-------+-----+--------+
|customer_id|sales_txn_id|category_id|   category_name|product_id|        product_name| price|quantity|total_amount|       name|   city|state|zip_code|
+-----------+------------+-----------+----------------+----------+--------------------+------+--------+------------+-----------+-------+-----+--------+
|       9488|          58|         45|         Fishing|      1004|Field & Stream Sp...|399.98|       1|      399.98| Mary Smith|Hialeah|   FL|   33012|
|       9488|          59|         43|Camping & Hiking|       957|Diamondback Women...|299.98|       1|      299.98| Mary Smith|Hialeah|   FL|   33012|
|        656|          78|          9|Cardio Equipment|       191|Nike Men's Free 5...| 99.99|       4|      399.96|Julie Smith| Pomona|   CA|   91767|
|        656|          79|         43|Camping & Hiking|       957|Diamondback Women...|2

17448

In [30]:
# Creating a temporary view

joined_df.createOrReplaceTempView("sales_data")
customers_df.createOrReplaceTempView("customer_data")
salestxns_df.createOrReplaceTempView("sales_trans_data")
logging.info("Temporary views created successfully")
print("Temporary views created successfully")
result=spark.sql("select * from sales_data")
result.show(5)
result.count()

Temporary views created successfully
+-----------+------------+-----------+----------------+----------+--------------------+------+--------+------------+-----------+-------+-----+--------+
|customer_id|sales_txn_id|category_id|   category_name|product_id|        product_name| price|quantity|total_amount|       name|   city|state|zip_code|
+-----------+------------+-----------+----------------+----------+--------------------+------+--------+------------+-----------+-------+-----+--------+
|       9488|          58|         45|         Fishing|      1004|Field & Stream Sp...|399.98|       1|      399.98| Mary Smith|Hialeah|   FL|   33012|
|       9488|          59|         43|Camping & Hiking|       957|Diamondback Women...|299.98|       1|      299.98| Mary Smith|Hialeah|   FL|   33012|
|        656|          78|          9|Cardio Equipment|       191|Nike Men's Free 5...| 99.99|       4|      399.96|Julie Smith| Pomona|   CA|   91767|
|        656|          79|         43|Camping & Hik

17448

In [11]:
# 1. Total Number of Customers:
# How many unique customers are there in the dataset?
# Using pyspark
result = customers_df.select("customer_id").distinct().count()
print("Total Number of Customers:", result)
# Using spark sql
result =spark.sql("select count(distinct(customer_id)) from customer_data")
result .show()
logger.info("Total Number of Customers calculated successfully")

Total Number of Customers: 1244
+---------------------------+
|count(DISTINCT customer_id)|
+---------------------------+
|                       1244|
+---------------------------+



In [12]:
from pyspark.sql.functions import sum,round,count,desc,avg,col

In [13]:
# 2. Total Sales by State:
# What is the total sales amount for each state?
# Using pyspark
print("Total Sales by State:")
result = joined_df.groupBy("state").agg(round(sum("total_amount"),2).alias("total_sales"))
result.show(5)
# Using spark sql
result= spark.sql("select state,round(sum(total_amount),2) as total_sales from sales_data group by state")
result.show(5)
logger.info("Total Sales by State calculated successfully")

Total Sales by State:
+-----+-----------+
|state|total_sales|
+-----+-----------+
|   AZ|   48702.68|
|   SC|    4144.68|
|   LA|   24449.42|
|   MN|     3549.6|
|   NJ|   52303.09|
+-----+-----------+
only showing top 5 rows

+-----+-----------+
|state|total_sales|
+-----+-----------+
|   AZ|   48702.68|
|   SC|    4144.68|
|   LA|   24449.42|
|   MN|     3549.6|
|   NJ|   52303.09|
+-----+-----------+
only showing top 5 rows



In [14]:
# 3. Top 10 Most Purchased Products:
# Which are the top 10 most purchased products based on the quantity sold?
# Using Pyspark
print("Top 10 most purchased products:")
result=salestxns_df.groupBy("product_id","product_name")\
        .agg(sum("quantity").alias("total_quantity"))\
        .orderBy(desc("total_quantity"))\
        .limit(10)
result.show(10)
# Usingspark sql
result=spark.sql("select product_id,product_name,sum(quantity) as total_quantity \
                  from sales_trans_data \
                  group by product_id,product_name \
                  order by total_quantity \
                 desc limit 10")
result.show(10)
logger.info("Top 10 most purchased products calculated successfully")


Top 10 most purchased products:
+----------+--------------------+--------------+
|product_id|        product_name|total_quantity|
+----------+--------------------+--------------+
|       365|Perfect Fitness P...|         73698|
|       502|Nike Men's Dri-FI...|         62956|
|      1014|O'Brien Men's Neo...|         57803|
|       191|Nike Men's Free 5...|         36680|
|       627|Under Armour Girl...|         31735|
|       403|Nike Men's CJ Eli...|         22246|
|      1004|Field & Stream Sp...|         17325|
|      1073|Pelican Sunstream...|         15500|
|       957|Diamondback Women...|         13729|
|       977|ENO Atlas Hammock...|           998|
+----------+--------------------+--------------+

+----------+--------------------+--------------+
|product_id|        product_name|total_quantity|
+----------+--------------------+--------------+
|       365|Perfect Fitness P...|         73698|
|       502|Nike Men's Dri-FI...|         62956|
|      1014|O'Brien Men's Neo...|   

In [15]:
# 4. Average Transaction Value:
# What is the average price of transactions across all sales?
# Using Pyspark
print("Average Transaction Value:")
result=joined_df.agg(round(avg(col("price")*col("quantity")),2).alias("Avg_transaction_value"))
result.show(5)
# Using spark sql
result=spark.sql("select round(avg(price*quantity),2) as Avg_transaction_value from sales_data")
result.show(5)
logger.info("Average Transaction Value calculated successfully")

Average Transaction Value:
+---------------------+
|Avg_transaction_value|
+---------------------+
|               198.57|
+---------------------+

+---------------------+
|Avg_transaction_value|
+---------------------+
|               198.57|
+---------------------+



In [16]:
# 5. Top 5 Customers by Expenditure:
# Who are the top 5 customers by total amount spent?
# Using pyspark
print("Top 5 Customers by Expenditure:")
result=joined_df.groupBy("customer_id","name")\
        .agg(round(sum("total_amount"),2).alias("total_expenditure"))\
        .orderBy(desc("total_expenditure"))\
        .limit(5)
result.show(5)
# Using spark sql
result=spark.sql("select customer_id,name,round(sum(total_amount),2) as total_expenditure \
                  from sales_data \
                  group by customer_id,name \
                  order by total_expenditure \
                  desc limit 5")
result.show(5)
logger.info("Top 5 Customers by Expenditure calculated successfully")

Top 5 Customers by Expenditure:
+-----------+-----------------+-----------------+
|customer_id|             name|total_expenditure|
+-----------+-----------------+-----------------+
|       9371|   Mary Patterson|          9299.03|
|        664|    Bobby Jimenez|          8394.26|
|      12431|        Mary Rios|          8073.15|
|      10591| Deborah Humphrey|          7889.05|
|       9271|Christopher Smith|          7665.25|
+-----------+-----------------+-----------------+

+-----------+-----------------+-----------------+
|customer_id|             name|total_expenditure|
+-----------+-----------------+-----------------+
|       9371|   Mary Patterson|          9299.03|
|        664|    Bobby Jimenez|          8394.26|
|      12431|        Mary Rios|          8073.15|
|      10591| Deborah Humphrey|          7889.05|
|       9271|Christopher Smith|          7665.25|
+-----------+-----------------+-----------------+



In [17]:
# 6. Product Purchases by a Specific Customer:
# List all products purchased by a specific customer (e.g., customer with ID 256),
# including the product name, quantity, and total amount spent.

# Using pyspark
print("Product Purchases by a Specific Customer:")
result = (
    joined_df.filter(col("customer_id") == 9371)
    .withColumn("total_amount", col("price") * col("quantity"))
    .groupBy("product_id", "product_name")
    .agg(
        sum("quantity").alias("total_quantity"),
        sum("total_amount").alias("total_spent")
    )
    .orderBy(desc("total_spent"))
)

result.show(truncate=False)

# Using spark sql
result = spark.sql("select product_id,product_name,\
        sum(quantity) AS total_quantity,\
        sum(price * quantity) AS total_spent\
    from sales_data\
    where customer_id = 9371\
    group by product_id, product_name\
    order by total_spent desc")
result.show()
logger.info("Product Purchases by a Specific Customer calculated successfully")

Product Purchases by a Specific Customer:
+----------+---------------------------------------------+--------------+------------------+
|product_id|product_name                                 |total_quantity|total_spent       |
+----------+---------------------------------------------+--------------+------------------+
|1004      |Field & Stream Sportsman 16 Gun Fire Safe    |8             |3199.84           |
|365       |Perfect Fitness Perfect Rip Deck             |26            |1559.74           |
|957       |Diamondback Women's Serene Classic Comfort Bi|4             |1199.92           |
|403       |Nike Men's CJ Elite 2 TD Football Cleat      |6             |779.94            |
|1014      |O'Brien Men's Neoprene Life Vest             |14            |699.72            |
|502       |Nike Men's Dri-FIT Victory Golf Polo         |9             |450.0             |
|1073      |Pelican Sunstream 100 Kayak                  |2             |399.98            |
|191       |Nike Men's Free 

In [20]:
# 7. Monthly Sales Trends:
# Assuming there is a date field, analyze the sales trends over the months.
# Which month had the highest sales?
# Using pyspark
print("Monthly Sales Trends:")
from pyspark.sql.functions import month
"""monthly_sales = joined_df.groupBy("month") \
                        .agg(sum("total_amount").alias("total_sales")) \
                        .orderBy(desc("total_sales"))

# Using spark sql
result = spark.sql("select MONTH(txn_date) AS month\
                    SUM(price * quantity) AS total_sales\
                    from sales_dtata\
                    group by MONTH(txn_date)\
                    order by total_sales desc")"""

logger.info("Monthly sales trend identified")

Monthly Sales Trends:


In [21]:
# 8. Category with Highest Sales:
# Which product category generated the highest total sales revenue?

# Using pyspark
print("Category with Highest Sales:")
category_sales = joined_df.groupBy("category_name") \
                         .agg(round(sum("total_amount"),2).alias("total_sales")) \
                         .orderBy(desc("total_sales"))
category_sales.show(1)
# Using spark sql
category_sales = spark.sql("select category_name,\
                    round(SUM(price * quantity),2) AS total_sales\
                    from sales_data\
                    group by category_name\
                    order by total_sales desc")
category_sales.show(1)
logger.info("Category with highest sales identified")

Category with Highest Sales:
+-------------+-----------+
|category_name|total_sales|
+-------------+-----------+
|      Fishing|   701964.9|
+-------------+-----------+
only showing top 1 row

+-------------+-----------+
|category_name|total_sales|
+-------------+-----------+
|      Fishing|   701964.9|
+-------------+-----------+
only showing top 1 row



In [22]:
# 9. State-wise Sales Comparison:
# Compare the total sales between two specific states (e.g., Texas vs. Ohio).
# Which state had higher sales?
# Using pyspark
print("State-wise Sales Comparison:")
states_to_compare=["TX","OH"]
state_sales = joined_df.filter(col("state").isin(states_to_compare))\
                       .groupBy("state") \
                       .agg(round(sum("total_amount"),2).alias("total_sales")) \
                       .orderBy(desc("total_sales"))
state_sales.show()
# Using spark sql
state_sales = spark.sql("select state,\
                    round(SUM(price * quantity),2) AS total_sales\
                    from sales_data\
                    where state in ('TX', 'OH')\
                    group by state\
                    order by total_sales desc")
state_sales.show()
logger.info("State-wise sales comparison identified")

State-wise Sales Comparison:
+-----+-----------+
|state|total_sales|
+-----+-----------+
|   TX|   184629.3|
|   OH|   82342.95|
+-----+-----------+

+-----+-----------+
|state|total_sales|
+-----+-----------+
|   TX|   184629.3|
|   OH|   82342.95|
+-----+-----------+



In [23]:
# 10. Detailed Customer Purchase Report:
# Generate a detailed report showing each customer along with their total purchases,
# the total number of transactions they have made, and
# the average transaction value.
# Using pyspark
print("Detailed Customer Purchase Report:")
customer_report = (joined_df.groupBy("customer_id", "name")
                   .agg(round(sum("total_amount"),2).alias("total_purchases"),
                    count("sales_txn_id").alias("num_transactions"),
                    round(avg("total_amount"), 2).alias("avg_transaction_value")
                   )
                   .orderBy(col("total_purchases").desc())
                  )
customer_report.show(5)
# Using spark sql
customer_report = spark.sql("select customer_id,name,\
                    round(sum(total_amount),2) AS total_purchases,\
                    count(sales_txn_id) AS num_transactions,\
                    round(avg(total_amount),2) as avg_transaction_value\
                    from sales_data\
                    group by customer_id,name\
                    order by total_purchases desc")
customer_report.show(5)

# customer_report.coalesce(1).write.mode("overwrite").option("header", True).csv("output/customer_report_csv")
customer_report.toPandas().to_csv("customer_report.csv", index=False)


logger.info("Customer report created successfully")


Detailed Customer Purchase Report:
+-----------+-----------------+---------------+----------------+---------------------+
|customer_id|             name|total_purchases|num_transactions|avg_transaction_value|
+-----------+-----------------+---------------+----------------+---------------------+
|       9371|   Mary Patterson|        9299.03|              44|               211.34|
|        664|    Bobby Jimenez|        8394.26|              39|               215.24|
|      12431|        Mary Rios|        8073.15|              36|               224.25|
|      10591| Deborah Humphrey|        7889.05|              45|               175.31|
|       9271|Christopher Smith|        7665.25|              35|               219.01|
+-----------+-----------------+---------------+----------------+---------------------+
only showing top 5 rows

+-----------+-----------------+---------------+----------------+---------------------+
|customer_id|             name|total_purchases|num_transactions|avg_tr

In [48]:
################### Creating All results into one excel report ######################
import os
import pandas as pd
from pyspark.sql.functions import round as _round, countDistinct, count, avg, sum as _sum, date_format


# 1. Total Number of Customers
query1 = "SELECT COUNT(DISTINCT customer_id) AS unique_customers FROM customer_data"
unique_customers_df = spark.sql(query1)
unique_customers_df.show()


# 2. Total Sales by State
query2 = "SELECT state, ROUND(SUM(total_amount),2) AS total_sales FROM sales_data GROUP BY state ORDER BY total_sales DESC"
sales_by_state_df = spark.sql(query2)
sales_by_state_df.show()


# 3. Top 10 Most Purchased Products
query3 = "SELECT product_id, product_name, SUM(quantity) AS total_quantity_sold FROM sales_data GROUP BY product_id, product_name ORDER BY total_quantity_sold DESC LIMIT 10"
top10_products_df = spark.sql(query3)
top10_products_df.show()


# 4. Average Transaction Value
query4 = "SELECT ROUND(AVG(total_amount),2) AS avg_transaction_value FROM sales_data"
avg_txn_df = spark.sql(query4)
avg_txn_df.show()


# 5. Top 5 Customers by Expenditure
query5 = "SELECT c.customer_id, c.name, ROUND(SUM(s.total_amount),2) AS total_spent FROM sales_trans_data s JOIN customer_data c ON s.customer_id = c.customer_id GROUP BY c.customer_id, c.name ORDER BY total_spent DESC LIMIT 5"
top5_customers_df = spark.sql(query5)
top5_customers_df.show()


# 6. Product Purchases by Specific Customer (ID=256)
customer_id = 9371
query6 = """select product_id,product_name,\
        sum(quantity) AS total_quantity,\
        sum(price * quantity) AS total_spent\
    from sales_data\
    where customer_id = 9371\
    group by product_id, product_name\
    order by total_spent desc"""
products_by_customer_df = spark.sql(query6)
products_by_customer_df.show()


# 8. Category with Highest Sales
query8 = "select category_name,\
                    round(SUM(price * quantity),2) AS total_sales\
                    from sales_data\
                    group by category_name\
                    order by total_sales desc"
top_category_df = spark.sql(query8)
top_category_df.show()


# 9. State-wise Sales Comparison (Texas vs Ohio)
query9 = "select state,\
                    round(SUM(price * quantity),2) AS total_sales\
                    from sales_data\
                    where state in ('TX', 'OH')\
                    group by state\
                    order by total_sales desc"
state_compare_df = spark.sql(query9)
state_compare_df.show()


# 10. Detailed Customer Purchase Report
query10 = "select customer_id,name,\
                    round(sum(total_amount),2) AS total_purchases,\
                    count(sales_txn_id) AS num_transactions,\
                    round(avg(total_amount),2) as avg_transaction_value\
                    from sales_data\
                    group by customer_id,name\
                    order by total_purchases desc"
detailed_customer_report_df = spark.sql(query10)
detailed_customer_report_df.show()



output_dir = "/content/output"
os.makedirs(output_dir, exist_ok=True)


# Example save
unique_customers_df.toPandas().to_csv(f"{output_dir}/01_unique_customers.csv", index=False)


# Save all to Excel
writer = pd.ExcelWriter(f"{output_dir}/sales_report.xlsx", engine='openpyxl')
unique_customers_df.toPandas().to_excel(writer, sheet_name='01_unique_customers', index=False)
sales_by_state_df.toPandas().to_excel(writer, sheet_name='02_sales_by_state', index=False)
top10_products_df.toPandas().to_excel(writer, sheet_name='03_top10_products', index=False)
avg_txn_df.toPandas().to_excel(writer, sheet_name='04_avg_txn', index=False)
top5_customers_df.toPandas().to_excel(writer, sheet_name='05_top5_customers', index=False)
products_by_customer_df.toPandas().to_excel(writer, sheet_name='06_products_cust_256', index=False)
top_category_df.toPandas().to_excel(writer, sheet_name='08_top_category', index=False)
state_compare_df.toPandas().to_excel(writer, sheet_name='09_state_comparison', index=False)
detailed_customer_report_df.toPandas().to_excel(writer, sheet_name='10_detailed_report', index=False)
writer.close()


print("Saved Excel report at /content/output/sales_report.xlsx")
files.download(f"{output_dir}/sales_report.xlsx")

+----------------+
|unique_customers|
+----------------+
|            1244|
+----------------+

+-----+-----------+
|state|total_sales|
+-----+-----------+
|   PR| 1408999.74|
|   CA|  503205.49|
|   NY|  219390.35|
|   TX|   184629.3|
|   IL|  116223.17|
|   FL|   93359.15|
|   PA|   88213.58|
|   MI|   83347.09|
|   OH|   82342.95|
|   NJ|   52303.09|
|   MD|   51982.49|
|   AZ|   48702.68|
|   NV|   47103.61|
|   NC|   45275.89|
|   CO|   40321.13|
|   GA|   38056.33|
|   HI|   35682.81|
|   MO|   34749.06|
|   TN|   33690.12|
|   VA|   30488.97|
+-----+-----------+
only showing top 20 rows

+----------+--------------------+-------------------+
|product_id|        product_name|total_quantity_sold|
+----------+--------------------+-------------------+
|       365|Perfect Fitness P...|               7474|
|       502|Nike Men's Dri-FI...|               6405|
|      1014|O'Brien Men's Neo...|               5899|
|       191|Nike Men's Free 5...|               3698|
|       627|Under Ar

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>