In [1]:
from pyspark.sql import SparkSession

In [2]:
# Create SparkSession object
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
# read csv files
df_train = spark.read.csv("data/train.csv",inferSchema=True, header=True)
df_meal = spark.read.csv("data/meal_info.csv",inferSchema=True, header=True)
df_cntr = spark.read.csv("data/fulfilment_center_info.csv",inferSchema=True, header=True)

# Check data

In [5]:
# check the schema of the dataframe
df_train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- center_id: integer (nullable = true)
 |-- meal_id: integer (nullable = true)
 |-- checkout_price: double (nullable = true)
 |-- base_price: double (nullable = true)
 |-- emailer_for_promotion: integer (nullable = true)
 |-- homepage_featured: integer (nullable = true)
 |-- num_orders: integer (nullable = true)



In [6]:
# View data
df_train.show(5)

+-------+----+---------+-------+--------------+----------+---------------------+-----------------+----------+
|     id|week|center_id|meal_id|checkout_price|base_price|emailer_for_promotion|homepage_featured|num_orders|
+-------+----+---------+-------+--------------+----------+---------------------+-----------------+----------+
|1379560|   1|       55|   1885|        136.83|    152.29|                    0|                0|       177|
|1466964|   1|       55|   1993|        136.83|    135.83|                    0|                0|       270|
|1346989|   1|       55|   2539|        134.86|    135.86|                    0|                0|       189|
|1338232|   1|       55|   2139|         339.5|    437.53|                    0|                0|        54|
|1448490|   1|       55|   2631|         243.5|     242.5|                    0|                0|        40|
+-------+----+---------+-------+--------------+----------+---------------------+-----------------+----------+
only showi

In [7]:
df_train.columns

['id',
 'week',
 'center_id',
 'meal_id',
 'checkout_price',
 'base_price',
 'emailer_for_promotion',
 'homepage_featured',
 'num_orders']

In [8]:
# count num rows and cols
(df_train.count(), len(df_train.columns))

(180744, 9)

In [9]:
# check the schema of the dataframe
df_meal.printSchema()

root
 |-- meal_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- cuisine: string (nullable = true)



In [10]:
# View data
df_meal.show(5)

+-------+---------+-------+
|meal_id| category|cuisine|
+-------+---------+-------+
|   1885|Beverages|   Thai|
|   1993|Beverages|   Thai|
|   2539|Beverages|   Thai|
|   1248|Beverages| Indian|
|   2631|Beverages| Indian|
+-------+---------+-------+
only showing top 5 rows



In [11]:
# count num rows and cols
(df_meal.count(), len(df_meal.columns))

(51, 3)

In [12]:
# check the schema of the dataframe
df_cntr.printSchema()

