In [None]:
import pandas as pd
from pyspark.sql import *
from plotly.graph_objs import *
from plotly.offline import download_plotlyjs, init_notebook_mode, iplot
init_notebook_mode()
import plotly.graph_objs as go
import datetime
import numpy as np

import warnings
warnings.filterwarnings('ignore')


In [None]:
import os
import sys
module_path = os.path.abspath(os.path.join('../../instacart-ml'))
if module_path not in sys.path:
    sys.path.append(module_path)


import common_utility.ModelEvaluation as me
import common_utility.PlotlyObject as plt

In [None]:
from glob import glob
import pyspark.sql.functions as F

def read_spark_csv(spark, path):
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
    return df

spark = SparkSession.builder \
  .appName("My Spark Application")\
  .config("spark.master", "local[*]")\
  .config("spark.driver.memory", "10g")\
  .config("spark.executor.memory", "30g")\
  .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger = spark._jvm.org.apache.log4j
logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)

In [None]:
data_list = glob("/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/data/*")
product = read_spark_csv(spark, data_list[0])
order = read_spark_csv(spark, data_list[1])
order_products_train = read_spark_csv(spark, data_list[2])
departments = read_spark_csv(spark, data_list[3])
aisles = read_spark_csv(spark, data_list[4])
order_products_prior = read_spark_csv(spark, data_list[5])

In [None]:
#### Cohort 1 ####
# for every user, collect all previous purchase product_id as # of row in training set
order_schema = ['order_id', 'user_id', 'eval_set', 'order_dow', 'order_hour_of_day', 'days_since_prior_order']
train_user = order.filter(F.col("eval_set") == 'train').select("user_id").distinct()
order_product = order_products_prior\
        .unionByName(order_products_train)\
        .join(order.select(*order_schema), ['order_id'], 'inner')\
        .join(train_user, ['user_id'], 'inner')
cohort_df = order_product.filter(F.col("eval_set") == "prior").groupBy('user_id')\
            .agg(F.collect_set("product_id").alias("product_id"))\
            .withColumn("product_id", F.explode("product_id"))
cohort_schema = ["order_id", "product_id", 'user_id']
feature_schema = ['user_id', 'order_id', 'order_dow', 'order_hour_of_day', 'days_since_prior_order']

train_df = order_product.filter(F.col("eval_set") == 'train').select(*cohort_schema)\
            .join(cohort_df, ['user_id', 'product_id'], 'right')\
            .withColumn("reordered", F.when(F.col("order_id").isNotNull(), 1).otherwise(0)).drop("order_id")
output_df = order_product.filter(F.col("eval_set") == 'train') \
            .dropDuplicates(["user_id"]).select(*feature_schema) \
            .join(train_df, ['user_id'], 'inner')\
            .join(product.select("product_id", 'department_id'), ['product_id'], 'inner')

output_path = "/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/train_df.parquet"
output_df.write.mode("overwrite").parquet(output_path)

# Product level -> Order Level; User_id, Product_id, Reordered


In [None]:
df = pd.read_parquet(output_path)
df.columns

In [None]:
#### Cohort 2 ####
df2 = spark.read.parquet(output_path)

user_order = order.filter(F.col("eval_set") == 'prior')\
            .groupBy(['user_id']).agg(F.count_distinct("order_id").alias("user_order_num"))

# aisle id & reorder rate per product for each user
reorder_freq = order_products_prior\
    .join(order.select("user_id", "order_id"), ["order_id"], "left")\
    .groupBy(['user_id', "product_id"]).agg(F.count("order_id").alias("num_order"))\
    .join(user_order, ["user_id"], "left")\
    .withColumn("reorder_rate", F.round(F.col("num_order") / F.col("user_order_num"),2))\
    .join(product.select("product_id", "aisle_id"), ["product_id"], "inner")

temp = order.groupBy("user_id").agg(F.mean("days_since_prior_order").alias("mean_day"), 
                             F.stddev("days_since_prior_order").alias("std_day"))

# scale since prior order for each product (based on each user)
scale_day_prior = order\
    .filter(F.col("eval_set") == "train")\
    .join(temp, ["user_id"], "inner")\
    .withColumn("scale_day_prior", (F.col("days_since_prior_order") - F.col("mean_day")) / F.col("std_day"))\
    .select("user_id", "scale_day_prior")

