<a href="https://colab.research.google.com/github/mmtondreau/HousingPrices/blob/main/House_Prices_Advanced_Regression_Techniques.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
! pip install pytorch_lightning

Collecting pytorch_lightning
  Downloading pytorch_lightning-2.5.1.post0-py3-none-any.whl.metadata (20 kB)
Collecting torchmetrics>=0.7.0 (from pytorch_lightning)
  Downloading torchmetrics-1.7.3-py3-none-any.whl.metadata (21 kB)
Collecting lightning-utilities>=0.10.0 (from pytorch_lightning)
  Downloading lightning_utilities-0.14.3-py3-none-any.whl.metadata (5.6 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=2.1.0->pytorch_lightning)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=2.1.0->pytorch_lightning)
  Dow

In [2]:
import os
import requests
import pytorch_lightning as pl
from torch.utils.data import random_split, DataLoader
import torch
from torch.utils.data import TensorDataset
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
from torchmetrics import Accuracy, MeanMetric, MeanAbsoluteError
import torch.nn as nn

from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.callbacks import ModelCheckpoint

In [3]:
LOG_Y = True

In [17]:
class HPDataModule(pl.LightningDataModule):
    REMOVE_FEATURES = []
    def __init__(self, data_dir: str = './', batch_size: int = 32):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size

    def prepare_data(self) -> None:
        response = requests.get("https://raw.githubusercontent.com/mmtondreau/HousingPrices/refs/heads/main/train.csv")
        with open(os.path.join(self.data_dir, "train.csv"), "wb") as file:
            file.write(response.content)
        response = requests.get("https://raw.githubusercontent.com/mmtondreau/HousingPrices/refs/heads/main/test.csv")
        with open(os.path.join(self.data_dir, "test.csv"), "wb") as file:
            file.write(response.content)

    def setup(self, stage: str) -> None:
        train_data_df = pd.read_csv(os.path.join(self.data_dir, "train.csv"), index_col="Id")
        test_data_df = pd.read_csv(os.path.join(self.data_dir, "train.csv"), index_col="Id")

        self.test_dataset = self.generate_dataset(test_data_df)
        train_and_val_dataset = self.generate_dataset(train_data_df)

        self.train_dataset, self.val_dataset = train_test_split(train_and_val_dataset, test_size=0.2, random_state=42)

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=4
        )

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=4)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, num_workers=4)

    def generate_dataset(self, df):
        x_data_transformed = self.transform_data(df)
        y_raw = df["SalePrice"].astype(float)
        if LOG_Y:
            y_data = np.log1p(y_raw)
        else:
            y_data = y_raw / 1000.0

        z_data = df.index
        assert len(x_data_transformed) == len(y_data) == len(z_data), "Mismatch in dataset sizes!"
        self.width = x_data_transformed.shape[1]

        return TensorDataset(
            torch.tensor(x_data_transformed.to_numpy(dtype=np.float32, copy=False), dtype=torch.float32),
            torch.tensor(y_data.values, dtype=torch.float32).unsqueeze(1),
            torch.tensor(z_data.values, dtype=torch.int32),
        )

    def normalize_numeric(self, df: pd.DataFrame) -> pd.DataFrame:
        num_cols = df.select_dtypes(include=["number"]).columns  # ints **and** floats
        means = df[num_cols].mean()
        stds  = df[num_cols].std().replace(0, 1)
        df[num_cols] = (df[num_cols] - means) / stds
        return df

    def one_hot(self, df, cols):
        df = pd.get_dummies(df, columns=cols, dummy_na=True, drop_first=True)
        return df

    def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.drop(columns=self.REMOVE_FEATURES, errors="ignore")
        cat_cols = df.select_dtypes(include=["object", "string", "category"]).columns
        df = self.one_hot(df, cat_cols)
        df = self.normalize_numeric(df)

        # add missing-value flags
        for col in ["LotFrontage", "MasVnrArea", "GarageYrBlt"]:
            if col in df.columns:
                df[f"{col}_missing"] = df[col].isna().astype(float)

        # make sure everything is numeric (for torch)
        df = df.apply(pd.to_numeric, errors="coerce")

        # final safety net for any straggling NaNs
        df[["LotFrontage", "MasVnrArea", "GarageYrBlt"]] = df[
            ["LotFrontage", "MasVnrArea", "GarageYrBlt"]
        ].fillna(df[["LotFrontage", "MasVnrArea", "GarageYrBlt"]].median())
        return df.fillna(0.0)

    # -------------------------
    # 2.  NORMALISATION HELPER
    # -------------------------
    def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        z-score **only** the continuous (float-typed) columns.
        One-hot columns remain 0/1 integers and are returned unchanged.
        """
        df_out = df.copy()

        # pick out just the float columns
        float_cols = df.select_dtypes(include=["float", "float64", "float32"]).columns
        if len(float_cols) == 0:  # nothing to do
            return df_out

        means = df_out[float_cols].mean()
        stds = df_out[float_cols].std().replace(0, 1)  # guard against constant cols

        df_out[float_cols] = (df_out[float_cols] - means) / stds
        return df_out

In [18]:
dm = HPDataModule()
dm.prepare_data()
dm.setup(stage="fit")

In [None]:


# x, y, z = next(iter(dm.train_dataloader()))
# print(x[0])
# print(y[0])

# df = dm.df

# print(df[df["ID"] == z[0].item()].to_string())

In [19]:
import matplotlib.pyplot as plt
from pytorch_lightning.callbacks import Callback

class PlotMetricsCallback(Callback):
    def __init__(self):
        self.train_losses = []
        self.val_losses = []
        self.train_maes = []
        self.val_maes = []

    def on_train_epoch_end(self, trainer, pl_module):
        train_loss = trainer.callback_metrics.get("train_loss")
        train_mae = trainer.callback_metrics.get("train_mae")
        if train_loss is not None and train_mae is not None:
            self.train_losses.append(train_loss.item())
            self.train_maes.append(train_mae.item())

    def on_validation_epoch_end(self, trainer, pl_module):
        val_loss = trainer.callback_metrics.get("val_loss")
        val_mae = trainer.callback_metrics.get("val_mae")
        if val_loss is not None and val_mae is not None:
            self.val_losses.append(val_loss.item())
            self.val_maes.append(val_mae.item())

    def on_train_end(self, trainer, pl_module):
        print("[PlotMetricsCallback] Training ended. Preparing to plot...")
        min_len = min(len(self.train_losses), len(self.val_losses), len(self.train_maes), len(self.val_maes))
        print(f"Collected {min_len} epochs of data.")

        if min_len == 0:
            print("[PlotMetricsCallback] No metrics collected. Skipping plot.")
            return

        epochs = range(1, min_len + 1)

        plt.figure(figsize=(12, 5))

        plt.subplot(1, 2, 1)
        plt.plot(epochs, self.train_losses[:min_len], label="Train Loss")
        plt.plot(epochs, self.val_losses[:min_len], label="Val Loss")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.title("Loss over Epochs")
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(epochs, self.train_maes[:min_len], label="Train MAE")
        plt.plot(epochs, self.val_maes[:min_len], label="Val MAE")
        plt.xlabel("Epoch")
        plt.ylabel("MAE")
        plt.title("MAE over Epochs")
        plt.legend()

        plt.tight_layout()
        plot_path = "metrics_plot.png"
        plt.show
        plt.close()
        print(f"[PlotMetricsCallback] Saved plot to {os.path.abspath(plot_path)}")



In [None]:
class Block(nn.Module):
    def __init__(self, input_size, hidden_units, dropout=0.1):
        super(Block, self).__init__()
        self.layer = nn.Linear(input_size, hidden_units)
        self.drop = nn.Dropout(dropout)
        self.batchNorm = nn.BatchNorm1d(hidden_units)
        self.activation = F.relu

    def forward(self, x):
        x = self.layer(x)
        x = self.batchNorm(x)
        x = self.activation(x)
        x = self.drop(x)
        return x


class HPModel(pl.LightningModule):
    def __init__(self, num_features, num_classes, hidden_units):
        super(HPModel, self).__init__()
        self.example_input_array = torch.Tensor(32, num_features)
        all_layers = []
        for hidden_unit in hidden_units:
            all_layers.append(Block(input_size=num_features, hidden_units=hidden_unit))
            num_features = hidden_unit
        all_layers.append(nn.Linear(hidden_units[-1], num_classes))
        self.layers = nn.Sequential(*all_layers)

    def forward(self, x):
        return self.layers(x)

In [None]:
config = {
    "hidden_units": [128, 64, 12],
    "learning_rate": 0.001,
    "batch_size": 64,
}
model = HPLitModule(config, num_features=dm.width)

In [None]:
 trainer = pl.Trainer(
    devices="auto",
    accelerator="auto",
    max_epochs=100,
    callbacks=[
        PlotMetricsCallback(),
        EarlyStopping(
            monitor="ptl/val_loss", mode="min", patience=5, min_delta=0.0001
        ),
        ModelCheckpoint(
            monitor="ptl/val_loss", mode="min", filename="{epoch}-{val_loss:.2f}"
        ),
    ],
)
trainer.fit(model, datamodule=dm)

trainer.test(model, datamodule=dm)

model.eval()

x, y, _ = next(iter(dm.test_dataloader()))
y_hat = model(x)
print(x)
print(torch.flatten(y))
print(torch.flatten(y_hat))

In [None]:
# from PIL import Image
# Image.open("metrics_plot.png")