# Regression Evaluation

Notebook for testing regression using the California Housing dataset


# Setup


In [None]:
import numpy as np
import pandas as pd
import scipy
import tqdm

# This is a draft---don't overengineer!
# NO renaming!
# NO refactoring!
# TODO: Remove this when the draft is done.

In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.metrics import mean_squared_error

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

## Config


In [None]:
config = {
    "random_state": 15943,
    "test_size": 0.2,
    "n_splits": 5,
    "scoring": "neg_mean_squared_error",
}

# Data


In [None]:
dataset = fetch_california_housing()

In [None]:
X = pd.DataFrame(dataset["data"], columns=dataset["feature_names"])
X

In [None]:
y = pd.DataFrame(dataset["target"], columns=dataset["target_names"])
y

In [None]:
df = pd.concat([X, y], axis=1)

## EDA


In [None]:
g = sns.PairGrid(df)
g.map_diag(sns.histplot, bins=32)
g.map_offdiag(sns.histplot, bins=32)

# Single-Feature Regression


## Set up


In [None]:
results = {}

### Select data


In [None]:
X_var = "MedInc"
y_var = "MedHouseVal"

In [None]:
X = pd.DataFrame(X[X_var])
y = pd.DataFrame(y[y_var])

In [None]:
n_features = X.shape[1]

In [None]:
fig = plt.figure()
ax = plt.gca()

ax.hist2d(
    X[X_var],
    y[y_var],
    bins=32,
)

ax.set_xlabel(X_var)
ax.set_ylabel(y_var)

### Split data


In [None]:
from sklearn.model_selection import train_test_split, cross_val_score, KFold

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=config["test_size"], random_state=config["random_state"]
)

In [None]:
cv = KFold(
    n_splits=config["n_splits"], shuffle=True, random_state=config["random_state"]
)

## Baseline


### Build


In [None]:
from sklearn.base import BaseEstimator

In [None]:
class Baseline(BaseEstimator):

    def fit(self, X, y):
        """Baseline is we just use the fraction of classifications as a binomial probability."""

        self.mean_ = y.mean()

    def predict(self, X):

        return np.full(X.shape[0], self.mean_)

In [None]:
# Make the estimator
model_name = "mean"
model = Baseline()

### Evaluate


In [None]:
result = {}

In [None]:
# Full prediction
model.fit(X_train, y_train)
y_pred = model.predict(X_test)

In [None]:
result["mse"] = mean_squared_error(y_test, y_pred)
result["mse"]

In [None]:
# Crossval score
result["cross_val_score"] = cross_val_score(
    model, X_train, y_train, cv=cv, scoring=config["scoring"]
)
result["cross_val_score"]

In [None]:
results[model_name] = result

## Linear Regression


### Build


In [None]:
from sklearn.linear_model import LinearRegression

In [None]:
# Make the estimator
model_name = "linear_regression"
model = LinearRegression()

### Evaluate


In [None]:
result = {}

In [None]:
# Full prediction
model.fit(X_train, y_train)
y_pred = model.predict(X_test)

In [None]:
result["mse"] = mean_squared_error(y_test, y_pred)
result["mse"]

In [None]:
# Crossval score
result["cross_val_score"] = cross_val_score(
    model, X_train, y_train, cv=cv, scoring=config["scoring"]
)
result["cross_val_score"]

In [None]:
fig = plt.figure()
ax = plt.gca()

h, x_edges, y_edges, mesh = ax.hist2d(
    X_test[X_var],
    y_test[y_var],
    bins=32,
)

y_pred_plot = model.predict(x_edges.reshape(-1, 1))
ax.plot(x_edges, y_pred_plot, color="r")

ax.set_xlabel(X_var)
ax.set_ylabel(y_var)

In [None]:
result["parameters"] = {"w": model.coef_[0][0], "b": model.intercept_[0]}

In [None]:
results[model_name] = result

## Model: Single Linear Layer

Same thing as traditional linear regression, but trained with gradient descent.


In [None]:
import torch
from torch import nn

In [None]:
# Get cpu, gpu or mps device for training.
DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available() else "cpu"
)
print(f"Using {DEVICE} device")

