In [1]:
# Python-3
# spark-submit --num-executors 5 --driver-memory 6g

from   pyspark     import SparkContext
from   pyspark.sql import SQLContext
from   time        import mktime, strptime
import pyspark.sql.functions as fn
from   pyspark.sql.types     import IntegerType
from   pyspark.ml.feature    import VectorAssembler
from   pyspark.ml.regression import LinearRegression
from   pyspark.ml.evaluation import RegressionEvaluator
from   pyspark.ml.regression import DecisionTreeRegressor
from   pyspark.ml.regression import GBTRegressor
from   pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import StringIndexer
    
import json
import csv

In [2]:
JSON_DATA_TO_PREDICT = "./data_to_predict.json"
CSV_TRAINING_DATA    = "./historical_data.csv"
CSV_TMP_FILE         = "./tmp.csv"

In [3]:
training_df = sqlContext.read.csv(CSV_TRAINING_DATA,
                                    header=True,
                                    inferSchema=True)

In [4]:
training_df.head()

Row(market_id='1', created_at='2/6/15 22:24', actual_delivery_time='2/6/15 23:27', store_id='df263d996281d984952c07998dc54358', store_primary_category='american', order_protocol='1', total_items=4, subtotal=3441, num_distinct_items=4, min_item_price=557, max_item_price=1239, total_onshift_runners='33', total_busy_runners='14', total_outstanding_orders='21', estimated_order_place_duration=446, estimated_store_to_consumer_driving_duration='861')

In [5]:
training_df.printSchema()

root
 |-- market_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- actual_delivery_time: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- store_primary_category: string (nullable = true)
 |-- order_protocol: string (nullable = true)
 |-- total_items: integer (nullable = true)
 |-- subtotal: integer (nullable = true)
 |-- num_distinct_items: integer (nullable = true)
 |-- min_item_price: integer (nullable = true)
 |-- max_item_price: integer (nullable = true)
 |-- total_onshift_runners: string (nullable = true)
 |-- total_busy_runners: string (nullable = true)
 |-- total_outstanding_orders: string (nullable = true)
 |-- estimated_order_place_duration: integer (nullable = true)
 |-- estimated_store_to_consumer_driving_duration: string (nullable = true)



In [10]:
training_df.count()

197428

In [6]:
training_df.select('market_id').distinct().count()

7

In [7]:
training_df.select('store_id').distinct().count()

6743

In [8]:
training_df.select('market_id', 'store_id').distinct().count()

12717

In [82]:

training_df.select('store_id', 'store_primary_category')\
                    .where((training_df.store_id == "f0ade77b43923b38237db569b016ba25") &
                          (training_df.store_primary_category != "mexican")).show(20, False)
'''

training_df.select('store_id', 'store_primary_category')\
                   .where((training_df.store_id == training_df.store_id) & 
                         (training_df.store_primary_category != training_df.store_primary_category)).count()
'''



+--------------------------------+----------------------+
|store_id                        |store_primary_category|
+--------------------------------+----------------------+
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|indian                |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
|f0ade77b43923b38237db569b016ba25|NA                    |
+--------------------------------+----------------------+



"\n\ntraining_df.select('store_id', 'store_primary_category')                   .where((training_df.store_id == training_df.store_id) & \n                         (training_df.store_primary_category != training_df.store_primary_category)).count()\n"

In [9]:
training_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
market_id,197428,2.978706074597462,1.5248667244506329,1,
created_at,197428,,,1/21/15 15:22,2/9/15 6:00
actual_delivery_time,197428,,,1/21/15 15:58,
store_id,197428,,,0004d0b59e19461ff126e3a08a814c33,ffedf5be3a86e2ee281d54cdc97bc1cf
store_primary_category,197428,,,,vietnamese
order_protocol,197428,2.8823517433425137,1.5037712034995798,1,
total_items,197428,3.196390582896043,2.666546063599876,1,411
subtotal,197428,2682.331401827502,1823.0936878547845,0,27100
num_distinct_items,197428,2.6707913771096297,1.6302552413381561,1,20


In [83]:
# Drop rows with NA columns
training_df_1 = training_df.dropna()

In [84]:
# Drop rows with subtotal=0
training_df_2 = training_df_1[(training_df_1.subtotal>0)]

