# Penalised, PCR, and Stepwise Models for HDB Resale Prices

This notebook implements the following modelling families:

Penalised Regression Models
- **LASSO**
- **Ridge**
- **Elastic Net**

Variable Selection
- **Forward Stepwise OLS using AIC**
- **Forward Stepwise OLS using BIC**

Common Settings
- Target variable: `log_resale_price = log(resale_price)`
- Train–Test Split: 80/20
- 5-Fold Cross-Validation for hyperparameter search
- Performance Metrics: **RMSE and MAE**
- Penalised models + PCR use **standardised predictors**
- Stepwise uses a **small interpretable feature set**

In [1]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import Lasso, Ridge, ElasticNet, LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.pipeline import Pipeline

## Load and Prepare Data

In [2]:
DATA_PATH = "../data/HDB_data_2021_sample.xlsx"

df = pd.read_excel(DATA_PATH)

# ensure resale_price exists
df = df.dropna(subset=["resale_price"])

# target variable: log(price)
df["log_resale_price"] = np.log(df["resale_price"])

# predictors for penalised models & PCR
drop_cols_full = ["resale_price", "log_resale_price", "year"]
drop_cols_full = [c for c in drop_cols_full if c in df.columns]

X_full = df.drop(columns=drop_cols_full)
y = df["log_resale_price"].values

feature_names_full = X_full.columns.tolist()

print("Number of observations:", X_full.shape[0])
print("Number of predictors (full):", X_full.shape[1])

Number of observations: 6000
Number of predictors (full): 228


## Train–Test Split & Scaling

In [3]:
# 80/20 split
X_full_train, X_full_test, y_train, y_test = train_test_split(
    X_full, y, test_size=0.2, random_state=42
)

# standardisation (important for penalised models and PCA)
scaler = StandardScaler()
X_full_train_scaled = scaler.fit_transform(X_full_train)
X_full_test_scaled = scaler.transform(X_full_test)

# performance metrics
def rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

def mae(y_true, y_pred):
    return mean_absolute_error(y_true, y_pred)

def r2(y_true, y_pred):
    return r2_score(y_true, y_pred)

## Helper: Penalised Model Runner

In [6]:
from sklearn.model_selection import GridSearchCV

def run_penalised_model(model_name, base_model, param_grid,
                        X_train, y_train, X_test, y_test, feature_names):

    print(f"\n=== {model_name} ===")

    # baseline model (default hyperparameters)
    base_model.fit(X_train, y_train)
    y_pred_base = base_model.predict(X_test)

    baseline_rmse = rmse(y_test, y_pred_base)
    baseline_mae  = mae(y_test, y_pred_base)
    baseline_r2   = r2(y_test, y_pred_base)

    print(f"{model_name} Baseline - Test RMSE: {baseline_rmse:.4f}, "
          f"MAE: {baseline_mae:.4f}, R²: {baseline_r2:.4f}")

    # hyperparameter tuning via CV
    grid = GridSearchCV(
        estimator=base_model,
        param_grid=param_grid,
        scoring="neg_mean_squared_error",  # RMSE-based CV
        cv=5,
        n_jobs=-1
    )
    grid.fit(X_train, y_train)

    best_model = grid.best_estimator_
    y_pred_best = best_model.predict(X_test)

    tuned_rmse = rmse(y_test, y_pred_best)
    tuned_mae  = mae(y_test, y_pred_best)
    tuned_r2   = r2(y_test, y_pred_best)

    print(f"{model_name} Tuned - Best Params: {grid.best_params_}")
    print(f"{model_name} Tuned - Test RMSE: {tuned_rmse:.4f}, "
          f"MAE: {tuned_mae:.4f}, R²: {tuned_r2:.4f}")

    # coefficients / feature importance (for penalised models)
    if hasattr(best_model, "coef_"):
        coefs = pd.Series(best_model.coef_, index=feature_names)

        print(f"\nTop 10 {model_name} Coefficients (by absolute magnitude):")
        print(coefs.abs().sort_values(ascending=False).head(10))
    else:
        print(f"\n{model_name} has no .coef_ attribute (skipping coefficient summary).")

    # return structured output
    return {
        "best_model": best_model,
        "baseline_rmse": baseline_rmse,
        "baseline_mae": baseline_mae,
        "baseline_r2": baseline_r2,
        "tuned_rmse": tuned_rmse,
        "tuned_mae": tuned_mae,
        "tuned_r2": tuned_r2,
        "best_params": grid.best_params_
    }

