Importing necessary modules

In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [2]:
import pyspark
import pyspark.sql.functions as F
import os

A spark session has been created

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spark project')\
        .getOrCreate()

24/03/11 13:25:28 WARN Utils: Your hostname, DESKTOP-77VPNBL resolves to a loopback address: 127.0.1.1; using 172.25.173.185 instead (on interface eth0)
24/03/11 13:25:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/11 13:25:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Importing necessary datasets

In [4]:
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True)
])

In [5]:
sales_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("../dataset/sales.csv.txt")
display(sales_df)

DataFrame[product_id: int, customer_id: string, order_date: date, location: string, source_order: string]

In [6]:
sales_df.show()

                                                                                

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

In [7]:
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True)
])

In [8]:
menu_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("../dataset/menu.csv.txt")
display(sales_df)

DataFrame[product_id: int, customer_id: string, order_date: date, location: string, source_order: string]

In [9]:
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



Deriving year, month and quarter from the given dates

In [10]:
from pyspark.sql.functions import month, quarter, year

In [11]:
sales_df = sales_df.withColumn('order_year', year(sales_df['order_date']))

In [12]:
sales_df = sales_df.withColumn('order_quarter', quarter(sales_df['order_date']))

In [13]:
sales_df = sales_df.withColumn('order_month', month(sales_df['order_date']))

In [14]:
sales_df.show()

+----------+-----------+----------+--------+------------+----------+-------------+-----------+
|product_id|customer_id|order_date|location|source_order|order_year|order_quarter|order_month|
+----------+-----------+----------+--------+------------+----------+-------------+-----------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|            1|          1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|            1|          1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|            1|          1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|            1|          1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|            1|          1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|            1|          1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|            1|          2|
|         2|          B|2023-01-02|   India|      

Question 1: Total amount spent by each customer

In [15]:
total_amount_spent = (sales_df.join(menu_df,'product_id').groupby('customer_id').agg({'price':'sum'}).orderBy('customer_id'))

In [16]:
total_amount_spent.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



Question 2: Total amount spent by each food category

In [17]:
total_amount_food = (sales_df.join(menu_df, 'product_id').groupby('product_name').agg({'price':'sum'}).orderBy('product_name'))

In [18]:
total_amount_food.show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       PIZZA|    2100.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



Question 3: Total amount of sales in each month

In [24]:
total_amount_rev = (sales_df.join(menu_df, 'product_id').groupby('order_month').agg({'price':'sum'}).orderBy('order_month'))

In [25]:
total_amount_rev.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          5|    2960.0|
|          6|    2960.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



Question 4: Yearly sales

In [26]:
yearly_sales = (sales_df.join(menu_df, 'product_id').groupby('order_year').agg({'price':'sum'}).orderBy('order_year'))

In [27]:
yearly_sales.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



Question 5: Quarterly sales

In [28]:
quar_sales = (sales_df.join(menu_df, 'product_id').groupby('order_quarter').agg({'price':'sum'}).orderBy('order_quarter'))

In [29]:
quar_sales.show()

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            2|    5920.0|
|            3|     910.0|
|            4|     910.0|
+-------------+----------+



Question 6: How many times each product has been purchased

In [34]:
count_df = (sales_df.join(menu_df,'product_id').groupby('product_id','product_name').agg(F.count('product_id').alias('product_count')).orderBy('product_count', ascending=0).drop('product_id'))

In [35]:
count_df.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



Question 7: Top five ordered items

In [36]:
count_df.limit(5).show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
+------------+-------------+



Question 8: Top ordered item

In [37]:
count_df.limit(1).show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
+------------+-------------+



Question 9: Frequency of the customer visited to the restaurant

In [40]:
freq = (sales_df.filter(sales_df.source_order == 'Restaurant').groupby('customer_id').agg(F.countDistinct('order_date')).orderBy('customer_id'))

In [41]:
freq.show()

+-----------+-----------------+
|customer_id|count(order_date)|
+-----------+-----------------+
|          A|                6|
|          B|                6|
|          C|                3|
|          D|                1|
|          E|                5|
+-----------+-----------------+



Question 10: Total sales by each country

In [42]:
sales_country = (sales_df.join(menu_df, 'product_id').groupby('location').agg({'price':'sum'}))

In [43]:
sales_country.show()

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|     USA|    2460.0|
|      UK|    7020.0|
+--------+----------+



Question 11: Total sales by order_source

In [45]:
sales_order = (sales_df.join(menu_df, 'product_id').groupby('source_order').agg({'price':'sum'}))

In [46]:
sales_order.show()

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4920.0|
|      Swiggy|    6330.0|
|  Restaurant|    3090.0|
+------------+----------+

