In [None]:
import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Scikit-Learn's Linear Regression

In [None]:
state = 4

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split, cross_validate, KFold
from sklearn.metrics import mean_squared_error

In [None]:
housing = pl.read_csv("Housing.csv")
housing.head()

In [None]:
categorical_cols = housing.select(pl.col(pl.String)).columns

encoder = OneHotEncoder()
encoded_array = encoder.fit_transform(housing[categorical_cols]).toarray().astype('int64')
encoder_features = encoder.get_feature_names_out().tolist()

housing_cat = pl.DataFrame(
    encoded_array,
    schema=encoder_features
).with_row_index(name="index")

housing_cat.head()

In [None]:
housing_int = housing.clone().select(pl.col(pl.Int64)).with_row_index(name="index")
housing_int.head()

In [None]:
housing_df = housing_int.join(
    other=housing_cat,
    on="index", 
    how="left" 
)

In [None]:
# Splitting data
x = housing_df[:, 2:].to_numpy()
y = housing_df[:, 0].to_numpy().reshape(-1, 1)

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.25, random_state=state)

In [None]:
# Standardisation
x_scaler = StandardScaler()
x_train_scaled = x_scaler.fit_transform(x_train)
x_test_scaled = x_scaler.transform(x_test)

y_scaler = StandardScaler()
y_train_scaled = y_scaler.fit_transform(y_train)
y_test_scaled = y_scaler.transform(y_test)

In [None]:
# Linear regression model
lin_reg = LinearRegression()
lin_reg.fit(x_train_scaled, y_train_scaled)

# Cross validation
cv = KFold(n_splits=5, shuffle=True, random_state=state)

# Applying the linear regression model on each fold
cv_results = cross_validate(
    lin_reg, 
    x_train_scaled, y_train_scaled,
    scoring="neg_mean_squared_error", 
    cv=cv,
    return_estimator=True
)

# Identify which fold produced the best score
best_fold_idx = cv_results['test_score'].round(3).argmax()

# Get the fitted model corresponding to the best fold
best_estimator = cv_results['estimator'][best_fold_idx]

# Extract the results
coefficients_scaled = best_estimator.coef_[0]
intercept_scaled = best_estimator.intercept_[0]
mse = -(cv_results['test_score'][best_fold_idx])

# Intercept
intercept_reshaped = intercept_scaled.reshape(1, -1)
intercept = y_scaler.inverse_transform(intercept_reshaped)[0, 0].round(2)

# Coefficients
coefficients_reshaped = coefficients_scaled.reshape(1, -1)
coefficients = x_scaler.inverse_transform(coefficients_reshaped)[0].round(3)

## Final Results
np.set_printoptions(suppress=True, precision=3)

print("### Best Fold Results ###")
print(f"Score (MSE): {mse.round(3)}")
print(f"Intercept: {intercept.round(3):,}")
print(f"Coefficients: {coefficients.round(2)}")

In [None]:
y_pred_test = best_estimator.predict(x_test_scaled)

final_mse = mean_squared_error(y_test_scaled, y_pred_test)
final_r2_score = best_estimator.score(x_test_scaled, y_test_scaled)

print(f"Final test MSE: {final_mse:.4f}")
print(f"Final test R-squared score: {final_r2_score:.4f}")

# PyTorch's Linear Neural Network

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import ParameterGrid

device = 'mps' if torch.mps.is_available() else 'cpu'

print(f"Device: {device}")

In [None]:
housing_df.head()

In [None]:
X = housing_df[:, 2:].to_numpy()
Y = housing_df[:, 1].to_numpy().reshape(-1, 1)

X_train, X_val_test, Y_train, Y_val_test = train_test_split(X, Y, test_size=0.25, random_state=state)

In [None]:
nrows = X_val_test.shape[0]
test_indices = np.arange(nrows) 

np.random.seed(1)
np.random.shuffle(test_indices)

test_val_split = round(nrows * 0.50)
val_idx = test_indices[:test_val_split] 
test_idx = test_indices[test_val_split:] 

X_test, Y_test = X_val_test[test_idx], Y_val_test[test_idx]
X_val, Y_val = X_val_test[val_idx], Y_val_test[val_idx]

In [None]:
X_scaler = StandardScaler()
X_train_scaled = X_scaler.fit_transform(X_train)
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)

X_val_scaled = X_scaler.transform(X_val)
X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32)

X_test_scaled = X_scaler.transform(X_test)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)

Y_scaler = StandardScaler()
Y_train_scaled = Y_scaler.fit_transform(Y_train)
Y_train_tensor = torch.tensor(Y_train_scaled, dtype=torch.float32)

Y_val_scaled = Y_scaler.transform(Y_val)
Y_val_tensor = torch.tensor(Y_val_scaled, dtype=torch.float32)

Y_test_scaled = Y_scaler.transform(Y_test)
Y_test_tensor = torch.tensor(Y_test_scaled, dtype=torch.float32)

In [None]:
training_data = TensorDataset(X_train_tensor, Y_train_tensor)
validation_data = TensorDataset(X_val_tensor, Y_val_tensor)

