In [16]:
from pyspark.sql import SparkSession, functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType

spark = SparkSession.builder.master("local[*]").appName("Sales_App").getOrCreate()

sales_schema = StructType(
    [
        StructField("Product_ID", IntegerType(), True),
        StructField("Customer_ID", StringType(), True),
        StructField("Order_date", DateType(), True),
        StructField("Location", StringType(), True),
        StructField("Source_order", StringType(), True),
    ]
)

menu_schema = StructType(
    [
        StructField("Product_ID", IntegerType(), True),
        StructField("Product_name", StringType(), True),
        StructField("Product_price", StringType(), True),
    ]
)

sales_df = spark.read.csv("./sales.csv.txt", inferSchema=True, schema=sales_schema)
menu_df = spark.read.csv("./menu.csv.txt", inferSchema=True, schema=menu_schema)

sales_df.show(10, truncate=False)
menu_df.show(10, truncate=False)

+----------+-----------+----------+--------+------------+
|Product_ID|Customer_ID|Order_date|Location|Source_order|
+----------+-----------+----------+--------+------------+
|1         |A          |2023-01-01|India   |Swiggy      |
|2         |A          |2022-01-01|India   |Swiggy      |
|2         |A          |2023-01-07|India   |Swiggy      |
|3         |A          |2023-01-10|India   |Restaurant  |
|3         |A          |2022-01-11|India   |Swiggy      |
|3         |A          |2023-01-11|India   |Restaurant  |
|2         |B          |2022-02-01|India   |Swiggy      |
|2         |B          |2023-01-02|India   |Swiggy      |
|1         |B          |2023-01-04|India   |Restaurant  |
|1         |B          |2023-02-11|India   |Swiggy      |
+----------+-----------+----------+--------+------------+
only showing top 10 rows

+----------+------------+-------------+
|Product_ID|Product_name|Product_price|
+----------+------------+-------------+
|1         | PIZZA      | 100         |
|2

In [17]:
# Question 1: Total amount spent by each customer

Total_Amount_Spent_Per_Customer = (
    sales_df.join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .groupBy(func.col("Customer_ID"))
    .agg(func.sum("Product_price").alias("Total_Amount_Spent"))
    .orderBy(func.desc("Total_Amount_Spent"))
)

Total_Amount_Spent_Per_Customer.show()

+-----------+------------------+
|Customer_ID|Total_Amount_Spent|
+-----------+------------------+
|          B|            4440.0|
|          A|            4260.0|
|          C|            2400.0|
|          E|            2040.0|
|          D|            1200.0|
+-----------+------------------+



In [18]:
# Question 2: Total amount spent on each food category

Total_Amount_Spent_Per_Food_Category = (
    sales_df.join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .groupBy(func.col("Product_name"))
    .agg(func.sum("Product_price").alias("Total_Amount_Spent"))
    .orderBy(func.desc("Total_Amount_Spent"))
)

Total_Amount_Spent_Per_Food_Category.show()

+------------+------------------+
|Product_name|Total_Amount_Spent|
+------------+------------------+
|    sandwich|            5760.0|
|     Chowmin|            3600.0|
|       PIZZA|            2100.0|
|        Dosa|            1320.0|
|       Pasta|            1080.0|
|     Biryani|             480.0|
+------------+------------------+



In [19]:
# Question 3: Total Amount of sales in each month

Total_Amount_Of_Sales_Each_Month = (
    sales_df.withColumn("Month", func.month(func.col("Order_date")))
    .join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .groupBy(func.col("Month").alias("Order_Month"))
    .agg(func.sum("Product_price").alias("Total_Amount_Spent"))
    .orderBy(func.asc("Order_Month"))
)

Total_Amount_Of_Sales_Each_Month.show()

+-----------+------------------+
|Order_Month|Total_Amount_Spent|
+-----------+------------------+
|          1|            2960.0|
|          2|            2730.0|
|          3|             910.0|
|          5|            2960.0|
|          6|            2960.0|
|          7|             910.0|
|         11|             910.0|
+-----------+------------------+



In [20]:
# Question 4: Total Amount of sales yearly

Total_Amount_Of_Sales_Yearly = (
    sales_df.withColumn("Year", func.year(func.col("Order_date")))
    .join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .groupBy(func.col("Year").alias("Order_Year"))
    .agg(func.sum(func.col("Product_price")).alias("Total_Amount_Spent"))
    .orderBy(func.asc(func.col("Order_Year")))
)

Total_Amount_Of_Sales_Yearly.show()

+----------+------------------+
|Order_Year|Total_Amount_Spent|
+----------+------------------+
|      2022|            4350.0|
|      2023|            9990.0|
+----------+------------------+



In [21]:
# Question 5: Total Amount of Sales quaterly