output_df2 = df2.join(reorder_freq.select("product_id", "user_id", "reorder_rate", "aisle_id"), ["user_id", "product_id"], "left")\
    .join(scale_day_prior, ['user_id'], "left")\
    .drop("days_since_prior_order").fillna({'scale_day_prior': 0})

output_path2 = "/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/train_df2.parquet"
output_df2.write.mode("overwrite").parquet(output_path2)


In [None]:
df2 = pd.read_parquet(output_path2)
df2.columns

In [None]:
# Feature Engineering

from sklearn.preprocessing import OneHotEncoder

encoder = OneHotEncoder(sparse=False)
encoder_list = ['order_dow', 'order_hour_of_day', 'department_id']

encoder.fit(df[encoder_list])
encoded_data = encoder.transform(df[encoder_list])

columns = encoder.get_feature_names_out(encoder_list)

one_hot_encoded_df = pd.DataFrame(encoded_data, columns=columns)
df = pd.concat([df, one_hot_encoded_df], axis=1)


In [None]:
# train_test_split
test_ratio = 0.2
n_id = df['user_id'].nunique()
test_id = df['user_id'].drop_duplicates().sample(int(n_id *test_ratio)).tolist()
train_df = df[~df['user_id'].isin(test_id)].reset_index(drop=True)
test_df = df[df['user_id'].isin(test_id)].reset_index(drop=True)
positive_rate = train_df[train_df['reordered'] == 0].shape[0] / train_df[
    train_df['reordered'] == 1].shape[0] # ratio for imbalance data 

input_var_list = columns.tolist() + ['days_since_prior_order']
label = 'reordered'

train_x = train_df[input_var_list]
test_x = test_df[input_var_list]
train_y = train_df[label]
test_y = test_df[label]

In [None]:
import pickle

path = '/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/'
test_df.to_parquet(path + 'test_prior_training1.parquet', index=False)
train_df.to_parquet(path + 'train_prior_training1.parquet', index=False)

# Save the test_id list to a file
with open(path + 'test_ids.pkl', 'wb') as f:
    pickle.dump(test_id, f)

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

logreg = LogisticRegression(class_weight='balanced', max_iter=150, n_jobs=-1, verbose=0)
logreg.fit(train_x, train_y)

test_df['log_prob'] = logreg.predict_proba(test_x)[:,1]
test_df['log_pred'] = test_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['log_prob'] = logreg.predict_proba(train_x)[:,1]
train_df['log_pred'] = train_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)

In [None]:
import xgboost as xgb
clf = xgb.XGBClassifier(n_estimators=300, max_depth=6, n_jobs=-1, scale_pos_weight=positive_rate)
clf.fit(train_x, train_y)

# Predict the test set results
test_df['xgb_prob'] = clf.predict_proba(test_x)[:,1]
test_df['xgb_pred'] = test_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['xgb_prob'] = clf.predict_proba(train_x)[:,1]
train_df['xgb_pred'] = train_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)

In [None]:
path = '/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/'

test_df.to_parquet(path + 'test_predictions1.parquet', index=False)
train_df.to_parquet(path + 'train_predictions1.parquet', index=False)

with open(path + 'loreg1.pkl', 'wb') as f:
    pickle.dump(logreg, f)
with open(path + 'clf1.pkl', 'wb') as f:
    pickle.dump(clf, f)

In [None]:
trace1 = me.create_roc_trace(train_df, label, 'log_prob', 'train_logistic1')
trace2 = me.create_roc_trace(test_df, label, 'log_prob', 'test_logistic1')
trace3 = me.create_roc_trace(train_df, label, 'xgb_prob', 'train_xgb1')
trace4 = me.create_roc_trace(test_df, label, 'xgb_prob', 'test_xgb1')
data = [trace1, trace2, trace3, trace4]

me.create_overlay_roc_curve(data)

In [None]:
t1 = me.ClassifierModelEvaluation(train_df, 'logistic_train', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_train")
t2 = me.ClassifierModelEvaluation(test_df, 'logistic_test', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_test")
t3 = me.ClassifierModelEvaluation(train_df, 'xgb_train', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_train")
t4 = me.ClassifierModelEvaluation(test_df, 'xgb_test', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_test")
pd.concat([t1, t2, t3, t4], axis=0).round(3)

In [None]:
threshold_list = [i/100 for i in range(40, 65)]
b1 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'logistic_train', label, 'log_prob')
b2 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'logistic_test', label, 'log_prob')
b3 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'xgb_train', label, 'xgb_prob')
b4 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'xgb_test', label, 'xgb_prob')

