In [0]:
# dbutils.widgets.removeAll()
dbutils.widgets.text("catalog", "juan_dev", "Unity Catalog catalog (required)")
dbutils.widgets.text("schema", "ml_nba_demo", "Schema/database for demo artifacts")
dbutils.widgets.text("feature_table", "nba_customer_features", "Feature table name (no catalog/schema)")
dbutils.widgets.text("raw_table", "nba_customers_raw", "Raw data table name (no catalog/schema)")
dbutils.widgets.text("rec_table", "nba_recommendations", "Recommendation output table name")
dbutils.widgets.text("log_table", "nba_inference_log", "Inference log table name")
dbutils.widgets.text("model_base", "nba_model", "Model base name")
dbutils.widgets.text("experiment_path", "/Users/juan.lamadrid@databricks.com/experiments/nba_demo", "MLflow experiment path (optional)")

CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")
FEATURE_TABLE = dbutils.widgets.get("feature_table")
RAW_TABLE = dbutils.widgets.get("raw_table")
REC_TABLE = dbutils.widgets.get("rec_table")
LOG_TABLE = dbutils.widgets.get("log_table")
MODEL_BASE = dbutils.widgets.get("model_base")
EXPERIMENT_PATH = dbutils.widgets.get("experiment_path") or None

FEATURE_TABLE_FULL = f"{CATALOG}.{SCHEMA}.{FEATURE_TABLE}"
RAW_TABLE_FULL = f"{CATALOG}.{SCHEMA}.{RAW_TABLE}"
REC_TABLE_FULL = f"{CATALOG}.{SCHEMA}.{REC_TABLE}"
LOG_TABLE_FULL = f"{CATALOG}.{SCHEMA}.{LOG_TABLE}"
UC_MODEL_NAME = f"{CATALOG}.{SCHEMA}.{MODEL_BASE}"

In [0]:
# Ensure the MLflow client talks to Unity Catalog's registry (when applicable)
import mlflow
try:
    mlflow.set_registry_uri("databricks-uc")
except Exception as e:
    print("Note: Using default registry URI. If you're on UC, this is usually fine.", str(e))

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"USE {CATALOG}.{SCHEMA}")

print("Using configuration:")
print("  Catalog       :", CATALOG)
print("  Schema        :", SCHEMA)
print("  Raw table     :", RAW_TABLE_FULL)
print("  Feature table :", FEATURE_TABLE_FULL)
print("  Recs table    :", REC_TABLE_FULL)
print("  Log table     :", LOG_TABLE_FULL)
print("  UC model name :", UC_MODEL_NAME)
print("  Experiment    :", EXPERIMENT_PATH or "<not set>")

In [0]:
import numpy as np
import pandas as pd

np.random.seed(42)
n_customers = 10_000
customer_id = np.arange(1, n_customers + 1)

# Demographics
age = np.random.randint(18, 71, size=n_customers)
gender = np.random.choice(['M','F'], size=n_customers, p=[0.48,0.52])
region = np.random.choice(['Northeast','Midwest','South','West'], size=n_customers, p=[0.20,0.23,0.35,0.22])
income = np.random.normal(80_000, 25_000, size=n_customers).clip(20_000, 250_000)

# Transactions (last month)
num_purchases_last_month = np.random.poisson(lam=2.2, size=n_customers)
purchase_amount_last_month = np.random.gamma(shape=2.0, scale=120.0, size=n_customers) * (1 + (income/250_000))

# Browsing (last week)
browsing_minutes_last_week = np.random.gamma(shape=2.2, scale=25.0, size=n_customers)
categories = ['Beauty','Electronics','Fashion','Home','Grocery','Sports','Toys']
top_category = np.random.choice(categories, size=n_customers)

# Ground-truth NEXT BEST ACTION (rule-based for education)
best_action = []
for a, spend, purchases, browse in zip(age, purchase_amount_last_month, num_purchases_last_month, browsing_minutes_last_week):
    if spend > 600 or purchases >= 6:
        best_action.append("Email")
    elif browse > 200 or a < 28:
        best_action.append("Push")
    elif 28 <= a <= 50:
        best_action.append("SMS")
    else:
        best_action.append("Email")

