## Food Demand Forecasting for Food Delivery Company



#### Project Descreption:

The majority of the raw ingredients used by a food delivery service are perishable, thus precisely forecasting daily and weekly demand is crucial for this kind of business. A warehouse with too much inventory runs the danger of wastage, while one with too little could experience out-of-stocks, which would force clients to turn to your rivals for assistance. The challenge is to forecast demand because the majority of raw materials must be replenished on a weekly basis and are perishable, making procurement planning crucial. 

The main goal of this project is to create an appropriate machine learning model to forecast the number of orders to gather raw materials for upcoming weeks. To achieve this, we should first gather data from HDFS. We should know the information about of fulfilment center like area, city etc., and meal information like category of food sub category of food price of the food or discount in particular week. By using this data, we can use any regression algorithm to forecast the quantity for upcoming weeks. 

#### Data Pre-processing

Data Pre-processing includes the following main tasks

- Import the libraries
- Loading the dataset from HDFS
- Exploratory Data Analysis
- Checking for Null Values
- Reading and merging files
- Dropping the columns
- Label Encoding
- Data Visualization
- Splitting the Dataset into train and test
- Model Building

The train data is splitted into 70/30 % train/test and model is build using Linear Regressor and Random Forest Tree Regressor.

In [1]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id

from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Import standard libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotnine as p9
%matplotlib inline
color = sns.color_palette()


In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("FoodDemand").getOrCreate()

# Load a fulfilmentcenterinfo table from HDFS
center_data = spark.read.format("csv").option("header", "false").load("fooddemand/fulfilmentcenterinfo/part-m-00000")

# Columns of the fulfilmentcenterinfo DataFrame
center_data = center_data.withColumnRenamed("_c0", "center_id").withColumnRenamed("_c1", "city_code").withColumnRenamed("_c2", "region_code").withColumnRenamed("_c3", "center_type").withColumnRenamed("_c4", "op_area")
# Datatype of the fulfilmentcenterinfo DataFrame
center_data = center_data.withColumn("center_id", col("center_id").cast("integer")).withColumn("city_code", col("city_code").cast("integer")).withColumn("region_code", col("region_code").cast("integer")).withColumn("center_type", col("center_type").cast("string")).withColumn("op_area", col("op_area").cast("float"))

# Show the contents of the fulfilmentcenterinfo DataFrame
center_data.show()

23/04/11 16:57:15 WARN Utils: Your hostname, cis6180 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/04/11 16:57:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/11 16:57:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+---------+---------+-----------+-----------+-------+
|center_id|city_code|region_code|center_type|op_area|
+---------+---------+-----------+-----------+-------+
|       11|      679|         56|     TYPE_A|    3.7|
|       13|      590|         56|     TYPE_B|    6.7|
|      124|      590|         56|     TYPE_C|    4.0|
|       66|      648|         34|     TYPE_A|    4.1|
|       94|      632|         34|     TYPE_C|    3.6|
|       64|      553|         77|     TYPE_A|    4.4|
|      129|      593|         77|     TYPE_A|    3.9|
|      139|      693|         34|     TYPE_C|    2.8|
|       88|      526|         34|     TYPE_A|    4.1|
|      143|      562|         77|     TYPE_B|    3.8|
|      101|      699|         85|     TYPE_C|    2.8|
|       86|      699|         85|     TYPE_C|    4.0|
|       32|      526|         34|     TYPE_A|  

In [3]:
# Print the total number of rows of the fulfilmentcenterinfo DataFrame
print("Total number of rows: ", center_data.count())

Total number of rows:  77


In [4]:
# Print fulfilmentcenterinfo schema
center_data.printSchema()

