<div style="padding-top: 10px;  padding-bottom: 10px;">
  <img src="https://insightfactoryai.sharepoint.com/:i:/r/sites/insightfactory.ai/Shared%20Documents/E.%20Marketing/Company%20Logos%20and%20Style%20Guide/PNG/insightfactory.ai%20logo%20multiline%20reversed.png" alt='insightfactory.ai' width=150   style="display: block; margin: 0 auto" /> 
</div>

# Model Build

***Summary of process:*** This notebook is used to Build the ML model from the engineered features. This results in table with performance of the model and version of the resulting model. It is also preferred to Match your resulting model version with the used pipeline version in the resultant. 

***Input Tables:***  
- 

***Output Table:*** 
- model_config

Note: This is just the overview of the process, For details please review the notebook thoroughly

**Business Rules:** <br/>
\<Describe the Business Rules that are encapsulated in this Enrichment\>

**Dependencies:**<br/>
-

**Ownership:**<br/>
\<Indicate who owns this Enrichment ruleset\>


#### Modification Schedule

| Date | Who | Description |
| ---: | :--- | :--- |
| 2025-09-24 | Yifan Gu  | Initial version. |


#### Insight Factory Notebook Preparation

**(Do not modify/delete the following cell)**

In [0]:
%pip install pandas scikit-learn mlflow-skinny[databricks]
%pip install -U lightgbm
dbutils.library.restartPython()

In [0]:
%run "/InsightFactory/Helpers/ML Build (Unity Catalog) Entry"


# Notebook Start

### Input Parameters

All Notebook Parameters (if any) are contained in the dictionary variable 'params'.  There are two ways to get the individual parameter from params, in both cases the parameter name is case-sensitive:

  1) Use dot-notation - refer to the example below.

      params = { "Name": "Test", "Values": { "Title": "Results", "Results": [ { "Definition": "Core Sample", "Outcome": "Prospective" }, { "Definition": "Follow-up", "Outcome": "For review" } ] } }

      params.Name produces 'Test'<br/>
      params.Values.Title produces 'Results'<br/>
      params.Values.Results[0] produces { "Definition": "Core Sample", "Outcome": "Prospective" }<br/>
      params.Values.Results[1].Definition produces 'Follow-up'

  2) Use the search_dictionary function as follows:  var1 = search_dictionary(params, "parameter-name").  

      There is an optional third parameter to this function: value_to_return_if_not_found -  this is the value to return if the particular parameter is not found in params.<br/>
      **Note** that value_to_return_if_not_found can take on any type (string, int, boolean, struct, ..) e.g search_dictionary(params, "IncorrectlyNamedParameter", False) will return the boolean False if "IncorrectlyNamedParameter" is not found in params.

**CAUTION:** There is another dictionary variable, 'config', that contains all of the configuration sent to this Notebook.  In most cases, you will have no use for 'config' but if you choose to use 'config' in this Notebook, note the following:
- Access the individual parameters within config by using the search_dictionary function e.g. search_dictionary(config, "ParameterName").  Dot-notation access **does not apply** to 'config'.
- Heed this **WARNING** - The individual parameter names within 'config' are subject to change outside of your control which may break your code.
<br/><br/>

### Enrichment Results

Add the code you need to perform your enrichment/extract in cell(s) below until the 'Notebook End' cell.

####Important: 
- Ensure that the result is stored in a PySpark dataframe as
    - 'df_result' e.g. df_result = ...  containing Model Name, Model Version, Performance metrics, Pipeline version (for feature log) and model
    - Ensure that your models are registered under ml_catalog. Always name your model as `f'{ml_catalog}.{delta_schema_name}.{model_name}'`

This will result in model and model config Table in your shared ml_catalog for easy sharing and inference across environments.
<br/><br/>

### Running this Notebook directly in Databricks

This Notebook can be run directly from your Databricks Workspace.  If the Notebook relies on Notebook Parameters, please read the following instructions:
1) Add this line of code to a cell at the top of your Notebook and run that cell.<br/>
   ```dbutils.widgets.text('ParametersJSON', '{ "ModelName":"name","ModelAlias":"alias","ModelVersion":"version","ModelSchema":"DatabaseName for Model", "NotebookParameters": { "param1": "value1", "param2": "value2" } }')```
2) This will add a Parameter to the Notebook.  Simply replace (or remove) the pre-canned parameters, 'param1', 'param2' and their values with your own.
3) When you have finished running this Notebook directly in Databricks, comment out the line of code you added or delete the cell entirely.    

In [0]:
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, average_precision_score, roc_auc_score, f1_score,precision_recall_curve

from sklearn.feature_selection import RFECV
from lightgbm import LGBMClassifier
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window
from sklearn.model_selection import TimeSeriesSplit

In [0]:
df = spark.table("`09ad024f-822f-48e4-9d9e-b5e03c1839a2`.feature_selection.total_training_table")