In [85]:
# Drop rows where min_item_price is -ve
training_df_2 = training_df_2[(training_df_2.min_item_price>0)]

In [86]:
# Drop rows where max_item_price is 0, assuming spoons and forks are not part of the order
training_df_2 = training_df_2[(training_df_2.max_item_price>0)]

In [87]:
# Drop rows where total_onshift_runners is -ve
training_df_2 = training_df_2[(training_df_2.total_onshift_runners>0)]

In [88]:
# Drop rows where total_busy_runners is -ve
training_df_2 = training_df_2[(training_df_2.total_busy_runners>0)]

In [89]:
# Drop rows where total_outstanding_orders is -ve
training_df_2 = training_df_2[(training_df_2.total_outstanding_orders>0)]

In [90]:
# Drop rows where estimated_order_place_duration is 0 - is not practical
training_df_2 = training_df_2[(training_df_2.estimated_order_place_duration>0)]

In [91]:
# Drop rows where estimated_store_to_consumer_driving_duration is 0 - is not practical
training_df_2 = training_df_2[(training_df_2.estimated_store_to_consumer_driving_duration>0)]

In [92]:
training_df_2.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
market_id,173763,2.771835955576122,1.3293983875033508,1,
created_at,173763,,,1/21/15 15:22,2/9/15 6:00
actual_delivery_time,173763,,,1/21/15 15:58,
store_id,173763,,,0004d0b59e19461ff126e3a08a814c33,ffeabd223de0d4eacb9a3e6e53e5448d
store_primary_category,173763,,,,vietnamese
order_protocol,173763,2.89506747728101,1.5141108306262807,1,
total_items,173763,3.1263329937903928,2.235394513014637,1,48
subtotal,173763,2711.4806144000736,1827.2401288913038,109,26800
num_distinct_items,173763,2.655174001369682,1.609267475185531,1,20


In [93]:
# Drop rows with "NA" strings in columns
training_df_3 = training_df_2[(training_df_2.market_id != "NA") & 
                             (training_df_2.actual_delivery_time != "NA") &
                             (training_df_2.store_primary_category != "NA") &
                             (training_df_2.order_protocol != "NA")]

In [94]:
training_df_3.count()

169007

In [95]:
training_df_3.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
market_id,169007,2.7670451519759536,1.3285725401233055,1,6
created_at,169007,,,1/21/15 15:22,2/9/15 6:00
actual_delivery_time,169007,,,1/21/15 16:17,2/9/15 7:47
store_id,169007,,,0004d0b59e19461ff126e3a08a814c33,ffeabd223de0d4eacb9a3e6e53e5448d
store_primary_category,169007,,,afghan,vietnamese
order_protocol,169007,2.9110036862378483,1.5120985217914669,1,7
total_items,169007,3.121361837083671,2.2289142672365427,1,48
subtotal,169007,2709.61501002917,1826.8005545484448,109,26800
num_distinct_items,169007,2.651771820102126,1.6072491612831503,1,20


In [96]:
# Convert datetime string to seconds
print(int(mktime(strptime("2/6/15 22:24", "%m/%d/%y %H:%M"))))

1423290240


In [97]:
def rdd_datetime_to_sec(input_time):
    return int(mktime(strptime(input_time, "%m/%d/%y %H:%M")))

udf_rdd_datetime_to_sec = fn.udf(rdd_datetime_to_sec, IntegerType())  # LongType() not available for now

In [98]:
training_df_3 = training_df_3.withColumn('created_at',
                                        udf_rdd_datetime_to_sec(fn.col('created_at')))
training_df_3 = training_df_3.withColumn('actual_delivery_time',
                                        udf_rdd_datetime_to_sec(fn.col('actual_delivery_time')))
training_df_3 = training_df_3.withColumn('total_delivery_duration',
                                        training_df_3['actual_delivery_time'] - training_df_3['created_at'])