In [None]:
b1.to_parquet(path + 'b1_logistic_train_evaluation.parquet', index=False)
b2.to_parquet(path + 'b2_logistic_test_evaluation.parquet', index=False)
b3.to_parquet(path + 'b3_xgb_train_evaluation.parquet', index=False)
b4.to_parquet(path + 'b4_xgb_test_evaluation.parquet', index=False)

In [None]:
data = [plt.create_table_trace(i.round(3).drop('model_name', axis=1)) for i in [b1, b2, b3, b4]]
data[0].visible = True

var_list = ['logistic_train', 'logistic_test', 'xgb_train', 'xgb_test']
buttons = []
visible_list = plt.visible_true_false_list(len(var_list), 1)
for i in range(len(visible_list)):
    temp = {'label': var_list[i],'method': 'update', 'args': [{'visible': visible_list[i]}]}
    buttons.append(temp)

updatemenus = list([
            dict(active=-1,
                 x=0.0,
                 xanchor='left',
                 y=1.33,
                 yanchor='top',
                 direction='down',
                 buttons=buttons,
                 )
        ])

layout = go.Layout(title='<b>Model Performance - Threshold Table<b>',
                   updatemenus = updatemenus,
                       height=600,
                       width=900)
fig = go.Figure(data=data, layout=layout)
fig.show()

### Next Steps: 
- Feature Engineering include adding new predictors. 
- Used hyperparameter tuning to improve the performance - Tried XGBoost: n_estimator to 500 and 1000, max_depth = 7, 8. But they did not improve the model much. We will try if feature engineering helps improve the performance.

# Featuring Engineering

Consider adding the below feature and evaluate the model performance
- aisle id
- reorder rate for each user per product for each user
- sacle since priror order for each product (based on each user)

In [None]:
df2.head()

In [None]:
# Feature Engineering

from sklearn.preprocessing import OneHotEncoder

encoder = OneHotEncoder(sparse=False)
encoder_list = ['order_dow', 'order_hour_of_day', 'department_id', 'aisle_id']

encoder.fit(df2[encoder_list])
encoded_data = encoder.transform(df2[encoder_list])

columns = encoder.get_feature_names_out(encoder_list)

one_hot_encoded_df2 = pd.DataFrame(encoded_data, columns=columns)
df2 = pd.concat([df2, one_hot_encoded_df2], axis=1)

In [None]:
## train_test_split

# Load the test_id list from the file
with open(path + 'test_ids.pkl', 'rb') as f:
    test_id = pickle.load(f)

# Splitting df2 using the loaded test_id
train_df = df2[~df2['user_id'].isin(test_id)].reset_index(drop=True)
test_df = df2[df2['user_id'].isin(test_id)].reset_index(drop=True)

# Calculate the positive rate for the imbalance data
positive_rate = train_df[train_df['reordered'] == 0].shape[0] / train_df[train_df['reordered'] == 1].shape[0]

# Preparing input and output variables
input_var_list = columns.tolist() + ['reorder_rate'] + ['scale_day_prior']
label = 'reordered'

train_x = train_df[input_var_list]
test_x = test_df[input_var_list]
train_y = train_df[label]
test_y = test_df[label]

In [None]:
import pickle

test_df.to_parquet(path + 'test_prior_training2.parquet', index=False)
train_df.to_parquet(path + 'train_prior_training2.parquet', index=False)

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

logreg = LogisticRegression(class_weight='balanced', max_iter=250, n_jobs=-1, verbose=0)
logreg.fit(train_x, train_y)

test_df['log_prob'] = logreg.predict_proba(test_x)[:,1]
test_df['log_pred'] = test_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['log_prob'] = logreg.predict_proba(train_x)[:,1]
train_df['log_pred'] = train_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)

In [None]:
import xgboost as xgb
clf = xgb.XGBClassifier(n_estimators=300, max_depth=6, n_jobs=-1, scale_pos_weight=positive_rate)
clf.fit(train_x, train_y)

# Predict the test set results
test_df['xgb_prob'] = clf.predict_proba(test_x)[:,1]
test_df['xgb_pred'] = test_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['xgb_prob'] = clf.predict_proba(train_x)[:,1]
train_df['xgb_pred'] = train_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)