## Penalised Models: LASSO, Ridge, Elastic Net

In [7]:
results = []  # store final results

# LASSO
lasso_base = Lasso(max_iter=5000, random_state=42)
lasso_grid = {"alpha": [0.0005, 0.001, 0.01, 0.1, 1.0]}

lasso_res = run_penalised_model(
    "LASSO", lasso_base, lasso_grid,
    X_full_train_scaled, y_train,
    X_full_test_scaled, y_test,
    feature_names_full
)

results.append({
    "Model": "LASSO",
    "Test_RMSE": lasso_res["tuned_rmse"],
    "Test_MAE": lasso_res["tuned_mae"],
    "Test_R2": lasso_res["tuned_r2"],
    "Best_Params": lasso_res["best_params"]
})


# Ridge
ridge_base = Ridge()
ridge_grid = {"alpha": [0.1, 1.0, 10.0, 100.0]}

ridge_res = run_penalised_model(
    "Ridge", ridge_base, ridge_grid,
    X_full_train_scaled, y_train,
    X_full_test_scaled, y_test,
    feature_names_full
)

results.append({
    "Model": "Ridge",
    "Test_RMSE": ridge_res["tuned_rmse"],
    "Test_MAE": ridge_res["tuned_mae"],
    "Test_R2": ridge_res["tuned_r2"],
    "Best_Params": ridge_res["best_params"]
})


# Elastic Net
enet_base = ElasticNet(max_iter=5000, random_state=42)
enet_grid = {"alpha": [0.0005, 0.001, 0.01, 0.1, 1.0],
             "l1_ratio": [0.2, 0.5, 0.8]}

enet_res = run_penalised_model(
    "ElasticNet", enet_base, enet_grid,
    X_full_train_scaled, y_train,
    X_full_test_scaled, y_test,
    feature_names_full
)

results.append({
    "Model": "Elastic Net",
    "Test_RMSE": enet_res["tuned_rmse"],
    "Test_MAE": enet_res["tuned_mae"],
    "Test_R2": enet_res["tuned_r2"],
    "Best_Params": enet_res["best_params"]
})


=== LASSO ===
LASSO Baseline - Test RMSE: 0.3236, MAE: 0.2571, R²: -0.0018
LASSO Tuned - Best Params: {'alpha': 0.0005}
LASSO Tuned - Test RMSE: 0.0774, MAE: 0.0599, R²: 0.9426

Top 10 LASSO Coefficients (by absolute magnitude):
floor_area_sqm           0.178885
Remaining_lease          0.154745
Dist_CBD                 0.062814
mature                   0.031116
Dist_nearest_GHawker     0.030710
storey_range_01.TO.03    0.027989
flat_type_3.ROOM         0.027887
max_floor_lvl            0.026193
Dist_nearest_station     0.025844
postal_2digits_44        0.024547
dtype: float64

=== Ridge ===
Ridge Baseline - Test RMSE: 0.0773, MAE: 0.0595, R²: 0.9428
Ridge Tuned - Best Params: {'alpha': 1.0}
Ridge Tuned - Test RMSE: 0.0773, MAE: 0.0595, R²: 0.9428

Top 10 Ridge Coefficients (by absolute magnitude):
floor_area_sqm             0.162985
Remaining_lease            0.153364
Dist_CBD                   0.097727
Dist_nearest_GAI_jc        0.073005
Dist_nearest_university    0.071881
Dist_near

## Stepwise Feature Subset (Interpretable Variables)