Total_Amount_Of_Sales_Quaterly = (
    sales_df.withColumn("Quater", func.quarter(func.col("Order_date")))
    .join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .groupBy(func.col("Quater").alias("Order_Quater"))
    .agg(func.sum(func.col("Product_price")).alias("Total_Amount_Spent"))
    .orderBy(func.asc(func.col("Order_Quater")))
)

Total_Amount_Of_Sales_Quaterly.show()

+------------+------------------+
|Order_Quater|Total_Amount_Spent|
+------------+------------------+
|           1|            6600.0|
|           2|            5920.0|
|           3|             910.0|
|           4|             910.0|
+------------+------------------+



In [22]:
# Question 6: How many times each product has been purchased

No_Of_Times_Each_Product_Purchased = (
    sales_df.join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .drop(menu_df["Product_ID"])
    .groupBy(func.col("Product_ID"), func.col("Product_name"))
    .agg(func.count(func.col("Product_ID")).alias("Total_Items_Purchased"))
    .orderBy(func.desc(func.col("Total_Items_Purchased")))
)

No_Of_Times_Each_Product_Purchased.show()

+----------+------------+---------------------+
|Product_ID|Product_name|Total_Items_Purchased|
+----------+------------+---------------------+
|         3|    sandwich|                   48|
|         2|     Chowmin|                   24|
|         1|       PIZZA|                   21|
|         4|        Dosa|                   12|
|         5|     Biryani|                    6|
|         6|       Pasta|                    6|
+----------+------------+---------------------+



In [23]:
# Question 7: find the top 5 Order Items

Top_Five_Order = (
    sales_df.join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .drop(menu_df["Product_ID"])
    .groupBy(func.col("Product_ID"), func.col("Product_name"))
    .agg(func.count(func.col("Product_ID")).alias("Total_Items_Purchased"))
    .orderBy(func.desc(func.col("Total_Items_Purchased")))
    .limit(5)
)

Top_Five_Order.show()

+----------+------------+---------------------+
|Product_ID|Product_name|Total_Items_Purchased|
+----------+------------+---------------------+
|         3|    sandwich|                   48|
|         2|     Chowmin|                   24|
|         1|       PIZZA|                   21|
|         4|        Dosa|                   12|
|         5|     Biryani|                    6|
+----------+------------+---------------------+



In [24]:
# Question 8: find the frequency of customer visited

Frequency_Of_Customer_Visited = (
    sales_df.groupBy(func.col("Customer_ID"))
    .agg(func.count(func.col("Customer_ID")).alias("Customer_Frequency"))
    .orderBy(func.desc(func.col("Customer_Frequency")))
)

Frequency_Of_Customer_Visited.show()

+-----------+------------------+
|Customer_ID|Customer_Frequency|
+-----------+------------------+
|          B|                36|
|          A|                33|
|          E|                18|
|          C|                18|
|          D|                12|
+-----------+------------------+



In [25]:
# Question 9: find the frequency of customer visited to the Restaurant

Frequency_Of_Customer_Visited_To_Restaurant = (
    sales_df.filter(func.col("Source_order") == "Restaurant")
    .groupBy(func.col("Customer_ID"))
    .agg(func.count(func.col("Customer_ID")).alias("Customer_Frequency"))
    .orderBy(func.desc(func.col("Customer_Frequency")))
)

Frequency_Of_Customer_Visited_To_Restaurant.show()

+-----------+------------------+
|Customer_ID|Customer_Frequency|
+-----------+------------------+
|          A|                 9|
|          E|                 6|
|          B|                 6|
|          D|                 3|
|          C|                 3|
+-----------+------------------+



In [26]:
# Question 10: Find the total sales by each country

Total_Sales_By_Each_Country = (
    sales_df.join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .groupBy(func.col("Location").alias("Country"))
    .agg(func.sum(func.col("Product_price")).alias("Total_Amount_Of_Sales"))
    .orderBy(func.desc(func.col("Total_Amount_Of_Sales")))
)

Total_Sales_By_Each_Country.show()

+-------+---------------------+
|Country|Total_Amount_Of_Sales|
+-------+---------------------+
|     UK|               7020.0|
|  India|               4860.0|
|    USA|               2460.0|
+-------+---------------------+



In [27]:
# Question 11: Find the total sales by order source

Total_Sales_By_Order_Source = (
    sales_df.join(menu_df, sales_df["Product_ID"] == menu_df["Product_ID"])
    .groupBy(func.col("Source_order").alias("Source"))
    .agg(func.sum(func.col("Product_price")).alias("Total_Ordered_Amount"))
    .orderBy(func.desc(func.col("Total_Ordered_Amount")))
)

Total_Sales_By_Order_Source.show()

+----------+--------------------+
|    Source|Total_Ordered_Amount|
+----------+--------------------+
|    Swiggy|              6330.0|
|    zomato|              4920.0|
|Restaurant|              3090.0|
+----------+--------------------+

