Import Libraries

In [0]:
import pyspark
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DateType
from pyspark.sql.functions import *

Define The Schema for Sales

In [0]:
schema = StructType([
    StructField("product_id", IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location", StringType(),True),
    StructField("source_order", StringType(),True)
])
sales = spark.read.format('csv').option("inferschema",'true').schema(schema).load('/FileStore/sales_csv.txt')
sales

Out[66]: DataFrame[product_id: int, customer_id: string, order_date: date, location: string, source_order: string]

In [0]:
sales.display()

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


Extracting Month, Quarter, Day from Year

In [0]:
sales = sales.withColumn("Year", year(col('order_date')))\
    .withColumn("Month", month(col('order_date')))\
    .withColumn("Quarter", quarter(col('order_date')))\
    .withColumn("Day", dayofmonth(col('order_date')))\
    .withColumn("Month_Name", date_format(col('order_date'), 'MMMM'))\
    .withColumn("Day_of_Week", date_format(col('order_date'), 'EEEE'))

    

In [0]:
sales.display()

product_id,customer_id,order_date,location,source_order,Year,Month,Quarter,Day,Month_Name,Day_of_Week
1,A,2023-01-01,India,Swiggy,2023,1,1,1,January,Sunday
2,A,2022-01-01,India,Swiggy,2022,1,1,1,January,Saturday
2,A,2023-01-07,India,Swiggy,2023,1,1,7,January,Saturday
3,A,2023-01-10,India,Restaurant,2023,1,1,10,January,Tuesday
3,A,2022-01-11,India,Swiggy,2022,1,1,11,January,Tuesday
3,A,2023-01-11,India,Restaurant,2023,1,1,11,January,Wednesday
2,B,2022-02-01,India,Swiggy,2022,2,1,1,February,Tuesday
2,B,2023-01-02,India,Swiggy,2023,1,1,2,January,Monday
1,B,2023-01-04,India,Restaurant,2023,1,1,4,January,Wednesday
1,B,2023-02-11,India,Swiggy,2023,2,1,11,February,Saturday


Defining The schema of Menu Table

In [0]:
schema = StructType([
    StructField("product_id", IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True),
])
menu = spark.read.format('csv').option("inferschema",'true').schema(schema).load('/FileStore/menu_csv.txt')
menu

Out[70]: DataFrame[product_id: int, product_name: string, price: string]

In [0]:
display(menu)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


Type casting the price from string to integer and removing any string part.

In [0]:
menu = menu.withColumn('price',regexp_replace(col('price'), '[^\d]','').cast('int'))

In [0]:
display(menu)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


Creating View to work with SQL

In [0]:
sales.createOrReplaceTempView('sales')
menu.createOrReplaceTempView('menu')

In [0]:
%sql
SELECT s.customer_id, sum(m.price) FROM sales AS s
JOIN menu AS m
WHERE s.product_id = m.product_id
GROUP BY 1
ORDER BY 2 DESC

customer_id,sum(price)
B,4440
A,4260
C,2400
E,2040
D,1200


In [0]:
total_amount_spent_by_each_customer = sales.join(menu, 'product_id') \
                                           .groupBy('customer_id') \
                                           .agg({'price': 'sum'}) \
                                           .orderBy(desc('sum(price)'))

total_amount_spent_by_each_customer.display()

customer_id,sum(price)
B,4440
A,4260
C,2400
E,2040
D,1200


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT product_name, SUM(price) AS total_amount_spent
FROM sales
JOIN menu ON sales.product_id = menu.product_id
GROUP BY product_name
ORDER BY total_amount_spent DESC;

product_name,total_amount_spent
sandwich,5760
Chowmin,3600
PIZZA,2100
Dosa,1320
Pasta,1080
Biryani,480


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_by_each_category = sales.join(menu, 'product_id') \
                                           .groupBy('product_name') \
                                           .agg({'price': 'sum'}).alias('Total Sales') \
                                           .orderBy(desc('sum(price)'))

total_sales_by_each_category.display()

product_name,sum(price)
sandwich,5760
Chowmin,3600
PIZZA,2100
Dosa,1320
Pasta,1080
Biryani,480


In [0]:

total_sales_by_each_month = sales.join(menu, 'product_id') \
                                           .groupBy('month_name','Month') \
                                           .agg({'price': 'sum'}) \
                                           .orderBy('Month').drop('Month')

total_sales_by_each_month.display()

month_name,sum(price)
January,2960
February,2730
March,910
May,2960
June,2960
July,910
November,910


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_spent_by_each_quarter = sales.join(menu, 'product_id') \
                                           .groupBy('Quarter') \
                                           .agg({'price': 'sum'}) \
                                           .orderBy('Quarter')

total_amount_spent_by_each_quarter.display()

Quarter,sum(price)
1,6600
2,5920
3,910
4,910


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_spent_by_each_year = sales.join(menu, 'product_id') \
                                           .groupBy('Year') \
                                           .agg({'price': 'sum'}) \
                                           .orderBy('Year')

total_amount_spent_by_each_year.display()

Year,sum(price)
2022,4350
2023,9990


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_by_each_day = sales.join(menu, 'product_id') \
                                           .groupBy('day_of_week','Day') \
                                           .agg({'price': 'sum'}) \
                                           .orderBy('Day').drop('Day')

total_sales_by_each_day.display()

day_of_week,sum(price)
Sunday,540
Saturday,300
Wednesday,550
Tuesday,610
Friday,300
Tuesday,300
Monday,300
Thursday,200
Sunday,200
Wednesday,200


Databricks visualization. Run in Databricks to view.

In [0]:
Orders_by_product_category= sales.join(menu, 'product_id') \
                                           .groupBy('product_id','product_name') \
                                           .agg(count('product_id').alias('Orders')) \
                                           .orderBy('Orders', ascending = 0).drop('product_id')
display(Orders_by_product_category)

product_name,Orders
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6
Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
Top_3_Orders_by_product_category= sales.join(menu, 'product_id') \
                                           .groupBy('product_id','product_name') \
                                           .agg(count('product_id').alias('Orders')) \
                                           .orderBy('Orders', ascending = 0).drop('product_id').limit(3)
display(Top_3_Orders_by_product_category)

product_name,Orders
sandwich,48
Chowmin,24
PIZZA,21


Databricks visualization. Run in Databricks to view.

In [0]:
Highest_ordered_item= sales.join(menu, 'product_id') \
                                           .groupBy('product_id','product_name') \
                                           .agg(count('product_id').alias('Orders')) \
                                           .orderBy('Orders', ascending = 0).drop('product_id').limit(1)
display(Highest_ordered_item)

product_name,Orders
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
frequency_resturant = sales.filter(sales.source_order == 'Restaurant').groupBy('customer_id').agg(countDistinct('order_date'))
frequency_resturant.display()

customer_id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

In [0]:
sales = sales.withColumn("location", when(sales["location"] == "UK", "United Kingdom").otherwise(sales["location"]))
sales = sales.withColumn("location", when(sales["location"] == "USA", "United States").otherwise(sales["location"]))



In [0]:
total_sales_by_country = sales.join(menu, 'product_id') \
                                           .groupBy('location') \
                                           .agg(sum('price').alias('Total Sales'))\
                                           .orderBy('Total Sales')

total_sales_by_country.display()

location,Total Sales
United States,2460
India,4860
United Kingdom,7020


Databricks visualization. Run in Databricks to view.

In [0]:
Total_Revenue = total_amount_spent_by_each_customer.agg(sum('sum(price)').alias('Total Revenue'))
Total_Revenue.display()

Total Revenue
14340


Databricks visualization. Run in Databricks to view.

In [0]:
total_orders = Orders_by_product_category.agg(sum("Orders").alias("Total_Orders"))
total_orders.display()

Total_Orders
117


Databricks visualization. Run in Databricks to view.