root
 |-- center_id: integer (nullable = true)
 |-- city_code: integer (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- center_type: string (nullable = true)
 |-- op_area: float (nullable = true)



In [5]:
# Load a mealinfo table from HDFS
meal_data = spark.read.format("csv").option("header", "false").load("fooddemand/mealinfo/part-m-00000")

# Columns of the mealinfo DataFrame
meal_data = meal_data.withColumnRenamed("_c0", "meal_id").withColumnRenamed("_c1", "category").withColumnRenamed("_c2", "cuisine")
# Datatype of the mealinfo DataFrame
meal_data = meal_data.withColumn("meal_id", col("meal_id").cast("integer")).withColumn("category", col("category").cast("string")).withColumn("cuisine", col("cuisine").cast("string"))

# Show the contents of the mealinfo DataFrame
meal_data.show()

+-------+------------+-------+
|meal_id|    category|cuisine|
+-------+------------+-------+
|   1885|   Beverages|   Thai|
|   1993|   Beverages|   Thai|
|   2539|   Beverages|   Thai|
|   1248|   Beverages| Indian|
|   2631|   Beverages| Indian|
|   1311|      Extras|   Thai|
|   1062|   Beverages|Italian|
|   1778|   Beverages|Italian|
|   1803|      Extras|   Thai|
|   1198|      Extras|   Thai|
|   2707|   Beverages|Italian|
|   1847|        Soup|   Thai|
|   1438|        Soup|   Thai|
|   2494|        Soup|   Thai|
|   2760|Other Snacks|   Thai|
|   2490|       Salad|Italian|
|   1109|   Rice Bowl| Indian|
|   2290|   Rice Bowl| Indian|
|   1525|Other Snacks|   Thai|
|   2704|Other Snacks|   Thai|
+-------+------------+-------+
only showing top 20 rows



In [6]:
# Print the total number of rows of the mealinfo DataFrame
print("Total number of rows: ", meal_data.count())

Total number of rows:  51


In [7]:
# Print mealinfo schema
meal_data.printSchema()

root
 |-- meal_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- cuisine: string (nullable = true)



In [8]:
# Load a train table from HDFS
train_data = spark.read.format("csv").option("header", "false").load("fooddemand/train/part-m-00000")

# Columns of the train DataFrame
train_data = train_data.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "week").withColumnRenamed("_c2", "center_id").withColumnRenamed("_c3", "meal_id").withColumnRenamed("_c4", "checkout_price").withColumnRenamed("_c5", "base_price").withColumnRenamed("_c6", "email_for_promotion").withColumnRenamed("_c7", "homepage_featured").withColumnRenamed("_c8", "num_orders")
# Datatype of the fulfilmentcenterinfo DataFrame
train_data = train_data.withColumn("id", col("id").cast("integer")).withColumn("week", col("week").cast("integer")).withColumn("center_id", col("center_id").cast("integer")).withColumn("meal_id", col("meal_id").cast("integer")).withColumn("checkout_price", col("checkout_price").cast("float")).withColumn("base_price", col("base_price").cast("float")).withColumn("email_for_promotion", col("email_for_promotion").cast("integer")).withColumn("homepage_featured", col("homepage_featured").cast("integer")).withColumn("num_orders", col("num_orders").cast("integer"))

# Show the contents of the train DataFrame
train_data.show()

+-------+----+---------+-------+--------------+----------+-------------------+-----------------+----------+
|     id|week|center_id|meal_id|checkout_price|base_price|email_for_promotion|homepage_featured|num_orders|
+-------+----+---------+-------+--------------+----------+-------------------+-----------------+----------+
|1379560|   1|       55|   1885|        136.83|    152.29|                  0|                0|       177|
|1466964|   1|       55|   1993|        136.83|    135.83|                  0|                0|       270|
|1346989|   1|       55|   2539|        134.86|    135.86|                  0|                0|       189|
|1338232|   1|       55|   2139|         339.5|    437.53|                  0|                0|        54|
|1448490|   1|       55|   2631|         243.5|     242.5|                  0|                0|        40|
|1270037|   1|       55|   1248|        251.23|    252.23|                  0|                0|        28|
|1191377|   1|       55|   1

In [9]:
# Print the total number of rows of the train DataFrame
print("Total number of rows: ", train_data.count())

Total number of rows:  456548


In [10]:
# Print train data schema
train_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- center_id: integer (nullable = true)
 |-- meal_id: integer (nullable = true)
 |-- checkout_price: float (nullable = true)
 |-- base_price: float (nullable = true)
 |-- email_for_promotion: integer (nullable = true)
 |-- homepage_featured: integer (nullable = true)
 |-- num_orders: integer (nullable = true)



In [11]:
# Load a test table from HDFS
test_data = spark.read.format("csv").option("header", "false").load("fooddemand/test/part-m-00000")

# Columns of the test DataFrame
test_data = test_data.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "week").withColumnRenamed("_c2", "center_id").withColumnRenamed("_c3", "meal_id").withColumnRenamed("_c4", "checkout_price").withColumnRenamed("_c5", "base_price").withColumnRenamed("_c6", "email_for_promotion").withColumnRenamed("_c7", "homepage_featured")
# Datatype of the fulfilmentcenterinfo DataFrame
test_data = test_data.withColumn("id", col("id").cast("integer")).withColumn("week", col("week").cast("integer")).withColumn("center_id", col("center_id").cast("integer")).withColumn("meal_id", col("meal_id").cast("integer")).withColumn("checkout_price", col("checkout_price").cast("float")).withColumn("base_price", col("base_price").cast("float")).withColumn("email_for_promotion", col("email_for_promotion").cast("integer")).withColumn("homepage_featured", col("homepage_featured").cast("integer"))

# Show the contents of the test DataFrame
test_data.show()

