# Instacart Machine Learning Models
# This notebook builds ML models to predict product reorders.

In [0]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, count, sum, avg, stddev, min, max


In [0]:
# Prepare training data
train_data = spark.sql("""
  SELECT 
    op.order_id,
    op.product_id,
    op.add_to_cart_order,
    op.reordered,
    o.user_id,
    o.order_dow,
    o.order_hour_of_day,
    o.days_since_prior_order,
    p.aisle_id,
    p.department_id,
    -- Add user-based features
    up.user_total_orders,
    up.user_avg_days_between_orders,
    -- Add product-based features
    pp.product_reorder_rate,
    pp.product_order_count
  FROM instacart.order_products_train op
  JOIN instacart.orders o ON op.order_id = o.order_id
  JOIN instacart.products p ON op.product_id = p.product_id
  -- Join with pre-calculated user stats
  JOIN (
    SELECT 
      user_id,
      COUNT(DISTINCT order_id) as user_total_orders,
      AVG(days_since_prior_order) as user_avg_days_between_orders
    FROM instacart.orders
    GROUP BY user_id
  ) up ON o.user_id = up.user_id
  -- Join with pre-calculated product stats
  JOIN (
    SELECT
      product_id,
      SUM(CAST(reordered as INT)) / COUNT(*) as product_reorder_rate,
      COUNT(*) as product_order_count
    FROM instacart.order_products_prior
    GROUP BY product_id
  ) pp ON op.product_id = pp.product_id
""")


In [0]:
# Explore the dataset
train_data.describe().show()

# Check class imbalance
train_data.groupBy("reordered").count().show()


In [0]:
# Prepare feature columns
cat_cols = ["order_dow", "aisle_id", "department_id"]
num_cols = ["add_to_cart_order", "order_hour_of_day", "days_since_prior_order", 
            "user_total_orders", "user_avg_days_between_orders",
            "product_reorder_rate", "product_order_count"]

# Create a pipeline for preprocessing
indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx").fit(train_data) for col in cat_cols]
encoders = [OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_enc") for col in cat_cols]

# Get encoded column names
encoded_cols = [col+"_enc" for col in cat_cols]
feature_cols = num_cols + encoded_cols

# Create the assembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")


In [0]:
# Split the data
train, test = train_data.randomSplit([0.8, 0.2], seed=42)


In [0]:
# Create and train Random Forest model
rf_pipeline = Pipeline(stages=indexers + encoders + [assembler, 
                                                     RandomForestClassifier(labelCol="reordered", 
                                                                            featuresCol="features",
                                                                            numTrees=100)])
rf_model = rf_pipeline.fit(train)

# Make predictions
rf_predictions = rf_model.transform(test)

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="reordered")
rf_auc = evaluator.evaluate(rf_predictions)
print(f"Random Forest AUC: {rf_auc}")


In [0]:
# Create and train Logistic Regression model
lr_pipeline = Pipeline(stages=indexers + encoders + [assembler, 
                                                     LogisticRegression(labelCol="reordered", 
                                                                       featuresCol="features")])
lr_model = lr_pipeline.fit(train)

# Make predictions
lr_predictions = lr_model.transform(test)

# Evaluate model
lr_auc = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression AUC: {lr_auc}")


In [0]:
# Save the best model
best_model_path = "/models/product_reorder_prediction"
if rf_auc > lr_auc:
    rf_model.write().overwrite().save(best_model_path)
    print(f"Random Forest model saved to {best_model_path}")
else:
    lr_model.write().overwrite().save(best_model_path)
    print(f"Logistic Regression model saved to {best_model_path}")
