# 1. Import necessary lib #

In [0]:
# Import pyspark, SparkSession, StrucType
import pyspark
import pyspark.sql.functions as F

# From module
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Create SparkSession
spark = SparkSession.builder.master("local[*]").appName("SalesFoodAnalysis").getOrCreate()
print("Spark Version: ", spark.version)

Spark Version:  3.3.2


# 2. Extract data #

In [0]:
# Create function for extract data
def extract(format_file, file_path, schema):
    df = spark.read.format(format_file) \
                   .option("inferSchema", True) \
                   .schema(schema) \
                   .load(file_path)
    
    return df

In [0]:
# Define format_file
format_file = "csv"

# Read dataset sales.csv
file_sales_csv = "dbfs:/FileStore/tables/dataset/sales.csv"

# Define Schema of sales.csv
schmea_sales = StructType(
    [
        StructField("product_id", IntegerType(), True),
        StructField("customer_id", StringType(), True),
        StructField("order_date", DateType(), True),
        StructField("location", StringType(), True),
        StructField("source_order", StringType(), True)
    ]
)

# Read dataset into df_sales
df_sales = extract(format_file, file_sales_csv, schmea_sales)

# Show Dataframe
display(df_sales)

product_id,customer_id,order_date,location,source_order
1,A,2023-01-01,India,Swiggy
2,A,2022-01-01,India,Swiggy
2,A,2023-01-07,India,Swiggy
3,A,2023-01-10,India,Restaurant
3,A,2022-01-11,India,Swiggy
3,A,2023-01-11,India,Restaurant
2,B,2022-02-01,India,Swiggy
2,B,2023-01-02,India,Swiggy
1,B,2023-01-04,India,Restaurant
1,B,2023-02-11,India,Swiggy


In [0]:
# Define format_file
format_file = "csv"

# Read dataset menu.csv
file_menu_csv = "dbfs:/FileStore/tables/dataset/menu.csv"

# Define schema of menu.csv
schmea_menu = StructType(
    [
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("price", StringType(), True)
    ]
)

# Read dataset into df_menu
df_menu = extract(format_file, file_menu_csv, schmea_menu)

# Display df_menu
display(df_menu)

product_id,product_name,price
1,PIZZA,100
2,Chowmin,150
3,sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


# 3. Transformation data #

In [0]:
# Transformation df_sales with create year, month, quarter
df_sales = df_sales.withColumn("order_year", F.year(F.col('order_date')))
df_sales = df_sales.withColumn("order_month", F.month(F.col('order_date')))
df_sales = df_sales.withColumn("order_quarter", F.quarter(F.col('order_date')))

# Initcap for data in column source_order
df_sales = df_sales.withColumn("source_order", F.initcap(F.col('source_order')))

# Show dataframe
display(df_sales)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1,A,2023-01-01,India,Swiggy,2023,1,1
2,A,2022-01-01,India,Swiggy,2022,1,1
2,A,2023-01-07,India,Swiggy,2023,1,1
3,A,2023-01-10,India,Restaurant,2023,1,1
3,A,2022-01-11,India,Swiggy,2022,1,1
3,A,2023-01-11,India,Restaurant,2023,1,1
2,B,2022-02-01,India,Swiggy,2022,2,1
2,B,2023-01-02,India,Swiggy,2023,1,1
1,B,2023-01-04,India,Restaurant,2023,1,1
1,B,2023-02-11,India,Swiggy,2023,2,1


In [0]:
# Uplower all data from df_menu
df_menu = df_menu.withColumn("product_name", F.initcap(F.col("product_name")))

# Show dataframe
display(df_menu)

product_id,product_name,price
1,Pizza,100
2,Chowmin,150
3,Sandwich,120
4,Dosa,110
5,Biryani,80
6,Pasta,180


In [0]:
# Create Temp View
def create_temp_view(df, name_view):
    df.createOrReplaceTempView(name_view)

# Drop Temp View
def drop_temp_view(name_view):
    spark.catalog.dropTempView(name_view)

# 4. Load and Analytics #

In [0]:
# Create Temp View Menu
create_temp_view(df_menu, "menu")

# Create Temp View Sales
create_temp_view(df_sales, "sales")

## 4.1 Total Amount by each Customer ##

In [0]:
# Define query
query = ''' SELECT customer_id AS Customer, SUM(m.price) AS TotalAmount
         FROM menu m join sales s on m.product_id = s.product_id
         GROUP BY s.customer_id
         ORDER BY s.customer_id '''

# Run query
total_amount_each_cus = spark.sql(query)

# Show result
display(total_amount_each_cus)

Customer,TotalAmount
A,4260.0
B,4440.0
C,2400.0
D,1200.0
E,2040.0


Databricks visualization. Run in Databricks to view.

## 4.2. Total Amonut by Food Category ##

In [0]:
# Define query
query = ''' SELECT m.product_name AS FoodCategory, SUM(m.price) AS TotalAmount
         FROM menu m join sales s on m.product_id = s.product_id
         GROUP BY m.product_name
         ORDER BY TotalAmount DESC '''

# Run query
total_amount_each_foodcate = spark.sql(query)

# Show result
display(total_amount_each_foodcate)

FoodCategory,TotalAmount
Sandwich,5760.0
Chowmin,3600.0
Pizza,2100.0
Dosa,1320.0
Pasta,1080.0
Biryani,480.0


Databricks visualization. Run in Databricks to view.

## 4.3. Monthly Sales ##

In [0]:
# Define query
query = ''' SELECT s.order_month AS Monthly, SUM(m.price) AS TotalAmount
         FROM menu m join sales s on m.product_id = s.product_id
         GROUP BY s.order_month
         ORDER BY s.order_month '''

# Run query
total_amount_monthly = spark.sql(query)

# Show result
display(total_amount_monthly)

Monthly,TotalAmount
1,2960.0
2,2730.0
3,910.0
5,2960.0
6,2960.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

## 4.4. Number Of Orders by Each SourceOrder  ##

In [0]:
# Define query
query = ''' SELECT DISTINCT source_order AS SourceOrder, COUNT(*) AS NumberOfOrderBySource
            FROM sales
            GROUP BY source_order
            ORDER BY NumberOfOrderBySource DESC '''

# Run query
number_of_order_bysource = spark.sql(query)

# Show result
display(number_of_order_bysource)

SourceOrder,NumberOfOrderBySource
Swiggy,51
Zomato,39
Restaurant,27


Databricks visualization. Run in Databricks to view.

## 4.5. Total number of Order ##

In [0]:
# Define query
query = ''' SELECT COUNT(source_order) AS NumberOfOrder
            FROM sales '''

# Run query
number_of_orders = spark.sql(query)

# Show result
display(number_of_orders)

NumberOfOrder
117


Databricks visualization. Run in Databricks to view.

# 5. Saving parquet file #

In [0]:
drop_temp_view("sales")
drop_temp_view("menu")

In [0]:
# Export parquet file
df_sales.write.parquet("dbfs:/FileStore/tables/sales.parquet")
df_menu.write.parquet("dbfs:/FileStore/tables/menu.parquet")