In [2]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [3]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DateType

In [10]:
schema=StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True),
])

In [11]:
sales_df=spark.read.format("csv").option("InferSchema", "true").schema(schema).load("/content/sample_data/sales.csv.txt")
display(sales_df)

DataFrame[product_id: int, customer_id: string, order_date: date, location: string, source_order: string]

In [12]:
sales_df.show(10)

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
+----------+-----------+----------+--------+------------+
only showing top 10 rows



Deriving year, Month, Quarter

In [13]:
from pyspark.sql.functions import month, year, quarter

In [20]:
sales_df=sales_df.withColumn("order_year", year(sales_df.order_date))
sales_df=sales_df.withColumn("order_month", month(sales_df.order_date))
sales_df=sales_df.withColumn("order_quarter", quarter(sales_df.order_date))


In [21]:
sales_df.show(18)

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

In [24]:
schema=StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True),
])
menu_df=spark.read.format("csv").option("InferSchema", "true").schema(schema).load("/content/sample_data/menu.csv.txt")
display(menu_df)

DataFrame[product_id: int, product_name: string, price: string]

In [25]:
menu_df.show(10)

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



# Total Amount Spent by each **Customer**

Using PySpark

In [29]:
total_amount_spent=(sales_df.join(menu_df,'product_id').groupby('customer_id').agg({'price':'sum'}).orderBy('customer_id'))
total_amount_spent.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



using SQL

In [32]:
sales_df.createOrReplaceTempView("Sales")
menu_df.createOrReplaceTempView("Menu")

In [37]:
joinDF = spark.sql("select s.customer_id, sum(m.price) as Price from Sales s, Menu m where s.product_id == m.product_id group by 1 order by 1") \
  .show(truncate=False)

+-----------+------+
|customer_id|Price |
+-----------+------+
|A          |4260.0|
|B          |4440.0|
|C          |2400.0|
|D          |1200.0|
|E          |2040.0|
+-----------+------+



# Total Amount Spent by Each Food Category

Using PySpark

In [38]:
total_amount_spent=(sales_df.join(menu_df,'product_id').groupby('product_name').agg({'price':'sum'}).orderBy('product_name'))
total_amount_spent.show()

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       PIZZA|    2100.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



using SQL

In [40]:
joinDF = spark.sql("select m.product_name, sum(m.price) as Price from Sales s, Menu m where s.product_id == m.product_id group by 1 order by 1") \
  .show(truncate=False)

+------------+------+
|product_name|Price |
+------------+------+
| Biryani    |480.0 |
| Chowmin    |3600.0|
| Dosa       |1320.0|
| PIZZA      |2100.0|
| Pasta      |1080.0|
| sandwich   |5760.0|
+------------+------+



# Total Amount of Sales in Each Month

Using PySpark

In [42]:
df1=(sales_df.join(menu_df,'product_id').groupby('order_month').agg({'price':'sum'}).orderBy('order_month'))
df1.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          5|    2960.0|
|          6|    2960.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



Using SQL

In [44]:
joinDF = spark.sql("select s.order_month, sum(m.price) as Price from Sales s, Menu m where s.product_id == m.product_id group by 1 order by 1") \
  .show(truncate=False)

+-----------+------+
|order_month|Price |
+-----------+------+
|1          |2960.0|
|2          |2730.0|
|3          |910.0 |
|5          |2960.0|
|6          |2960.0|
|7          |910.0 |
|11         |910.0 |
+-----------+------+



# Yearly Sales

using PySpark

In [45]:
df1=(sales_df.join(menu_df,'product_id').groupby('order_year').agg({'price':'sum'}).orderBy('order_year'))
df1.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



Using SQL

In [46]:
joinDF = spark.sql("select s.order_year, sum(m.price) as Price from Sales s, Menu m where s.product_id == m.product_id group by 1 order by 1") \
  .show(truncate=False)

+----------+------+
|order_year|Price |
+----------+------+
|2022      |4350.0|
|2023      |9990.0|
+----------+------+



