In [0]:
from pyspark .sql.types import StructType,StructField,IntegerType,StructType,DateType,StringType
from pyspark.sql import SparkSession

spark=SparkSession.builder.master("local").appName("Sales Analysis").getOrCreate()
schema_define=StructType([StructField("product_id",IntegerType(),True),
                          StructField("customer_id",StringType(),True),
                          StructField("order_date",DateType(),True),
                          StructField("location",StringType(),True),
                          StructField("source_order",StringType(),True)  
                         ]
                        )

sales_df=spark.read.csv('/FileStore/sales.csv',inferSchema=True,schema=schema_define)

sales_df.show()
display(sales_df)

'''
/dbfs/FileStore/menu.csv

/dbfs/FileStore/sales.csv
'''

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


'\n/dbfs/FileStore/menu.csv\n\n/dbfs/FileStore/sales.csv\n'

In [0]:
from pyspark.sql.functions import month, year, quarter

sales_df=sales_df.withColumn("order_year", year(sales_df.order_date))
sales_df=sales_df.withColumn("order_month", month(sales_df.order_date))
sales_df=sales_df.withColumn("order_quarter", quarter(sales_df.order_date))
display(sales_df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
schema_define=StructType([StructField("product_id",IntegerType(),True),
                          StructField("product_name",StringType(),True),
                          StructField("price",StringType(),True),
                        ]
                        )

menu_df=spark.read.csv('/FileStore/menu.csv',inferSchema=True,schema=schema_define)
display(menu_df)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


Databricks visualization. Run in Databricks to view.

In [0]:
total_amount_spent=(sales_df.join(menu_df,'product_id').groupBy('customer_id').agg({'price':'sum'}).orderBy('customer_id'))
total_amount_spent.show()
display(total_amount_spent)

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



customer_id,sum(price)
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_food_category=(sales_df.join(menu_df,'product_id').groupBy('product_name').agg({'price':'sum'}).orderBy('product_name'))
total_sales_food_category.show()
display(total_sales_food_category)

+------------+----------+
|product_name|sum(price)|
+------------+----------+
|     Biryani|     480.0|
|     Chowmin|    3600.0|
|        Dosa|    1320.0|
|       PIZZA|    2100.0|
|       Pasta|    1080.0|
|    sandwich|    5760.0|
+------------+----------+



product_name,sum(price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
PIZZA,2100.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_each_month=(sales_df.join(menu_df,'product_id').groupBy('order_month').agg({'price':'sum'}).orderBy('order_month'))
display(total_sales_each_month)


order_month,sum(price)
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_year_wise=(sales_df.join(menu_df,'product_id').groupBy('order_year').agg({'price':'sum'}).orderBy('order_year'))
display(total_sales_year_wise)

order_year,sum(price)
2022,4350.0
2023,9990.0


Databricks visualization. Run in Databricks to view.

In [0]:
total_sales_qurt_wise=(sales_df.join(menu_df,'product_id').groupBy('order_quarter').agg({'price':'sum'}).alias('Sum_amount').orderBy('order_quarter'))
display(total_sales_qurt_wise)

order_quarter,sum(price)
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

each_product_sales=(sales_df.join(menu_df,'product_id')
                    .groupBy('product_id','product_name')
                    .agg(count('product_id').alias('product_count'))
                    .orderBy('product_count',ascending=0)
                    .drop('product_id')
)
each_product_sales.show()

display(each_product_sales)

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6
Pasta,6


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import count

top_5_order=(sales_df.join(menu_df,'product_id')
             .groupBy('product_id','product_name')
             .agg(count('product_id').alias('product_count'))
             .orderBy('product_count',ascending=0)
             .drop('product_id')
             .limit(5)
            )
display(top_5_order)            

product_name,product_count
sandwich,48
Chowmin,24
PIZZA,21
Dosa,12
Biryani,6


In [0]:
from pyspark.sql.functions import count

top_order_item=(sales_df.join(menu_df,'product_id').groupBy('product_id','product_name')
             .agg(count('product_id').alias('product_count'))
             .orderBy('product_count',ascending=0)
             .drop('product_id')
             .limit(1)
            )
display(top_order_item)      

product_name,product_count
sandwich,48


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import countDistinct

df_restaurant=(sales_df.filter(sales_df.source_order=='Restaurant')
               .groupBy('customer_id')
               .agg(countDistinct('order_date').alias('No of Visit'))
               )
display(df_restaurant)

customer_id,No of Visit
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import countDistinct

total_sales_country_wise = (sales_df.join(menu_df,'product_id')
                            .groupBy('location')
                            .agg({'price':'sum'})
                            )
display(total_sales_country_wise)                            

location,sum(price)
India,4860.0
USA,2460.0
UK,7020.0


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import countDistinct

sales_df.printSchema()
total_sales_from_ord_source = (sales_df.join(menu_df,'product_id')
                            .groupBy('source_order')
                            .agg({'price':'sum'})
                            )
display(total_sales_from_ord_source)  

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_quarter: integer (nullable = true)



source_order,sum(price)
zomato,4810.0
Swiggy,6330.0
Restaurant,3090.0
zomato,110.0


Databricks visualization. Run in Databricks to view.