In [0]:
# import libraries
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import month,year,quarter,sum,asc,desc,count,countDistinct


In [0]:
# create sales schema
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True)
    ])


In [0]:
# Read sales csv
sales_df=spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/sales_csv.txt")

In [0]:
# add columns in sales
sales_df=sales_df.withColumn("order_year",year(sales_df.order_date))
sales_df=sales_df.withColumn("order_month",month(sales_df.order_date))
sales_df=sales_df.withColumn("order_quarter",quarter(sales_df.order_date))
display(sales_df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
# create menu schema
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True),
    ])

In [0]:
# Read menu csv
menu_df=spark.read.format("csv").option("inferschema","true").schema(schema).load("/FileStore/tables/menu_csv.txt")

In [0]:
display(menu_df)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
# Total amount spent by each customer
# joining two datafrmaes
df_joined = sales_df.join(menu_df,sales_df["product_id"] == menu_df["product_id"], "inner")
df1=df_joined.groupBy("customer_id").agg(sum("price").alias("total_amount")).orderBy(asc("customer_id"))
display(df1)


customer_id,total_amount
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
# total amount spend by each food catagory
df2=df_joined.groupBy("product_name").agg(sum("price").alias("total_amount")).orderBy(asc("product_name"))
display(df2)


product_name,total_amount
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
# total amount of sales in each month
df3=df_joined.groupBy("order_month").agg(sum("price").alias("total_amount")).orderBy(asc("order_month"))
display(df3)

order_month,total_amount
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
#  yearly sales
df4=df_joined.groupBy("order_year").agg(sum("price").alias("total_amount")).orderBy(asc("order_year"))
display(total_amount)


order_year,total_amount
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
# how many times each product purchased
df5 = df_joined.groupBy("product_name").agg(count("*").alias("purchase_count")).orderBy("purchase_count")
display(df5)


product_name,purchase_count
Pasta,6
Biryani,6
Dosa,12
PIZZA,21
Chowmin,24
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
# top 5 ordered items
df6= df_joined.groupBy("product_name").agg(count("*").alias("purchase_count")).orderBy(desc("purchase_count")).limit(5)
display(df6)



product_name,purchase_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6


In [0]:
# top ordered items'
df7= df_joined.groupBy("product_name").agg(count("*").alias("purchase_count")).orderBy(desc("purchase_count")).limit(1)
display(df7)



product_name,purchase_count
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
# frequency of customer visited to restaurant
df8=(sales_df.filter(sales_df.source_order=="Restaurant").groupBy("customer_id").agg(countDistinct("order_date")))
display(df8)


customer_id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

In [0]:
# total sales by each country
df9=df_joined.groupBy("location").agg(sum("price")).alias("total_sales")
display(df9)


location,sum(price)
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.