In [None]:
import os
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [33]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import *
from pyspark.sql.functions import asc, desc, dense_rank, col, when, count, avg, sum
from pyspark.sql.window import Window

In [2]:
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()
# ss = SparkSession \
#     .builder \
#     .appName("instacart") \
#     .config("spark.mongodb.input.uri", "mongodb://34.209.32.234:27017/shardonnay.instacart")\
#     .getOrCreate()

## Change data_path variable to local consolidated_df.csv file

In [3]:
# df = ss.read.format("com.mongodb.spark.sql.DefaultSource").load() #mongo
# df = ss.read.csv('../consolidated_df.csv', header=True, inferSchema=True) #local csv - full dataset
df = ss.read.csv('sample_consolidated_df.csv', header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: double (nullable = true)
 |-- product_id: double (nullable = true)
 |-- add_to_cart_order: double (nullable = true)
 |-- reordered: double (nullable = true)



In [5]:
# df = df.drop('aisle_id', 'aisle', 'department_id', 'department')

In [6]:
df.write.saveAsTable('Instacart') # saving as table for complex queries with window functions

### Reference Stats

In [7]:
df.filter("eval_set == 'test'").select('user_id').distinct().count()

0

In [8]:
df.filter("eval_set == 'train'").select('user_id').distinct().count()

4861

In [9]:
df.filter("eval_set == 'prior'").select('user_id').distinct().count()

4861

In [10]:
df.select('user_id').distinct().count()

4861

In [11]:
# Number of unique prior user_id-product_id combinations
df.filter("eval_set == 'prior'").groupby('user_id', 'product_id').count().count()

394864

In [12]:
# Number of train (order_id-product_id) rows
df.filter("eval_set == 'train'").count()

80516

In [13]:
# Number of prior (order_id-product_id) rows
df.filter("eval_set == 'prior'").count()

1021157

### Constructing Target Variable

In [14]:
train_users = df.filter("eval_set == 'train'").select(col('user_id').alias('user_id2')).distinct()

In [15]:
final_index_df = df.filter("eval_set == 'prior'").select(
    'user_id', 'product_id').distinct()

In [16]:
final_index_df = final_index_df.join(train_users, on=(final_index_df.user_id == train_users.user_id2), how='inner') \
                .drop('user_id2')

In [17]:
final_index_df.count()

394864

In [18]:
last_orders_df = df.filter("eval_set == 'train'") \
                    .select(col('user_id').alias('user_id2'),
                            col('product_id').alias('product_id2'))

In [19]:
#join leaves out new products never ordered before in final(train) orders
df3 = final_index_df \
    .join(last_orders_df, 
          on=(final_index_df.user_id == last_orders_df.user_id2) 
          & (final_index_df.product_id == last_orders_df.product_id2), how="left")
df3.show()

+-------+----------+--------+-----------+
|user_id|product_id|user_id2|product_id2|
+-------+----------+--------+-----------+
|    252|   28321.0|    null|       null|
|    703|   48199.0|    null|       null|
|    749|   12614.0|    null|       null|
|    786|   10305.0|    null|       null|
|   1459|   26162.0|    null|       null|
|   1459|   48931.0|    null|       null|
|   1486|    1942.0|    null|       null|
|   1486|   39275.0|    null|       null|
|   1559|   40836.0|    null|       null|
|   1952|    9477.0|    1952|     9477.0|
|   2310|   47553.0|    null|       null|
|   2312|     277.0|    null|       null|
|   2367|   26666.0|    null|       null|
|   2409|   46941.0|    null|       null|
|   2564|   37067.0|    null|       null|
|   2917|   12797.0|    null|       null|
|   3194|   31717.0|    3194|    31717.0|
|   3217|    7350.0|    null|       null|
|   3314|    3270.0|    null|       null|
|   3314|   11182.0|    null|       null|
+-------+----------+--------+-----

In [20]:
feature_target_df = df3.withColumn('ordered_true', df3.user_id2.isNotNull()) \
    .select('user_id','product_id','ordered_true')
feature_target_df.show()

+-------+----------+------------+
|user_id|product_id|ordered_true|
+-------+----------+------------+
|    252|   28321.0|       false|
|    703|   48199.0|       false|
|    749|   12614.0|       false|
|    786|   10305.0|       false|
|   1459|   26162.0|       false|
|   1459|   48931.0|       false|
|   1486|    1942.0|       false|
|   1486|   39275.0|       false|
|   1559|   40836.0|       false|
|   1952|    9477.0|        true|
|   2310|   47553.0|       false|
|   2312|     277.0|       false|
|   2367|   26666.0|       false|
|   2409|   46941.0|       false|
|   2564|   37067.0|       false|
|   2917|   12797.0|       false|
|   3194|   31717.0|        true|
|   3217|    7350.0|       false|
|   3314|    3270.0|       false|
|   3314|   11182.0|       false|
+-------+----------+------------+
only showing top 20 rows



In [21]:
priors_df = df.filter("eval_set == 'prior'")
priors_df = priors_df.join(train_users, on=(priors_df.user_id == train_users.user_id2), how='inner') \
                .drop('user_id2')

priors_df.cache()

DataFrame[order_id: int, user_id: int, eval_set: string, order_number: int, order_dow: int, order_hour_of_day: int, days_since_prior_order: double, product_id: double, add_to_cart_order: double, reordered: double]

In [22]:
priors_df.count()

1021157

In [23]:
priors_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: double (nullable = true)
 |-- product_id: double (nullable = true)
 |-- add_to_cart_order: double (nullable = true)
 |-- reordered: double (nullable = true)



### Number of times a user ordered a product
### Average days since prior order
### Number of times user reordered each product

In [24]:
features = priors_df.groupby('user_id', 'product_id') \
    .agg(count('order_id').alias('usr_prod_ct'),
         avg('days_since_prior_order').alias('avg_days_since_ord_wnull'),
         sum('reordered').alias('num_reordered'))

features = features.withColumn('avg_days_since_ord', when(col('avg_days_since_ord_wnull').isNull(),365).otherwise(col('avg_days_since_ord_wnull'))) \
                    .drop('user_id2', 'product_id2','avg_days_since_ord_wnull')

features.cache()
# features.show()

DataFrame[user_id: int, product_id: double, usr_prod_ct: bigint, num_reordered: double, avg_days_since_ord: double]

### Number of times user ordered products in last 5 orders

In [25]:
window = Window.partitionBy('user_id') \
                .orderBy(desc('order_number')) \
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)

