In [0]:
# Import Libraries
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType  
from pyspark.sql.functions import year, month , quarter, sum, count, countDistinct

In [0]:
# Define Schema for Sales
SalesSchema = StructType([
    StructField('product_id',IntegerType(),True),
    StructField('customer_id',StringType(),True),
    StructField('order_date',DateType(),True),
    StructField('location',StringType(),True),
    StructField('source_orders',StringType(),True)
])

sales_df = spark.read.format('csv').schema(SalesSchema).option("inferSchema",'True').load('/FileStore/tables/sales_csv.txt')
display(sales_df)

product_id,customer_id,order_date,location,source_orders
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
# define schema for menu
MenuSchema = StructType([
    StructField('product_id',IntegerType(),True),
    StructField('product_name',StringType(),True),
    StructField('price',StringType(),True)
])

menu_df = spark.read.format("csv").schema(MenuSchema).option("inferSchema", 'True').load('/FileStore/tables/menu_csv.txt')
display(menu_df)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
sales_df = sales_df.withColumn('order_year', year(sales_df.order_date))
sales_df = sales_df.withColumn('order_month', month(sales_df.order_date))
sales_df = sales_df.withColumn('order_quarter', quarter(sales_df.order_date))
display(sales_df)

product_id,customer_id,order_date,location,source_orders,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
total_amount_spend_by_each_customer = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('customer_id')
    .agg(sum('price').alias('total_spent'))
    .orderBy('customer_id')
)

display(total_amount_spend_by_each_customer)

customer_id,total_spent
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_spend_by_each_food_cat = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('product_name')
    .agg(sum('price').alias('total_spent'))
    .orderBy('product_name')
)

display(total_amount_spend_by_each_food_cat)

product_name,total_spent
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_of_sales_each_month = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('order_month')
    .agg(sum('price').alias('total_spent'))
    .orderBy('order_month')
)

display(total_amount_of_sales_each_month)

order_month,total_spent
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_of_sales_each_quartly = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('order_quarter')
    .agg(sum('price').alias('total_spent'))
    .orderBy('order_quarter')
)

display(total_amount_of_sales_each_quartly)

order_quarter,total_spent
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_of_sales_each_quartly = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('order_year')
    .agg(sum('price').alias('total_spent'))
    .orderBy('order_year')
)

display(total_amount_of_sales_each_quartly)

order_year,total_spent
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
most_df = (sales_df
    .join(menu_df, 'product_id')
    .groupBy('product_name')
    .agg(count('product_id').alias('frequency'))
    .orderBy('frequency', ascending = 0) 
)

display(most_df)

product_name,frequency
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6
Biryani,6


Databricks visualization. Run in Databricks to view.

In [0]:
most_df = (sales_df
    .join(menu_df, 'product_id')
    .groupBy('product_name')
    .agg(count('product_id').alias('frequency'))
    .orderBy('frequency', ascending = 0) 
    .limit(5)
)

display(most_df)

product_name,frequency
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
most_df = (sales_df
    .join(menu_df, 'product_id')
    .groupBy('product_name')
    .agg(count('product_id').alias('frequency'))
    .orderBy('frequency', ascending = 0) 
    .limit(1)
)

display(most_df)

product_name,frequency
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
most_df = (sales_df
    .filter(sales_df.source_orders == 'Restaurant' )
    .join(menu_df, 'product_id')
    .groupBy('customer_id')
    .agg(countDistinct('order_date').alias('frequency'))
    .orderBy('customer_id')
)

display(most_df)

customer_id,frequency
A,6
B,6
C,3
D,1
E,5


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_by_each_country = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('location')
    .agg(sum('price').alias('total_spent_by_country'))
    
)

display(total_sales_by_each_country)

location,total_spent_by_country
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_by_each_source = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('source_orders')
    .agg(sum('price').alias('total_spent_by_source'))
)

display(total_sales_by_each_source)


source_orders,total_spent_by_source
zomato,4920.0
Swiggy,6330.0
Restaurant,3090.0


Databricks visualization. Run in Databricks to view.