In [5]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:03[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840628 sha256=c9050f5fe99109989742133f62a48e4e7eac86e69b5ad3e70fbb66b736781ed9
  Stored in directory: /Users/vivek/Library/Caches/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.3

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new

In [90]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructField, DateType
from pyspark.sql.functions import col,month,year,quarter,count,countDistinct

In [30]:
spark = SparkSession.builder.master("local").appName("project").getOrCreate()

In [46]:
schema = StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True), 
])

In [52]:
sales_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("/Users/vivek/Desktop/pyspark/pyspark project/sales.csv")

In [53]:
sales_df.show()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

## feature engineering

In [54]:
sales_df = sales_df.withColumn("order_year",year(sales_df.order_date))
sales_df = sales_df.withColumn("order_month",month(sales_df.order_date))
sales_df = sales_df.withColumn("order_quarter",quarter(sales_df.order_date))

In [55]:
sales_df.show()

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

## define schema


In [65]:
menu_schema = StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True),

])

menu_df = spark.read.format("csv").option("inferschema","true").schema(menu_schema).load("/Users/vivek/Desktop/pyspark/pyspark project/menu.csv")

In [66]:
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



## insights

In [71]:
#total amount spent by each customer

total_amount_spent = (sales_df.join(menu_df,"product_id").groupBy("customer_id").agg({'price':'sum'}).orderBy("customer_id"))
total_amount_spent.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



In [79]:
#total spent by each country
total_spent_by_each_food_country = (sales_df.join(menu_df,"product_id").groupBy("location").agg({"price":"sum"}).orderBy("location"))
total_spent_by_each_food_country.show()


+--------+----------+
|location|sum(price)|
+--------+----------+
|   India|    4860.0|
|      UK|    7020.0|
|     USA|    2460.0|
+--------+----------+



In [80]:
#total amount of sales in each month

sales = (sales_df.join(menu_df,"product_id").groupBy("order_month").agg({'price':'sum'}).orderBy("order_month"))
sales.show()

+-----------+----------+
|order_month|sum(price)|
+-----------+----------+
|          1|    2960.0|
|          2|    2730.0|
|          3|     910.0|
|          5|    2960.0|
|          6|    2960.0|
|          7|     910.0|
|         11|     910.0|
+-----------+----------+



In [81]:
#total amount of sales in each month

sales = (sales_df.join(menu_df,"product_id").groupBy("order_year").agg({'price':'sum'}).orderBy("order_year"))
sales.show()

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2022|    4350.0|
|      2023|    9990.0|
+----------+----------+



In [87]:
#how many times each product was purchased

product_insight = (sales_df.join(menu_df,"product_id").groupBy("product_id","product_name").agg(count('product_id').alias('product_count')).orderBy("product_count",ascending=0))
product_insight.show()

+----------+------------+-------------+
|product_id|product_name|product_count|
+----------+------------+-------------+
|         3|    sandwich|           48|
|         2|     Chowmin|           24|
|         1|       PIZZA|           21|
|         4|        Dosa|           12|
|         5|     Biryani|            6|
|         6|       Pasta|            6|
+----------+------------+-------------+



In [93]:
#frequency of cust visited to restaurant
freq_view = (sales_df.filter(sales_df.source_order=='Restaurant').groupBy("customer_id").agg(countDistinct("order_date")))
freq_view.show()

+-----------+--------------------------+
|customer_id|count(DISTINCT order_date)|
+-----------+--------------------------+
|          E|                         5|
|          B|                         6|
|          D|                         1|
|          C|                         3|
|          A|                         6|
+-----------+--------------------------+



In [95]:
#spent by each customer


spent_view = (sales_df.join(menu_df,"product_id").groupBy("customer_id").agg({'price':'sum'}))
spent_view.show()

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          E|    2040.0|
|          B|    4440.0|
|          D|    1200.0|
|          C|    2400.0|
|          A|    4260.0|
+-----------+----------+



In [None]:
#spent by order_source




order_view = (sales_df.join(menu_df,"product_id").groupBy("source_order").agg({'price':'sum'}))
order_view.show()

+------------+----------+
|source_order|sum(price)|
+------------+----------+
|      zomato|    4920.0|
|      Swiggy|    6330.0|
|  Restaurant|    3090.0|
+------------+----------+

