In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType, DateType

# Schema= StructType({
#     StructField("product_id", IntegerType(), True),
#     StructField("customer_id", StringType(), True),
#     StructField("order_date", DateType(), True),
#     StructField("location", StringType(), True),
#     StructField("source_order", StringType(), True)
# })

In [0]:
s_df=spark.table("sales")
s_df.show(truncate=False)

+--------------------------------+
|value                           |
+--------------------------------+
|1,A, 2023-01-01,India,Swiggy    |
|2,A, 2022-01-01,India,Swiggy    |
|2,A, 2023-01-07,India,Swiggy    |
|3,A, 2023-01-10,India,Restaurant|
|3,A, 2022-01-11,India,Swiggy    |
|3,A, 2023-01-11,India,Restaurant|
|2,B, 2022-02-01,India,Swiggy    |
|2,B, 2023-01-02,India,Swiggy    |
|1,B, 2023-01-04,India,Restaurant|
|1,B, 2023-02-11,India,Swiggy    |
|3,B, 2023-01-16,India,zomato    |
|3,B, 2022-02-01,India,zomato    |
|3,C, 2023-01-01,India,zomato    |
|1,C, 2023-01-01,UK,Swiggy       |
|6,C, 2022-01-07,UK,zomato       |
|3,D, 2023-02-16,UK,Restaurant   |
|5,D, 2022-02-01,UK,zomato,      |
|3,E, 2023-02-01,UK,Restaurant   |
|4,E, 2023-02-01,UK,Swiggy       |
|4,E, 2023-02-07,UK,Restaurant   |
+--------------------------------+
only showing top 20 rows


In [0]:
from pyspark.sql.functions import split, col

s_df_split = s_df.withColumn("cols", split("value", ","))
s_df_split.show()

