## 0) Config - Imports, Spark and MLFlow

In [0]:

# Imports
import os, matplotlib.pyplot as plt, mlflow, math, random, os
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorSlicer
from synapse.ml.lightgbm import LightGBMRegressor as LGBReg
from pyspark.sql.types import NumericType, StringType, BooleanType
from xgboost.spark import SparkXGBRegressor
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from mlflow.spark import log_model
from pyspark import StorageLevel
from pyspark.ml import Pipeline

In [0]:
CRASH_SAFE             = True

CV_TRIALS              = 10
EARLY_STOP_ROUNDS      = 15

OUT_BASE               = "XXX"
IMG_BASE               = "XXX"
DBFS_FILES_BASE        = "XXX"

os.makedirs(IMG_BASE, exist_ok=True)
os.makedirs(DBFS_FILES_BASE, exist_ok=True)

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 64*1024*1024)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

spark.conf.set("spark.sql.shuffle.partitions", 160 if CRASH_SAFE else 256)
spark.sparkContext.setCheckpointDir("/tmp/capstone_ai_chkpt")

In [0]:

# ML Flow config
MLFLOW_EXPERIMENT = "XXX"
MODEL_NAME        = "quality_model"
mlflow.set_experiment(MLFLOW_EXPERIMENT)

# Source table and key cols
SOURCE_TABLE        = "XXX"
ID_COL              = "XXX"
EXPOSURE_COL        = "XXX"
LOSS_COL            = "XXX"
LOSS_RATIO_TARGET   = "XXX"
STATE_COL           = "XXX"

RANDOM_SEED        = 42
random.seed(RANDOM_SEED)

PRIMARY_METRIC     = "mae_w"

##1) Define Helpers

In [0]:
# Split date features into date part int cols
def add_date_features(df, col):
    d=F.to_date(F.col(col))

    return (df.withColumn(f"{col}_year",F.year(d).cast("int"))
            .withColumn(f"{col}_month",F.month(d).cast("int"))
            .withColumn(f"{col}_dow",F.dayofweek(d).cast("int"))
            .withColumn(f"{col}_dom",F.dayofmonth(d).cast("int"))
            .withColumn(f"{col}_week",F.weekofyear(d).cast("int"))
            .withColumn(f"{col}_is_month_end",(F.dayofmonth(d)==F.dayofmonth(F.last_day(d))).cast("int"))
            .withColumn(f"{col}_age_days",F.datediff(F.current_date(), d).cast("int")))

# Random split by
def random_split_by_state(sdf, state_col="State", train=0.6, test=0.2, val=0.2, seed=42):
    w = Window.partitionBy(state_col)
    sdf2 = sdf.withColumn("_u", F.rand(seed)) 

    b1, b2 = train, train + test

    tr = sdf2.where(F.col("_u") < b1).drop("_u")
    te = sdf2.where((F.col("_u") >= b1) & (F.col("_u") < b2)).drop("_u")
    va = sdf2.where(F.col("_u") >= b2).drop("_u")

    return tr, te, va

# Apply frequency encoding to high cardinality categorical features
def apply_freq_encoding(df_in):
    df_out = df_in
    for c in cat_high:
        cnt_df, cnt_col = cnt_maps[c]
        freq_col = f"{c}_freq"
        df_out = (
            df_out.join(cnt_df, c, "left")
                  .withColumn(freq_col, F.col(cnt_col) / F.lit(train_n))
                  .drop(cnt_col)
        )
        if freq_col not in high_encoded_cols:
            high_encoded_cols.append(freq_col)
    return df_out

# Extract mapping from vector index to get original input col name
def get_feature_index_map(df, features_col="features"):
    meta = df.schema[features_col].metadata
    attrs = meta["ml_attr"]["attrs"]
    all_attrs = []
    for t in ["binary", "numeric", "nominal"]:
        all_attrs.extend(attrs.get(t, []))

    index_to_name = {a["idx"]: a["name"] for a in all_attrs}
    return index_to_name

# Remove OHE feature value suffix from feature name for base feature importance ranking
def strip_ohe_suffix(name: str) -> str:
    if name is None:
        return None
    pos = name.find("_oh")
    if pos == -1:
        return name
    return name[:pos+3]
    
