In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import *


In [None]:
%fs ls dbfs:/FileStore/tables/project1/

path,name,size,modificationTime
dbfs:/FileStore/tables/project1/menu_csv.txt,menu_csv.txt,98,1723468430000
dbfs:/FileStore/tables/project1/sales_csv.txt,sales_csv.txt,3465,1723462359000


In [None]:
df=spark.read.option("header",True).option("inferschema",True).csv("dbfs:/FileStore/tables/project1/sales_csv.txt")

In [None]:
df.display()

1,A,2023-01-01,India,Swiggy
2,A,2022-01-01T00:00:00.000+0000,India,Swiggy
2,A,2023-01-07T00:00:00.000+0000,India,Swiggy
3,A,2023-01-10T00:00:00.000+0000,India,Restaurant
3,A,2022-01-11T00:00:00.000+0000,India,Swiggy
3,A,2023-01-11T00:00:00.000+0000,India,Restaurant
2,B,2022-02-01T00:00:00.000+0000,India,Swiggy
2,B,2023-01-02T00:00:00.000+0000,India,Swiggy
1,B,2023-01-04T00:00:00.000+0000,India,Restaurant
1,B,2023-02-11T00:00:00.000+0000,India,Swiggy
3,B,2023-01-16T00:00:00.000+0000,India,zomato


In [None]:
df.printSchema()

root
 |-- 1: integer (nullable = true)
 |-- A: string (nullable = true)
 |--  2023-01-01: timestamp (nullable = true)
 |-- India: string (nullable = true)
 |-- Swiggy: string (nullable = true)



In [None]:
schema=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order" ,StringType(),True)
    ])

In [None]:
df=spark.read.option("header",True).option("inferschema",True).schema(schema).csv("dbfs:/FileStore/tables/project1/sales_csv.txt")

In [None]:
df.display()

product_id,customer_id,order_date,location,source_order
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy
3,B,2023-01-16,India,zomato


Year,month,Quarter

In [None]:
sales_df=df.withColumn("order_year",year(df.order_date)).withColumn("order_month",month(df.order_date)).withColumn("order_quarter",quarter(df.order_date))
sales_df.display()

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1
3,B,2023-01-16,India,zomato,2023,1,1


Menu Dataframe

In [None]:
schema2=StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True)
  
    ])

In [None]:
menu_df=spark.read.option("header",True).option("inferschema",True).schema(schema2).csv("dbfs:/FileStore/tables/project1/menu_csv.txt")

Total amount spent by each customer

In [None]:
help(sales_df.join)

Help on method join in module pyspark.sql.dataframe:

join(other: 'DataFrame', on: Union[str, List[str], pyspark.sql.column.Column, List[pyspark.sql.column.Column], NoneType] = None, how: Optional[str] = None) -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Joins with another :class:`DataFrame`, using the given join expression.
    
    .. versionadded:: 1.3.0
    
    .. versionchanged:: 3.4.0
        Support Spark Connect.
    
    Parameters
    ----------
    other : :class:`DataFrame`
        Right side of the join
    on : str, list or :class:`Column`, optional
        a string for the join column name, a list of column names,
        a join expression (Column), or a list of Columns.
        If `on` is a string or a list of strings indicating the name of the join column(s),
        the column(s) must exist on both sides, and this performs an equi-join.
    how : str, optional
        default ``inner``. Must be one of: ``inner``, ``cross``, ``outer``,
       

In [None]:
# Print schema and preview data
sales_df.printSchema()
sales_df.show(5)

menu_df.printSchema()
menu_df.show(5)


root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_quarter: integer (nullable = true)

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1| 

In [None]:

total_amount_spent=sales_df.join(menu_df,'product_id').groupBy('customer_id').agg({'price':'sum'}).orderBy('customer_id')
display(total_amount_spent)

customer_id,sum(price)
A,3960.0
B,3240.0
C,1800.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

Total amount Spent by each food category

In [None]:
total_amount_food=sales_df.join(menu_df,'product_id').groupBy('product_name').agg({'price':'sum'}).orderBy('product_name')
display(total_amount_food)

product_name,sum(price)
Biryani,480.0
Chowmin,3600.0
Dosa,1320.0
Pasta,1080.0
sandwich,5760.0


Databricks visualization. Run in Databricks to view.

Total sales by each month

In [None]:
monthly_Sales=sales_df.join(menu_df,'product_id').groupBy('order_month').agg({'price':'sum'}).orderBy('order_month')
monthly_Sales.display()

order_month,sum(price)
1,2460.0
2,2430.0
3,810.0
5,2460.0
6,2460.0
7,810.0
11,810.0


Databricks visualization. Run in Databricks to view.

Yearly sales

In [None]:
yearly_sales=sales_df.join(menu_df,'product_id').groupBy('order_year').agg({'price':'sum'}).orderBy('order_year')
yearly_sales.display()

order_year,sum(price)
2022,4350.0
2023,7890.0


Databricks visualization. Run in Databricks to view.

Quaterly sales

In [None]:
quaterly_sales=sales_df.join(menu_df,'product_id').groupBy('order_quarter').agg({'price':'sum'}).orderBy('order_quarter')
quaterly_sales.display()


order_quarter,sum(price)
1,5700.0
2,4920.0
3,810.0
4,810.0


Databricks visualization. Run in Databricks to view.

Total number of order by each category

In [None]:

most_order=sales_df.join(menu_df,'product_id')\
.groupBy('product_id','product_name')\
.agg(count('product_id').alias('product_count')).orderBy('product_count',ascending=0).drop('product_id')
most_order.display()

product_name,product_count
sandwich,48
Chowmin,24
Dosa,12
Biryani,6
Pasta,6


Databricks visualization. Run in Databricks to view.

Top Ordered Item

In [None]:
top_product=sales_df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count')).orderBy('product_count',ascending=0).drop('product_id').limit(1)
display(top_product)

product_name,product_count
sandwich,48


Databricks visualization. Run in Databricks to view.

Frequency of customer visited to restaurant 


In [None]:
visited_res=sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date'))
display(visited_res)


customer_id,count(order_date)
E,5
B,6
D,1
C,3
A,6


Databricks visualization. Run in Databricks to view.

Total sales by each country

In [None]:
sales_df.display()

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1
3,B,2023-01-16,India,zomato,2023,1,1


In [None]:
sales_country=sales_df.join(menu_df,'product_id').groupBy('location').agg({'price':'sum'})
sales_country.display()

location,sum(price)
India,3960.0
USA,2160.0
UK,6120.0


Databricks visualization. Run in Databricks to view.

Total sales by order source

In [None]:
sales_source=sales_df.join(menu_df,'product_id').groupBy('source_order').agg({'price':'sum'})
display(sales_source)

source_order,sum(price)
zomato,4920.0
Swiggy,4830.0
Restaurant,2490.0


Databricks visualization. Run in Databricks to view.