In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sklearn
import xgboost as xgb

import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
import dask.array as da
import dask.distributed

import os
import warnings
import datetime as dt

pd.options.mode.chained_assignment = None  # default='warn'

In [33]:
cluster = dask.distributed.LocalCluster()
client = dask.distributed.Client(
    n_workers=4, 
    threads_per_worker=1,
    memory_limit='16GB'
    )

Perhaps you already have a cluster running?
Hosting the HTTP server on port 56632 instead


## <center>Instacart Model Notebook</center>

A simple decision tree model to predict product reorders with the instacart dataset.

#### Making Changes to the Pipeline

- If making changes to the pipeline, change the all_data variable to FALSE (two cells below). 
- As there is over 3 million rows it takes way too long to use the whole dataset when testing.

#### Using F1-Score

- F1 score is a great metric of model performance, balancing precision and recall. This is great in our case as we have >80% of products that are not reordered and therefore raw accuracy would be misleading.

#### Memory Limits

- Be very conscious of memory limits. I delete dataframes as I go to stay within Colab/Kaggle limits.
- Given the size of the dataset these limits are reached very easily

## Loading Data

In [34]:
# Data variables
PATH = ".././archive/"
missing_value_formats = ["n.a.","?","NA","n/a", "na", "--","-"]

for file in os.listdir(PATH):
    dfname = file.split('.')[0].replace("__","_")
    globals()[dfname] = pd.read_csv(PATH + file, na_values = missing_value_formats)
    print(dfname)

products
orders
all_order_products


#### Creating Features on Timestamp

In [35]:
orders['timestamp'] =  pd.to_datetime(orders['timestamp'])
orders = orders.sort_values(["user_id", "timestamp"])

orders["order_number"] = orders.groupby('user_id').cumcount()
orders["order_dow"] = orders["timestamp"].dt.dayofweek
orders["order_hour_of_day"] = orders["timestamp"].dt.hour
orders['days_since_prior_order'] = (orders["timestamp"] - orders.groupby('user_id')['timestamp'].shift(1)).dt.round('1d').dt.days
orders['days_since_prior_order'].fillna(0.0, inplace=True)
orders = orders.drop(columns=["timestamp"])
orders.head()

Unnamed: 0,order_id,user_id,order_number,order_dow,order_hour_of_day,days_since_prior_order
0,2539329,1,0,2,8,0.0
1,2398795,1,1,3,7,15.0
2,473747,1,2,3,12,21.0
3,2254736,1,3,4,7,29.0
4,431534,1,4,4,15,28.0


#### Creating Prior + Train Set

In [36]:
# Creating 'reordered' column
all_order_products = pd.merge(all_order_products, orders[["order_id","user_id","order_number"]], on="order_id")
all_order_products = all_order_products.sort_values(["user_id","order_number"])
all_order_products['reordered'] = all_order_products.duplicated(subset=['user_id','product_id'])

# Creating Training and Prior Sets
order_products_prior = all_order_products[all_order_products.groupby(['user_id'])['order_number'].transform(max) != all_order_products['order_number']]
order_products_train = all_order_products[all_order_products.groupby(['user_id'])['order_number'].transform(max) == all_order_products['order_number']]

del all_order_products

### Dask?

In [37]:
products = dd.from_pandas(products, npartitions=4)
orders = dd.from_pandas(orders, npartitions=4)
order_products_prior = dd.from_pandas(order_products_prior, npartitions=4)
order_products_train = dd.from_pandas(order_products_train, npartitions=4)

In [38]:
# Set all_data to false if testing. 
# Using all the data takes a long time to train
all_data = True
if not all_data:
    ids = orders["user_id"].unique()[:100].compute()
    orders = orders[orders["user_id"].isin(ids)]

#creating a dataframe that will contain only prior information
op = dd.merge(orders, order_products_prior, on='order_id', how='inner', suffixes=('', '_y'))
op = op.drop([x for x in op.columns if x.endswith("_y")], axis=1)

# Creating features related to the users. i.e using user_id

In [39]:
#Total number of orders placed by each users
users = op.groupby(by='user_id')['order_number'].aggregate('max').to_frame('u_num_of_orders').reset_index()

In [40]:
#average number of products bought by the user in each purchase.

#1. First getting the total number of products in each order.
total_prd_per_order = op.groupby(by=['user_id', 'order_id'])['product_id'].aggregate('count').to_frame('total_products_per_order').reset_index()

#2. Getting the average products purchased by each user
avg_products = total_prd_per_order.groupby(by=['user_id'])['total_products_per_order'].mean().to_frame('u_avg_prd').reset_index()

#deleting the total_prd_per_order dataframe
del total_prd_per_order

In [41]:
#dow (Day of week) of most orders placed by each user 
dow = op.groupby('user_id')['order_dow'].agg('mean').to_frame('dow_most_orders_u').reset_index()

In [42]:
#hour of day when most orders placed by each user
hod = op.groupby('user_id')['order_hour_of_day'].agg('mean').to_frame('hod_most_orders_u').reset_index()

In [43]:
# Merging the user created features.

#1. merging avg_products with users
users = users.merge(avg_products, on='user_id', how='left')
#deleting avg_products
del avg_products