strip_ohe_suffix_udf = F.udf(strip_ohe_suffix)

# Calculate weighted baselines
def weighted_baseline_metrics(sdf, label_col, w_col):
    y  = F.col(label_col)
    ww = F.col(w_col)

    agg = sdf.agg(
        F.sum(y * ww).alias("sum_y_w"),
        F.sum((y * y) * ww).alias("sum_y2_w"),
        F.sum(ww).alias("sum_w")
    ).first().asDict()

    sum_w   = float(agg["sum_w"]) if agg["sum_w"] is not None else 0.0
    sum_y_w = float(agg["sum_y_w"]) if agg["sum_y_w"] is not None else 0.0
    sum_y2_w = float(agg["sum_y2_w"]) if agg["sum_y2_w"] is not None else 0.0

    if sum_w == 0.0:
        return {
            "baseline_mean": float("nan"),
            "baseline_mae_w": float("nan"),
            "baseline_mse_w": float("nan"),
            "baseline_rmse_w": float("nan")
        }

# Calculate metrics
def metrics_sdf(sdf, label, pred, w):
    y    = F.col(label)
    yhat = F.col(pred)
    ww   = F.col(w)

    r = (
        sdf.agg(
            (F.sum(F.abs(y - yhat) * ww) / F.sum(ww)).alias("mae_w"),
            (F.sum(((y - yhat) * (y - yhat)) * ww) / F.sum(ww)).alias("mse_w"),
            F.sqrt(F.sum(((y - yhat) * (y - yhat)) * ww) / F.sum(ww)).alias("rmse_w")
        )
        .first()
        .asDict()
    )

    return r

# Define model param space
def param_samples(lib: str = "xgb", trials: int = 5):
    if lib == "xgb":
        for _ in range(trials):
            yield dict(
                max_depth        = random.choice([4, 6, 8]),
                learning_rate    = random.choice([0.03, 0.06, 0.1]),
                subsample        = random.choice([0.7, 0.8, 0.9]),
                colsample_bytree = random.choice([0.6, 0.7, 0.8]),
                reg_alpha        = random.choice([0.0, 0.1, 0.5]),
                reg_lambda       = random.choice([0.5, 1.0, 2.0]),
                n_estimators     = random.choice([300, 600, 1000]),
            )
    else:  # LightGBM
        for _ in range(trials):
            yield dict(
                numLeaves       = random.choice([31, 63, 127]),
                learningRate    = random.choice([0.03, 0.06, 0.1]),
                baggingFraction = random.choice([0.7, 0.8, 0.9]),
                featureFraction = random.choice([0.6, 0.7, 0.8]),
                maxDepth        = random.choice([-1, 10, 20]),
                numIterations   = random.choice([300, 600, 1000]))

# Define CV evaluator
def cv_fit_eval(model, ds, val, label, weight, run_name):
    with mlflow.start_run(run_name=run_name):
        m = model.fit(ds)

        # score on validation set
        va_scored = (
            m.transform(
                val.select(
                    "features",
                    F.col(label).alias(label),
                    F.col(weight).alias(weight),
                )
            )
            .select(
                F.col("prediction").alias("pred"),
                F.col(label).alias("y"),
                F.col(weight).alias("w"),
            )
        )

        mets = metrics_sdf(
            va_scored.withColumnRenamed("y", label).withColumnRenamed("pred", "yhat"),
            label,
            "yhat",
            "w",
        )

        for k, v in mets.items():
            mlflow.log_metric(f"cv_{k}", float(v))

    return m, mets

