In [0]:
# We are creating a dataset for our model
# We'll use a 10% sample because 32 million rows might crash the free cluster memory
ml_data = spark.sql("""
    SELECT 
        try_cast(reordered AS INT) AS label,
        try_cast(order_dow AS INT) AS day_of_week,
        try_cast(order_hour_of_day AS INT) AS hour_of_day,
        try_cast(days_since_prior_order AS INT) AS days_since_last_order,
        try_cast(add_to_cart_order AS INT) AS cart_position
    FROM catalog1.instacart_db.order_products__prior op
    JOIN catalog1.instacart_db.orders o ON op.order_id = o.order_id
    WHERE reordered IS NOT NULL
""").sample(0.1) 

display(ml_data)

label,day_of_week,hour_of_day,days_since_last_order,cart_position
1,5,9,8,2
0,5,9,8,5
1,5,17,12,4
1,5,17,12,5
1,1,9,7,8
1,6,16,9,5
1,6,16,9,19
1,6,16,9,21
0,6,16,9,23
0,6,16,9,24


In [0]:
from pyspark.ml.feature import VectorAssembler

# Define which columns are our "features"
feature_cols = ['day_of_week', 'hour_of_day', 'days_since_last_order', 'cart_position']

# Create the Assembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the data
final_data = assembler.transform(ml_data)

# Select only the columns the model needs
final_data = final_data.select("features", "label")

display(final_data)

features,label
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""5.0"",""9.0"",""8.0"",""2.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""5.0"",""9.0"",""8.0"",""5.0""]}",0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""5.0"",""17.0"",""12.0"",""4.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""5.0"",""17.0"",""12.0"",""5.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""1.0"",""9.0"",""7.0"",""8.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""6.0"",""16.0"",""9.0"",""5.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""6.0"",""16.0"",""9.0"",""19.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""6.0"",""16.0"",""9.0"",""21.0""]}",1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""6.0"",""16.0"",""9.0"",""23.0""]}",0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""6.0"",""16.0"",""9.0"",""24.0""]}",0


In [0]:
# 70% for training, 30% for testing
train_data, test_data = final_data.randomSplit([0.7, 0.3])

print(f"Training Rows: {train_data.count()}")
print(f"Testing Rows: {test_data.count()}")

Training Rows: 2269260
Testing Rows: 973328


In [0]:
from pyspark.ml.classification import LogisticRegression

# 1. Initialize the model
# featuresCol and labelCol tell the model where to look
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 2. Fit the model (This is the "Training" part)
# This might take 1-2 minutes because it's processing 2 million rows
lr_model = lr.fit(train_data)

print("Training Complete! The model has learned the patterns in your data.")

Training Complete! The model has learned the patterns in your data.


In [0]:
# Use the model to predict reorders on the test set
predictions = lr_model.transform(test_data)

# Show the results
# 'prediction' is what the AI thinks, 'label' is the actual truth
display(predictions.select("features", "label", "prediction", "probability"))

features,label,prediction,probability
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""1.0""]}",1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.29244573367534976"",""0.7075542663246502""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""2.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.3003635691349929"",""0.6996364308650072""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""2.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.3003635691349929"",""0.6996364308650072""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""2.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.3003635691349929"",""0.6996364308650072""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""2.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.3003635691349929"",""0.6996364308650072""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""4.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.31655890699543565"",""0.6834410930045643""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""4.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.31655890699543565"",""0.6834410930045643""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""6.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.33321159125027744"",""0.6667884087497226""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""9.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.35898068845654185"",""0.6410193115434581""]}"
"{""type"":""0"",""size"":""4"",""indices"":[""3""],""values"":[""11.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.3766351732464315"",""0.6233648267535685""]}"


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 1. Setup the evaluator to look for "accuracy"
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# 2. Calculate accuracy
accuracy = evaluator.evaluate(predictions)

print(f"Model Accuracy: {accuracy * 100:.2f}%")

Model Accuracy: 59.81%


In [0]:
import pandas as pd

# Extract coefficients
# These represent how much each feature affects the "Reorder" decision
coeffs = lr_model.coefficients.toArray()
feature_names = ['Day of Week', 'Hour of Day', 'Days Since Last Order', 'Cart Position']

# Put them in a nice table
importance_df = pd.DataFrame({'Feature': feature_names, 'Coefficient': coeffs})
print("Feature Importance (Higher absolute number means more influence):")
print(importance_df.sort_values(by='Coefficient', ascending=False))

Feature Importance (Higher absolute number means more influence):
                 Feature  Coefficient
2  Days Since Last Order    -0.004258
0            Day of Week    -0.008967
1            Hour of Day    -0.012467
3          Cart Position    -0.037968


In [0]:
from pyspark.ml.linalg import Vectors

# Create a single row of data: [Day, Hour, DaysSinceLast, CartPos]
# Experiment: Change the 2.0 (Cart Position) to 20.0 to see the prediction change!
manual_data = [(Vectors.dense([1.0, 10.0, 7.0, 2.0]),)]
sample_df = spark.createDataFrame(manual_data, ["features"])

# Get the prediction
result = lr_model.transform(sample_df)

print("--- AI PREDICTION FOR MANUAL ORDER ---")
res = result.select("probability", "prediction").collect()[0]
prob_reorder = res.probability[1] * 100

print(f"Chance of Reorder: {prob_reorder:.2f}%")
print(f"Final Decision: {'REORDER' if res.prediction == 1.0 else 'NEW ITEM'}")

--- AI PREDICTION FOR MANUAL ORDER ---
Chance of Reorder: 66.42%
Final Decision: REORDER