num_cols = [
    'Wagon_Twist14m','Wagon_BounceFrt','Wagon_BounceRr','Wagon_BodyRockFrt','Wagon_BodyRockRr','Wagon_LP1',
    'Wagon_LP2','Wagon_LP3','Wagon_LP4','Wagon_Speed','Wagon_BrakeCylinder','Wagon_IntrainForce','Wagon_Acc1',
    'Wagon_Acc2','Wagon_Acc3','Wagon_Acc4','Wagon_Twist2m','Wagon_Acc1_RMS','Wagon_Acc2_RMS','Wagon_Acc3_RMS',
    'Wagon_Acc4_RMS','Wagon_Rail_Pro_L','Wagon_Rail_Pro_R','Wagon_SND','Wagon_VACC','Wagon_VACC_L','Wagon_VACC_R',
    'Wagon_Curvature','Wagon_Track_Offset','Wagon_SND_L','Wagon_SND_R','w_row_count','Tng_Tonnage'
]


gcols = ["Tc_BaseCode","Tc_BaseCode_Mapped","Tc_SectionBreakStartKM","Tc_r_date","p_key"]

pdf = df.toPandas()
pdf["Tc_r_date"] = pd.to_datetime(pdf["Tc_r_date"])
pdf["Wagon_RecordingDate"] = pd.to_datetime(pdf["Wagon_RecordingDate"])

pdf = pdf.sort_values(["Tc_r_date"] + gcols[:-1] + ["Wagon_RecordingDate"], kind="mergesort").reset_index(drop=True)

bag_size = pdf.groupby(gcols)["Tc_target"].transform("size")
pos = int(pdf["Tc_target"].sum()); neg = int(len(pdf) - pos)
class_w = neg / max(1,pos)           
pdf["row_w"] = (1.0 / bag_size) * class_w


X_all = (pdf[num_cols]
         .apply(pd.to_numeric, errors="coerce")
         .replace([np.inf,-np.inf], np.nan)
         #.fillna(0.0)
         .astype("double"))
y_all = pdf["Tc_target"].astype(int).to_numpy()

gdf = pdf[gcols + ["Tc_target"]].drop_duplicates()
uniq_dates = np.sort(gdf["Tc_r_date"].unique())
cutoff_date = uniq_dates[int(len(uniq_dates)*0.8) - 1]
train_gmask = gdf["Tc_r_date"] <= cutoff_date
test_gmask  = gdf["Tc_r_date"]  > cutoff_date

train_groups = set(map(tuple, gdf.loc[train_gmask, gcols].values))
test_groups  = set(map(tuple, gdf.loc[test_gmask,  gcols].values))

row_train_mask = pdf[gcols].apply(tuple, axis=1).isin(train_groups)
row_test_mask  = pdf[gcols].apply(tuple, axis=1).isin(test_groups)

X_tr, y_tr = X_all[row_train_mask], y_all[row_train_mask]
w_tr       = pdf.loc[row_train_mask, "row_w"].to_numpy()

X_te, y_te = X_all[row_test_mask],  y_all[row_test_mask]
pdf_tr     = pdf.loc[row_train_mask].reset_index(drop=True)
pdf_te     = pdf.loc[row_test_mask].reset_index(drop=True)


In [0]:

tscv = TimeSeriesSplit(n_splits=5)

dates_tr_g = np.sort(gdf.loc[train_gmask, "Tc_r_date"].unique())

def rows_mask_for_fold(train_idx, valid_idx):
    dtrain = set(dates_tr_g[train_idx])
    dvalid = set(dates_tr_g[valid_idx])
    gtrain = set(map(tuple, gdf.loc[train_gmask & gdf["Tc_r_date"].isin(dtrain), gcols].values))
    gvalid = set(map(tuple, gdf.loc[train_gmask & gdf["Tc_r_date"].isin(dvalid), gcols].values))
    mtrain = pdf_tr[gcols].apply(tuple, axis=1).isin(gtrain)
    mvalid = pdf_tr[gcols].apply(tuple, axis=1).isin(gvalid)
    return mtrain.values, mvalid.values

def aggregate_to_group(df_rows, prob_col="p", method="noisyor", lambda_=0.05):

    age = (df_rows["Tc_r_date"] - df_rows["Wagon_RecordingDate"]).dt.days.clip(lower=0)
    w_time = np.exp(-lambda_*age)
    df = df_rows.copy()
    df["p"] = df[prob_col].values
    df["w_time"] = w_time.values

    def agg_fn(g):
        if method == "mean":
            P = g["p"].mean()
        elif method == "wavg":
            P = np.average(g["p"], weights=g["w_time"])
        elif method == "max":
            P = g["p"].max()
        else:  # noisyor
            P = 1.0 - np.prod(1.0 - g["p"])
        return pd.Series({"P": P, "y": g["Tc_target"].iloc[0]})
    out = df.groupby(gcols).apply(agg_fn).reset_index()
    return out

lgb_base = LGBMClassifier(
    objective="binary",
    n_estimators=2000,
    learning_rate=0.03,
    num_leaves=31,
    min_child_samples=30,
    subsample=0.8, colsample_bytree=0.8,
    reg_lambda=1.0,
    scale_pos_weight=None,           
    feature_pre_filter=False,
    random_state=42, n_jobs=-1
)

