In [None]:
%pip install pandas
from pyspark.sql import SparkSession



In [None]:
spark = SparkSession.builder.appName("MyApp").master("local[*]").config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.executor.memory", "6g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [None]:
"""# how to stop session

spark.stop()"""

'# how to stop session\n\nspark.stop()'

In [None]:
file_paths = ["Category.csv", "Customers.csv", "OrderDetails.csv", \
              "Orders.csv", "Payments.csv", "Products.csv", "Shippers.csv", "Suppliers.csv"]


dataframes = {
    file.split(".")[0]: spark.read.csv(file, header=True, inferSchema=True)
    for file in file_paths
}


dataframes["Category"].show()

+----------+--------------------+------+
|CategoryID|        CategoryName|Active|
+----------+--------------------+------+
|      5001|Cleaning & Household|   Yes|
|      5002|Kitchen, Garden &...|   Yes|
|      5003|Foodgrains, Oil &...|   Yes|
|      5004|Gourmet & World Food|   Yes|
|      5005|           Baby Care|    No|
|      5006|Snacks & Branded ...|   Yes|
|      5007|Bakery, Cakes & D...|   Yes|
|      5008|    Beauty & Hygiene|   Yes|
|      5009|           Beverages|   Yes|
|      5010|   Eggs, Meat & Fish|    No|
|      5011| Fruits & Vegetables|   Yes|
+----------+--------------------+------+



In [None]:
dataframes["Orders"].printSchema()
dataframes["Customers"].printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- PaymentID: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ShipperID: integer (nullable = true)
 |-- ShipDate: string (nullable = true)
 |-- DeliveryDate: string (nullable = true)
 |-- Total_order_amount: double (nullable = true)

root
 |-- CustomerID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- PostalCode: integer (nullable = true)
 |-- Phone: long (nullable = true)
 |-- Email: string (nullable = true)
 |-- DateEntered: date (nullable = true)



In [None]:
# Find top 5 cities with highest revenue along with top products sold in each.
from pyspark.sql.functions import sum,round
dataframes["Orders"].join(dataframes["Customers"], \
                          dataframes["Orders"]["CustomerID"] == dataframes["Customers"]["CustomerID"], \
                          "inner").groupby("City").\
                          agg(round(sum("Total_order_amount"),2).alias("total_sales")).\
                          orderBy("total_sales", ascending=False).show(5)


+---------+-----------+
|     City|total_sales|
+---------+-----------+
|Amsterdam| 3825993.32|
|   Zurich| 3753449.87|
|  Belfast| 3732753.48|
| Brussels| 3485226.21|
|   Vienna| 2511282.69|
+---------+-----------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import sum, format_number
# Which combination of category and payment type generates the most revenue and apply round if required.

df=dataframes["Orders"].join(dataframes["Payments"], \
                          dataframes["Orders"]["PaymentID"] == dataframes["Payments"]["PaymentID"], \
                          "inner").join(dataframes["OrderDetails"], \
                                         dataframes["Orders"]["OrderID"] == dataframes["OrderDetails"]["OrderID"], \
                                         "inner").join(dataframes["Products"], \
                                                        dataframes["OrderDetails"]["ProductID"] == dataframes["Products"]["ProductID"], \
                                                        "inner").join(dataframes["Category"], \
                                                                       dataframes["Products"]["Category_ID"] == dataframes["Category"]["CategoryID"], \
                                                                       "inner").groupby("PaymentType", "CategoryName").\
                                        agg(sum("Total_order_amount").alias("Total_sales")).orderBy("Total_sales", ascending=False)


df=df.withColumn("Total_sales", format_number("Total_sales", 2)).show()

+-----------+--------------------+-------------+
|PaymentType|        CategoryName|  Total_sales|
+-----------+--------------------+-------------+
|Credit Card|    Beauty & Hygiene|97,304,550.00|
|Credit Card|Gourmet & World Food|55,572,799.65|
|Credit Card|Kitchen, Garden &...|48,010,825.65|
|     PayPal|    Beauty & Hygiene|37,524,812.43|
|Credit Card|Snacks & Branded ...|32,024,975.60|
|Credit Card|Cleaning & Household|31,825,670.35|
|Credit Card|Foodgrains, Oil &...|30,254,102.70|
|     Wallet|    Beauty & Hygiene|25,311,386.00|
|Net banking|    Beauty & Hygiene|22,855,738.00|
|     PayPal|Gourmet & World Food|22,174,660.24|
|     PayPal|Kitchen, Garden &...|18,431,310.30|
|     Wallet|Gourmet & World Food|16,275,342.00|
|Net banking|Gourmet & World Food|15,071,544.00|
|     Wallet|Kitchen, Garden &...|13,564,482.00|
|     PayPal|Cleaning & Household|12,788,036.14|
|     PayPal|Snacks & Branded ...|12,334,985.28|
|Net banking|Kitchen, Garden &...|11,389,617.00|
|     PayPal|Foodgra

In [None]:
# Compare customer order frequency vs. order value (repeat customers vs. one-timers).

from pyspark.sql.functions import count, sum, avg,lit


customer_orders = dataframes["Orders"] \
    .groupBy("CustomerID") \
    .agg(
        count("OrderID").alias("OrderCount"),
        sum("Total_order_amount").alias("TotalSpent"),
        avg("Total_order_amount").alias("AvgOrderValue")
    )
one_time_customers = customer_orders.filter("OrderCount = 1")
repeat_customers = customer_orders.filter("OrderCount > 1")

summary_one_time = one_time_customers.agg(
    count("CustomerID").alias("NumCustomers"),\
    avg("OrderCount").alias("AvgOrderCount"),\
    avg("TotalSpent").alias("AvgTotalSpent"),\
    avg("AvgOrderValue").alias("AvgOrderValue")\
).withColumn("CustomerType", lit("One-Time Customer"))

