#  COST 231 MWM (OLS / Ridge / Lasso / ElasticNet) 

#### Imports — core utils, data wrangling, ML, and stats

In [8]:
# Data handling
import numpy as np
import pandas as pd
from IPython.display import display

# Core + utils
from math import sqrt
import re
import os

# ML (scikit-learn)
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.metrics import mean_squared_error, r2_score

# Parallel
from joblib import Parallel, delayed, dump

# Preventing BLAS oversubscription
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["NUMEXPR_MAX_THREADS"] = "1"

# Parallelism settings
N_JOBS = -1  # Use all available cores

#### Data paths, loading, and preparation

In [9]:
# Paths (aligned with earlier prep)
SAVE_DIR   = 'Data+Files+Plots+etc'
TRAIN_CSV  = f'{SAVE_DIR}/train.csv'
TEST_CSV   = f'{SAVE_DIR}/test.csv'
FOLDS_NPY  = f'{SAVE_DIR}/train_folds.npy'

# Load splits
df_train = pd.read_csv(TRAIN_CSV)
df_test  = pd.read_csv(TEST_CSV)
fold_assignments = np.load(FOLDS_NPY)

# Feature/target setup (COST231-MWM baseline)
raw_feats  = ['distance','frequency','c_walls','w_walls']
target_col = 'PL'

# Train/test matrices
Xtr_raw = df_train[raw_feats].copy()
ytr_pl  = df_train[target_col].astype(float).values
Xte_raw = df_test[raw_feats].copy()
yte_pl  = df_test[target_col].astype(float).values

def slug(obj):
    """Filename-safe tag for model/config (kept readable)."""
    if isinstance(obj, dict) and obj:
        items = []
        for k in sorted(obj.keys()):
            v = obj[k]
            if isinstance(v, (float, np.floating)): v = float(v)
            items.append(f"{k}={v}")
        s = "__".join(items)
    else:
        s = str(obj) if obj not in (None, {}, []) else ""
    return re.sub(r"[^A-Za-z0-9._=-]+", "_", s).strip("_")

# Required columns check 
required_cols = ['PL', 'device_id', 'distance','frequency','c_walls','w_walls']
missing = [c for c in required_cols if c not in df_train.columns or c not in df_test.columns]
if missing:
    raise ValueError(f"Missing required columns in train/test: {missing}")

if len(fold_assignments) != len(df_train):
    raise ValueError(f"fold_assignments length {len(fold_assignments)} != df_train rows {len(df_train)}")

#### Physics-consistent linearization + helpers + model specs

In [10]:
# Linearize (Friis-adjusted): y_adj = PL - 20*log10(f)
d0 = 1.0
def z_of_d(d): 
    return 10.0*np.log10(np.clip(d.astype(float), 1e-6, None)/d0)

def f_term(f):
    return 20.0*np.log10(np.clip(f.astype(float), 1e-12, None))

# Adjusted targets
ftr_tr, ftr_te = f_term(Xtr_raw['frequency'].values), f_term(Xte_raw['frequency'].values)
ytr_adj, yte_adj = ytr_pl - ftr_tr, yte_pl - ftr_te

# Linear feature maps (COST231-MWM baseline: distance + wall counts)
cols = ['z_d','c_walls','w_walls']
Xtr_lin = pd.DataFrame({
    'z_d': z_of_d(Xtr_raw['distance'].values),
    'c_walls': Xtr_raw['c_walls'].values,
    'w_walls': Xtr_raw['w_walls'].values
}, columns=cols).values.astype(float)

Xte_lin = pd.DataFrame({
    'z_d': z_of_d(Xte_raw['distance'].values),
    'c_walls': Xte_raw['c_walls'].values,
    'w_walls': Xte_raw['w_walls'].values
}, columns=cols).values.astype(float)

# Param labels (for reporting)
param_names = [
    'PL(d0) [dB]', 'Path loss exponent (n)',
    'Brick Wall Loss (L_c) [dB]', 'Wood Wall Loss (L_w) [dB]'
]