ths = []
for tr_i, va_i in tscv.split(dates_tr_g):      
    mtr, mva = rows_mask_for_fold(tr_i, va_i)   

    mdl = LGBMClassifier(**lgb_base.get_params())
    mdl.fit(X_tr[mtr], y_tr[mtr], sample_weight=w_tr[mtr])

    p_va_row = mdl.predict_proba(X_tr[mva])[:,1]
    rows_va = pdf_tr.loc[mva, gcols + ["Wagon_RecordingDate","Tc_target"]].copy()
    rows_va["p_row"] = p_va_row

    agg_va = aggregate_to_group(rows_va.rename(columns={"p_row":"p"}), method="noisyor", lambda_=0.05)

    prec, rec, thr = precision_recall_curve(agg_va["y"].values, agg_va["P"].values)
    f1 = 2*prec*rec/(prec+rec+1e-12)
    ths.append(float(thr[np.nanargmax(f1[:-1])]))

best_thr = float(np.median(ths))
print(f"Chosen threshold from CV (median on groups): {best_thr:.6f}")


In [0]:
final_model = LGBMClassifier(**lgb_base.get_params())
final_model.fit(X_tr, y_tr, sample_weight=w_tr)

p_te_row = final_model.predict_proba(X_te)[:,1]
rows_te = pdf_te[gcols + ["Wagon_RecordingDate","Tc_target"]].copy()
rows_te["p_row"] = p_te_row

agg_te = aggregate_to_group(rows_te.rename(columns={"p_row":"p"}), method="noisyor", lambda_=0.05)
P = agg_te["P"].values
y = agg_te["y"].values

from sklearn.metrics import average_precision_score, f1_score, accuracy_score
from sklearn.metrics import precision_recall_curve
pr_auc = float(average_precision_score(y, P))
y_hat = (P >= best_thr).astype(int)
f1 = float(f1_score(y, y_hat))
acc = float(accuracy_score(y, y_hat))

print("\n=== TEST (group-level) ===")
print("PR-AUC:", pr_auc)
print("F1    :", f1)
print("ACC   :", acc)

In [0]:
################# Update your output data for the model configuration here #################
# ==== imports ====
import json, time
import mlflow, mlflow.lightgbm
from mlflow.models.signature import infer_signature
from mlflow.tracking import MlflowClient
from sklearn.metrics import average_precision_score, roc_auc_score, f1_score, accuracy_score
from pyspark.sql import Row

registered_name = f"{ml_catalog}.{model_schema_name}.{model_name}"

with mlflow.start_run() as run:
    mlflow.log_params({
        "n_estimators": 2000,
        "learning_rate": 0.03,
        "num_leaves": 31,
        "min_child_samples": 30,
        "subsample": 0.8,
        "colsample_bytree": 0.8,
        "reg_lambda": 1.0,
        "scale_pos_weight": None,
        "random_state": 42,
        "n_jobs":-1,
        "feature_pre_filter":False,
        #"selected_feat_cnt": len(selected_feats),
    })

    mlflow.log_metrics({
        "PR_AUC": pr_auc,
        "F1    ": f1,
        "ACC   ": acc
    })

    mlflow.log_dict(
        {#"selected_feats": selected_feats,
          "threshold": float(best_thr)},
        artifact_file="rfe_lightgbm.json"
    )

    ex_X = X_tr.head(5)
    sign = infer_signature(model_input=ex_X, model_output=final_model.predict_proba(ex_X)[:, 1])

    mlflow.lightgbm.log_model(
        final_model,
        artifact_path="model",                    
        signature=sign,
        input_example=ex_X.iloc[:1],
        registered_model_name=registered_name      
    )

    run_id = run.info.run_id
    print(f"[MLflow] run_id = {run_id}")

client = MlflowClient()
model_version = None
for _ in range(10):
    mvs = [m for m in client.search_model_versions(f"name='{registered_name}'") if m.run_id == run_id]
    if mvs:
        model_version = int(mvs[0].version)
        break
    time.sleep(1)

print(f"[Registry] {registered_name} version = {model_version}")

df_result = spark.createDataFrame([
    Row(
        ModelName      = registered_name,                 
        ModelVersion   = model_version,                
        run_id          = run_id,
        test_pr_auc     = pr_auc,
        test_f1         = f1,
        test_acc        = acc,
        threshold       = float(best_thr),
        #selected_feats  = json.dumps(selected_feats),      
    )
])




# Notebook End

**(Do not modify/delete the following cell)**

####Important: 
1) Ensure that the result is stored in a PySpark dataframe as
  - 'df_result' e.g. df_result = ...  containing Model Name, Model Version, Performance metrics, Pipeline version (for feature log)
2) Ensure that your models are registered under ml_catalog. Always name your model as `f'{ml_catalog}.{delta_schema_name}.{model_name}'`

This will result in model and model config Table in your shared ml_catalog for easy sharing and inference across environments.

In [0]:
%run "/InsightFactory/Helpers/ML Build (Unity Catalog) Exit"

# Testing or Debugging Zone