+--------------------+--------------------+
|               value|                cols|
+--------------------+--------------------+
|1,A, 2023-01-01,I...|[1, A,  2023-01-0...|
|2,A, 2022-01-01,I...|[2, A,  2022-01-0...|
|2,A, 2023-01-07,I...|[2, A,  2023-01-0...|
|3,A, 2023-01-10,I...|[3, A,  2023-01-1...|
|3,A, 2022-01-11,I...|[3, A,  2022-01-1...|
|3,A, 2023-01-11,I...|[3, A,  2023-01-1...|
|2,B, 2022-02-01,I...|[2, B,  2022-02-0...|
|2,B, 2023-01-02,I...|[2, B,  2023-01-0...|
|1,B, 2023-01-04,I...|[1, B,  2023-01-0...|
|1,B, 2023-02-11,I...|[1, B,  2023-02-1...|
|3,B, 2023-01-16,I...|[3, B,  2023-01-1...|
|3,B, 2022-02-01,I...|[3, B,  2022-02-0...|
|3,C, 2023-01-01,I...|[3, C,  2023-01-0...|
|1,C, 2023-01-01,U...|[1, C,  2023-01-0...|
|6,C, 2022-01-07,U...|[6, C,  2022-01-0...|
|3,D, 2023-02-16,U...|[3, D,  2023-02-1...|
|5,D, 2022-02-01,U...|[5, D,  2022-02-0...|
|3,E, 2023-02-01,U...|[3, E,  2023-02-0...|
|4,E, 2023-02-01,U...|[4, E,  2023-02-0...|
|4,E, 2023-02-07,U...|[4, E,  20

In [0]:
s_df_split.select("cols").show(5, truncate=False)

+--------------------------------------+
|cols                                  |
+--------------------------------------+
|[1, A,  2023-01-01, India, Swiggy]    |
|[2, A,  2022-01-01, India, Swiggy]    |
|[2, A,  2023-01-07, India, Swiggy]    |
|[3, A,  2023-01-10, India, Restaurant]|
|[3, A,  2022-01-11, India, Swiggy]    |
+--------------------------------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import trim, when, col, get
s_df_final = s_df_split.select(
    get(col("cols"), 0).alias("product_id"),
    get(col("cols"), 1).alias("customer_id"),
    get(col("cols"), 2).alias("order_date"),
    get(col("cols"), 3).alias("location"),
    get(col("cols"), 4).alias("source_order")
)


s_df_clean = (
    s_df_final
    .withColumn("product_id",
        when(trim(col("product_id")) == "", None)
        .otherwise(trim(col("product_id")))
    )
    .withColumn("order_date",
        when(trim(col("order_date")) == "", None)
        .otherwise(trim(col("order_date")))
    )
)

from pyspark.sql.types import IntegerType, DateType

sales_df = (
    s_df_clean
    .withColumn("product_id", col("product_id").cast(IntegerType()))
    .withColumn("order_date", col("order_date").cast(DateType()))
)

# Step 4: Check
sales_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- location: string (nullable = true)
 |-- source_order: string (nullable = true)



In [0]:
sales_df.show()

+----------+-----------+----------+--------+------------+
|product_id|customer_id|order_date|location|source_order|
+----------+-----------+----------+--------+------------+
|         1|          A|2023-01-01|   India|      Swiggy|
|         2|          A|2022-01-01|   India|      Swiggy|
|         2|          A|2023-01-07|   India|      Swiggy|
|         3|          A|2023-01-10|   India|  Restaurant|
|         3|          A|2022-01-11|   India|      Swiggy|
|         3|          A|2023-01-11|   India|  Restaurant|
|         2|          B|2022-02-01|   India|      Swiggy|
|         2|          B|2023-01-02|   India|      Swiggy|
|         1|          B|2023-01-04|   India|  Restaurant|
|         1|          B|2023-02-11|   India|      Swiggy|
|         3|          B|2023-01-16|   India|      zomato|
|         3|          B|2022-02-01|   India|      zomato|
|         3|          C|2023-01-01|   India|      zomato|
|         1|          C|2023-01-01|      UK|      Swiggy|
|         6|  

Deriving year month, quarter

In [0]:
from pyspark.sql.functions import month, year, quarter

sales_df=sales_df.withColumn("order_year",year(sales_df.order_date))
sales_df=sales_df.withColumn("order_month",month(sales_df.order_date))
sales_df=sales_df.withColumn("order_quarter",quarter(sales_df.order_date))
display(sales_df)

product_id,customer_id,order_date,location,source_order,order_year,order_month,order_quarter
1.0,A,2023-01-01,India,Swiggy,2023.0,1.0,1.0
2.0,A,2022-01-01,India,Swiggy,2022.0,1.0,1.0
2.0,A,2023-01-07,India,Swiggy,2023.0,1.0,1.0
3.0,A,2023-01-10,India,Restaurant,2023.0,1.0,1.0
3.0,A,2022-01-11,India,Swiggy,2022.0,1.0,1.0
3.0,A,2023-01-11,India,Restaurant,2023.0,1.0,1.0
2.0,B,2022-02-01,India,Swiggy,2022.0,2.0,1.0
2.0,B,2023-01-02,India,Swiggy,2023.0,1.0,1.0
1.0,B,2023-01-04,India,Restaurant,2023.0,1.0,1.0
1.0,B,2023-02-11,India,Swiggy,2023.0,2.0,1.0


Menu DF

In [0]:
m_df=spark.table("menu")

m_df_split = m_df.withColumn("cols", split("value", ","))

m_df_final = m_df_split.select(
    get(col("cols"), 0).alias("product_id"),
    get(col("cols"), 1).alias("product_name"),
    get(col("cols"), 2).alias("price")
)


m_df_clean = (
    m_df_final
    .withColumn("product_id",
        when(trim(col("product_id")) == "", None)
        .otherwise(trim(col("product_id")))
    )
    .withColumn("price",
        when(trim(col("price")) == "", None)
        .otherwise(trim(col("price")))
    )
)

from pyspark.sql.types import IntegerType, DateType

menu_df = (
    m_df_clean
    .withColumn("product_id", col("product_id").cast(IntegerType()))
    .withColumn("price", col("price").cast(IntegerType()))
)

# Step 4: Check
menu_df.printSchema()



root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: integer (nullable = true)



In [0]:
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



Total Amount spent by each customer

In [0]:
from pyspark.sql.functions import sum
total_amount_spent_by_cust = (sales_df.join(menu_df,'product_id').groupBy('customer_id').agg(sum('price').alias('total_spent')).orderBy('customer_id'))

display(total_amount_spent_by_cust)

customer_id,total_spent
A,4260
B,4440
C,2400
D,1200
E,2040


Databricks visualization. Run in Databricks to view.

Total Amount spent on each food category

In [0]:
total_amount_spent_acc_foodCateory=sales_df.join(menu_df,'product_id').groupBy('product_name').agg(sum('price').alias('total_spent')).orderBy('product_name')

display(total_amount_spent_acc_foodCateory)

product_name,total_spent
Biryani,480
Chowmin,3600
Dosa,1320
PIZZA,2100
Pasta,1080
sandwich,5760


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Total amount of sales in each month

In [0]:
total_sales_by_month = sales_df.join(menu_df,'product_id').groupBy(month('order_date').alias('Month')).agg(sum('price').alias('Total_sales'));

display(total_sales_by_month)

Month,Total_sales
1,2960
6,2960
3,910
11,910
2,2730
7,910
5,2960


Databricks visualization. Run in Databricks to view.

Yearly Sales

In [0]:
total_sales_yearly = sales_df.join(menu_df,'product_id').groupBy(year('order_date').alias('Year')).agg(sum('price').alias('Yearly_sales'));

display(total_sales_yearly)

Year,Yearly_sales
2023,9990
2022,4350


Databricks visualization. Run in Databricks to view.

Quarterly Sales

In [0]:
total_sales_quarterly = sales_df.join(menu_df,'product_id').groupBy(quarter('order_date').alias('Quarter')).agg(sum('price').alias('Quarterly_sales'));

display(total_sales_quarterly)

Quarter,Quarterly_sales
1,6600
4,910
3,910
2,5920


Databricks visualization. Run in Databricks to view.

Total number of order by each category

How many times each product purchased?

In [0]:
from pyspark.sql.functions import count
count_by_purchase = sales_df.join(menu_df,'product_id').groupBy('product_name').agg(count('customer_id').alias('purchase_count'));

display(count_by_purchase)

product_name,purchase_count
sandwich,48
Dosa,12
Biryani,6
PIZZA,21
Chowmin,24
Pasta,6


Databricks visualization. Run in Databricks to view.

Top 5 ordered items

In [0]:
top_five_ordered_item = sales_df.join(menu_df,'product_id').groupby('product_id','product_name').agg(count('customer_id').alias('purchase_count')).orderBy('purchase_count',ascending=False).limit(5)

display(top_five_ordered_item)

product_id,product_name,purchase_count
3,sandwich,48
2,Chowmin,24
1,PIZZA,21
4,Dosa,12
5,Biryani,6


Databricks visualization. Run in Databricks to view.

Top Ordered Item

In [0]:
top_ordered_item = sales_df.join(menu_df,'product_id').groupby('product_id','product_name').agg(count('customer_id').alias('purchase_count')).orderBy('purchase_count',ascending=False).limit(1)

display(top_ordered_item)

product_id,product_name,purchase_count
3,sandwich,48


Databricks visualization. Run in Databricks to view.

Frequency of customer visited to the resturant

In [0]:
from pyspark.sql.functions import countDistinct
frequency_per_resturant = sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date').alias('Frequency_of_customers'));

display(frequency_per_resturant)

customer_id,Frequency_of_customers
A,6
B,6
D,1
E,5
C,3


Databricks visualization. Run in Databricks to view.

Total Sales by country

In [0]:
from pyspark.sql.functions import sum
total_sales_country = sales_df.join(menu_df,'product_id').groupBy('location').agg({'price':'sum'}).orderBy('location');


display(total_sales_country)

location,sum(price)
India,4860
UK,7020
USA,2460


Databricks visualization. Run in Databricks to view.

Total Sales by restaurant

In [0]:
from pyspark.sql.functions import sum
total_sales_source = sales_df.join(menu_df,'product_id').groupBy('source_order').agg(sum('price').alias('Sales')).orderBy('source_order');


display(total_sales_source)

source_order,Sales
Restaurant,3090
Swiggy,6330
zomato,4920


Databricks visualization. Run in Databricks to view.