# Helpers
def unscale_coefficients(pipeline):
    """Undo StandardScaler effect → coeffs in original units."""
    steps = pipeline.named_steps
    est = steps.get('ridge') or steps.get('lasso') or steps.get('elasticnet') or steps.get('linearregression')
    if 'standardscaler' not in steps:
        return float(est.intercept_), est.coef_.astype(float).copy()
    scaler = steps['standardscaler']
    beta_scaled = est.coef_.astype(float)
    mu, sig = scaler.mean_, scaler.scale_
    beta_orig = beta_scaled / sig
    intercept_orig = float(est.intercept_ - np.sum(beta_scaled * mu / sig))
    return intercept_orig, beta_orig

def fold_indices(folds, k):
    val_idx = np.where(folds == k)[0]
    tr_idx  = np.where(folds != k)[0]
    return tr_idx, val_idx

def rmse_r2_on_PL(y_true_pl, y_pred_adj, fterm):
    """Score in PL-domain (add back freq term)."""
    y_pred_pl = y_pred_adj + fterm
    rmse = sqrt(mean_squared_error(y_true_pl, y_pred_pl))
    r2   = r2_score(y_true_pl, y_pred_pl)
    return rmse, r2

# Fold list (ignore negative folds if present)
unique_folds = sorted([int(k) for k in np.unique(fold_assignments) if int(k) >= 0])
folds = [fold_indices(fold_assignments, k) for k in unique_folds]

# Model factories
def make_OLS(_): return make_pipeline(LinearRegression())
def make_Ridge(cfg): return make_pipeline(StandardScaler(), Ridge(alpha=cfg["alpha"], random_state=42))
def make_Lasso(cfg): return make_pipeline(StandardScaler(), Lasso(alpha=cfg["alpha"], max_iter=20000, random_state=42))
def make_ElasticNet(cfg): return make_pipeline(
    StandardScaler(),
    ElasticNet(alpha=cfg["alpha"], l1_ratio=cfg["l1_ratio"], max_iter=20000, random_state=42)
)

# Grids
ridge_grid = [dict(alpha=a) for a in np.logspace(-4, 3, 15)]
lasso_grid = [dict(alpha=a) for a in np.logspace(-4, 1, 15)]
enet_grid  = [dict(alpha=a, l1_ratio=r) for a in np.logspace(-4, 1, 10) for r in (0.2, 0.5, 0.8)]

# Spec list
specs = [
    ("OLS",        make_OLS,        [dict()]),
    ("Ridge",      make_Ridge,      ridge_grid),
    ("Lasso",      make_Lasso,      lasso_grid),
    ("ElasticNet", make_ElasticNet, enet_grid),
]

#### Time-aware K-fold CV on the train split to pick the best config per model variant, then refit on full train subset

In [11]:
def eval_cfg(factory, cfg):
    rmses, r2s = [], []
    for tr_idx, val_idx in folds:
        pipe = factory(cfg)
        pipe.fit(Xtr_lin[tr_idx], ytr_adj[tr_idx])
        pred_adj = pipe.predict(Xtr_lin[val_idx])
        rmse, r2 = rmse_r2_on_PL(ytr_pl[val_idx], pred_adj, ftr_tr[val_idx])
        rmses.append(rmse); r2s.append(r2)
    return {
        "cfg": cfg,
        "rmse_val_mean": float(np.mean(rmses)) if rmses else np.nan,
        "rmse_val_sd":   float(np.std(rmses, ddof=1)) if len(rmses) > 1 else 0.0,
        "r2_val_mean":   float(np.mean(r2s)) if r2s else np.nan,
    }

results = []

