In [0]:
#/FileStore/tables/sales_csv.txt
#/FileStore/tables/menu_csv.txt

In [0]:
from pyspark.sql.types import StructType, IntegerType, DateType, StringType, StructField

schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("location", StringType(), True),
    StructField("source_order", StringType(), True)
])

sales_df = spark.read.format("csv").option("inferschema", "true").schema(schema).load("/FileStore/tables/sales_csv-2.txt")
display(sales_df)

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
from pyspark.sql.functions import month, quarter, year

sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))
sales_df = sales_df.withColumn("order_quarter", quarter(sales_df.order_date))
sales_df = sales_df.withColumn("order_month", month(sales_df.order_date))
display(sales_df)

product_id,customer_id,order_date,location,source_order,order_year,order_quarter,order_month
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,1,2
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,1,2


In [0]:
dbutils.fs.rm("/FileStore/tables/menu_csv.txt")

Out[4]: True

In [0]:
from pyspark.sql.types import StructType, IntegerType, DateType, StringType, StructField, DecimalType

schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", IntegerType(), True)
])

menu_df = spark.read.format("csv").option("inferschema", "true").schema(schema).load("/FileStore/tables/menu_csv-2.txt")
display(menu_df)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
total_amount_spent = sales_df.join(menu_df, on="product_id").groupBy("customer_id").sum("price").orderBy("customer_id")
display(total_amount_spent)
total_amount_spent2 = sales_df.join(menu_df, on="product_id").groupBy("customer_id").agg({"price":"sum"}).orderBy("customer_id")
display(total_amount_spent2)

customer_id,sum(price)
A,4260
B,4440
C,2400
D,1200
E,2040


Databricks visualization. Run in Databricks to view.

customer_id,sum(price)
A,4260
B,4440
C,2400
D,1200
E,2040


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_spent_food_category = sales_df.join(menu_df, on="product_id").groupBy("product_name").sum("price").orderBy("product_name")
display(total_amount_spent_food_category)
total_amount_spent_food_category2 = sales_df.join(menu_df, on="product_id").groupBy("product_name").agg({"price":"sum"}).orderBy("product_name")
display(total_amount_spent_food_category2)

product_name,sum(price)
Biryani,480
Chowmin,3600
Dosa,1320
PIZZA,2100
Pasta,1080
sandwich,5760


Databricks visualization. Run in Databricks to view.

product_name,sum(price)
Biryani,480
Chowmin,3600
Dosa,1320
PIZZA,2100
Pasta,1080
sandwich,5760


Databricks visualization. Run in Databricks to view.

In [0]:
monthly_amount_spent_food_category = sales_df.join(menu_df, on="product_id").groupBy("order_month").sum("price").orderBy("order_month")
display(monthly_amount_spent_food_category)

order_month,sum(price)
1,2960
2,2730
3,910
5,2960
6,2960
7,910
11,910


Databricks visualization. Run in Databricks to view.

In [0]:
yearly_amount_spent_food_category = sales_df.join(menu_df, on="product_id").groupBy("order_year").sum("price").orderBy("order_year")
display(yearly_amount_spent_food_category)

order_year,sum(price)
2022,4350
2023,9990


In [0]:
quarterly_amount_spent_food_category = sales_df.join(menu_df, on="product_id").groupBy("order_quarter").sum("price").orderBy("order_quarter")
display(quarterly_amount_spent_food_category)

order_quarter,sum(price)
1,6600
2,5920
3,910
4,910


In [0]:
from pyspark.sql.functions import count

most_purchased_df = sales_df.join(menu_df, on="product_id").groupBy("product_id","product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=0)
display(most_purchased_df)

product_id,product_name,product_count
3,sandwich,48
2,Chowmin,24
1,PIZZA,21
4,Dosa,12
5,Biryani,6
6,Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

most_purchased_df = sales_df.join(menu_df, on="product_id").groupBy("product_id","product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=0).drop("product_id").limit(5)
display(most_purchased_df)

product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6


In [0]:
from pyspark.sql.functions import count

most_purchased_df = sales_df.join(menu_df, on="product_id").groupBy("product_id","product_name").agg(count("product_id").alias("product_count")).orderBy("product_count", ascending=0).drop("product_id").limit(1)
display(most_purchased_df)

product_name,product_count
sandwich,48


In [0]:
from pyspark.sql.functions import countDistinct

df = sales_df.filter(sales_df.source_order=="Restaurant").groupBy("customer_id").agg(countDistinct("order_date").alias("frequency")).orderBy("frequency",ascending=0)
display(df)

customer_id,frequency
B,6
A,6
E,5
C,3
D,1


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_spent = sales_df.join(menu_df, on="product_id").groupBy("customer_id").sum("price").orderBy("sum(price)",ascending=0)
display(total_amount_spent)

customer_id,sum(price)
B,4440
A,4260
C,2400
E,2040
D,1200


In [0]:
total_ampunt_per_country = sales_df.join(menu_df, "product_id").groupBy("location").sum("price")

display(total_ampunt_per_country)

location,sum(price)
India,4860
USA,2460
UK,7020


Databricks visualization. Run in Databricks to view.

In [0]:
total_ampunt_per_source = sales_df.join(menu_df, "product_id").groupBy("source_order").sum("price")

display(total_ampunt_per_source)

source_order,sum(price)
zomato,4920
Swiggy,6330
Restaurant,3090


Databricks visualization. Run in Databricks to view.