# Quartely Sales

In [47]:
df1=(sales_df.join(menu_df,'product_id').groupby('order_quarter').agg({'price':'sum'}).orderBy('order_quarter'))
df1.show()

+-------------+----------+
|order_quarter|sum(price)|
+-------------+----------+
|            1|    6600.0|
|            2|    5920.0|
|            3|     910.0|
|            4|     910.0|
+-------------+----------+



In [48]:
joinDF = spark.sql("select s.order_quarter, sum(m.price) as Price from Sales s, Menu m where s.product_id == m.product_id group by 1 order by 1") \
  .show(truncate=False)

+-------------+------+
|order_quarter|Price |
+-------------+------+
|1            |6600.0|
|2            |5920.0|
|3            |910.0 |
|4            |910.0 |
+-------------+------+



# How many times each product was purchased

In [53]:
df1=(sales_df.join(menu_df,'product_id').groupby('product_id','product_name').agg({'product_id':'count'}).drop('product_id')) #.orderBy('order_quarter'))
df1.show()

+------------+-----------------+
|product_name|count(product_id)|
+------------+-----------------+
|     Biryani|                6|
|    sandwich|               48|
|        Dosa|               12|
|     Chowmin|               24|
|       PIZZA|               21|
|       Pasta|                6|
+------------+-----------------+



In [56]:
joinDF = spark.sql("select s.product_id, m.product_name, count(s.product_id) as product_count from Sales s, Menu m where s.product_id == m.product_id group by 1,2 order by 3 desc") \
  .show(truncate=False)

+----------+------------+-------------+
|product_id|product_name|product_count|
+----------+------------+-------------+
|3         | sandwich   |48           |
|2         | Chowmin    |24           |
|1         | PIZZA      |21           |
|4         | Dosa       |12           |
|5         | Biryani    |6            |
|6         | Pasta      |6            |
+----------+------------+-------------+



# Frequency of Customer Visited to Restaurant

In [60]:
from pyspark.sql.functions import countDistinct
df=(sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date')))
df.show()

+-----------+-----------------+
|customer_id|count(order_date)|
+-----------+-----------------+
|          E|                5|
|          B|                6|
|          D|                1|
|          C|                3|
|          A|                6|
+-----------+-----------------+



In [63]:
joinDF = spark.sql("select customer_id, count (distinct order_date) as frequency from sales where source_order='Restaurant' group by 1") \
  .show(truncate=False)

+-----------+---------+
|customer_id|frequency|
+-----------+---------+
|E          |5        |
|B          |6        |
|D          |1        |
|C          |3        |
|A          |6        |
+-----------+---------+



# Total Sales by Each Country

In [64]:
df1=(sales_df.join(menu_df,'product_id').groupby('location').agg({'price':'sum'}))
df1.show()

+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|     USA|    2460.0|
|      UK|    7020.0|
+--------+----------+



In [65]:
joinDF = spark.sql("select s.location, sum (m.price) as total_sales from sales s join menu m on s.product_id=m.product_id group by 1") \
  .show(truncate=False)

+--------+-----------+
|location|total_sales|
+--------+-----------+
|India   |4860.0     |
|USA     |2460.0     |
|UK      |7020.0     |
+--------+-----------+



# Total Sales by Order Source

In [66]:
df1=(sales_df.join(menu_df,'product_id').groupby('source_order').agg({'price':'sum'}))
df1.show()

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4920.0|
|      Swiggy|    6330.0|
|  Restaurant|    3090.0|
+------------+----------+



In [67]:
joinDF = spark.sql("select s.source_order, sum (m.price) as total_sales from sales s join menu m on s.product_id=m.product_id group by 1") \
  .show(truncate=False)

+------------+-----------+
|source_order|total_sales|
+------------+-----------+
|zomato      |4920.0     |
|Swiggy      |6330.0     |
|Restaurant  |3090.0     |
+------------+-----------+