In [99]:
# More exploration
training_df_3.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
market_id,169007,2.7670451519759536,1.3285725401233055,1,6
created_at,169007,1.4231087064691994E9,694850.7354175277,1421882520,1424268000
actual_delivery_time,169007,1.423111562577763E9,694896.7488149876,1421885820,1424414700
store_id,169007,,,0004d0b59e19461ff126e3a08a814c33,ffeabd223de0d4eacb9a3e6e53e5448d
store_primary_category,169007,,,afghan,vietnamese
order_protocol,169007,2.9110036862378483,1.5120985217914669,1,7
total_items,169007,3.121361837083671,2.2289142672365427,1,48
subtotal,169007,2709.61501002917,1826.8005545484448,109,26800
num_distinct_items,169007,2.651771820102126,1.6072491612831503,1,20


In [100]:
training_df_3[training_df_3.total_delivery_duration == 332460].show()

+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+-----------------------+
|market_id|created_at|actual_delivery_time|            store_id|store_primary_category|order_protocol|total_items|subtotal|num_distinct_items|min_item_price|max_item_price|total_onshift_runners|total_busy_runners|total_outstanding_orders|estimated_order_place_duration|estimated_store_to_consumer_driving_duration|total_delivery_duration|
+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+--------

In [101]:
training_df_3[training_df_3.estimated_order_place_duration == 2715].show()

+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+-----------------------+
|market_id|created_at|actual_delivery_time|            store_id|store_primary_category|order_protocol|total_items|subtotal|num_distinct_items|min_item_price|max_item_price|total_onshift_runners|total_busy_runners|total_outstanding_orders|estimated_order_place_duration|estimated_store_to_consumer_driving_duration|total_delivery_duration|
+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+--------

In [102]:
training_df_3[training_df_3.min_item_price == 14700].show()

+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+-----------------------+
|market_id|created_at|actual_delivery_time|            store_id|store_primary_category|order_protocol|total_items|subtotal|num_distinct_items|min_item_price|max_item_price|total_onshift_runners|total_busy_runners|total_outstanding_orders|estimated_order_place_duration|estimated_store_to_consumer_driving_duration|total_delivery_duration|
+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+--------

In [103]:
training_df_3[training_df_3.subtotal == 26800].show()

+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+-----------------------+
|market_id|created_at|actual_delivery_time|            store_id|store_primary_category|order_protocol|total_items|subtotal|num_distinct_items|min_item_price|max_item_price|total_onshift_runners|total_busy_runners|total_outstanding_orders|estimated_order_place_duration|estimated_store_to_consumer_driving_duration|total_delivery_duration|
+---------+----------+--------------------+--------------------+----------------------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+--------

In [104]:
# Map store_id string to unique number
stringindexer = StringIndexer().setInputCol("store_id").setOutputCol("store_id_int")
modelc = stringindexer.fit(training_df_3)
training_df_4 = modelc.transform(training_df_3)

In [105]:
# Map store_primary_category to unique number
stringindexer = StringIndexer().setInputCol("store_primary_category").setOutputCol("store_primary_category_int")
modelc = stringindexer.fit(training_df_4)
training_df_4 = modelc.transform(training_df_4)

In [106]:
# Recheck
training_df_4.select('store_id').distinct().count()

5430

In [107]:
training_df_4.select('store_id_int').distinct().count()

5430

In [108]:
training_df_4.select('store_primary_category').distinct().count()

73

In [109]:
training_df_4.select('store_primary_category_int').distinct().count()

73

In [110]:
training_df_4.printSchema()