raw_pdf = pd.DataFrame({
    "customer_id": customer_id,
    "age": age,
    "gender": gender,
    "region": region,
    "income": income.round(2),
    "num_purchases_last_month": num_purchases_last_month,
    "purchase_amount_last_month": purchase_amount_last_month.round(2),
    "browsing_minutes_last_week": browsing_minutes_last_week.round(2),
    "top_category": top_category,
    "best_action": best_action
})

raw_sdf = spark.createDataFrame(raw_pdf)
raw_sdf.write.mode("overwrite").format("delta").saveAsTable(RAW_TABLE_FULL)

display(spark.table(RAW_TABLE_FULL).limit(10))
print(f"Wrote {RAW_TABLE_FULL} with {raw_sdf.count():,} rows.")

In [0]:
from pyspark.sql import functions as F

raw = spark.table(RAW_TABLE_FULL)

# Class balance
display(raw.groupBy("best_action").count().orderBy(F.desc("count")))

# Numeric summary
num_cols = ["age","income","num_purchases_last_month","purchase_amount_last_month","browsing_minutes_last_week"]
display(raw.select(num_cols).summary())

# Correlation (numeric features vs a numeric label proxy)
label_int = F.when(F.col("best_action")=="Email", 0).when(F.col("best_action")=="SMS", 1).otherwise(2)
corr_df = raw.select(*num_cols, label_int.alias("label_int"))

corrs = []
for c in num_cols:
    cor = corr_df.stat.corr(c, "label_int")
    corrs.append((c, cor))
spark.createDataFrame(corrs, ["feature","corr_with_label"]).createOrReplaceTempView("feat_corrs")
display(spark.sql("SELECT * FROM feat_corrs ORDER BY ABS(corr_with_label) DESC"))

# Small sample to pandas for a couple of simple plots (matplotlib only)
sample_pdf = raw.limit(5000).toPandas()

import matplotlib.pyplot as plt

plt.figure(figsize=(6,4))
plt.hist(sample_pdf["age"], bins=25)
plt.title("Age Distribution"); plt.xlabel("Age"); plt.ylabel("Count")
display(plt.gcf()); plt.close()

plt.figure(figsize=(6,4))
groups = [sample_pdf[sample_pdf["best_action"]==lab]["browsing_minutes_last_week"] for lab in ["Email","SMS","Push"]]
plt.boxplot(groups, labels=["Email","SMS","Push"])
plt.title("Browsing Minutes vs. Best Action"); plt.xlabel("Best Action"); plt.ylabel("Browsing Minutes (last week)")
display(plt.gcf()); plt.close()

In [0]:
from pyspark.sql import functions as F

raw = spark.table(RAW_TABLE_FULL)

features_sdf = (
    raw
    .withColumn("gender_ix", F.when(F.col("gender")=="M", 1).otherwise(0))
    .withColumn("age_bucket",
        F.when(F.col("age")<28, "young")
         .when((F.col("age")>=28) & (F.col("age")<=50), "mid")
         .otherwise("senior"))
    .withColumn("avg_purchase_value",
        F.when(F.col("num_purchases_last_month")>0, F.col("purchase_amount_last_month")/F.col("num_purchases_last_month")).otherwise(0.0))
    .withColumn("is_high_spender", (F.col("purchase_amount_last_month")>600).cast("int"))
    .select(
        "customer_id",
        "age","gender_ix","income",
        "num_purchases_last_month","purchase_amount_last_month","avg_purchase_value",
        "browsing_minutes_last_week","is_high_spender",
        "region","top_category","age_bucket"
    )
)

labels_sdf = raw.select(
    "customer_id",
    F.when(F.col("best_action")=="Email", 0).when(F.col("best_action")=="SMS", 1).otherwise(2).alias("action_label"),
    "best_action"
)

