In [None]:
#Creating a SparkScession 

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Sales").getOrCreate()

#Sales DataFrame


In [None]:
#Reading Sales files into Sales DataFrame and Defining the Schema for the dataFrame
from pyspark.sql.types import *

schema= StructType([
    StructField('product_id',IntegerType(),True),
    StructField('customer_id',StringType(),True),
    StructField('order_date',DateType(),True),
    StructField('location',StringType(),True),
    StructField('source_order',StringType(),True),
])

sales_df=spark.read.format("csv").option('header','true').option('inferSchema','true').schema(schema).load('dbfs:/FileStore/tables/project/sales_csv.txt',)
sales_df.display()

product_id,customer_id,order_date,location,source_order
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy
3,B,2023-01-16,India,zomato


In [None]:
#Printing the Schema for sales_df dataFrame
sales_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)



In [None]:
#Importing date functions from functions packages and extracting Month, Quarter and Year based on Date Column

from pyspark.sql.functions import year,month,quarter

sales_df=sales_df.withColumn('order_year',year(sales_df.order_date))
sales_df=sales_df.withColumn('order_month',month(sales_df.order_date))
sales_df=sales_df.withColumn('order_quarter',quarter(sales_df.order_date))
sales_df.display()

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1
3,B,2023-01-16,India,zomato,2023,1,1


In [None]:
#Printing the schema after deriving 3 new columns with an existing column
sales_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_quarter: integer (nullable = true)



In [None]:
#Reading Menu files into menu DataFrame and Defining the Schema for the dataFrame

from pyspark.sql.types import *

schema= StructType([
    StructField('product_id',IntegerType(),True),
    StructField('product_name',StringType(),True),
    StructField('price',StringType(),True)
])

menu_df=spark.read.format("csv").option('header','true').option('inferSchema','true').schema(schema).load('dbfs:/FileStore/tables/project/menu_csv.txt',)
menu_df.display()

product_id,product_name,price
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [None]:
#Total Amount Spend by each customer

total_amount_spent = (sales_df.join(menu_df,'product_id').groupBy('customer_id').agg({'price':'sum'})).orderBy('customer_id')
total_amount_spent.display()

customer_id,sum(price)
A,3960.0
B,3240.0
C,1800.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [None]:
#2 Total Amount Spent by Each Category

total_amount_food_category = (sales_df.join(menu_df,'product_id').groupBy('product_name').agg({'price':'sum'})).orderBy('product_name')
total_amount_food_category.display()

product_name,sum(price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [None]:
#3 Total Amount of Sales Per Month

total_sales_per_month = (sales_df.join(menu_df,'product_id').groupBy('order_month').agg({'price':'sum'})).orderBy('order_month')
total_sales_per_month.display()

order_month,sum(price)
1,2460.0
2,2430.0
3,810.0
5,2460.0
6,2460.0
7,810.0
11,810.0


Databricks visualization. Run in Databricks to view.

In [None]:
#4 Yearly Sales

yearly_sales = (sales_df.join(menu_df,'product_id').groupBy('order_year').agg({'price':'sum'})).orderBy('order_year')
yearly_sales.display()

order_year,sum(price)
2022,4350.0
2023,7890.0


Databricks visualization. Run in Databricks to view.

In [None]:
#5 Quarterly Sales


yearly_sales = (sales_df.join(menu_df,'product_id').groupBy('order_quarter').agg({'price':'sum'})).orderBy('order_quarter')
yearly_sales.display()

order_quarter,sum(price)
1,5700.0
2,4920.0
3,810.0
4,810.0


Databricks visualization. Run in Databricks to view.

In [None]:
#6 Count of the Procuct_ids 

counts_of_product_id = (sales_df.join(menu_df,'product_id').groupBy('product_id')
                        .agg({'product_id':'count'})).orderBy('count(product_id)')
counts_of_product_id.display()

product_id,count(product_id)
6,6
5,6
4,12
2,24
3,48


In [None]:
#6 Count of the Procuct_ids 
from pyspark.sql.functions import count

counts_of_product_id = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name')
                        .agg(count('product_id').alias('product_count')).orderBy('product_count',ascending=0)
                        .drop('product_id'))
counts_of_product_id.display()

product_name,product_count
sandwich,48
Chowmin,24
Dosa,12
Biryani,6
Pasta,6


Databricks visualization. Run in Databricks to view.

In [None]:
#7 Top order items

from pyspark.sql.functions import count

top_item = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name')
        .agg(count('product_id').alias('product_count')).orderBy('product_count',ascending=0)
        .drop('product_id')).limit(1)
top_item.display()



product_name,product_count
sandwich,48


In [None]:
#8 Frequency of Customer visited  to resturant
from pyspark.sql.functions import countDistinct

df = (sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date')))           
df.display()

customer_id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

In [None]:
#total sales by each Country

(sales_df.join(menu_df,'product_id').groupBy('location')).agg({'price':'sum'}).display()

location,sum(price)
India,3960.0
USA,2160.0
UK,6120.0


Databricks visualization. Run in Databricks to view.

In [None]:
#total sales by each source

(sales_df.join(menu_df,'product_id').groupBy('source_order')).agg({'price':'sum'}).display()

source_order,sum(price)
zomato,4920.0
Swiggy,4830.0
Restaurant,2490.0


Databricks visualization. Run in Databricks to view.