In [12]:
X_full_train_scaled_df = pd.DataFrame(
    X_full_train_scaled,
    columns=feature_names_full
)

## Compute AIC/BIC

In [13]:
def compute_ic(y_true, y_pred, k, criterion="AIC"):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    n = y_true.shape[0]

    rss = np.sum((y_true - y_pred)**2)
    sigma2 = rss / n

    loglik = -0.5 * n * (np.log(2 * np.pi * sigma2) + 1)

    if criterion.upper() == "AIC":
        return 2 * k - 2 * loglik
    else:
        return k * np.log(n) - 2 * loglik

## Forward Stepwise (AIC/BIC)

In [14]:
def forward_stepwise_ic(X, y, criteria=("AIC", "BIC"), tol=1e-4, verbose=False):
    # pre-convert for speed
    X_np = X.values
    y_np = y.values if isinstance(y, pd.Series) else y
    feature_names = list(X.columns)
    n_features = len(feature_names)

    results = {}

    for criterion in criteria:

        print(f"\n=== Forward Stepwise Selection using {criterion} ===")
        
        selected = []
        remaining = feature_names.copy()
        best_ic = np.inf

        while remaining:
            ic_candidates = []

            for feat in remaining:
                trial_features = selected + [feat]
                idx = [X.columns.get_loc(f) for f in trial_features]
                X_trial = X_np[:, idx]

                model = LinearRegression()
                model.fit(X_trial, y_np)
                y_pred = model.predict(X_trial)

                k = len(trial_features) + 1
                ic_val = compute_ic(y_np, y_pred, k, criterion)
                r2_val = r2_score(y_np, y_pred)

                ic_candidates.append((ic_val, feat, r2_val))

                if verbose:
                    print(f"{criterion}: {feat:>25s} -> {ic_val:.4f}, R²={r2_val:.4f}")

            ic_candidates.sort(key=lambda x: x[0])
            best_new_ic, best_feat, best_r2 = ic_candidates[0]

            if best_new_ic + tol < best_ic:
                selected.append(best_feat)
                remaining.remove(best_feat)
                best_ic = best_new_ic

                print(f"--> Added {best_feat}, {criterion}={best_new_ic:.4f}, R²={best_r2:.4f}")
            else:
                print(f"No improvement in {criterion}. Stopping.")
                break

        results[criterion] = {
            "selected": selected,
            "best_ic": best_ic
        }

    return results

In [16]:
stepwise_results = forward_stepwise_ic(
    X_full_train_scaled_df,
    y_train,
    criteria=("AIC", "BIC"),
    verbose=False   # True if you want every trial printed
)


=== Forward Stepwise Selection using AIC ===
--> Added floor_area_sqm, AIC=-379.9940, R²=0.4656
--> Added max_floor_lvl, AIC=-3148.6260, R²=0.7000
--> Added Dist_CBD, AIC=-3903.5809, R²=0.7437
--> Added Remaining_lease, AIC=-5969.3479, R²=0.8334
--> Added LRT, AIC=-6607.1883, R²=0.8542
--> Added unique_no_mrt_0.5km, AIC=-6963.2867, R²=0.8647
--> Added flat_model_dbss, AIC=-7339.3979, R²=0.8749
--> Added town_SENGKANG, AIC=-7589.6355, R²=0.8813
--> Added storey_range_01.TO.03, AIC=-7770.2382, R²=0.8858
--> Added mature, AIC=-7933.3352, R²=0.8896
--> Added postal_2digits_44, AIC=-8063.1946, R²=0.8926
--> Added Dist_nearest_station, AIC=-8238.8809, R²=0.8965
--> Added Nearest_A_hospital_ownership_Private, AIC=-8392.0937, R²=0.8998
--> Added Dist_nearest_GHawker, AIC=-8539.5560, R²=0.9029
--> Added flat_model_terrace, AIC=-8668.7121, R²=0.9055
--> Added storey_range_04.TO.06, AIC=-8793.4797, R²=0.9080
--> Added town_BUKIT.TIMAH, AIC=-8916.5435, R²=0.9103
--> Added beach_within_2km, AIC=-9