for name, factory, grid in specs:
    # Evaluate all configs (parallel over configs)
    evals = Parallel(n_jobs=N_JOBS, prefer="processes")(
        delayed(eval_cfg)(factory, cfg) for cfg in grid
    )

    # Pick best config (mean RMSE, then sd, then higher R2)
    evals_sorted = sorted(evals, key=lambda e: (e["rmse_val_mean"], e["rmse_val_sd"], -e["r2_val_mean"]))
    best = evals_sorted[0]
    best_cfg = best["cfg"]

    # Fit final on full TRAIN
    final_pipe = factory(best_cfg)
    final_pipe.fit(Xtr_lin, ytr_adj)

    # Test score
    yte_pred_adj = final_pipe.predict(Xte_lin)
    rmse_te, r2_te = rmse_r2_on_PL(yte_pl, yte_pred_adj, ftr_te)

    # Coeffs (original units, adjusted-domain model)
    b0, b = unscale_coefficients(final_pipe)
    coef_vec = np.concatenate([[b0], b]).astype(float)
    coeffs = pd.Series(coef_vec, index=param_names)

    model_tag = name if not best_cfg else f"{name}__{slug(best_cfg)}"

    results.append({
        "model": name,
        "model_tag": model_tag,
        "best_cfg": best_cfg,
        "cv": {
            "rmse_val_mean": best["rmse_val_mean"],
            "rmse_val_sd":   best["rmse_val_sd"],
            "r2_val_mean":   best["r2_val_mean"],
        },
        "test": {"rmse": float(rmse_te), "r2": float(r2_te)},
        "final_pipe": final_pipe,
        "coeffs": coeffs
    })

def mwm_short_tag(name: str) -> str:
    return f"MWM_{name}"

# CV-only summary table (TRAIN split only)
rows = []
for r in results:
    cfg = r.get("best_cfg", {}) if isinstance(r.get("best_cfg", {}), dict) else {"cfg": str(r.get("best_cfg"))}
    rows.append({
        "model":            mwm_short_tag(r["model"]),
        "is_best":          False,  # set below after selecting best_overall
        "cv_rmse_val_mean": r["cv"]["rmse_val_mean"],
        "cv_rmse_val_sd":   r["cv"]["rmse_val_sd"],
        "cv_r2_val_mean":   r["cv"]["r2_val_mean"],
        "alpha":            cfg.get("alpha", np.nan),
        "l1_ratio":         cfg.get("l1_ratio", np.nan),
    })

mwm_cv_table = (pd.DataFrame(rows)
                .sort_values(["cv_rmse_val_mean", "cv_rmse_val_sd", "model"])
                .reset_index(drop=True))

# Mark best overall (same logic you use later)
best_overall = min(results, key=lambda r: (r["cv"]["rmse_val_mean"], r["cv"]["rmse_val_sd"], -r["cv"]["r2_val_mean"]))
best_long_tag = best_overall["model_tag"]
mwm_cv_table["is_best"] = mwm_cv_table["model"].eq(mwm_short_tag(best_overall["model"]))

display(mwm_cv_table)

Unnamed: 0,model,is_best,cv_rmse_val_mean,cv_rmse_val_sd,cv_r2_val_mean,alpha,l1_ratio
0,MWM_ElasticNet,True,10.965104,1.143518,0.65418,0.059948,0.5
1,MWM_Ridge,False,10.97348,1.175072,0.653303,1000.0,
2,MWM_Lasso,False,10.973923,1.175701,0.653266,0.002683,
3,MWM_OLS,False,10.973923,1.175962,0.653265,,


#### Fitting CV summary (mean ± sd across folds), best model highlighted, then OOF residuals for BEST are saved.

In [None]:
best_name     = best_overall["model"]      # OLS / Ridge / Lasso / ElasticNet
best_cfg      = best_overall["best_cfg"]
best_long_tag = best_overall["model_tag"]

best_tag_short = mwm_short_tag(best_name)

# BEST residuals on TEST (refit on full TRAIN already stored in best_overall["final_pipe"])
best_pipe = best_overall["final_pipe"]
yte_pred_adj = best_pipe.predict(Xte_lin)

PL_pred_test = yte_pred_adj + ftr_te
resid_test   = yte_pl - PL_pred_test