# Execute and identify best trials
def best_of_trials(train, val, label, weight, run_prefix="cv", objective="tweedie"):

    best_m, best_s, best_tag = None, {"mae_w": float("inf")}, None

    tr = train.withColumn("is_val", F.lit(False))
    va = val.withColumn("is_val", F.lit(True))
    parts_local = int(spark.conf.get("spark.sql.shuffle.partitions"))
    ds = tr.unionByName(va).repartition(parts_local).persist(StorageLevel.MEMORY_AND_DISK)
    _ = ds.count()

    trials = CV_TRIALS

    # LightGBM
    for i, ps in enumerate(param_samples("lgbm", trials), 1):
        params = dict(
            labelCol=label,
            featuresCol="features",
            validationIndicatorCol="is_val",
            earlyStoppingRound=EARLY_STOP_ROUNDS,
            seed=RANDOM_SEED,
            objective=("tweedie" if objective == "tweedie" else "regression"),
            metric="l1",
        )
        params.update(ps)
        model = LGBReg(**{k: v for k, v in params.items() if v is not None})

        m, s = cv_fit_eval(
            model,
            ds,
            val,
            label,
            weight,
            run_name=f"{run_prefix}_lgbm_{i}",
        )

        if s["mae_w"] < best_s["mae_w"]:
            best_m, best_s, best_tag = m, s, "lgbm"
        spark.catalog.clearCache()

    # XGBoost
    num_cores   = spark.sparkContext.defaultParallelism
    num_workers = max(2, num_cores // 2)

    for i, ps in enumerate(param_samples("xgb", trials), 1):
        params = dict(
            features_col="features",
            label_col=label,
            validation_indicator_col="is_val",
            early_stopping_rounds=EARLY_STOP_ROUNDS,
            missing=float("nan"),
            num_workers=num_workers,
            tree_method="hist",
            grow_policy="lossguide",
            max_bin=256,
            objective=("reg:tweedie" if objective == "tweedie" else "reg:squarederror"),
            eval_metric="mae",
        )
        params.update(ps)
        model = SparkXGBRegressor(**params)

        m, s = cv_fit_eval(
            model,
            ds,
            val,
            label,
            weight,
            run_name=f"{run_prefix}_xgb_{i}",
        )

        if s["mae_w"] < best_s["mae_w"]:
            best_m, best_s, best_tag = m, s, "xgb"
        spark.catalog.clearCache()

    ds.unpersist()

    return best_tag, best_m, best_s
        
# Evaluate best model
def eval_and_save(tag, label_col, pred_alias):

    global best_model

    model = best_model

    # Score
    test_pred = model.transform(
        dfe_te.select(
            "features",
            F.col(label_col).alias(label_col),
            F.col("model_weight").alias("mw"),
        )
    ).select(
        F.col("prediction").alias(pred_alias),
        F.col(label_col).alias("y"),
        F.col("mw").alias("w"),
    )

    # Metrics vs baseline 
    mets = metrics_sdf(test_pred, "y", pred_alias, "w")
    base = weighted_baseline_metrics(
        test_pred.select(F.col("y").alias("y"), F.col("w").alias("w")),
        "y",
        "w",
    )
    rel_improve = 1.0 - (mets["mae_w"] / max(base["baseline_mae_w"], 1e-9))

    # Save preds & deciles
    test_pred.write.mode("overwrite").parquet(f"{OUT_BASE}/{tag}_test_pred")
    deciles_and_lift(
        test_pred.select(F.col(pred_alias).alias("pred"), F.col("y"), F.col("w")),
        "pred",
        "y",
        "w",
    ).write.mode("overwrite").parquet(f"{OUT_BASE}/{tag}_test_lift")

    # Plot MAE vs baseline
    plt.figure()
    plt.bar(
        ["Baseline MAE_w", "Model MAE_w"],
        [base["baseline_mae_w"], mets["mae_w"]],
    )
    plt.ylabel("Exposure-weighted MAE")
    plt.title(f"{tag.upper()} — Test MAE vs Baseline")
    p1 = f"{IMG_BASE}/{tag}_mae_vs_baseline.png"
    plt.savefig(p1, bbox_inches="tight")
    plt.close()

    # Plot deciles calibration
    lift_df = spark.read.parquet(f"{OUT_BASE}/{tag}_test_lift").orderBy("decile").toPandas()
    p2 = f"{IMG_BASE}/{tag}_deciles.png"
    plt.figure()
    plt.plot(lift_df["decile"], lift_df["actual"], label="Actual")
    plt.plot(lift_df["decile"], lift_df["pred"], label="Predicted")
    plt.xlabel("Decile")
    plt.ylabel(label_col)
    plt.title(f"{tag.upper()} — Decile Calibration")
    plt.legend()
    plt.savefig(p2, bbox_inches="tight")
    plt.close()

    return dict(mets=mets, base=base, rel_improve=rel_improve, plots=[p1, p2] + seg_paths)
#Calculate deciles and lift
def deciles_and_lift(sdf, pred_col, label_col, w_col):
    qs = [i/10.0 for i in range(1,10)]
    quantiles = sdf.approxQuantile(pred_col, qs, 1e-3)
    bounds = [-float("inf")] + quantiles + [float("inf")]
    expr = None
    for i in range(10):
        low, high = bounds[i], bounds[i+1]
        cond = (F.col(pred_col) > F.lit(low)) & (F.col(pred_col) <= F.lit(high))
        expr = cond.cast("int")*F.lit(i+1) if expr is None else F.when(cond, i+1).otherwise(expr)
    scored = sdf.withColumn("decile", expr)
    w = F.col(w_col)
    agg = (scored.groupBy("decile")
           .agg(F.sum(w).alias("w"),
                F.sum(F.col(label_col)*w).alias("wy"),
                F.sum(F.col(pred_col)*w).alias("wyp")))
    return agg.select("decile",
                      (F.col("wy")/F.col("w")).alias("actual"),
                      (F.col("wyp")/F.col("w")).alias("pred"),
                      "w").orderBy("decile")

## 2) Load Data

In [0]:
# Load table
df = spark.table(SOURCE_TABLE)

# Shuttle partition optimziation
row_est = df.count()
parts = max(32, min(200, int(math.ceil(row_est / 300_000.0) * 64)))
spark.conf.set("spark.sql.shuffle.partitions", parts)
print(f"[Config] rows={row_est:,} | shuffle.partitions={parts}")

# Valid exposure rows
df = df.where((F.col(EXPOSURE_COL) > 0) & (~F.isnan(F.col(EXPOSURE_COL))) & F.col(EXPOSURE_COL).isNotNull())

# Derived targets
df = df.withColumn("offset_log_exposure", F.log(F.col(EXPOSURE_COL)))

# Parse date cols
date_cols = [c for c,t in df.dtypes if t in ("date","timestamp")]

for dc in date_cols: 
    df = add_date_features(df, dc)

df = df.drop(*date_cols)

# Encode bool cols as string (1 and null values only, no 0)
bool_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, BooleanType)]

