In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Sales df
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True ),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True)
])

sales_df = spark.read.format('csv').option('inferSchema', True).schema(schema).load('/FileStore/tables/sales_csv.txt')
display(sales_df.limit(10))

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
sales_df = sales_df.withColumn('Order_Year', year('order_date')).withColumn('Order_Month', month('order_date')).withColumn('Order_Quarter', quarter('order_date'))

display(sales_df.limit(10))

product_id,customer_id,order_date,location,source_order,Order_Year,Order_Month,Order_Quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
#  Menu df
MenuSchema = StructType([
    StructField('product_id', IntegerType(), True),
    StructField('Prodcut_name', StringType(), True),
    StructField('Price', StringType(), True)
])

menu_df = spark.read.format('csv').option('inferSchema', True).schema(MenuSchema).load('/FileStore/tables/menu_csv.txt')
display(menu_df)

product_id,Prodcut_name,Price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
# Total amount spent by each customer
total_amount_spent = sales_df.join(menu_df, 'product_id').groupBy('customer_id').agg(sum('Price').alias('Total_Spent')).orderBy('customer_id')

display(total_amount_spent)

customer_id,Total_Spent
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


In [0]:
# Total amount spent by each food category
total_amount_spent_food = sales_df.join(menu_df, 'product_id').groupBy('Prodcut_name').agg(sum('Price').alias('TotalSpentByFood'))

display(total_amount_spent_food)

Prodcut_name,TotalSpentByFood
Pasta,1080.0
PIZZA,2100.0
sandwich,5760.0
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0


In [0]:
#Total amount of sales in each month

Total_sales_month = sales_df.join(menu_df, 'product_id').groupBy('Order_Month').agg(sum('Price'))

display(Total_sales_month)

Order_Month,sum(Price)
1,2960.0
6,2960.0
3,910.0
5,2960.0
7,910.0
11,910.0
2,2730.0


In [0]:
# Yearly sales
Total_sales_year = sales_df.join(menu_df, 'product_id').groupBy('Order_Year').agg(sum('Price'))

display(Total_sales_year)

Order_Year,sum(Price)
2023,9990.0
2022,4350.0


In [0]:
# Quaterly sales
Total_sales_quarter = sales_df.join(menu_df, 'product_id').groupBy('Order_Quarter').agg(sum('Price'))

display(Total_sales_quarter)

Order_Quarter,sum(Price)
1,6600.0
3,910.0
4,910.0
2,5920.0


In [0]:
# Total number of category by each category

In [0]:
#how many times each prodcut purchase

product_count = sales_df.join(menu_df, 'product_id').groupBy('Prodcut_name').agg((count('customer_id')).alias('Count')).orderBy('Count', ascending = 0)

display(product_count)

Prodcut_name,Count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6
Biryani,6


In [0]:
    # Top 5 ordered items

top5 = sales_df.join(menu_df, 'product_id').groupBy('Prodcut_name').agg((count('customer_id')).alias('Count')).orderBy('Count', ascending = 0).limit(5)

display(top5)


Prodcut_name,Count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6


In [0]:
# frequency of customer visited

freq_customer = sales_df.filter(sales_df.source_order == 'Restaurant').groupBy('customer_id').agg(countDistinct('order_date')).orderBy('customer_id')


display(freq_customer)

customer_id,count(order_date)
A,6
B,6
C,3
D,1
E,5


In [0]:
# Total sales by each country

total_sales_country = sales_df.join(menu_df, 'product_id').groupBy('location').agg((sum('Price')).alias('Price')).orderBy('Price', ascending=0)

display(total_sales_country)

location,Price
UK,7020.0
India,4860.0
USA,2460.0


In [0]:
#total sales by order source

total_sales_source = sales_df.join(menu_df, 'product_id').groupBy('source_order').agg((sum('Price')).alias('Price')).orderBy('Price', ascending=0)

display(total_sales_source)

source_order,Price
Swiggy,6330.0
zomato,4920.0
Restaurant,3090.0