root
 |-- market_id: string (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- actual_delivery_time: integer (nullable = true)
 |-- store_id: string (nullable = true)
 |-- store_primary_category: string (nullable = true)
 |-- order_protocol: string (nullable = true)
 |-- total_items: integer (nullable = true)
 |-- subtotal: integer (nullable = true)
 |-- num_distinct_items: integer (nullable = true)
 |-- min_item_price: integer (nullable = true)
 |-- max_item_price: integer (nullable = true)
 |-- total_onshift_runners: string (nullable = true)
 |-- total_busy_runners: string (nullable = true)
 |-- total_outstanding_orders: string (nullable = true)
 |-- estimated_order_place_duration: integer (nullable = true)
 |-- estimated_store_to_consumer_driving_duration: string (nullable = true)
 |-- total_delivery_duration: integer (nullable = true)
 |-- store_id_int: double (nullable = false)
 |-- store_primary_category_int: double (nullable = false)



In [111]:
training_df_4 = training_df_4.withColumn("market_id", training_df_4["market_id"].cast(IntegerType()))
training_df_4 = training_df_4.withColumn("order_protocol", training_df_4["order_protocol"].cast(IntegerType()))

training_df_4 = training_df_4.withColumn("total_onshift_runners", training_df_4["total_onshift_runners"].cast(IntegerType()))
training_df_4 = training_df_4.withColumn("total_busy_runners", training_df_4["total_busy_runners"].cast(IntegerType()))

training_df_4 = training_df_4.withColumn("total_outstanding_orders", training_df_4["total_outstanding_orders"].cast(IntegerType()))
training_df_4 = training_df_4.withColumn("estimated_store_to_consumer_driving_duration", training_df_4["estimated_store_to_consumer_driving_duration"].cast(IntegerType()))

training_df_4.printSchema()

root
 |-- market_id: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- actual_delivery_time: integer (nullable = true)
 |-- store_id: string (nullable = true)
 |-- store_primary_category: string (nullable = true)
 |-- order_protocol: integer (nullable = true)
 |-- total_items: integer (nullable = true)
 |-- subtotal: integer (nullable = true)
 |-- num_distinct_items: integer (nullable = true)
 |-- min_item_price: integer (nullable = true)
 |-- max_item_price: integer (nullable = true)
 |-- total_onshift_runners: integer (nullable = true)
 |-- total_busy_runners: integer (nullable = true)
 |-- total_outstanding_orders: integer (nullable = true)
 |-- estimated_order_place_duration: integer (nullable = true)
 |-- estimated_store_to_consumer_driving_duration: integer (nullable = true)
 |-- total_delivery_duration: integer (nullable = true)
 |-- store_id_int: double (nullable = false)
 |-- store_primary_category_int: double (nullable = false)



In [112]:
# Drop string columns as we created respective columns with numerical values
training_df_4 = training_df_4.drop('store_id', 'store_primary_category')

In [113]:
# Drop actual_delivery_time as we have computed total_delivery_duration
training_df_4 = training_df_4.drop('actual_delivery_time')

In [114]:
# Check for more bad values before we go into training
training_df_4.select([fn.count(fn.when(fn.isnan(c) | fn.col(c).isNull(), c)).alias(c) for c in training_df_4.columns]).show()

+---------+----------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+-----------------------+------------+--------------------------+
|market_id|created_at|order_protocol|total_items|subtotal|num_distinct_items|min_item_price|max_item_price|total_onshift_runners|total_busy_runners|total_outstanding_orders|estimated_order_place_duration|estimated_store_to_consumer_driving_duration|total_delivery_duration|store_id_int|store_primary_category_int|
+---------+----------+--------------+-----------+--------+------------------+--------------+--------------+---------------------+------------------+------------------------+------------------------------+--------------------------------------------+-----------------------+------------+--------------------------+
|        0|         0|             0|          0|       0|

In [116]:
# Check correlation of other columns to total_delivery_duration

import six
for i in training_df_4.columns:
    if not(isinstance(training_df_4.select(i).take(1)[0][0], six.string_types)):
        print("Correlation of " + i + " to TDD: " + str(training_df_4.stat.corr('total_delivery_duration',i)))

Correlation of market_id to TDD: -0.040637489176712484
Correlation of created_at to TDD: 0.03170726796590593
Correlation of order_protocol to TDD: -0.0596805739749755
Correlation of total_items to TDD: 0.1156719187431689
Correlation of subtotal to TDD: 0.1705534115818244
Correlation of num_distinct_items to TDD: 0.12734199460556248
Correlation of min_item_price to TDD: 0.011397874146431723
Correlation of max_item_price to TDD: 0.10703629454011691
Correlation of total_onshift_runners to TDD: 0.06919976791637909
Correlation of total_busy_runners to TDD: 0.08566350421619037
Correlation of total_outstanding_orders to TDD: 0.15785403047962324
Correlation of estimated_order_place_duration to TDD: 0.08118412980888919
Correlation of estimated_store_to_consumer_driving_duration to TDD: 0.18672186991657688
Correlation of total_delivery_duration to TDD: 1.0
Correlation of store_id_int to TDD: 0.04025896012881745
Correlation of store_primary_category_int to TDD: 0.0019016363801349994


In [None]:
# No clear correlation exists between features and target

In [117]:
# Drop min_item_price and max_item_price as they are factored into subtotal
training_df_4 = training_df_4.drop('min_item_price', 'max_item_price')

In [118]:
training_df_4.printSchema()

root
 |-- market_id: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- order_protocol: integer (nullable = true)
 |-- total_items: integer (nullable = true)
 |-- subtotal: integer (nullable = true)
 |-- num_distinct_items: integer (nullable = true)
 |-- total_onshift_runners: integer (nullable = true)
 |-- total_busy_runners: integer (nullable = true)
 |-- total_outstanding_orders: integer (nullable = true)
 |-- estimated_order_place_duration: integer (nullable = true)
 |-- estimated_store_to_consumer_driving_duration: integer (nullable = true)
 |-- total_delivery_duration: integer (nullable = true)
 |-- store_id_int: double (nullable = false)
 |-- store_primary_category_int: double (nullable = false)



In [119]:
training_df_4.repartition(1).write.csv(CSV_TMP_FILE, header=True)

In [120]:
feature_list = training_df_4.columns
feature_list.remove('total_delivery_duration')

In [121]:
vectorAssembler = VectorAssembler(inputCols = feature_list, outputCol = 'features')
vectorized_training_df = vectorAssembler.transform(training_df_4)
vectorized_training_df = vectorized_training_df.select(['features', 'total_delivery_duration'])

In [122]:
splits = vectorized_training_df.randomSplit([0.7, 0.3])
split_input_train_df      = splits[0]
split_input_validation_df = splits[1]


In [None]:
''' 
############################################################################################
# We will try multiple models and pick the one with best prediction accuracy (lowest error)#
############################################################################################
'''

In [123]:
# LinearRegression

print("Linear Regression...")
lr = LinearRegression(featuresCol = 'features', labelCol='total_delivery_duration', maxIter=5, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(split_input_train_df)

# Predict on validation set

lr_predictions = lr_model.transform(split_input_validation_df)
test_result    = lr_model.evaluate(split_input_validation_df)
rmse = test_result.rootMeanSquaredError
print("RMSE on validation data: %f" % rmse)


Linear Regression...
RMSE on validation data: 1795.896644


In [128]:
# DecisionTree

dt_regressor   = DecisionTreeRegressor(featuresCol ='features', labelCol = 'total_delivery_duration', maxBins=5450)
dt_model       = dt_regressor.fit(split_input_train_df)

# Predict on validation set

dt_predictions = dt_model.transform(split_input_validation_df)
dt_evaluator   = RegressionEvaluator(labelCol="total_delivery_duration", predictionCol="prediction", metricName="rmse")
rmse           = dt_evaluator.evaluate(dt_predictions)
print("RMSE on validation data = %f" % rmse)


RMSE on validation data = 1898.238355


In [129]:
# GradientBoostedTree

gbt_regressor   = GBTRegressor(featuresCol='features', labelCol='total_delivery_duration', maxIter=5, maxBins=5450)
gbt_model       = gbt_regressor.fit(split_input_train_df)

# Predict on validation set

gbt_predictions = gbt_model.transform(split_input_validation_df)
gbt_evaluator   = RegressionEvaluator(labelCol="total_delivery_duration", predictionCol="prediction", metricName="rmse")
rmse            = gbt_evaluator.evaluate(gbt_predictions)
print("RMSE on validation data = %f" % rmse)


RMSE on validation data = 1975.730051


In [127]:
# RandomForestRegressor

rf_regressor   = RandomForestRegressor(featuresCol = 'features', labelCol = 'total_delivery_duration', maxBins=5450)
rf_model       = rf_regressor.fit(split_input_train_df)

# Predict on validation set

rf_predictions = rf_model.transform(split_input_validation_df)
rf_evaluator   = RegressionEvaluator(labelCol="total_delivery_duration", predictionCol="prediction", metricName="rmse")
rmse           = rf_evaluator.evaluate(rf_predictions)
print("RMSE on validation data = %f" % rmse)

RMSE on validation data = 1824.092780


In [None]:
# All models are performing more or less the same.

In [130]:
# Let us use RandomForestRegressor

rf_model.save("./dd_rf_model")

In [None]:
# Now on to actual predictions

In [131]:
# Load data to predict
predict_df = spark.read.json(JSON_DATA_TO_PREDICT)

In [132]:
predict_df.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- delivery_id: string (nullable = true)
 |-- estimated_order_place_duration: string (nullable = true)
 |-- estimated_store_to_consumer_driving_duration: string (nullable = true)
 |-- market_id: string (nullable = true)
 |-- max_item_price: string (nullable = true)
 |-- min_item_price: string (nullable = true)
 |-- num_distinct_items: string (nullable = true)
 |-- order_protocol: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- store_primary_category: string (nullable = true)
 |-- subtotal: string (nullable = true)
 |-- total_busy_runners: string (nullable = true)
 |-- total_items: string (nullable = true)
 |-- total_onshift_runners: string (nullable = true)
 |-- total_outstanding_orders: string (nullable = true)



In [133]:
predict_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
created_at,54778,,,2015-02-18 14:50:28,2015-02-25 05:59:49
delivery_id,54778,,,0000300773fe015f870914b42528541b,fffffcc42d1bf7ab742a4fa2d820f9f0
estimated_order_place_duration,54778,308.11267297090075,88.93788812977652,0,446
estimated_store_to_consumer_driving_duration,54778,547.7654061752515,220.45971487843556,0,
market_id,54778,3.0313600352112675,1.5316060843815849,1,
max_item_price,54778,1166.7953557997737,574.5947264759789,0,999
min_item_price,54778,683.7573843513819,538.4969945313895,-30,999
num_distinct_items,54778,2.6919018584103105,1.6502038175106004,1,9
order_protocol,54778,2.8850169740343152,1.5120872918297972,1,


In [None]:
# Apply same process as historical data to convert/map

In [134]:
# Drop rows with NA columns
predict_df_1 = predict_df.dropna()

In [135]:
# Drop rows with subtotal=0
predict_df_2 = predict_df_1[(predict_df_1.subtotal>0)]

In [136]:
# Drop rows where min_item_price is -ve
predict_df_2 = predict_df_2[(predict_df_2.min_item_price>0)]

In [137]:
# Drop rows where max_item_price is 0, assuming spoons and forks are not part of the order
predict_df_2 = predict_df_2[(predict_df_2.max_item_price>0)]

In [138]:
# Drop rows where total_onshift_runners is -ve
predict_df_2 = predict_df_2[(predict_df_2.total_onshift_runners>0)]

In [139]:
# Drop rows where total_busy_runners is -ve
predict_df_2 = predict_df_2[(predict_df_2.total_busy_runners>0)]

In [140]:
# Drop rows where total_outstanding_orders is -ve
predict_df_2 = predict_df_2[(predict_df_2.total_outstanding_orders>0)]

In [141]:
# Drop rows where estimated_order_place_duration is 0 - is not practical
predict_df_2 = predict_df_2[(predict_df_2.estimated_order_place_duration>0)]

In [143]:
# Drop rows where estimated_store_to_consumer_driving_duration is 0 - is not practical
predict_df_2 = predict_df_2[(predict_df_2.estimated_store_to_consumer_driving_duration>0)]

In [185]:
# Drop rows with "NA" strings in columns
predict_df_3 = predict_df_2[(predict_df_2.market_id != "NA") & 
                             (predict_df_2.store_primary_category != "NA") &
                             (predict_df_2.order_protocol != "NA")]

In [186]:
def rdd_datetimesec_to_sec(input_time):
    return int(mktime(strptime(input_time, "%Y-%m-%d %H:%M:%S")))

udf_rdd_datetimesec_to_sec = fn.udf(rdd_datetimesec_to_sec, IntegerType())  # LongType() not available for now

In [187]:
predict_df_3 = predict_df_3.withColumn('created_at',
                                        udf_rdd_datetimesec_to_sec(fn.col('created_at')))


In [188]:
predict_df_3.show(2)

+----------+--------------------+------------------------------+--------------------------------------------+---------+--------------+--------------+------------------+--------------+--------+--------------------+----------------------+--------+------------------+-----------+---------------------+------------------------+
|created_at|         delivery_id|estimated_order_place_duration|estimated_store_to_consumer_driving_duration|market_id|max_item_price|min_item_price|num_distinct_items|order_protocol|platform|            store_id|store_primary_category|subtotal|total_busy_runners|total_items|total_onshift_runners|total_outstanding_orders|
+----------+--------------------+------------------------------+--------------------------------------------+---------+--------------+--------------+------------------+--------------+--------+--------------------+----------------------+--------+------------------+-----------+---------------------+------------------------+
|1424600864|3b04e68c88349a77

In [189]:
# Map store_id string to unique number
stringindexer = StringIndexer().setInputCol("store_id").setOutputCol("store_id_int")
modelc = stringindexer.fit(predict_df_3)
predict_df_4 = modelc.transform(predict_df_3)

In [190]:
# Map store_primary_category to unique number
stringindexer = StringIndexer().setInputCol("store_primary_category").setOutputCol("store_primary_category_int")
modelc = stringindexer.fit(predict_df_4)
predict_df_4 = modelc.transform(predict_df_4)

In [191]:
predict_df_4 = predict_df_4.withColumn("market_id", predict_df_4["market_id"].cast(IntegerType()))
predict_df_4 = predict_df_4.withColumn("order_protocol", predict_df_4["order_protocol"].cast(IntegerType()))

predict_df_4 = predict_df_4.withColumn("total_onshift_runners", predict_df_4["total_onshift_runners"].cast(IntegerType()))
predict_df_4 = predict_df_4.withColumn("total_busy_runners", predict_df_4["total_busy_runners"].cast(IntegerType()))

predict_df_4 = predict_df_4.withColumn("total_outstanding_orders", predict_df_4["total_outstanding_orders"].cast(IntegerType()))
predict_df_4 = predict_df_4.withColumn("estimated_store_to_consumer_driving_duration", predict_df_4["estimated_store_to_consumer_driving_duration"].cast(IntegerType()))

predict_df_4.printSchema()

root
 |-- created_at: integer (nullable = true)
 |-- delivery_id: string (nullable = true)
 |-- estimated_order_place_duration: string (nullable = true)
 |-- estimated_store_to_consumer_driving_duration: integer (nullable = true)
 |-- market_id: integer (nullable = true)
 |-- max_item_price: string (nullable = true)
 |-- min_item_price: string (nullable = true)
 |-- num_distinct_items: string (nullable = true)
 |-- order_protocol: integer (nullable = true)
 |-- platform: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- store_primary_category: string (nullable = true)
 |-- subtotal: string (nullable = true)
 |-- total_busy_runners: integer (nullable = true)
 |-- total_items: string (nullable = true)
 |-- total_onshift_runners: integer (nullable = true)
 |-- total_outstanding_orders: integer (nullable = true)
 |-- store_id_int: double (nullable = false)
 |-- store_primary_category_int: double (nullable = false)



In [192]:
# Drop string columns as we created respective columns with numerical values
# Drop min_item_price and max_item_price as they are factored into subtotal

predict_df_4 = predict_df_4.drop('store_id', 'store_primary_category', 'min_item_price', 'max_item_price')

In [193]:
predict_df_4 = predict_df_4.withColumn("subtotal", predict_df_4["subtotal"].cast(IntegerType()))
predict_df_4 = predict_df_4.withColumn("num_distinct_items", predict_df_4["num_distinct_items"].cast(IntegerType()))
predict_df_4 = predict_df_4.withColumn("estimated_order_place_duration", predict_df_4["estimated_order_place_duration"].cast(IntegerType()))

In [194]:
predict_df_4 = predict_df_4.withColumn("total_items", predict_df_4["total_items"].cast(IntegerType()))

In [195]:
predict_df_4.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
created_at,46945,1.4245901166887422E9,162509.80336319067,1424302170,1424872789
delivery_id,46945,,,0000300773fe015f870914b42528541b,fffeab713c91634ae0fd5b1b91f8d72c
estimated_order_place_duration,46945,306.3789540952178,88.00053913282085,212,1095
estimated_store_to_consumer_driving_duration,46945,549.1419320481415,218.89783374010833,3,1550
market_id,46945,2.8158909362019386,1.3382430164957273,1,6
num_distinct_items,46945,2.665587389498349,1.6271062619174674,1,19
order_protocol,46945,2.9120673128128662,1.5166440105620012,1,7
platform,46945,,,android,other
subtotal,46945,2738.385195441474,1902.9686622701602,5,29925


In [196]:
# Use same features as in historical data

pvectorAssembler = VectorAssembler(inputCols = feature_list, outputCol = 'features')
vectorized_predict_df = pvectorAssembler.transform(predict_df_4)
vectorized_predict_df = vectorized_predict_df.select(['features'])

In [197]:
model_predictions = rf_model.transform(vectorized_predict_df)


In [200]:
feature_list

['market_id',
 'created_at',
 'order_protocol',
 'total_items',
 'subtotal',
 'num_distinct_items',
 'total_onshift_runners',
 'total_busy_runners',
 'total_outstanding_orders',
 'estimated_order_place_duration',
 'estimated_store_to_consumer_driving_duration',
 'store_id_int',
 'store_primary_category_int']

In [199]:
model_predictions.show(20, False)

+-----------------------------------------------------------------------------+------------------+
|features                                                                     |prediction        |
+-----------------------------------------------------------------------------+------------------+
|[4.0,1.424600864E9,1.0,4.0,4500.0,2.0,9.0,7.0,6.0,446.0,504.0,4029.0,8.0]    |3465.0249460172577|
|[1.0,1.424784045E9,1.0,2.0,3150.0,2.0,4.0,4.0,4.0,446.0,528.0,3101.0,11.0]   |3357.190960044998 |
|[1.0,1.424598221E9,1.0,3.0,6815.0,3.0,5.0,3.0,3.0,446.0,837.0,3101.0,11.0]   |3649.212464795469 |
|[1.0,1.424750963E9,1.0,2.0,850.0,2.0,11.0,11.0,16.0,446.0,698.0,3866.0,2.0]  |4903.903967341965 |
|[1.0,1.424613443E9,1.0,1.0,1700.0,1.0,11.0,10.0,10.0,446.0,502.0,2737.0,11.0]|4933.500536249489 |
|[1.0,1.424866799E9,1.0,1.0,2179.0,1.0,20.0,24.0,28.0,446.0,54.0,2737.0,11.0] |4820.148417630822 |
|[1.0,1.424600672E9,1.0,1.0,1800.0,1.0,26.0,26.0,20.0,446.0,54.0,2737.0,11.0] |4820.148417630822 |
|[3.0,1.42

In [201]:
predict_df_4.count()

46945

In [202]:
model_predictions.count()

46945

In [230]:
from pyspark.sql.functions import monotonically_increasing_id

df1 = predict_df_4.select('delivery_id').withColumn("id", monotonically_increasing_id())
df2 = model_predictions.select('prediction').withColumnRenamed('prediction', 'predicted_delivery_seconds').withColumn("id", monotonically_increasing_id())

# Perform a join on the ids.
prediction_results_df = df1.join(df2, "id", "left").drop("id")
prediction_results_df = prediction_results_df.withColumn("predicted_delivery_seconds", prediction_results_df["predicted_delivery_seconds"].cast(IntegerType()))

prediction_results_df.show(20, False)

+--------------------------------+--------------------------+
|delivery_id                     |predicted_delivery_seconds|
+--------------------------------+--------------------------+
|3b04e68c88349a776013fcae3bc5aea6|3465.0249460172577        |
|cffcd18445407b8027ed4484ff46726c|3357.190960044998         |
|9d65036171bc0463ebb48bf125ac5cbf|3649.212464795469         |
|bebd60f139989879bd86c6047babce83|4903.903967341965         |
|89bca4b8fbabcbfd0cbf07fb7c6271cc|4933.500536249489         |
|4061b453014cf028b2213145a460753c|4820.148417630822         |
|d71ac2be630dbd0d88ef0a9e3a559329|4820.148417630822         |
|4af2232da5619ef3fd923ff8346ee804|3020.21441861829          |
|27fb5c0a87e11d6a7ba573109cfe743f|3227.306399728086         |
|0a5116feb4f6b2be8356b78022adfc9e|3680.485142012863         |
|b9978bc9d8d3383f7c0287eeeaa66838|3607.851322448472         |
|ad825b60391a15fc86e44af29ffc1a4a|3195.142697285199         |
|777e747d8029109c521c793884d3e300|3108.0000648227983        |
|1f63330

In [233]:
# Save results to file
prediction_results_df.repartition(1).write.format('csv').options(delimiter='\t').save('./prediction_results')