In [0]:
%sql
DROP TABLE IF EXISTS workspace.gold.ecommerce_predictions;

In [0]:
# ----------------------------
# ORCHESTRATOR NOTEBOOK (ALL-IN-ONE)
# ----------------------------

import mlflow
import mlflow.pyfunc
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# ----------------------------
# 0. Spark Session
# ----------------------------
spark = SparkSession.builder.getOrCreate()

# ----------------------------
# 1. Drop Existing Predictions Table (Safe)
# ----------------------------
output_table = "workspace.gold.ecommerce_predictions"
spark.sql(f"DROP TABLE IF EXISTS {output_table}")
print(f"✅ Dropped table if it existed: {output_table}")

# ----------------------------
# 2. Load Production Model from MLflow
# ----------------------------
model_name = "ecommerce_purchase_prediction_gb"
model_alias = "production"
print(f"✅ Loading model: {model_name} ({model_alias})")
model = mlflow.pyfunc.load_model(f"models:/{model_name}@{model_alias}")
print("✅ Model loaded successfully")

# ----------------------------
# 3. Load Gold Feature Table
# ----------------------------
gold_table = "workspace.ecommerce.gold_user_item_features"
df_gold = spark.table(gold_table)
print(f"✅ Loaded Gold table: {gold_table} with {df_gold.count()} rows")

# ----------------------------
# 4. Select Features + Identifiers
# ----------------------------
feature_cols = ["interaction_score", "funnel_depth", "engagement_score"]
id_cols = ["visitorid", "itemid", "converted"]

df_input = df_gold.select(id_cols + feature_cols)

# ----------------------------
# 5. Convert to Pandas for Prediction
# ----------------------------
pdf = df_input.toPandas()
X_input = pdf[feature_cols]
print(f"✅ Feature matrix shape: {X_input.shape}")

# ----------------------------
# 6. Run Predictions
# ----------------------------
y_pred_prob = model.predict(X_input)  # returns float probabilities
y_pred_label = (y_pred_prob >= 0.5).astype(int)
print("✅ Predictions completed")

# ----------------------------
# 7. Attach Predictions
# ----------------------------
pdf["actual_label"] = pdf["converted"].astype(int)
pdf["predicted_label"] = y_pred_label
pdf["predicted_prob"] = y_pred_prob.astype(float)  # ensure float type

# Reorder columns
final_pdf = pdf[
    ["visitorid", "itemid", "actual_label", "predicted_label", "predicted_prob"] + feature_cols
]

# Convert back to Spark DataFrame
final_predictions = spark.createDataFrame(final_pdf)
print("✅ Predictions joined with identifiers")

# ----------------------------
# 8. Write to Gold Predictions Table
# ----------------------------
final_predictions.write.mode("overwrite").saveAsTable(output_table)
print(f"✅ Predictions saved to: {output_table}")

# ----------------------------
# 9. Validation: Show first 5 rows and schema
# ----------------------------
spark.table(output_table).show(5)
spark.sql(f"DESCRIBE TABLE {output_table}").show()

print("🎉 ORCHESTRATION COMPLETE SUCCESSFULLY")


✅ Dropped table if it existed: workspace.gold.ecommerce_predictions
✅ Loading model: ecommerce_purchase_prediction_gb (production)
✅ Model loaded successfully
✅ Loaded Gold table: workspace.ecommerce.gold_user_item_features with 4620187 rows
✅ Feature matrix shape: (4620187, 3)
✅ Predictions completed
✅ Predictions joined with identifiers
✅ Predictions saved to: workspace.gold.ecommerce_predictions
+---------+------+------------+---------------+--------------+-----------------+------------+----------------+
|visitorid|itemid|actual_label|predicted_label|predicted_prob|interaction_score|funnel_depth|engagement_score|
+---------+------+------------+---------------+--------------+-----------------+------------+----------------+
|   796052|116706|           0|              0|           0.0|                1|           1|               1|
|   980263|215429|           0|              0|           0.0|                1|           1|               1|
|   326683| 61926|           0|            