yn_true = {"Y","1","true","True"} 

for c in bool_cols:
    df = df.withColumn(
        c,
        F.when(F.col(c).isNull(), F.lit("__MISSING__"))
         .when(F.lower(F.col(c)).isin(*yn_true), F.lit("True"))
         .otherwise(F.lit("False")))

# Define exposure length-based weight
df = df.withColumn("model_weight", F.col(EXPOSURE_COL))

[Config] rows=522,026 | shuffle.partitions=128


## 3) State-Aware Random Split

In [0]:
# State-aware random train/test/val split
train_df, test_df, val_df = random_split_by_state(df, state_col=STATE_COL, seed=RANDOM_SEED)

## 4) Preprocessing (Feature Encoding, Vector Assembly, Materialization)

In [0]:
# Exclusion cols (IDs, targets)
EXCLUDE = {
    ID_COL,
    STATE_COL,
    EXPOSURE_COL,
    LOSS_RATIO_TARGET,
    LOSS_COL,
    "offset_log_exposure",
    "model_weight",
    "state_weight",
    "EmailAddress"
    "tier",
    "credit_score",
    "tenure_years",
    "renewal_month"
}

# Identify col datatype
numeric_cols = [
    f.name for f in train_df.schema.fields
    if isinstance(f.dataType, NumericType) and f.name not in EXCLUDE ]

string_cols = [
    f.name for f in train_df.schema.fields
    if isinstance(f.dataType, StringType) and f.name not in EXCLUDE ]

# Identify low-card vs high-card categoricals
row_counts = train_df.select([F.approx_count_distinct(c) for c in string_cols]).collect()[0]
card_row = {c: row_counts[i] for i, c in enumerate(string_cols)}

LOW_CARD_MAX = 40  # threshold for OHE