#2. merging dow with users.
users = users.merge(dow, on='user_id', how='left')
#deleting dow
del dow

#3. merging hod with users
users = users.merge(hod, on='user_id', how='left')
#deleting dow
del hod

# Creating features related to the products using product_id.

In [44]:
#number of time a product was purchased.
prd = op.groupby('product_id')['order_id'].agg('count').to_frame('prd_count_p').reset_index()

In [45]:
#products reorder ratio.
reorder_p = op.groupby(by='product_id')['reordered'].agg('mean').to_frame('p_reordered_ratio').reset_index()

In [46]:
#merging the reorder_p with prd
prd = prd.merge(reorder_p, on='product_id', how='left')
#deleting reorder_p
del reorder_p

# Creating user-product features.

In [47]:
#how many times a user bought the same product.
uxp = op.groupby(by=['user_id', 'product_id'])['order_id'].agg('count').to_frame('uxp_times_bought').reset_index()

In [48]:
#reorder ratio of the user for each product.
reorder_uxp = op.groupby(by=['user_id', 'product_id'])['reordered'].agg('mean').to_frame('uxp_reordered_ratio').reset_index()

In [49]:
#merging the two dataframes into one
uxp = uxp.merge(reorder_uxp, on=['user_id', 'product_id'], how='left')
#deleting reorder_uxp
del reorder_uxp

# Merging all the features into data DF.

In [50]:
#merging users df into uxp
data = uxp.merge(users, on='user_id', how='left')

#merging products df into data
data = data.merge(prd, on='product_id', how='left')
data = data.merge(order_products_train[["user_id", "order_id"]].drop_duplicates(), on='user_id')

#deleting unwanted dfs
del [users, prd, uxp]

# Creating Training + Validation

In [51]:
#merging the information from the order_proucts_train df into the data_train.
data_train = data.merge(order_products_train[['product_id', 'order_id', 'reordered']], on=['product_id', 'order_id'], how='left')
del data

In [52]:
#filling the NAN values
data_train['reordered'] = data_train['reordered'].fillna(0.0)
data_train['reordered'] = data_train['reordered'].astype(int)

#deleting eval_set, order_id as they are not needed for training.
data_train = data_train.drop(['order_id'], axis=1)

In [53]:
#deleting unwanted df
del [order_products_prior, order_products_train, orders]

In [54]:
#merging the aisles and department ids to with the train and test data
data_train = data_train.merge(products[['product_id', 'aisle_id', 'department_id']], on='product_id', how='left')

#setting user_id and product_id as index. (Not Supported in Dask)
# data_train = data_train.set_index(['user_id', 'product_id'])
data_train = data_train.reset_index(drop=True)

In [55]:
#creating training and validation set
train, valid = train_test_split(data_train, test_size=0.1)
del data_train



# Building XGB Model

In [56]:
#creating data and labels
X_train, y_train = train[[x for x in train.columns if x not in ["reordered","user_id","product_id"]]], train['reordered']

#setting boosters parameters
parameters = {
    'eval_metric' : 'logloss',
    'max_depth' : 5,
    'colsample_bytree' : 0.4,
    'subsample' : 0.8
}

#instantiating the model
xgb_clf = xgb.dask.DaskXGBClassifier(objective='binary:logistic', parameters=parameters, num_boost_round=10) #change to 10

In [57]:
#training model
model = xgb_clf.fit(X_train, y_train)

2022-04-03 14:41:32,625 - tornado.application - ERROR - Uncaught exception GET /system/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='127.0.0.1:8787', method='GET', uri='/system/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "/Users/brendanartley/dev/instacartproject/venv/lib/python3.8/site-packages/tornado/websocket.py", line 954, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  File "/Users/brendanartley/dev/instacartproject/venv/lib/python3.8/site-packages/tornado/web.py", line 3173, in wrapper
    return method(self, *args, **kwargs)
  File "/Users/brendanartley/dev/instacartproject/venv/lib/python3.8/site-packages/bokeh/server/views/ws.py", line 149, in open
    raise ProtocolError("Token is expired.")
bokeh.protocol.exceptions.ProtocolError: Token is expired.
  from pandas import MultiIndex, Int64Index
  from pandas import MultiIndex, Int64Index
  from pandas import MultiIndex, In

# Making Validation Predictions

In [58]:
#creating data and labels
X_valid, y_valid = valid[[x for x in valid.columns if x not in ["reordered","user_id","product_id"]]], valid["reordered"]

#setting a threshold.?
y_pred = (xgb_clf.predict_proba(X_valid)[:, 1] >= 0.21).astype('int')

#saving the prediction as a new column in data_test
valid['prediction'] = y_pred

In [59]:
#valid f1_score
print('F1 score: ', sklearn.metrics.f1_score(valid['reordered'].compute(), valid['prediction'].compute()))

  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):


F1 score:  0.3963658816265326


In [60]:
# Writing DF to disk for later eval
with warnings.catch_warnings():
    warnings.simplefilter('ignore')

    #writing validation file to disk
    valid.reset_index().to_csv("validation.csv", single_file=True)

  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
  elif isinstance(data.columns, (pd.Int64Index, pd.RangeIndex)):