In [None]:
# Designing model architecture
class LinearModel(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()  # Inherit constructor from parent class (nn.Module) to initialise the parent class

        # Build sequential layer
        self.sequential_layer = nn.Sequential(
            nn.Dropout(p=0.5),  # 50% of the input features will be randomly set to zero
            nn.Linear(input_size, 10),  # Linear layer 1
            nn.ReLU(),  # Rectified linear unit (activation function)
            nn.Linear(10, output_size),  # Linear layer 2
        )

    # The forward pass simply calls the sequential container
    def forward(self, x):
        return self.sequential_layer(x)

In [None]:
class ModelTrainer:
    # Initialise trainer with the core components
    def __init__(self, model, loss_criterion, optimiser):
        self.model = model
        self.loss_criterion = loss_criterion
        self.optimiser = optimiser

    # Behaviours within a singular training step
    def training_step(self, x, y):
        # Set model to training mode
        self.model.train()

        # Forward pass (i.e., create prediction based on features)
        y_hat = self.model(x)

        # Calculate loss
        loss = self.loss_criterion(y_hat, y)

        # Backwards pass
        loss.backward()

        # Optimisation
        self.optimiser.step()

        # Reset gradient to zero
        self.optimiser.zero_grad()

        return loss.item()

    # Trains the model over multiple epochs using the provided loaded data
    def training_process(self, training_batch, epochs, print_loss=False):
        total_train_loss = []

        for epoch in range(epochs):
            train_batch_loss = []

            for x_train_batch, y_train_batch in training_batch:
                x_train_batch = x_train_batch.to(device)
                y_train_batch = y_train_batch.to(device)

                train_loss = self.training_step(x_train_batch, y_train_batch)
                train_batch_loss.append(train_loss)

            total_train_loss.append(torch.tensor(train_batch_loss).mean().item())

            if print_loss == True:
                if epoch == 0:
                    print(f"Epoch {epoch}/{epochs} | Train Loss: {total_train_loss[-1]:.3f}")
                if (epoch + 1) % 1000 == 0:
                    print(
                        f"Epoch {epoch + 1}/{epochs} | Train Loss: {total_train_loss[-1]:.3f}"
                    )
            else:
                pass

        return total_train_loss

In [None]:
def validation_process(model, validation_batch, loss_criterion, epochs, print_loss=False):
    total_val_loss = []
    
    for epoch in range(epochs):
        with torch.no_grad():
            val_batch_loss = []
            
            for x_val_batch, y_val_batch in validation_batch:
                # Move validation data to the device
                x_val_batch = x_val_batch.to(device)
                y_val_batch = y_val_batch.to(device)
                
                model.eval()
                
                y_hat = model(x_val_batch)
                val_loss = loss_criterion(y_hat, y_val_batch).item()
                val_batch_loss.append(val_loss)
                
            total_val_loss.append(torch.tensor(val_batch_loss).mean().item())
        
        if print_loss == True:
            if epoch == 0:
                print(
                        f"Epoch {epoch} / {epochs} | Validation loss: {total_val_loss[-1]:.3f}"
                    )
                if (epoch + 1) % 1000 == 0:
                    print(
                        f"Epoch {epoch + 1}/{epochs} | Validation loss: {total_val_loss[-1]:.3f}"
                    )
    
    return total_val_loss

In [None]:
### Hyperparameter tuning ###
# Define hyperparameter grid
param_grid = {
    'lr': [1e-4, 1e-3],
    'weight_decay': [1e-5, 1e-4],
    'batch_size': [4, 8],
    'epochs' : [2000, 4000]
}

# Perform grid search to find the best parameters
best_params = None
best_loss = float('inf')  # Initialise best_loss as infinity

# Go through a grid with all possible combinations of the specified hyperparameters
for params in ParameterGrid(param_grid):
    print(f"Testing parameters: {params}")

    # Get data loaders for current batch size
    train_batch_tuning = DataLoader(dataset=training_data, batch_size=params['batch_size'], shuffle=True)

    # Initialise model, loss function, and optimiser with current parameters
    model = LinearModel(input_size=X.shape[1], output_size=1)
    model.to(device) # Move the model to the device (if on cuda)

    tune_mse_loss = nn.MSELoss(reduction="mean")
    optimiser = optim.SGD(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay'])

    # Train the model
    train_loss = ModelTrainer(model=model, loss_criterion=tune_mse_loss, optimiser=optimiser
    ).training_process(training_batch=train_batch_tuning, epochs=params['epochs'], print_loss = False)

    train_loss_mean = np.mean(train_loss).item()

    # Update best parameters if current validation loss is lower
    if train_loss_mean < best_loss:
        best_loss = train_loss_mean
        best_params = params

In [None]:
print(f"The best parameters: {best_params} with the lowest loss of {best_loss:.3f}")

In [None]:
val_batch_tuning = DataLoader(dataset=validation_data, batch_size=params['batch_size'], shuffle=True)

# Validate the model
val_loss = validation_process(model=model, validation_batch=val_batch_tuning, loss_criterion=mse_loss, epochs=n_epochs)

val_loss_mean = np.mean(val_loss).item()

In [None]:
def testing_process(model, loss_criterion, test_batches):
    with torch.no_grad():
        total_test_loss = []

        for x_test_batch, y_test_batch in test_batches:
            # Move validation data to the device
            x_test_batch = x_test_batch.to(device)
            y_test_batch = y_test_batch.to(device)

            model.eval()

            y_hat = model(x_test_batch)
            test_loss = loss_criterion(y_hat, y_test_batch).item()
            total_test_loss.append(test_loss)
        
    return total_test_loss


In [None]:
model = LinearModel(input_size=X.shape[1], output_size=1)
test_mse_loss = nn.MSELoss(reduction="mean")
testing_data = TensorDataset(X_test_tensor, Y_test_tensor)
test_batches = DataLoader(dataset=testing_data, batch_size=best_params['batch_size'], shuffle=True)
test_optimiser = optim.SGD(model.parameters(), lr=best_params['lr'], weight_decay=best_params['weight_decay'])

total_test_loss = testing_process(model=model, loss_criterion=mse_loss, test_batches=test_batches)
print(f"Average MSE: {np.mean(total_test_loss):.3f}")

In [None]:
fig, ax = plt.subplots()

ax.scatter(np.arange(len(X_test_tensor)), total_test_loss)
ax.set_xlabel("Epochs")
ax.set_ylabel("MSE Loss")