In [0]:
# Current user
current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get().split("@")[0]
print(current_user)


In [0]:
# Extract part before '@'
user_name = current_user.split("@")[0]

print("Full email:", current_user)
print("User name:", user_name)

In [0]:
from pyspark.sql import functions as F
import pandas as pd
from scipy import stats
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature

In [0]:
# ============================
# CONFIG
# ============================
CATALOG = "dbacademy"
SCHEMA = user_name
BASELINE_TABLE = f"{CATALOG}.{SCHEMA}.baseline_data"
CURRENT_TABLE  = f"{CATALOG}.{SCHEMA}.current_data"
OUTPUT_TABLE   = f"{CATALOG}.{SCHEMA}.drift_summary"

NUMERIC_COLS = ["feature_num"]
CATEGORICAL_COLS = ["feature_cat"]
ALPHA = 0.05   # significance level
MODEL_NAME = f"{CATALOG}.{SCHEMA}.demo_drift_model"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

In [0]:
# ============================
# SYNTHETIC DATA GENERATION
# ============================

# Baseline numeric + categorical
np.random.seed(42)
baseline_num = np.random.normal(loc=50, scale=10, size=1000)
baseline_cat = np.random.choice(["A", "B", "C"], size=1000, p=[0.6, 0.3, 0.1])
baseline_df = pd.DataFrame({"feature_num": baseline_num, "feature_cat": baseline_cat})

# Current (simulate drift by shifting mean + distribution)
current_num = np.random.normal(loc=60, scale=12, size=1000)   # shifted mean
current_cat = np.random.choice(["A", "B", "C"], size=1000, p=[0.4, 0.2, 0.4])  # shifted probs
current_df = pd.DataFrame({"feature_num": current_num, "feature_cat": current_cat})

In [0]:
# Save to Unity Catalog as Delta tables
spark.createDataFrame(baseline_df).write.mode("overwrite").saveAsTable(BASELINE_TABLE)
spark.createDataFrame(current_df).write.mode("overwrite").saveAsTable(CURRENT_TABLE)

In [0]:
# ============================
# DRIFT FUNCTIONS
# ============================

def population_stability_index(expected, actual, buckets=10):
    breakpoints = np.percentile(expected, np.linspace(0, 100, buckets + 1))
    expected_percents = np.histogram(expected, bins=breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, bins=breakpoints)[0] / len(actual)
    psi = np.sum((expected_percents - actual_percents) * np.log((expected_percents + 1e-6) / (actual_percents + 1e-6)))
    return psi

def chi_square_test(expected, actual):
    exp_counts = expected.value_counts()
    act_counts = actual.value_counts()
    all_categories = set(exp_counts.index).union(set(act_counts.index))
    exp_aligned = exp_counts.reindex(all_categories, fill_value=0)
    act_aligned = act_counts.reindex(all_categories, fill_value=0)
    chi2, p, _, _ = stats.chi2_contingency([exp_aligned, act_aligned])
    return chi2, p

In [0]:
# ============================
# DRIFT TESTS
# ============================

baseline_pd = baseline_df
current_pd  = current_df
results = []

# Numeric: KS + PSI
for col in NUMERIC_COLS:
    ks_stat, ks_p = stats.ks_2samp(baseline_pd[col], current_pd[col])
    psi_val = population_stability_index(baseline_pd[col], current_pd[col])
    results.append((col, "numeric", f"KS={ks_stat:.4f}, p={ks_p:.4f}", f"PSI={psi_val:.4f}",
                    "DRIFT" if (ks_p < ALPHA and psi_val > 0.1) else "NO_DRIFT"))

# Categorical: Chi-square
for col in CATEGORICAL_COLS:
    chi2, chi_p = chi_square_test(baseline_pd[col], current_pd[col])
    results.append((col, "categorical", f"Chi2={chi2:.4f}, p={chi_p:.4f}", None,
                    "DRIFT" if chi_p < ALPHA else "NO_DRIFT"))


In [0]:
# Write drift results
results_df = pd.DataFrame(results, columns=["feature", "type", "test_result", "effect_size", "drift_flag"])
spark_results = spark.createDataFrame(results_df)
spark_results.write.mode("overwrite").saveAsTable(OUTPUT_TABLE)

display(spark.table(OUTPUT_TABLE))

In [0]:
# ============================
# AUTOMATED RETRAINING
# ============================

# Check if any drift detected
drift_detected = results_df["drift_flag"].eq("DRIFT").any()

if drift_detected:
    print("🚨 Drift detected! Triggering automated retraining...")

    # Synthetic classification dataset for retraining demo
    X, y = make_classification(n_samples=2000, n_features=5, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Train logistic regression
    model = LogisticRegression(max_iter=500)
    model.fit(X_train, y_train)
    preds = model.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, preds)

    # Log to MLflow
    mlflow.set_experiment("/Shared/drift_retraining_demo")
    # ✅ Infer model signature (input + output schema)
    signature = infer_signature(X_train, model.predict_proba(X_train))

    # ✅ Create example input (small pandas DataFrame)
    example_input = pd.DataFrame(X_train[:5], columns=[f"f{i}" for i in range(X_train.shape[1])])

    # Log to MLflow with signature + example
    mlflow.set_experiment("/Shared/drift_retraining_demo")
    with mlflow.start_run():
        mlflow.log_metric("AUC", auc)
        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path="model",
            registered_model_name=MODEL_NAME,
            signature=signature,
            input_example=example_input
        )

    print(f"✅ New model trained, logged, and registered with AUC={auc:.4f}")

else:
    print("✅ No drift detected. Skipping retraining.")