# Persist engineered features to Delta (Feature Store will build on top of this)
features_sdf.write.mode("overwrite").format("delta").saveAsTable(FEATURE_TABLE_FULL)
display(spark.table(FEATURE_TABLE_FULL).limit(10))
display(labels_sdf.limit(10))

# We'll set fs_mode now; Cell 3.2 may update it to 'feature_store'
fs_mode = "delta"
print("Baseline features table written:", FEATURE_TABLE_FULL)

In [0]:
# Ensure the engineered features table has a valid PK column for Feature Store
from pyspark.sql import functions as F

feat_df = spark.table(FEATURE_TABLE_FULL)

dups = (feat_df.groupBy("customer_id").count().filter("count > 1").count())
nulls = feat_df.filter(F.col("customer_id").isNull()).count()

print(f"Duplicates on customer_id: {dups}")
print(f"Null customer_id values : {nulls}")

# If needed, auto-deduplicate by customer_id (keep the first) and drop nulls
if dups > 0 or nulls > 0:
    print("Fixing duplicates/nulls by dropping duplicates and null customer_id rows...")
    fixed = (
        feat_df
        .filter(F.col("customer_id").isNotNull())
        .dropDuplicates(["customer_id"])
    )
    fixed.write.mode("overwrite").format("delta").saveAsTable(FEATURE_TABLE_FULL)
    feat_df = spark.table(FEATURE_TABLE_FULL)
    print("Rewrote features table without duplicates/nulls.")

# Enforce NOT NULL on customer_id
try:
    spark.sql(f"ALTER TABLE {FEATURE_TABLE_FULL} ALTER COLUMN customer_id SET NOT NULL")
    print("Set NOT NULL on customer_id.")
except Exception as e:
    print("Note: Could not set NOT NULL (might already be set):", str(e))

# Add PRIMARY KEY constraint (constraint name cannot contain dots)
CONSTRAINT_NAME = f"{FEATURE_TABLE}_pk"
try:
    spark.sql(f"ALTER TABLE {FEATURE_TABLE_FULL} ADD CONSTRAINT {CONSTRAINT_NAME} PRIMARY KEY (customer_id)")
    print(f"Added PRIMARY KEY constraint {CONSTRAINT_NAME} on {FEATURE_TABLE_FULL}.")
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"PRIMARY KEY constraint {CONSTRAINT_NAME} already exists; continuing.")
    else:
        print("PK add failed (check privileges / UC support). Proceeding with Delta-only if FS unavailable.")
        print("Details:", str(e))

In [0]:
# Try to register/write via Feature Store; if not available, stay in Delta-only mode.
fs_mode = "delta"
try:
    from databricks.feature_store import FeatureStoreClient
    fs = FeatureStoreClient()

    # (Re)create FS table entry if needed
    try:
        fs.create_table(
            name=FEATURE_TABLE_FULL,
            primary_keys=["customer_id"],
            schema=spark.table(FEATURE_TABLE_FULL).schema,
            description="Customer features for NBA model (demo)"
        )
        print("Feature Store table entry created.")
    except Exception as e:
        print("Feature Store table may already exist; proceeding. Details:", str(e))

    # Write features into FS-managed table
    fs.write_table(
        name=FEATURE_TABLE_FULL,
        df=spark.table(FEATURE_TABLE_FULL),
        mode="overwrite"
    )
    fs_mode = "feature_store"
    print(f"Feature Store write OK → {FEATURE_TABLE_FULL}")
except Exception as e:
    print("Feature Store not available; continuing with Delta-only features.")
    print("Reason:", str(e))

print("Feature persistence mode:", fs_mode)
display(spark.table(FEATURE_TABLE_FULL).limit(10))

In [0]:
# Define the feature lists (used consistently across train & inference)
NUMERIC_FEATURES = [
    "age","gender_ix","income",
    "num_purchases_last_month","purchase_amount_last_month","avg_purchase_value",
    "browsing_minutes_last_week","is_high_spender"
]
CATEGORICALS = ["region","top_category","age_bucket"]