summary_repeat = repeat_customers.agg( \
    count("CustomerID").alias("NumCustomers"),\
    avg("OrderCount").alias("AvgOrderCount"),\
    avg("TotalSpent").alias("AvgTotalSpent"),\
    avg("AvgOrderValue").alias("AvgOrderValue")\
).withColumn("CustomerType", lit("Repeat Customer"))

final_summary = summary_one_time.union(summary_repeat)
final_summary.select("CustomerType", "NumCustomers", "AvgOrderCount", "AvgTotalSpent", "AvgOrderValue").show(truncate=False)


+-----------------+------------+-----------------+------------------+------------------+
|CustomerType     |NumCustomers|AvgOrderCount    |AvgTotalSpent     |AvgOrderValue     |
+-----------------+------------+-----------------+------------------+------------------+
|One-Time Customer|1           |1.0              |36306.0           |36306.0           |
|Repeat Customer  |524         |9.538167938931299|175348.93664122143|18287.947302524637|
+-----------------+------------+-----------------+------------------+------------------+



In [None]:
# Which product categories are generating the highest profit margins in United States regions?
from pyspark.sql.functions import sum, round, col

rdd2 = dataframes["Orders"].join(dataframes["OrderDetails"], \
                          dataframes["Orders"]["OrderID"] == dataframes["OrderDetails"]["OrderID"], \
                          "inner").join(dataframes["Products"], \
                                         dataframes["OrderDetails"]["ProductID"] == dataframes["Products"]["ProductID"],"inner").\
                                         join(dataframes["Category"], \
                                              dataframes["Products"]["Category_ID"] == dataframes["Category"]["CategoryID"], \
                                              "inner").join(dataframes["Customers"],dataframes["Orders"]["CustomerID"]==dataframes["Customers"]["CustomerID"],\
                                                            "inner")
rdd2= rdd2.withColumn("Sale_Price", col("Sale_Price").cast("float")) \
                         .withColumn("Market_Price", col("Market_Price").cast("float"))\
                         .withColumn("Profit_margin",col("Sale_Price")-col("Market_Price"))



rdd2 = rdd2.filter(rdd2["Country"] == "United States")

profit_by_category = rdd2.groupby("CategoryName").agg(round(sum("Profit_margin"),2).alias("total_profit"))

highest_profit_categories = profit_by_category.orderBy("total_profit", ascending=False)

highest_profit_categories.show(truncate=False)

+------------------------+------------+
|CategoryName            |total_profit|
+------------------------+------------+
|Fruits & Vegetables     |-298.0      |
|Eggs, Meat & Fish       |-1317.0     |
|Bakery, Cakes & Dairy   |-1921.0     |
|Snacks & Branded Foods  |-2971.0     |
|Beverages               |-3579.0     |
|Baby Care               |-5815.0     |
|Foodgrains, Oil & Masala|-6295.0     |
|Cleaning & Household    |-10891.0    |
|Gourmet & World Food    |-17424.0    |
|Kitchen, Garden & Pets  |-71311.0    |
|Beauty & Hygiene        |-73982.0    |
+------------------------+------------+



In [None]:
# What is the customer retention rate based on repeat orders?
# customer retention rate = (Number of Customers with more than one order) / (Total Number of Unique Customers)
rdd3 = dataframes["Orders"].groupBy("CustomerID").agg(count("OrderID").alias("OrderCount"))
repeat_customers = rdd3.filter("OrderCount > 1")
total_customers = rdd3.select("CustomerID").distinct().count()
repeat_customers_count = repeat_customers.count()
retention_rate = (repeat_customers_count / total_customers) * 100
print("Customer Retention Rate:", retention_rate, "%")


Customer Retention Rate: 99.80952380952381 %


In [None]:
dataframes["Products"].printSchema()

root
 |-- ProductID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Category_ID: string (nullable = true)
 |-- Sub_Category: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Sale_Price: integer (nullable = true)
 |-- Market_Price: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Rating: integer (nullable = true)



In [None]:
# Which sub-categories perform well in terms of both quantity sold and average rating?
rdd4 = dataframes["OrderDetails"].join(dataframes["Products"], \
                          dataframes["OrderDetails"]["ProductID"] == dataframes["Products"]["ProductID"], \
                          "inner")

performance = rdd4.groupBy("Sub_Category").agg( \
    sum("Quantity").alias("TotalQuantitySold"), \
    avg("Rating").alias("AverageRating")
)
performance = performance.orderBy("TotalQuantitySold", ascending=False)
performance.show()


+--------------------+-----------------+------------------+
|        Sub_Category|TotalQuantitySold|     AverageRating|
+--------------------+-----------------+------------------+
|           Skin Care|            25638| 3.042914979757085|
|   Health & Medicine|            12676| 3.001655629139073|
|Storage & Accesso...|            10031|3.1317427385892116|
|           Hair Care|             9803|2.9058823529411764|
|    Masalas & Spices|             9569| 2.903050108932462|
|   Fragrances & Deos|             9233| 3.089861751152074|
|  Crockery & Cutlery|             9137| 3.026284348864994|
|    Bath & Hand Wash|             9097|2.9588100686498855|
|Snacks, Dry Fruit...|             8598|2.9841075794621026|
| Ready To Cook & Eat|             8342| 2.839240506329114|
|  Drinks & Beverages|             8088|3.0140664961636827|
|Sauces, Spreads &...|             8053|2.9563492063492065|
|Cooking & Baking ...|             7454|3.1127717391304346|
|Chocolates & Bisc...|             6643|