num_prod_ordl5 = priors_df.select('user_id', 'product_id', 'order_number',  
                                  dense_rank().over(window).alias('rank')) \
                .filter(col('rank') <= 5) \
                .groupby('user_id', 'product_id') \
                .count() \
                .withColumnRenamed('user_id','user_id2') \
                .withColumnRenamed('product_id','product_id2')

features = features.join(num_prod_ordl5, 
          on=(features.user_id == num_prod_ordl5.user_id2) 
          & (features.product_id == num_prod_ordl5.product_id2), how="left")

features = features.withColumn('num_prod_ordl5', when(col('count').isNull(),0).otherwise(col('count'))) \
                    .drop('user_id2', 'product_id2','count')


# features.show()

### Ratio of orders user ordered products in last 5 orders

In [26]:
ratio_prod_ordl5 = priors_df.select('user_id', 'product_id', 'order_number',  
          dense_rank().over(window).alias('rank')) \
                .filter(col('rank') <= 5) \
                .groupby('user_id', 'product_id') \
                .agg((count('order_number')/5).alias('ratio_wnull'))\
                .withColumnRenamed('user_id','user_id2') \
                .withColumnRenamed('product_id','product_id2')

features = features.join(ratio_prod_ordl5, 
          on=(features.user_id == ratio_prod_ordl5.user_id2) 
          & (features.product_id == ratio_prod_ordl5.product_id2), how="left")

features = features.withColumn('last5_ratio', when(col('ratio_wnull').isNull(),0).otherwise(col('ratio_wnull'))) \
                    .drop('user_id2', 'product_id2','ratio_wnull')

# features.show()

### Number of orders since a user last ordered a given item
done by generating chrononological order_num from order_id, and returns max order_num (grouped by user) - max order_num (grouped by user and product)