In [None]:
# Save the test & train DataFrame with predictions and probabilities
test_df.to_parquet(path + 'test_predictions2.parquet', index=False)
train_df.to_parquet(path + 'train_predictions2.parquet', index=False)

with open(path + 'loreg2.pkl', 'wb') as f:
    pickle.dump(logreg, f)
with open(path + 'clf2.pkl', 'wb') as f:
    pickle.dump(clf, f)

In [None]:
trace5 = me.create_roc_trace(train_df, label, 'log_prob', 'train_logistic2')
trace6 = me.create_roc_trace(test_df, label, 'log_prob', 'test_logistic2')
trace7 = me.create_roc_trace(train_df, label, 'xgb_prob', 'train_xgb2')
trace8 = me.create_roc_trace(test_df, label, 'xgb_prob', 'test_xgb2')
data = [trace5, trace6, trace7, trace8]

me.create_overlay_roc_curve(data)

In [None]:
t5 = me.ClassifierModelEvaluation(train_df, 'logistic_train', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_train")
t6 = me.ClassifierModelEvaluation(test_df, 'logistic_test', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_test")
t7 = me.ClassifierModelEvaluation(train_df, 'xgb_train', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_train")
t8 = me.ClassifierModelEvaluation(test_df, 'xgb_test', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_test")
pd.concat([t5, t6, t7, t8], axis=0).round(3)

In [None]:
threshold_list = [i/100 for i in range(40, 65)]
b5 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'logistic_train', label, 'log_prob')
b6 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'logistic_test', label, 'log_prob')
b7 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'xgb_train', label, 'xgb_prob')
b8 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'xgb_test', label, 'xgb_prob')

In [None]:
b5.to_parquet(path + 'b5_logistic_train_evaluation.parquet', index=False)
b6.to_parquet(path + 'b6_logistic_test_evaluation.parquet', index=False)
b7.to_parquet(path + 'b7_xgb_train_evaluation.parquet', index=False)
b8.to_parquet(path + 'b8_xgb_test_evaluation.parquet', index=False)

In [None]:
data = [plt.create_table_trace(i.round(3).drop('model_name', axis=1)) for i in [b5, b6, b7, b8]]
data[0].visible = True

var_list = ['logistic_train', 'logistic_test', 'xgb_train', 'xgb_test']
buttons = []
visible_list = plt.visible_true_false_list(len(var_list), 1)
for i in range(len(visible_list)):
    temp = {'label': var_list[i],'method': 'update', 'args': [{'visible': visible_list[i]}]}
    buttons.append(temp)

updatemenus = list([
            dict(active=-1,
                 x=0.0,
                 xanchor='left',
                 y=1.33,
                 yanchor='top',
                 direction='down',
                 buttons=buttons,
                 )
        ])

layout = go.Layout(title='<b>Model Performance - Threshold Table<b>',
                   updatemenus = updatemenus,
                       height=600,
                       width=900)
fig = go.Figure(data=data, layout=layout)
fig.show()

In [None]:
import plotly.subplots as sp

fig = sp.make_subplots(rows=2, cols=2)

# Add traces to the subplots
fig.add_trace(trace1, row=1, col=1)
fig.add_trace(trace5, row=1, col=1)

fig.add_trace(trace2, row=1, col=2)
fig.add_trace(trace6, row=1, col=2)

fig.add_trace(trace3, row=2, col=1)
fig.add_trace(trace7, row=2, col=1)

fig.add_trace(trace4, row=2, col=2)
fig.add_trace(trace8, row=2, col=2)

# Show the figure
fig.show()

In [None]:
import plotly.express as px

a = clf.feature_importances_
b = clf.feature_names_in_
feature_data = pd.DataFrame({
    'Feature': b,
    'Importance': a
}).round(5)
feature_data['nomalized_Feature'] = feature_data['Feature'].str.replace(r'(_\d+)$', '', regex=True)
feature_sum = feature_data.groupby('nomalized_Feature')['Importance'].sum().reset_index(name='feature_sum')

# Creating a bar chart using Plotly
fig = px.bar(feature_sum, x='nomalized_Feature', y='feature_sum',
             title='Normalized Feature Importance',
             labels={'nomalized_Feature': 'Feature', 'feature_sum': 'Sum of Importance'})

# Show the plot
fig.show()