In [None]:
class TorchLinearEstimator(BaseEstimator):

    def __init__(self, lr=1e-2, n_epoch=1000):

        self.lr = lr
        self.n_epoch = n_epoch

    def fit(self, X, y):

        X = torch.Tensor(X).to(DEVICE)
        y = torch.Tensor(y).to(DEVICE)

        # Create parameters and turn on gradient tracking
        w = torch.rand(n_features).to(DEVICE).requires_grad_()
        b = torch.rand(1).to(DEVICE).requires_grad_()

        # Training loop
        losses = []
        ws = []
        bs = []
        i_best = None
        for i in tqdm.tqdm(range(self.n_epoch)):

            # Make the prediction
            y_pred = self.linear_model(X, w, b)

            # Get the loss
            loss = self.loss(y_pred, y)

            # Calculate the gradient
            loss.backward()

            # Modify the parameters
            w.data -= w.grad.data * self.lr
            b.data -= b.grad.data * self.lr

            # Zero the gradient
            w.grad = None
            b.grad = None

            # Mark when the training ceases improving and story a copy of the parameters
            if i_best is None:
                if i > 0:
                    if float(loss) > float(losses[-1]):
                        i_best = i
                        w_best = w.clone()
                        b_best = b.clone()

            # Store
            losses.append(loss.cpu().detach().numpy())
            ws.append(w.cpu().detach().numpy())
            bs.append(b.cpu().detach().numpy())

        # If i_best is still None give the final value
        if i_best is None:
            i_best = i
            w_best = w.clone()
            b_best = b.clone()

        self.i_best_ = i_best
        self.w_ = w_best
        self.b_ = b_best
        self.losses_ = np.array(losses)
        self.ws_ = np.array(ws)
        self.bs_ = np.array(bs)

        return self

    def predict(self, X):

        X = torch.Tensor(X).to(DEVICE)

        # Make the prediction
        y_pred = self.linear_model(X, self.w_, self.b_)

        return y_pred

    def linear_model(self, X, weights=None, bias=None):
        """The model itself."""

        return X @ weights + bias

    def loss(self, y_pred, y_actual):

        return ((y_pred - y_actual) ** 2.0).mean()

In [None]:
# Make the estimator
model_name = "linear_model"
model = TorchLinearEstimator(n_epoch=1000)

### Evaluate


In [None]:
result = {}

In [None]:
# Full prediction
model.fit(X_train.values, y_train.values)
y_pred = model.predict(X_test.values)

In [None]:
result["mse"] = mean_squared_error(y_test.values, y_pred.cpu().detach().numpy())
result["mse"]

In [None]:
fig = plt.figure()
ax = plt.gca()

# Data
h, x_edges, y_edges, mesh = ax.hist2d(
    X_train[X_var],
    y_train[y_var],
    bins=32,
    cmap="Greys",
)

# Change in fit parameters
norm = plt.Normalize(0, len(model.ws_))
cmap = sns.color_palette("flare", as_cmap=True)
for i, w in enumerate(model.ws_):

    if i == 0 or i == len(model.ws_) - 1:
        label = i
    else:
        label = None

    ax.plot(x_edges, w * x_edges + model.bs_[i], color=cmap(norm(i)), label=label)

# Linear regression best fit
l_params = results["linear_regression"]["parameters"]
ax.plot(x_edges, x_edges * l_params["w"] + l_params["b"], color="b")

ax.legend()

ax.set_xlabel(X_var)
ax.set_ylabel(y_var)

In [None]:
# Explore the range of viable parameters
ws = np.linspace(0.0, 1.0, 32)
bs = np.linspace(0.0, 2.0, 32)
loss_map = []
for w in tqdm.tqdm(ws):
    loss_row = []
    for b in bs:
        y_pred = w * X_train.values + b
        loss = ((y_pred - y_train.values) ** 2.0).mean()
        loss_row.append(loss)
    loss_map.append(loss_row)
loss_map = np.array(loss_map)

In [None]:
fig = plt.figure()
ax = plt.gca()

p = ax.pcolormesh(ws, bs, loss_map.transpose(), cmap="Greys")
ax.scatter(
    model.ws_,
    model.bs_,
    c=cmap(norm(np.arange(model.ws_.size))),
)

ax.scatter(l_params["w"], l_params["b"])

plt.colorbar(p)

ax.set_xlabel("w")
ax.set_ylabel("b")

In [None]:
# Plot the training progress (loss curve)
fig = plt.figure()
ax = plt.gca()

ax.plot(
    range(len(model.losses_)),
    model.losses_,
)

ax.scatter(model.i_best_, model.losses_[model.i_best_])

ax.set_xlabel("epoch")
ax.set_ylabel("mse")