root
 |-- center_id: integer (nullable = true)
 |-- city_code: integer (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- center_type: string (nullable = true)
 |-- op_area: double (nullable = true)



In [13]:
# View data
df_cntr.show(5)

+---------+---------+-----------+-----------+-------+
|center_id|city_code|region_code|center_type|op_area|
+---------+---------+-----------+-----------+-------+
|       11|      679|         56|     TYPE_A|    3.7|
|       13|      590|         56|     TYPE_B|    6.7|
|      124|      590|         56|     TYPE_C|    4.0|
|       66|      648|         34|     TYPE_A|    4.1|
|       94|      632|         34|     TYPE_C|    3.6|
+---------+---------+-----------+-----------+-------+
only showing top 5 rows



In [14]:
# count num rows and cols
(df_cntr.count(), len(df_cntr.columns))

(77, 5)

# Assignment

In [15]:
# For sql, Create view of the dataframes
df_train.createOrReplaceTempView('train')
df_meal.createOrReplaceTempView('meal')
df_cntr.createOrReplaceTempView('cntr')

# Q1 What are the distinct number of meal categories and cuisines?

In [16]:
# View rows
spark.sql('select * from meal limit 5')

meal_id,category,cuisine
1885,Beverages,Thai
1993,Beverages,Thai
2539,Beverages,Thai
1248,Beverages,Indian
2631,Beverages,Indian


In [17]:
spark.sql('select DISTINCT category from meal')

category
Salad
Desert
Biryani
Rice Bowl
Sandwich
Pizza
Beverages
Other Snacks
Soup
Starters


In [18]:
spark.sql('select DISTINCT cuisine from meal')

cuisine
Thai
Indian
Continental
Italian


In [39]:
spark.sql("select COUNT(DISTINCT category) AS num_category , COUNT(DISTINCT cuisine) AS num_cuisine from meal")

num_category,num_cuisine
14,4


# Q2 Which center_id has the highest num_orders?

In [40]:
# View rows
spark.sql('select * from train limit 5')

id,week,center_id,meal_id,checkout_price,base_price,emailer_for_promotion,homepage_featured,num_orders
1379560,1,55,1885,136.83,152.29,0,0,177
1466964,1,55,1993,136.83,135.83,0,0,270
1346989,1,55,2539,134.86,135.86,0,0,189
1338232,1,55,2139,339.5,437.53,0,0,54
1448490,1,55,2631,243.5,242.5,0,0,40


In [43]:
spark.sql('select center_id , SUM(num_orders) AS tot_order from train GROUP BY center_id ORDER BY tot_order DESC LIMIT 10' )

center_id,tot_order
13,1742220
43,1557942
10,1346533
137,1287312
52,1188327
174,1158331
67,1104886
11,1088253
27,955839
104,951334


In [44]:
spark.sql('select center_id , SUM(num_orders) AS tot_order from train GROUP BY center_id ORDER BY tot_order DESC LIMIT 1')

center_id,tot_order
13,1742220


# Q3 What is the top selling cuisine at the center_id that had the highest num_orders?

In [46]:
# For center id 13 
spark.sql('select meal.cuisine , COUNT(train.meal_id) AS tot_orders from  train LEFT JOIN meal ON train.meal_id = meal.meal_id where train.center_id == 13 GROUP BY meal.cuisine ORDER BY tot_orders DESC')

cuisine,tot_orders
Thai,893
Indian,729
Italian,665
Continental,547


# Q4 What is the average op_area per center_type?

In [47]:
spark.sql('select center_type , ROUND(AVG(op_area),2) AS avg_op_area from cntr GROUP BY center_type ORDER BY avg_op_area DESC' )

center_type,avg_op_area
TYPE_B,4.77
TYPE_A,4.08
TYPE_C,3.16


# Q5 Which center_type had the highest revenue? (Revenue is total sum of checkout_price*num_orders)

In [48]:
spark.sql('select cntr.center_type , ROUND(SUM(train.checkout_price * train.num_orders),2) AS Revenue from  train LEFT JOIN cntr ON train.center_id = cntr.center_id GROUP BY cntr.center_type ORDER BY Revenue DESC')

center_type,Revenue
TYPE_A,7276203201.87
TYPE_B,3172968529.4
TYPE_C,2251833991.37


# Q6 Which is the top ordered cuisine in terms of num_orders?

In [50]:
spark.sql('select meal.cuisine , SUM(train.num_orders) AS tot_orders from  train LEFT JOIN meal ON train.meal_id = meal.meal_id GROUP BY meal.cuisine ORDER BY tot_orders DESC')

cuisine,tot_orders
Italian,17166334
Thai,14058488
Indian,10979934
Continental,6766188


# Q7 What are the num_orders per cuisine per week?

In [51]:
spark.sql('select meal.cuisine , train.week, COUNT(train.num_orders) AS num_orders from  train LEFT JOIN meal ON train.meal_id = meal.meal_id GROUP BY meal.cuisine,train.week ORDER BY meal.cuisine,train.week ')

cuisine,week,num_orders
Continental,1,618
Continental,2,620
Continental,3,616
Continental,4,621
Continental,5,631
Continental,6,640
Continental,7,610
Continental,8,580
Continental,9,643
Continental,10,645


# Q8 Which center_id gave the highest number of discounts? (Discount is considered when checkout_price is less than base_price)

In [52]:
# Top 10 centers 
spark.sql('select center_id,SUM(if(train.checkout_price < train.base_price,1,0)) AS num_discount from  train GROUP BY center_id ORDER BY num_discount DESC LIMIT 10')

center_id,num_discount
13,1509
30,1495
137,1462
27,1462
153,1455
10,1454
51,1453
174,1447
36,1445
104,1443