+-------+----+---------+-------+--------------+----------+-------------------+-----------------+
|     id|week|center_id|meal_id|checkout_price|base_price|email_for_promotion|homepage_featured|
+-------+----+---------+-------+--------------+----------+-------------------+-----------------+
|1028232| 146|       55|   1885|        158.11|    159.11|                  0|                0|
|1127204| 146|       55|   1993|        160.11|    159.11|                  0|                0|
|1212707| 146|       55|   2539|        157.14|    159.14|                  0|                0|
|1082698| 146|       55|   2631|        162.02|    162.02|                  0|                0|
|1400926| 146|       55|   1248|        163.93|    163.93|                  0|                0|
|1284113| 146|       55|   1778|        190.15|    190.15|                  0|                0|
|1197966| 146|       55|   1062|        191.09|    192.09|                  0|                0|
|1132739| 146|       55|   270

In [12]:
# Print the total number of rows of the train DataFrame
print("Total number of rows: ", test_data.count())

Total number of rows:  32573


In [13]:
# Print test data schema
test_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- center_id: integer (nullable = true)
 |-- meal_id: integer (nullable = true)
 |-- checkout_price: float (nullable = true)
 |-- base_price: float (nullable = true)
 |-- email_for_promotion: integer (nullable = true)
 |-- homepage_featured: integer (nullable = true)



### Merging_data train and meal_data dataset by using common key id:

#### The meal_id column in train_data is similar to meal_id in meal_data dataset. Let us merge these two datasets using common key meal_id and name the table as data_train.


In [14]:
data_train = train_data.join(meal_data, on='meal_id', how='outer')
data_test = test_data.join(meal_data, on='meal_id', how='outer')

### Merging data_train and center_data dataset by using common key center_id:

#### The center_id column in data_train is similar to center_id in center_data dataset. Let us merge these two datasets, using common key center_id and store it back in data_train. 


In [15]:
data_train = data_train.join(center_data, on='center_id', how='outer')
data_test = data_test.join(center_data, on='center_id', how='outer')
data_train.show()



+---------+-------+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-------+---------+-----------+-----------+-------+
|center_id|meal_id|     id|week|checkout_price|base_price|email_for_promotion|homepage_featured|num_orders| category|cuisine|city_code|region_code|center_type|op_area|
+---------+-------+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-------+---------+-----------+-----------+-------+
|       26|   1062|1215035|   1|        161.08|    161.08|                  0|                0|       324|Beverages|Italian|      515|         77|     TYPE_C|    3.0|
|       26|   1062|1004523|   2|        174.66|    174.66|                  0|                0|       418|Beverages|Italian|      515|         77|     TYPE_C|    3.0|
|       26|   1062|1315636|   3|        177.54|    178.54|                  0|                0|       337|Beverages|Italian|      515|         77|     TYPE_C| 

                                                                                

In [16]:
# Drop columns “center_id” and “meal_id” as they are not required for the further process.
data_train = data_train.drop('meal_id', 'center_id')
data_test = data_test.drop('meal_id', 'center_id')
data_train.show(5)



+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-------+---------+-----------+-----------+-------+
|     id|week|checkout_price|base_price|email_for_promotion|homepage_featured|num_orders| category|cuisine|city_code|region_code|center_type|op_area|
+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-------+---------+-----------+-----------+-------+
|1215035|   1|        161.08|    161.08|                  0|                0|       324|Beverages|Italian|      515|         77|     TYPE_C|    3.0|
|1004523|   2|        174.66|    174.66|                  0|                0|       418|Beverages|Italian|      515|         77|     TYPE_C|    3.0|
|1315636|   3|        177.54|    178.54|                  0|                0|       337|Beverages|Italian|      515|         77|     TYPE_C|    3.0|
|1001462|   4|        178.54|    177.54|                  0|                0|       324|Beverages|I

                                                                                