cat_low  = [c for c in string_cols if card_row[c] <= LOW_CARD_MAX] 
cat_high = [c for c in string_cols if card_row[c] > LOW_CARD_MAX]

# Frequency-encode high-card categoricals
train_n = train_df.count()
high_encoded_cols = []

cnt_maps = {}
for c in cat_high:
    cnt_col = f"{c}_cnt"
    cnt_maps[c] = (
        train_df.groupBy(c).agg(F.count("*").alias(cnt_col)),
        cnt_col
    )

dfe_tr_enc = apply_freq_encoding(train_df) 
dfe_te_enc = apply_freq_encoding(test_df) 
dfe_va_enc = apply_freq_encoding(val_df)

# StringIndexer + OneHotEncoder for Llow-card categoricals
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in cat_low
]

encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_oh")
    for c in cat_low
]

ohe_cols = [f"{c}_oh" for c in cat_low]

# Assemble feature vector and define pipeline
feature_cols = numeric_cols + high_encoded_cols + ohe_cols

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="keep"
)

pipe = Pipeline(stages=indexers + encoders + [assembler])

# Fit preprocessing model
prep_model = pipe.fit(dfe_tr_enc)

# Transform train/test/val splits
dfe_tr = prep_model.transform(dfe_tr_enc)
dfe_te = prep_model.transform(dfe_te_enc) 
dfe_va = prep_model.transform(dfe_va_enc)



## 5) Log Baselines

In [0]:
mlflow.end_run()
# Calculate and log baseline metrics for each split
splits = {
    "train": dfe_tr.select(LOSS_RATIO_TARGET, "model_weight"),
    "test":  dfe_te.select(LOSS_RATIO_TARGET, "model_weight"),
    "val":   dfe_va.select(LOSS_RATIO_TARGET, "model_weight"),
}

with mlflow.start_run(run_name="capstone_baselines"):

    for split_name, sdf in splits.items():
            base = weighted_baseline_metrics(
                sdf.where(F.col(LOSS_RATIO_TARGET).isNotNull()),
                LOSS_RATIO_TARGET,
                "model_weight",
            )
            for k, v in base.items():
                mlflow.log_metric(f"{split_name}_loss_ratio_{k}", float(v))

## 6) Fit Fixed Regression Model for Feature Importances

In [0]:
# Define feature selection splits
train_fs = dfe_tr.where(F.col(LOSS_RATIO_TARGET).isNotNull())
val_fs   = dfe_va.where(F.col(LOSS_RATIO_TARGET).isNotNull())

# Map feature index -> name
index_to_name = get_feature_index_map(train_fs, features_col="features")