In [27]:
num_ords_since_last = ss.sql("select distinct product_id as product_id2, user_id as user_id2,\
        max(order_num) over (partition by user_id) - max(order_num) over (partition by user_id, product_id) as num_ords_since_last from\
        (select Instacart.order_id, Instacart.user_id, Instacart.product_id, rhs.order_num\
        from Instacart\
        left join\
        (select order_id, user_id, row_number() over (partition by user_id order by order_id) as order_num from\
        (select distinct order_id, user_id from Instacart where eval_set = 'prior') as iq) as rhs\
        on Instacart.order_id=rhs.order_id and Instacart.user_id=rhs.user_id\
        where eval_set = 'prior') as iq2")

features = features.join(num_ords_since_last, 
          on=(features.user_id == num_ords_since_last.user_id2) 
          & (features.product_id == num_ords_since_last.product_id2), how="left").drop('user_id2', 'product_id2')

In [28]:
# features.show()

### Rate of user item reorder: # of reorders of an item / # of orders since first time ordering item.
Get max(order_num) grouped by user_id, then min(order_num) grouped by user_id and product, subtract the two to get number of orders since first purchase of an item. Then sum(reordered) grouped by item, user to get the number of times an item was reordered by a user

In [29]:
reorder_rate = ss.sql("select product_id as product_id2, user_id as user_id2, \
        num_reorders/orders_since_first as reorder_rate_wnull from\
        (select distinct product_id, user_id,\
        max(order_num) over (partition by user_id) - min(order_num) over (partition by user_id, product_id) as orders_since_first,\
        sum(reordered) over (partition by user_id, product_id) as num_reorders from\
        (select Instacart.order_id, Instacart.user_id, Instacart.product_id, Instacart.reordered, rhs.order_num\
        from Instacart\
        left join\
        (select order_id, user_id, row_number() over (partition by user_id order by order_id) as order_num from\
        (select distinct order_id, user_id from Instacart where eval_set = 'prior') as iq) as rhs\
        on Instacart.order_id=rhs.order_id and Instacart.user_id=rhs.user_id\
        where eval_set = 'prior') as iq2) as iq3")

features = features.join(reorder_rate, 
          on=(features.user_id == reorder_rate.user_id2) 
          & (features.product_id == reorder_rate.product_id2), how="left").drop('user_id2', 'product_id2')

features = features.withColumn('reorder_rate', when(col('reorder_rate_wnull').isNull(),0).otherwise(col('reorder_rate_wnull'))) \
                    .drop('reorder_rate_wnull')

In [34]:
priors_df.unpersist()
features.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[user_id: int, product_id: double, usr_prod_ct: bigint, num_reordered: double, avg_days_since_ord: double, num_prod_ordl5: bigint, last5_ratio: double, num_ords_since_last: int, reorder_rate: double]

In [35]:
features.count()

394864

In [36]:
features.show(3)

+-------+----------+-----------+-------------+------------------+--------------+-----------+-------------------+-------------------+
|user_id|product_id|usr_prod_ct|num_reordered|avg_days_since_ord|num_prod_ordl5|last5_ratio|num_ords_since_last|       reorder_rate|
+-------+----------+-----------+-------------+------------------+--------------+-----------+-------------------+-------------------+
|    252|   28321.0|          1|          0.0|             365.0|             1|        0.2|                  3|                0.0|
|    703|   48199.0|          2|          1.0|               5.5|             2|        0.4|                  3|0.14285714285714285|
|    749|   12614.0|          1|          0.0|               7.0|             1|        0.2|                  6|                0.0|
+-------+----------+-----------+-------------+------------------+--------------+-----------+-------------------+-------------------+
only showing top 3 rows



In [37]:
feature_target_df.show(3)

+-------+----------+------------+
|user_id|product_id|ordered_true|
+-------+----------+------------+
|    252|   28321.0|       false|
|    703|   48199.0|       false|
|    749|   12614.0|       false|
+-------+----------+------------+
only showing top 3 rows



In [38]:
feature_target_df.count()

394864

In [39]:
feature_target_df2 = feature_target_df.withColumnRenamed('user_id', 'user_id2')\
                                      .withColumnRenamed('product_id', 'product_id2')
final_df = features.join(feature_target_df2, 
          on=(features.user_id == feature_target_df2.user_id2) 
          & (features.product_id == feature_target_df2.product_id2), how="left")
final_df = final_df.drop('user_id2', 'product_id2')