In [17]:
data_train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- checkout_price: float (nullable = true)
 |-- base_price: float (nullable = true)
 |-- email_for_promotion: integer (nullable = true)
 |-- homepage_featured: integer (nullable = true)
 |-- num_orders: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- city_code: integer (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- center_type: string (nullable = true)
 |-- op_area: float (nullable = true)



In [18]:
data_test.printSchema()

root
 |-- id: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- checkout_price: float (nullable = true)
 |-- base_price: float (nullable = true)
 |-- email_for_promotion: integer (nullable = true)
 |-- homepage_featured: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- city_code: integer (nullable = true)
 |-- region_code: integer (nullable = true)
 |-- center_type: string (nullable = true)
 |-- op_area: float (nullable = true)



In [19]:
# Label coding for to convert each text category to numbers in order for the machine to process those using mathematical equations

# Define a list of input columns and output columns
input_cols = ['category', 'cuisine', 'center_type']
output_cols = ['category_index', 'cuisine_index', 'center_type_index']

# Create a list of StringIndexer objects
indexers = [StringIndexer(inputCol=input_col, outputCol=output_col) for input_col, output_col in zip(input_cols, output_cols)]

# Fit and transform the data using the list of StringIndexer objects
for indexer in indexers:
    data_train = indexer.fit(data_train).transform(data_train)
    
data_train.show()




+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-------+---------+-----------+-----------+-------+--------------+-------------+-----------------+
|     id|week|checkout_price|base_price|email_for_promotion|homepage_featured|num_orders| category|cuisine|city_code|region_code|center_type|op_area|category_index|cuisine_index|center_type_index|
+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-------+---------+-----------+-----------+-------+--------------+-------------+-----------------+
|1215035|   1|        161.08|    161.08|                  0|                0|       324|Beverages|Italian|      515|         77|     TYPE_C|    3.0|           0.0|          0.0|              1.0|
|1004523|   2|        174.66|    174.66|                  0|                0|       418|Beverages|Italian|      515|         77|     TYPE_C|    3.0|           0.0|          0.0|              1.0|
|1315636|   3| 

                                                                                

In [20]:
# Drop columns 'category', 'cuisine', 'center_type' as they are not required for the further process.
data_train = data_train.drop('category', 'cuisine', 'center_type')
data_train.show(5)



+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-----------+-------+--------------+-------------+-----------------+
|     id|week|checkout_price|base_price|email_for_promotion|homepage_featured|num_orders|city_code|region_code|op_area|category_index|cuisine_index|center_type_index|
+-------+----+--------------+----------+-------------------+-----------------+----------+---------+-----------+-------+--------------+-------------+-----------------+
|1215035|   1|        161.08|    161.08|                  0|                0|       324|      515|         77|    3.0|           0.0|          0.0|              1.0|
|1004523|   2|        174.66|    174.66|                  0|                0|       418|      515|         77|    3.0|           0.0|          0.0|              1.0|
|1315636|   3|        177.54|    178.54|                  0|                0|       337|      515|         77|    3.0|           0.0|          0.0|              1.0

                                                                                

### Splitting the Dataset into  Train set and Test set 

In [21]:
# assemble the feature columns into a single feature vector column named 'features'
assembler = VectorAssembler(inputCols=['week', 'checkout_price', 'base_price', 'email_for_promotion', 'homepage_featured', 'city_code', 'region_code', 'op_area', 'category_index', 'cuisine_index', 'center_type_index'], outputCol='features')
data_train = assembler.transform(data_train)


In [22]:
# split the data into training and test sets
train, test = data_train.randomSplit([0.7, 0.3], seed=42)


### Model Building


- Train and test model algorithms
- Evaluation of Model
- Predicting the output using the model


#### Linear Regression

In [23]:
# train the linear regressor on the training data
lr = LinearRegression(featuresCol='features', labelCol='num_orders')
lr_model = lr.fit(train)

# Make predictions on the test set
predictions_lr = lr_model.transform(test)


[Stage 126:>                                                        (0 + 4) / 4]

23/04/11 16:57:55 WARN Instrumentation: [161d0946] regParam is zero, which might cause numerical instability and overfitting.


[Stage 131:>                                                        (0 + 4) / 4]

23/04/11 16:57:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/11 16:57:57 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/04/11 16:58:01 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

In [24]:
# Evaluate the lr_model's performance
evaluator_lr = RegressionEvaluator(labelCol='num_orders', predictionCol='prediction', metricName='rmse')
rmse_lr = evaluator_lr.evaluate(predictions_lr)
print('RMSE for Linear Regressor:', rmse_lr)




RMSE for Linear Regressor: 344.9834455470793


                                                                                

#### Random Forest Regressor

In [25]:

# A RandomForestRegressor model is created with 20 trees, a maximum depth of 5, and a random seed of 42.

# train the random forest regressor on the training data
rf = RandomForestRegressor(featuresCol='features', labelCol='num_orders', numTrees=20, maxDepth=5, seed=42)
rf_model = rf.fit(train)

# Make predictions on the test set
predictions_rf = rf_model.transform(test)




In [26]:
# The RegressionEvaluator is used to evaluate the predictions by calculating the root mean squared error (RMSE) 
# between the predicted values and the actual values. 
# The RMSE is printed to the console.

# Evaluate the rf_model's predictions
evaluator_rf = RegressionEvaluator(labelCol='num_orders', predictionCol='prediction', metricName='rmse')
rmse_rf = evaluator_rf.evaluate(predictions_rf)

print(f"Root Mean Squared Error (RMSE) for Random Forest Regressor: {rmse_rf}")


[Stage 216:>                                                        (0 + 4) / 4]

Root Mean Squared Error (RMSE) for Random Forest Regressor: 281.85328071613617


                                                                                