with mlflow.start_run(run_name="fs_fixed_lgb_loss_ratio"):
    # Train fixed LightGBM model for feature importances
    lgb_fs_params = dict(
        featuresCol="features",
        labelCol=LOSS_RATIO_TARGET,
        objective="regression",
        numIterations=300,
        learningRate=0.05,
        numLeaves=31,
        maxDepth=-1,
        baggingFraction=0.8,
        featureFraction=0.8,
        seed=RANDOM_SEED,
    )

    # Log FS model hyperparams
    for p, v in lgb_fs_params.items():
        mlflow.log_param(f"fs_{p}", v)
    mlflow.log_param("fs_target", LOSS_RATIO_TARGET)

    # Train
    fs_model = LGBReg(**lgb_fs_params).fit(train_fs)
    
    # Score
    val_scored_fs = fs_model.transform(
        val_fs.select(
            "features",
            F.col(LOSS_RATIO_TARGET).alias("y"),
            F.col("model_weight").alias("w"),
        )
    ).select(
        F.col("prediction").alias("yhat"),
        F.col("y"),
        F.col("w"),
    )
    # Metrics vs baseline
    val_mets_fs = metrics_sdf(
                    sdf = val_scored_fs,
                    label = "y",
                    pred = "yhat",
                    w="w",
                )

    mlflow.log_metric("fs_fixed_val_mae",  float(val_mets_fs["mae_w"]))
    mlflow.log_metric("fs_fixed_val_mse",  float(val_mets_fs["mse_w"]))
    mlflow.log_metric("fs_fixed_val_rmse", float(val_mets_fs.get("rmse_w", 0.0)))

    print("[FS] Fixed model (all features) – val metrics:")
    print("      mae          =", val_mets_fs["mae_w"])
    print("      mse          =", val_mets_fs["mse_w"])
    print("      rmse          =", val_mets_fs["rmse_w"])

    k_results = []
    k_results.append(("ALL", val_mets_fs["mae_w"], None))

    # Build feature importance table at index + base_feature level
    importances = fs_model.getFeatureImportances()

    fi_idx = []
    for idx, imp in enumerate(importances):
        if imp is None:
            continue
        name = index_to_name.get(idx, f"idx_{idx}")
        fi_idx.append((idx, name, float(imp)))

    fi_idx_df = spark.createDataFrame(fi_idx, ["idx", "feature_name", "importance"])

    # Normalized importance over all dims
    total_imp = fi_idx_df.agg(F.sum("importance")).first()[0]
    fi_idx_df = fi_idx_df.withColumn(
        "importance_norm",
        F.col("importance") / F.lit(total_imp)
    )

    # Collapse OHE levels to base_feature
    fi_idx_df = fi_idx_df.withColumn(
        "base_feature",
        strip_ohe_suffix_udf(F.col("feature_name"))
    )

    fi_base = (
        fi_idx_df
        .groupBy("base_feature")
        .agg(
            F.sum("importance_norm").alias("importance_sum"),
            F.avg("importance_norm").alias("importance_mean"),
            F.countDistinct("idx").alias("n_indices"),
        )
        .orderBy(F.col("importance_sum").desc())
    )

    mlflow.log_metric("fs_n_dims", fi_idx_df.count())
    mlflow.log_metric("fs_n_base_features", fi_base.count())

[FS] Fixed model (all features) – val metrics:
      mae          = 0.08394603081957387
      mse          = 0.08013010663960451
      rmse          = 0.28307261725501554


## 7) Select Top K Features for Regression

In [0]:
# Evaluate candidate top-K sets
candidate_ks = [50, 75, 150, 300]
fs_by_k      = {}

for k in candidate_ks:
    print(f"Evaluating with Top {k} Features")

    # Top-K base features by importance_sum
    top_k_base = (
        fi_base.orderBy(F.col("importance_sum").desc())
                .limit(k)
                .select("base_feature")
                .rdd.flatMap(lambda r: r)
                .collect()
    )

    # All indices where base_feature is in that top-K set
    indices_for_k = (
        fi_idx_df
        .where(F.col("base_feature").isin(top_k_base))
        .select("idx")
        .rdd.flatMap(lambda r: r)
        .collect()
    )

    indices_for_k = sorted(set(indices_for_k))
    fs_by_k[k] = indices_for_k

    print(f"K={k}: {len(top_k_base)} base features → {len(indices_for_k)} vector indices")

    mlflow.log_metric(f"fs_topk_{k}_n_base_features", float(len(top_k_base)))
    mlflow.log_metric(f"fs_topk_{k}_n_indices",       float(len(indices_for_k)))

    # Slice features for train/val for this K
    slicer_k = VectorSlicer(
        inputCol="features",
        outputCol="features_fs",
        indices=indices_for_k,
    )

    tr_k = slicer_k.transform(train_fs).drop("features").withColumnRenamed("features_fs", "features")
    va_k = slicer_k.transform(val_fs).drop("features").withColumnRenamed("features_fs", "features")

    # Fixed LightGBM model for scoring with K features
    print(f"Training LightGBM model for K={k}")
    lr_k = LGBReg(
        featuresCol="features",
        labelCol=LOSS_RATIO_TARGET,
        weightCol="model_weight",
        objective="regression",
        numIterations=150,
        learningRate=0.05,
        numLeaves=31,
        maxDepth=-1,
        baggingFraction=0.8,
        featureFraction=0.8,
        seed=RANDOM_SEED,
    ).fit(tr_k)

    va_scored_k = lr_k.transform(
        va_k.select(
            "features",
            F.col(LOSS_RATIO_TARGET).alias("y"),
            F.col("model_weight").alias("w")
            ) 
        ).select(
            F.col("prediction").alias("yhat"),
            F.col("y"),
            F.col("w"),
        )


    mets_k = metrics_sdf(
        sdf = va_scored_k,
        label = "y",
        pred = "yhat",
        w = "w",
    )

    print(f"K={k} → val_mae={mets_k['mae_w']:.6f}")
    mlflow.log_metric(f"fs_topk_{k}_val_mae", float(mets_k["mae_w"]))
    if "rmse_w" in mets_k:
        mlflow.log_metric(f"fs_topk_{k}_val_rmse", float(mets_k["rmse_w"]))

    k_results.append((k, mets_k["mae_w"], indices_for_k))