In [40]:
features.unpersist()
final_df.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[user_id: int, product_id: double, usr_prod_ct: bigint, num_reordered: double, avg_days_since_ord: double, num_prod_ordl5: bigint, last5_ratio: double, num_ords_since_last: int, reorder_rate: double, ordered_true: boolean]

In [41]:
final_df.show(3)

+-------+----------+-----------+-------------+------------------+--------------+-----------+-------------------+-------------------+------------+
|user_id|product_id|usr_prod_ct|num_reordered|avg_days_since_ord|num_prod_ordl5|last5_ratio|num_ords_since_last|       reorder_rate|ordered_true|
+-------+----------+-----------+-------------+------------------+--------------+-----------+-------------------+-------------------+------------+
|    252|   28321.0|          1|          0.0|             365.0|             1|        0.2|                  3|                0.0|       false|
|    703|   48199.0|          2|          1.0|               5.5|             2|        0.4|                  3|0.14285714285714285|       false|
|    749|   12614.0|          1|          0.0|               7.0|             1|        0.2|                  6|                0.0|       false|
+-------+----------+-----------+-------------+------------------+--------------+-----------+-------------------+------------

In [42]:
final_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- product_id: double (nullable = true)
 |-- usr_prod_ct: long (nullable = false)
 |-- num_reordered: double (nullable = true)
 |-- avg_days_since_ord: double (nullable = true)
 |-- num_prod_ordl5: long (nullable = true)
 |-- last5_ratio: double (nullable = true)
 |-- num_ords_since_last: integer (nullable = true)
 |-- reorder_rate: double (nullable = true)
 |-- ordered_true: boolean (nullable = true)



# Random Forest Modeling

In [43]:
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

In [44]:
dfnum = final_df.withColumn('ordered_true',final_df['ordered_true'].cast(IntegerType()))
# dfnum.show(3)
# Create feature vector and label column.
va = VectorAssembler(outputCol="features", inputCols=dfnum.columns[0:-1]) #except the last col.
dfpoints = va.transform(dfnum).select("features", "ordered_true").withColumnRenamed('ordered_true', 'label')
# dfpoints.show(3)
#Divide the dataset into training and testing sets.
splits = dfpoints.randomSplit([0.8, 0.2])
carttrain = splits[0]
cartvalid = splits[1]
# Cache the data. 
# cache() : the algorithm is interative and training and data sets are going to be reused many times.
final_df.unpersist()
carttrain.cache()
cartvalid.cache()
# carttrain.show(3)

DataFrame[features: vector, label: int]

In [45]:
# Create a RandomForestClassifer and build a model using training dataset.
rf = RandomForestClassifier(maxDepth=10)
rfmodel = rf.fit(carttrain)

In [46]:
# Evaluate the model using MulticlassClassificationEvaluator and test data.
# Caclulate F1 score as evaluation metric.
rfpredicts = rfmodel.transform(cartvalid)
rfpredicts.show() # this is the DF that shows the model workings
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(rfpredicts)
print('F1 = %.4f' % f1_score)
# Unpersist the datasets.

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(9,[0,1,2,4],[487...|    0|[19.4076663222311...|[0.97038331611155...|       0.0|
|(9,[0,1,2,4],[565...|    0|[19.4510245957505...|[0.97255122978752...|       0.0|
|(9,[0,1,2,4],[985...|    0|[19.5084176413045...|[0.97542088206522...|       0.0|
|(9,[0,1,2,4],[124...|    0|[19.5127240763142...|[0.97563620381571...|       0.0|
|(9,[0,1,2,4],[154...|    0|[19.4193516723237...|[0.97096758361618...|       0.0|
|(9,[0,1,2,4],[200...|    0|[19.4594296795297...|[0.97297148397648...|       0.0|
|(9,[0,1,2,7],[103...|    0|[19.7711117130909...|[0.98855558565454...|       0.0|
|(9,[0,1,2,7],[135...|    0|[19.5017467871574...|[0.97508733935787...|       0.0|
|[1486.0,39275.0,1...|    0|[19.3491773807736...|[0.96745886903868...|       0.0|
|[1952.0,9477.0,

In [None]:
carttrain.unpersist()
cartvalid.unpersist()
# Stop SparkContext & SparkSession.

In [None]:
sc.stop()
ss.stop()