In [17]:
def evaluate_stepwise_model(X_train, y_train, X_test, y_test, selected):
    Xtr = X_train[selected]
    Xte = X_test[selected]

    model = LinearRegression()
    model.fit(Xtr, y_train)
    y_pred = model.predict(Xte)

    return {
        "RMSE": rmse(y_test, y_pred),
        "MAE": mae(y_test, y_pred),
        "R2": r2(y_test, y_pred),
        "model": model
    }

In [20]:
# after scaling
X_full_train_scaled_df = pd.DataFrame(
    X_full_train_scaled,
    columns=feature_names_full
)

X_full_test_scaled_df = pd.DataFrame(
    X_full_test_scaled,
    columns=feature_names_full
)

# evaluate AIC-selected model
aic_feats = stepwise_results["AIC"]["selected"]

aic_eval = evaluate_stepwise_model(
    X_full_train_scaled_df, y_train,
    X_full_test_scaled_df, y_test,
    aic_feats
)

# evaluate BIC-selected model
bic_feats = stepwise_results["BIC"]["selected"]

bic_eval = evaluate_stepwise_model(
    X_full_train_scaled_df, y_train,
    X_full_test_scaled_df, y_test,
    bic_feats
)

## Summary Table

In [21]:
results_df = pd.DataFrame(results)
results_df.sort_values("Test_RMSE")

Unnamed: 0,Model,Test_RMSE,Test_MAE,Test_R2,Best_Params
2,Elastic Net,0.077115,0.05936,0.943114,"{'alpha': 0.0005, 'l1_ratio': 0.2}"
1,Ridge,0.077322,0.05951,0.942808,{'alpha': 1.0}
0,LASSO,0.077444,0.059862,0.942628,{'alpha': 0.0005}


In [24]:
aic_eval["RMSE"], aic_eval["MAE"], aic_eval["R2"], bic_eval["RMSE"], bic_eval["MAE"], bic_eval["R2"]

(np.float64(0.07732082902614039),
 0.05971598207135164,
 0.9428099443364082,
 np.float64(0.07910787705223886),
 0.06166227947239733,
 0.9401358286432576)

In [28]:
print("AIC selected:", len(stepwise_results["AIC"]["selected"]), "features")
print("AIC-selected features:")
for f in stepwise_results["AIC"]["selected"]:
    print(f)

print("-------------------------------------------------------------------")

print("BIC selected:", len(stepwise_results["BIC"]["selected"]), "features")
print("\nBIC-selected features:")
for f in stepwise_results["BIC"]["selected"]:
    print(f)

AIC selected: 122 features
AIC-selected features:
floor_area_sqm
max_floor_lvl
Dist_CBD
Remaining_lease
LRT
unique_no_mrt_0.5km
flat_model_dbss
town_SENGKANG
storey_range_01.TO.03
mature
postal_2digits_44
Dist_nearest_station
Nearest_A_hospital_ownership_Private
Dist_nearest_GHawker
flat_model_terrace
storey_range_04.TO.06
town_BUKIT.TIMAH
beach_within_2km
unique_no_mrt_1km
postal_2digits_57
flat_type_2.ROOM
storey_range_07.TO.09
postal_2digits_76
EWL
flat_model_new.generation
flat_model_simplified
storey_range_10.TO.12
postal_2digits_79
postal_2digits_39
Dist_nearest_polytechnic
month_Jan
total_dwelling_units
flat_model_premium.apartment
town_GEYLANG
CCL
flat_model_maisonette
flat_type_3.ROOM
storey_range_13.TO.15
storey_range_16.TO.18
ADF_within_1km
flat_model_model.a.maisonette
postal_2digits_50
Dist_nearest_GAI_secondary_school
postal_2digits_47
Dist_nearest_CC
town_KALLANG.WHAMPOA
flat_type_4.ROOM
flat_model_apartment
flat_type_1.ROOM
no_malls_0.5km
flat_model_improved
month_Feb
X