# Choose best K by val_mae_w, slice final features on dfe_tr/va/te
k_results_sorted = sorted(k_results, key=lambda t: t[1])
best_K, best_mae, best_indices = k_results_sorted[0]

print(f"Best K={best_K} with val_mae={best_mae:.6f}")
mlflow.log_param("fs_best_K", int(best_K))
mlflow.log_metric("fs_best_K_val_mae", float(best_mae))
mlflow.log_metric("fs_best_K_n_indices", float(len(best_indices)))



Evaluating with Top 50 Features
K=50: 50 base features → 323 vector indices
Training LightGBM model for K=50
K=50 → val_mae=0.081251
Evaluating with Top 75 Features
K=75: 75 base features → 403 vector indices
Training LightGBM model for K=75
K=75 → val_mae=0.081219
Evaluating with Top 150 Features
K=150: 150 base features → 631 vector indices
Training LightGBM model for K=150
K=150 → val_mae=0.081142
Evaluating with Top 300 Features
K=300: 300 base features → 954 vector indices
Training LightGBM model for K=300
K=300 → val_mae=0.081207
Best K=150 with val_mae=0.081142


## 8) CV & Best Regression Model Selection

In [0]:
# CV to tune hyperparams and select best model

# Final slicing for full train/val/test
final_slicer = VectorSlicer(
    inputCol="features",
    outputCol="features_fs",
    indices=best_indices,
)

dfe_tr_r = final_slicer.transform(dfe_tr).drop("features").withColumnRenamed("features_fs", "features")
dfe_va_r = final_slicer.transform(dfe_va).drop("features").withColumnRenamed("features_fs", "features")
dfe_te_r = final_slicer.transform(dfe_te).drop("features").withColumnRenamed("features_fs", "features")

tr_sub = dfe_tr_r.where(F.col(LOSS_RATIO_TARGET).isNotNull())
va_sub = dfe_va_r.where(F.col(LOSS_RATIO_TARGET).isNotNull())

tr = tr_sub.select(
    "features",
    F.col(LOSS_RATIO_TARGET).alias("label"),
    F.col("model_weight").alias("weight"),
)
va = va_sub.select(
    "features",
    F.col(LOSS_RATIO_TARGET).alias("label"),
    F.col("model_weight").alias("weight"),
)

# Fit cv
lib_tag, model, score = best_of_trials(
    tr,
    va,
    "label",
    "weight",
    run_prefix=f"lr_cv",
    objective=("squared"),
)

best_model = model



## 9) Register Best Model

In [0]:
with mlflow.start_run(run_name="register best model"):
    log_model(
        spark_model=best_model,
        artifact_path="capstone_model_final",
        registered_model_name="capstone_model_final"
    )

2025/11/17 17:50:46 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
Successfully registered model 'capstone_model_final'.
2025/11/17 17:51:31 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: capstone_model_final, version 1
Created version '1' of model 'capstone_model_final'.


## 10) Generate Reports

In [0]:
reports = {}

reports["lr"] = eval_and_save("lr", LOSS_RATIO_TARGET, "lr_pred")

print("Artifacts under:", DBFS_FILES_BASE)
print("\n[LR] Test regression metrics:", reports["lr"]["mets"])



Artifacts under: /FileStore/TB/Capstone_Project/img

[LR] Test regression metrics: {'mae_w': 0.08393868865272433, 'mse_w': 0.080602327222287, 'rmse_w': 0.2839054899474242}