# Assemble training dataframe (Spark → Pandas)
# if fs_mode == "feature_store":
from databricks.feature_store import FeatureLookup
feature_lookups = [
    FeatureLookup(
        table_name=FEATURE_TABLE_FULL,
        feature_names=NUMERIC_FEATURES + CATEGORICALS,
        lookup_key="customer_id"
    )
]
training_set = fs.create_training_set(df=labels_sdf, feature_lookups=feature_lookups, label="action_label")
train_sdf = training_set.load_df()
# else:
#     feats = spark.table(FEATURE_TABLE_FULL)
#     train_sdf = labels_sdf.alias("l").join(feats.alias("f"), on="customer_id") \
#         .select("l.action_label", *NUMERIC_FEATURES, *CATEGORICALS, "f.customer_id")

train_pdf = train_sdf.toPandas()

# Build design matrix (one-hot categoricals)
import pandas as pd
y_all = train_pdf["action_label"].astype(int).values
X_all = train_pdf[NUMERIC_FEATURES + CATEGORICALS].copy()
X_all = pd.get_dummies(X_all, columns=CATEGORICALS, drop_first=True)

# Persist training column order for downstream inference alignment
TRAIN_COLUMNS = list(X_all.columns)
print("Training matrix shape:", X_all.shape)
print("Label shape:", y_all.shape)
print("First 5 training columns:", TRAIN_COLUMNS[:5])

In [0]:
import mlflow, mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import numpy as np, matplotlib.pyplot as plt, itertools

EXPERIMENT_PATH = f"/Users/juan.lamadrid@databricks.com/experiments/nba-model"
# (Optional) Set experiment explicitly
if EXPERIMENT_PATH:
    mlflow.set_experiment(EXPERIMENT_PATH)

# Ensure UC registry when available (safe if already set)
try:
    mlflow.set_registry_uri("databricks-uc")
    print("MLflow registry set to Unity Catalog.")
except Exception as e:
    raise RuntimeError("Unity Catalog MLflow registry is required for this workflow.") from e

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X_all, y_all, test_size=0.25, random_state=42, stratify=y_all
)

mlflow.sklearn.autolog(log_input_examples=True, log_model_signatures=True)

from sklearn.ensemble import RandomForestClassifier

with mlflow.start_run() as run:
    model = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
    model.fit(X_train, y_train)

    # Basic eval
    y_pred = model.predict(X_test)
    TEST_ACCURACY = float(accuracy_score(y_test, y_pred))
    mlflow.log_metric("test_accuracy", TEST_ACCURACY)
    mlflow.log_text(classification_report(y_test, y_pred), "classification_report.txt")

    # Confusion matrix artifact
    cm = confusion_matrix(y_test, y_pred, labels=[0,1,2])
    plt.figure(figsize=(4,4))
    plt.imshow(cm, interpolation='nearest'); plt.title("Confusion Matrix (0=Email,1=SMS,2=Push)")
    plt.colorbar()
    ticks = np.arange(3); labs = ["Email","SMS","Push"]
    plt.xticks(ticks, labs, rotation=45); plt.yticks(ticks, labs)
    thresh = cm.max()/2.0
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, f"{cm[i,j]:d}", ha="center",
                     color="white" if cm[i, j] > thresh else "black")
    plt.ylabel("True"); plt.xlabel("Pred"); plt.tight_layout()
    mlflow.log_figure(plt.gcf(), "confusion_matrix.png"); plt.close()

    # ✅ Log model using the modern parameter `name`, not deprecated `artifact_path`
    mlflow.sklearn.log_model(
        sk_model=model,
        name="model",
        signature=mlflow.models.infer_signature(X_train, model.predict(X_train))
    )

    RUN_ID = run.info.run_id
    MODEL_RUNS_URI = f"runs:/{RUN_ID}/model"   # <-- use this for registration (works with UC)

print("Run ID:", RUN_ID)
print("Model runs URI:", MODEL_RUNS_URI)
print("Test accuracy:", TEST_ACCURACY)

In [0]:
import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

new_version = None