mwm_test_df = pd.DataFrame({
    "model":       "COST231_MWM_BEST",
    "variant":     best_tag_short,
    "split":       "test",
    "row_id":      np.arange(len(df_test), dtype=int),
    "time":        df_test.get("time", pd.Series(index=df_test.index, dtype=float)).values,
    "device_id":   df_test["device_id"].values,
    "distance":    df_test["distance"].values,
    "frequency":   df_test["frequency"].values,
    "c_walls":     df_test["c_walls"].values,
    "w_walls":     df_test["w_walls"].values,
    "PL_true":     yte_pl,
    "PL_pred":     PL_pred_test,
    "resid_db":    resid_test
})

test_path = f"{SAVE_DIR}/residuals__COST231__BEST__test.csv"
mwm_test_df.to_csv(test_path, index=False)
print(f"\n[TEST] Saved best COST231_MWM test residuals: {test_path}")

# OOF residuals for BEST (train-only)
factory_for_best = next(f for (n, f, g) in specs if n == best_name)

y_pred_adj_oof = np.full(len(ytr_adj), np.nan, dtype=float)
for tr_idx, val_idx in folds:
    pipe = factory_for_best(best_cfg)
    pipe.fit(Xtr_lin[tr_idx], ytr_adj[tr_idx])
    y_pred_adj_oof[val_idx] = pipe.predict(Xtr_lin[val_idx])

mask = ~np.isnan(y_pred_adj_oof)

PL_pred_oof = y_pred_adj_oof[mask] + ftr_tr[mask]
resid_oof   = ytr_pl[mask] - PL_pred_oof

mwm_oof_df = pd.DataFrame({
    "model":       "COST231_MWM_BEST",
    "variant":     best_tag_short,
    "split":       "oof",
    "row_id":      np.arange(len(df_train), dtype=int)[mask],
    "fold":        fold_assignments.astype(int)[mask],
    "time":        df_train.get("time", pd.Series(index=df_train.index, dtype=float)).values[mask],
    "device_id":   df_train["device_id"].values[mask],
    "distance":    df_train["distance"].values[mask],
    "frequency":   df_train["frequency"].values[mask],
    "c_walls":     df_train["c_walls"].values[mask],
    "w_walls":     df_train["w_walls"].values[mask],
    "PL_true":     ytr_pl[mask],
    "PL_pred":     PL_pred_oof,
    "resid_db":    resid_oof
})

oof_path = f"{SAVE_DIR}/residuals__COST231__BEST__oof.csv"
mwm_oof_df.to_csv(oof_path, index=False)
print(f"\n[OOF] Saved best COST231 OOF residuals: {oof_path}")


[TEST] Saved best COST231_MWM test residuals: Data+Files+Plots+etc/residuals__COST231_MWM__BEST__test.csv

[OOF] Saved best COST231_MWM OOF residuals: Data+Files+Plots+etc/residuals__COST231_MWM__BEST__oof.csv


#### Final held-out evaluation on the 20% test split for the CV-selected model

In [13]:
# Final test evaluation (held-out 20%): selected model only
test_rows = []
for res in results:
    te = res["test"]
    test_rows.append({
        "Model": res["model"],
        "Test RMSE": float(te["rmse"]),
        "Test R2":   float(te["r2"]),
    })

# Frame + display
test_df = pd.DataFrame(test_rows)
display(test_df)

Unnamed: 0,Model,Test RMSE,Test R2
0,OLS,12.070405,0.589023
1,Ridge,12.070322,0.589029
2,Lasso,12.069744,0.589068
3,ElasticNet,12.061215,0.589648


#### Coefficient table (all models, final fits, original units)

In [14]:
# Collect coeffs side by side
coef_df = pd.concat([res['coeffs'] for res in results], axis=1)
coef_df.columns = [res['model'] for res in results]

print("\n Harmonized Coefficients (final all-train fits, original units) ")
display(coef_df)


 Harmonized Coefficients (final all-train fits, original units) 


Unnamed: 0,OLS,Ridge,Lasso,ElasticNet
PL(d0) [dB],-32.747465,-32.669933,-32.759432,-29.604407
Path loss exponent (n),4.22721,4.219631,4.229175,3.93589
Brick Wall Loss (L_c) [dB],7.800822,7.805277,7.793905,7.858416
Wood Wall Loss (L_w) [dB],1.694792,1.703944,1.690098,1.999144
