In [0]:
# sales_csv - /FileStore/tables/sales_csv-3.txt

# menu_csv - /FileStore/tables/menu_csv-2.txt


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col,min,max,sum,rank, dense_rank,lit,to_date, year, month, quarter, count, countDistinct
from pyspark.sql import functions as F


### Sales df

Customer_df -:
    product_id

    customer_id

    order_date

    location

    source_order



product_df :-

    product_id
    
    product_name
    
    price


In [0]:
spark = SparkSession.builder.appName('PySpark_Project').getOrCreate()

In [0]:
schema_customer = StructType([
  StructField('product_id',IntegerType(),True),
  StructField('customer_id',StringType(), True),
  StructField('order_date', DateType(), True),
  StructField('location', StringType(), True),
  StructField('source_order', StringType(), True)
])

sales_df = spark.read.format('csv').option('inferschema','true').schema(schema_customer).load('/FileStore/tables/sales_csv-3.txt')
sales_df.show(10)

# Adding new column year, month, quarter

sales_df = sales_df.withColumn('order_year', year('order_date'))\
                   .withColumn('order_month', month('order_date'))\
                   .withColumn('order_quarter', quarter('order_date'))

sales_df.display()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
+----------+-----------+----------+--------+------------+
only showing top 10 rows



product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


##Menu_df

In [0]:

schema_menu = StructType([
  StructField('product_id', IntegerType(), True),
  StructField('product_name', StringType(), True),
  StructField('price', FloatType(), True)
])

menu_df = spark.read.format('csv').option('inferschema','true').schema(schema_menu).load('/FileStore/tables/menu_csv-2.txt')
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|100.0|
|         2|     Chowmin|150.0|
|         3|    sandwich|120.0|
|         4|        Dosa|110.0|
|         5|     Biryani| 80.0|
|         6|       Pasta|180.0|
+----------+------------+-----+



######## KPI

Total Amount spent by each customer

Total Amount spent by each food category

Total Amount of sales in each month

Yearly sales

Quaterly sales

total number of order by each category

Top 5 ordered items

Top ordered items

frequecy of customer visited

Total sales by each country

total sales by order source

In [0]:
sales_df.printSchema()
menu_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_quarter: integer (nullable = true)

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: float (nullable = true)



###### Total Amount spent by each customer

In [0]:
df_join = sales_df.alias('s').join(menu_df.alias('m'), col('s.product_id') == col('m.product_id'),'inner')

customer_spent = df_join\
                  .groupBy(col('s.customer_id')).agg(sum(col('m.price')).alias('total_spent'))\
                  .select(col('s.customer_id'),col('total_spent'))\
                  .orderBy(col('s.customer_id'))
display(customer_spent)

customer_id,total_spent
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

###### Total Amount Spent By each food category

In [0]:
food_spent = sales_df.alias('s').join(menu_df.alias('m'), col('s.product_id') == col('m.product_id'), 'inner')\
              .groupBy(col('m.product_name')).agg(sum(col('m.price')).alias('total_food_spent'))
              
food_spent.display()

product_name,total_food_spent
Pasta,1080.0
PIZZA,2100.0
sandwich,5760.0
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0


Databricks visualization. Run in Databricks to view.

###### Total Amount for sale in each

In [0]:
month_sale = df_join.groupBy(col('s.order_month')).agg(sum(col('price')).alias('total_month_spent')).orderBy(col('order_month'))
month_sale.display()

order_month,total_month_spent
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

###### Yearly Sales

In [0]:
yearly_sale = df_join.groupBy(col('order_year')).agg(sum(col('price')).alias('yearly_sale'))
yearly_sale.display()

order_year,yearly_sale
2023,9990.0
2022,4350.0


Databricks visualization. Run in Databricks to view.

###### Quarterly Sales

In [0]:
quarter_sale = df_join.groupBy(col('order_quarter')).agg(sum(col('price')).alias('quarter_sale'))\
                .orderBy('order_quarter')
quarter_sale.display()

order_quarter,quarter_sale
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

####### Total Number of order by each category

In [0]:
category_order = df_join.groupBy('product_name').count()
category_order.select('product_name',col('count').alias('Order_count')).display()

product_name,Order_count
Pasta,6
PIZZA,21
sandwich,48
Biryani,6
Chowmin,24
Dosa,12


Databricks visualization. Run in Databricks to view.

###### Top 5 ordered items

In [0]:
food_spent = sales_df.alias('s').join(menu_df.alias('m'), col('s.product_id') == col('m.product_id'), 'inner')\
              .groupBy(col('m.product_name')).agg(count(col('*')).alias('order_count')).orderBy(col('order_count').desc()).limit(5)
      
              
food_spent.display()

product_name,order_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Pasta,6


Databricks visualization. Run in Databricks to view.

##### Top Ordered item

In [0]:
food_spent = sales_df.alias('s').join(menu_df.alias('m'), col('s.product_id') == col('m.product_id'), 'inner')\
              .groupBy(col('m.product_name')).agg(count(col('*')).alias('order_count')).orderBy(col('order_count').desc()).limit(1)
      
              
food_spent.display()

product_name,order_count
sandwich,48


Databricks visualization. Run in Databricks to view.

###### Customer Frequecy to order from Restaurant

In [0]:
customer_frequency = sales_df.filter(col('source_order') == 'Restaurant').groupBy('customer_id').agg(countDistinct('order_date').alias('Customer_Frequency')).orderBy('customer_id')
customer_frequency.display()

customer_id,Customer_Frequency
A,6
B,6
C,3
D,1
E,5


Databricks visualization. Run in Databricks to view.

###### Total sale by each Country

In [0]:
sale_country = df_join.groupBy('location').agg(sum('price').alias("total_sale")).orderBy(col("total_sale").desc())
sale_country.display()

location,total_sale
UK,7020.0
India,4860.0
USA,2460.0


Databricks visualization. Run in Databricks to view.

####### Total sale By order Source

In [0]:
sale_orderSource = df_join.groupBy('source_order').agg(sum('price').alias('total_sale_per_source_order')).orderBy("total_sale_per_source_order")
sale_orderSource.display()

source_order,total_sale_per_source_order
Restaurant,3090.0
zomato,4920.0
Swiggy,6330.0


Databricks visualization. Run in Databricks to view.