In [5]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Sales Analysis")
    .master("local[*]")
    .getOrCreate()
)
spark

In [10]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
schema=StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True)
])

sales_df = spark.read.format("csv").schema(schema).load("data/input/sales.csv")
sales_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)



In [11]:
sales_df.show()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

In [16]:
#Derive year, month, quarter

from pyspark.sql.functions import *
sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))
sales_df = sales_df.withColumn("order_month", month(sales_df.order_date))
sales_df = sales_df.withColumn("order_quarter", quarter(sales_df.order_date))
sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

In [19]:
#create menu dataframe
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
schema_menu=StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True),
])

menu_df = spark.read.format("csv").schema(schema_menu).load("data/input/menu.csv")
menu_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)



In [20]:
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



In [24]:
#Total Amount spent by each customer
total_amount_cust = sales_df.join(menu_df, "product_id").groupBy("customer_id").agg(sum("price").alias("total_price")).orderBy("customer_id")
total_amount_cust.show()

+-----------+-----------+
|customer_id|total_price|
+-----------+-----------+
|          A|     4260.0|
|          B|     4440.0|
|          C|     2400.0|
|          D|     1200.0|
|          E|     2040.0|
+-----------+-----------+



In [25]:
#Total Amount spent by each food category
total_amount_category = sales_df.join(menu_df, "product_id").groupBy("product_name").agg(sum("price").alias("total_price")).orderBy("total_price")
total_amount_category.show()

+------------+-----------+
|product_name|total_price|
+------------+-----------+
|     Biryani|      480.0|
|       Pasta|     1080.0|
|        Dosa|     1320.0|
|       PIZZA|     2100.0|
|     Chowmin|     3600.0|
|    sandwich|     5760.0|
+------------+-----------+



In [27]:
#Total Amount of sales in each month
total_amount_month = sales_df.join(menu_df, "product_id").groupBy("order_month").agg(sum("price").alias("total_price")).orderBy("order_month")
total_amount_month.show()

+-----------+-----------+
|order_month|total_price|
+-----------+-----------+
|          1|     2960.0|
|          2|     2730.0|
|          3|      910.0|
|          5|     2960.0|
|          6|     2960.0|
|          7|      910.0|
|         11|      910.0|
+-----------+-----------+



In [28]:
#Yearly Sales
total_amount_year = sales_df.join(menu_df, "product_id").groupBy("order_year").agg(sum("price").alias("total_price")).orderBy("order_year")
total_amount_year.show()

+----------+-----------+
|order_year|total_price|
+----------+-----------+
|      2022|     4350.0|
|      2023|     9990.0|
+----------+-----------+



In [32]:
#Quarterly Sales
total_amount_quarter = sales_df.join(menu_df, "product_id").groupBy("order_quarter","order_year").agg(sum("price").alias("total_price")).orderBy("order_year","order_quarter")
total_amount_quarter.show()

+-------------+----------+-----------+
|order_quarter|order_year|total_price|
+-------------+----------+-----------+
|            1|      2022|     2150.0|
|            2|      2022|     1440.0|
|            3|      2022|      380.0|
|            4|      2022|      380.0|
|            1|      2023|     4450.0|
|            2|      2023|     4480.0|
|            3|      2023|      530.0|
|            4|      2023|      530.0|
+-------------+----------+-----------+



In [33]:
#How many times each product has been purchased
product_purchase_cnt = sales_df.join(menu_df, "product_id").groupBy("product_name").agg(count("product_id").alias("Count"))
product_purchase_cnt.show()

+------------+-----+
|product_name|Count|
+------------+-----+
|       Pasta|    6|
|       PIZZA|   21|
|    sandwich|   48|
|     Biryani|    6|
|     Chowmin|   24|
|        Dosa|   12|
+------------+-----+



In [40]:
#Top 5 ordered items
top_5_orders = sales_df.join(menu_df, "product_id").groupBy("product_name").agg(count("product_id").alias("Count")).orderBy("Count",ascending=0).limit(5)
top_5_orders.show()

+------------+-----+
|product_name|Count|
+------------+-----+
|    sandwich|   48|
|     Chowmin|   24|
|       PIZZA|   21|
|        Dosa|   12|
|       Pasta|    6|
+------------+-----+



In [41]:
#Top ordered Item
top_order = sales_df.join(menu_df, "product_id").groupBy("product_name").agg(count("product_id").alias("Count")).orderBy("Count",ascending=0).limit(1)
top_order.show()

+------------+-----+
|product_name|Count|
+------------+-----+
|    sandwich|   48|
+------------+-----+



In [53]:
#Frequency of customer visited restaurant
customer_frequency = sales_df.filter(sales_df.source_order == "Restaurant").groupBy("customer_id").agg(countDistinct("order_date").alias("Count")).orderBy("Count")
customer_frequency.show()

+-----------+-----+
|customer_id|Count|
+-----------+-----+
|          D|    1|
|          C|    3|
|          E|    5|
|          B|    6|
|          A|    6|
+-----------+-----+



In [43]:
#Total sales by each country
total_sales_country = sales_df.join(menu_df, "product_id").groupBy("location").agg(sum("price").alias("total_price")).orderBy("total_price")
total_sales_country.show()

+--------+-----------+
|location|total_price|
+--------+-----------+
|     USA|     2460.0|
|   India|     4860.0|
|      UK|     7020.0|
+--------+-----------+



In [44]:
#Total sales by order_source
total_sales_source = sales_df.join(menu_df, "product_id").groupBy("source_order").agg(sum("price").alias("total_price")).orderBy("total_price")
total_sales_source.show()

+------------+-----------+
|source_order|total_price|
+------------+-----------+
|  Restaurant|     3090.0|
|      zomato|     4920.0|
|      Swiggy|     6330.0|
+------------+-----------+