try:
    # ✅ UC registration: use runs:/<run_id>/model
    mv = mlflow.register_model(model_uri=MODEL_RUNS_URI, name=UC_MODEL_NAME)
    new_version = int(mv.version)
    print(f"Registered to UC: {UC_MODEL_NAME} v{new_version}")
except Exception as e:
    raise RuntimeError("Unity Catalog model registration failed. Verify catalog/schema permissions and UC configuration.") from e


In [0]:
from mlflow.tracking import MlflowClient
import mlflow

client = MlflowClient()
model_name = UC_MODEL_NAME
if new_version is None:
    raise RuntimeError("Model registration did not return a version. Re-run the registration cell.")
new_v = int(new_version)

def get_alias_version_or_none(name, alias):
    try:
        mv = client.get_model_version_by_alias(name, alias)
        return int(mv.version)
    except Exception:
        return None

def get_metric_for_version(name, version, metric_key="test_accuracy"):
    if version is None:
        return None
    mv = client.get_model_version(name, str(version))
    r = mlflow.get_run(mv.run_id)
    return r.data.metrics.get(metric_key)

# Always mark latest as @staging for smoke tests / endpoints
try:
    client.set_registered_model_alias(model_name, "staging", str(new_v))
    print(f"Set alias @{model_name}@staging → v{new_v}")
except Exception as e:
    print("Alias set (staging) warning:", str(e))

# Champion vs Challenger decision by test_accuracy
champ_v = get_alias_version_or_none(model_name, "champion")
new_acc = get_metric_for_version(model_name, new_v) or float("-inf")
champ_acc = get_metric_for_version(model_name, champ_v) if champ_v else None

if champ_v is None:
    client.set_registered_model_alias(model_name, "champion", str(new_v))
    print(f"No champion existed. Set @{model_name}@champion → v{new_v}")
else:
    if new_acc >= (champ_acc or float("-inf")):
        client.set_registered_model_alias(model_name, "prev_champion", str(champ_v))
        client.set_registered_model_alias(model_name, "champion", str(new_v))
        print(f"Promoted new champion: v{new_v} (prev champion was v{champ_v})")
    else:
        client.set_registered_model_alias(model_name, "challenger", str(new_v))
        print(f"Kept champion v{champ_v}; set challenger → v{new_v} (new_acc={new_acc:.4f} < champ_acc={champ_acc:.4f})")

In [0]:
import mlflow.pyfunc
import pandas as pd

# Take a fresh slice of features to score
features_df = spark.table(FEATURE_TABLE_FULL).limit(5000).toPandas()

# Build inference matrix matching training columns
X_infer = features_df[
    ["age","gender_ix","income",
     "num_purchases_last_month","purchase_amount_last_month","avg_purchase_value",
     "browsing_minutes_last_week","is_high_spender",
     "region","top_category","age_bucket"]
].copy()

X_infer = pd.get_dummies(X_infer, columns=["region","top_category","age_bucket"], drop_first=True)

# Align to training design matrix columns
missing = set(TRAIN_COLUMNS) - set(X_infer.columns)
for c in missing:
    X_infer[c] = 0
X_infer = X_infer[TRAIN_COLUMNS]  # exact order

# Load model by alias (works for UC & workspace registries)
loaded_model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")

pred_int = loaded_model.predict(X_infer)
inv_map = {0:"Email",1:"SMS",2:"Push"}
pred_str = [inv_map[int(i)] for i in pred_int]

recs_pdf = pd.DataFrame({
    "customer_id": features_df["customer_id"].values,
    "recommended_action": pred_str,
    "scored_at_ts": pd.Timestamp.utcnow()
})
recs_sdf = spark.createDataFrame(recs_pdf)
recs_sdf.write.mode("overwrite").format("delta").saveAsTable(REC_TABLE_FULL)

display(spark.table(REC_TABLE_FULL).limit(10))
print(f"Wrote recommendations → {REC_TABLE_FULL}")

In [0]:
from pyspark.sql import functions as F