In [None]:
# Crossval score
result["cross_val_score"] = cross_val_score(
    model, X_train.values, y_train.values, cv=cv, scoring=config["scoring"]
)
result["cross_val_score"]

In [None]:
results[model_name] = result

## Model: Single Linear Layer (using nn.Sequential)


In [None]:
from torch import optim
from torch.utils.data import TensorDataset, DataLoader

In [None]:
class TorchEstimator(BaseEstimator):

    def __init__(
        self,
        net,
        lr: float = 1e-2,
        epochs: int = 100,
        batch_size: int = 64,
        device: str = DEVICE,
        optimizer=optim.Adam,
    ):

        self.net = net.to(device)
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.device = device
        self.optimizer = optimizer

    def fit(
        self,
        X: np.ndarray,
        y: pd.Series,
        X_val: np.ndarray = None,
        y_val: pd.Series = None,
    ) -> "TorchEstimator":

        self.net.train()

        # Prep data
        X = torch.Tensor(X).to(self.device)
        y = torch.Tensor(y.values).to(self.device)
        dataset = TensorDataset(X, y)
        dataloader = DataLoader(dataset, batch_size=self.batch_size)

        # Prep validation data
        if X_val is not None and y_val is not None:
            X_val = torch.Tensor(X_val).to(self.device)
            y_val = torch.Tensor(y_val.values).to(self.device)
            dataset_val = TensorDataset(X_val, y_val)
            dataloader_val = DataLoader(dataset_val, batch_size=self.batch_size)
            self.losses_val_ = []

        # Initialize parameters
        params = [
            nn.init.uniform_(param.requires_grad_(), a=0, b=1)
            for param in self.net.parameters()
        ]

        optimizer = self.optimizer(params, lr=self.lr)

        # Training loop
        self.losses_ = []
        for i in tqdm.tqdm(range(self.epochs)):
            self.net.train()
            loss = 0.0
            for j, (X_j, y_j) in enumerate(dataloader):

                # Make the prediction
                pred_j = self.net(X_j)

                # Get the loss
                loss_j = self.loss_fn(pred_j, y_j)

                # Backpropagation
                optimizer.zero_grad()
                loss_j.backward()
                optimizer.step()

                loss += loss_j.cpu().detach().numpy() * len(y_j)
            # Store for later use
            loss /= len(y)
            self.losses_.append(loss)

            # Evaluation for validation data
            self.net.eval()
            with torch.no_grad():
                loss_val = 0.0
                if X_val is not None and y_val is not None:
                    for j, (X_val_j, y_val_j) in enumerate(dataloader_val):

                        # Make the prediction
                        pred_val_j = self.net(X_val_j)

                        # Get the loss
                        loss_val_j = self.loss_fn(pred_val_j, y_val_j)

                        loss_val += loss_val_j.cpu().detach().numpy() * len(y_val_j)

                    # Store for later use
                    loss_val /= len(y_val)
                    self.losses_val_.append(loss_val)

        return self

    def predict(self, X):

        X = torch.Tensor(X).to(self.device)

        # Make the prediction
        self.net.eval()
        pred = self.net(X)

        # Convert the prediction into a classification
        y_pred = (pred.cpu().detach().numpy() > 0.5).astype("int")

        return y_pred

    def loss_fn(self, pred, y_actual):
        return ((pred - y_actual) ** 2.0).mean()

In [None]:
class NetVisualizer:

    def __init__(self, net, X, y):

        self.net = net
        self.X = X
        self.y = y

    def plot_layer(i, ax):

        ax.plot()

### Build


In [None]:
class CustomOptimizer:

    def __init__(self, params, lr=1e-4):
        self.params = params
        self.lr = lr

    def zero_grad(self):
        for param in self.params:
            param.grad = None

    def step(self):
        for param in self.params:
            param.data -= param.grad.data * self.lr

In [None]:
# Make the estimator
model_name = "linear_net"
model = nn.Sequential(
    nn.Linear(n_features, 1),
)
model = TorchEstimator(net=model, optimizer=CustomOptimizer)

## Compare Models


In [None]:
# Format data
dfs = []
for key, value in results.items():

    df = pd.DataFrame(value)
    df["model_name"] = key
    dfs.append(df)
results_df = pd.concat(dfs)

In [None]:
fig = plt.figure(figsize=(len(results) * 2, 2))
ax = plt.gca()

sns.swarmplot(
    data=results_df,
    x="model_name",
    y="cross_val_score",
)

ax.set_ylabel(config["scoring"])