# Build an inference log joining predictions + features (simulate a daily batch)
preds = spark.table(REC_TABLE_FULL)
feats = spark.table(FEATURE_TABLE_FULL).select(
    "customer_id",
    "age","gender_ix","income",
    "num_purchases_last_month","purchase_amount_last_month","avg_purchase_value",
    "browsing_minutes_last_week","is_high_spender",
    "region","top_category","age_bucket"
)

log_df = (
    preds.alias("p")
    .join(feats.alias("f"), on="customer_id", how="left")
    .withColumn("log_date", F.current_date())
)

(log_df.write.mode("append").format("delta").saveAsTable(LOG_TABLE_FULL))

display(spark.table(LOG_TABLE_FULL).orderBy(F.desc("scored_at_ts")).limit(10))

# 1) Action distribution over time
action_dist = (
    spark.table(LOG_TABLE_FULL)
    .groupBy("log_date", "recommended_action")
    .count()
    .orderBy("log_date", "recommended_action")
)
display(action_dist)

# 2) Simple mean-drift check vs earliest day for numeric features
NUMERIC_FEATURES = [
    "age","gender_ix","income",
    "num_purchases_last_month","purchase_amount_last_month","avg_purchase_value",
    "browsing_minutes_last_week","is_high_spender"
]
log_tbl = spark.table(LOG_TABLE_FULL)
dates = [r["log_date"] for r in log_tbl.select("log_date").distinct().orderBy("log_date").collect()]
if len(dates) >= 1:
    baseline = dates[0]
    cur = dates[-1]
    base_means = (log_tbl.filter(F.col("log_date")==baseline)
                  .select([F.mean(c).alias(c) for c in NUMERIC_FEATURES])
                  .collect()[0].asDict())
    cur_means = (log_tbl.filter(F.col("log_date")==cur)
                 .select([F.mean(c).alias(c) for c in NUMERIC_FEATURES])
                 .collect()[0].asDict())
    rows = []
    for c in NUMERIC_FEATURES:
        b = float(base_means.get(c, 0) or 0)
        t = float(cur_means.get(c, 0) or 0)
        rows.append((c, b, t, (t-b)))
    spark.createDataFrame(rows, ["feature","baseline_mean","current_mean","delta"]).createOrReplaceTempView("drift_means")
    display(spark.sql("SELECT * FROM drift_means ORDER BY ABS(delta) DESC"))
else:
    print("Not enough days logged yet to compute drift deltas.")

In [0]:
# In the Databricks UI: Serving → Create Endpoint → Select your registered model by alias (@champion)
# Then call it like below (fill in your workspace URL, PAT, and endpoint name).
# Many teams instead just load by alias in scheduled jobs.

import pandas as pd, json, requests

DATABRICKS_HOST = "https://<your-workspace-url>"  # e.g., https://abc-123.cloud.databricks.com
TOKEN = "<DATABRICKS_PAT>"
ENDPOINT_NAME = "<your-endpoint-name>"

# Build a single-row payload that matches the training columns
features_one = spark.table(FEATURE_TABLE_FULL).limit(1).toPandas()

X_row = features_one[
    ["age","gender_ix","income",
     "num_purchases_last_month","purchase_amount_last_month","avg_purchase_value",
     "browsing_minutes_last_week","is_high_spender",
     "region","top_category","age_bucket"]
].copy()
X_row = pd.get_dummies(X_row, columns=["region","top_category","age_bucket"], drop_first=True)

missing = set(TRAIN_COLUMNS) - set(X_row.columns)
for c in missing:
    X_row[c] = 0
X_row = X_row[TRAIN_COLUMNS]

payload = {"dataframe_records": X_row.to_dict(orient="records")}
url = f"{DATABRICKS_HOST}/serving-endpoints/{ENDPOINT_NAME}/invocations"
headers = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}

print("Fill in DATABRICKS_HOST, TOKEN, ENDPOINT_NAME and uncomment the POST to invoke.")
# resp = requests.post(url, headers=headers, data=json.dumps(payload), timeout=15)
# print(resp.status_code, resp.text)