<a href="https://colab.research.google.com/github/ubaidillah-chem/fouling-ml/blob/main/03_MLP_model_CV.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import TensorDataset, DataLoader

# Load dataset
dataset = pd.read_csv('gdrive/MyDrive/dataset_filtered_by_top_pca_loadings.csv').drop(index=range(389, 432))
X = dataset.drop(columns=['Rf']).values.astype('float64')
y = dataset['Rf'].values.astype('float64').reshape(-1, 1)
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

In [None]:
# Define model
class MLPModel(nn.Module):
    def __init__(self, input_dim):
        super(MLPModel, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 1),  # Output: predicted Rf
            nn.Softplus()  # Constrain the prediction to strictly positive values
        )

    def forward(self, x):
        return self.model(x)

model = MLPModel(input_dim=X.shape[1])
print(model)

In [None]:
# K-Fold Cross-Validation Setup
k = 5
kf = KFold(n_splits=k, shuffle=True, random_state=42)

all_preds = []
all_targets = []
fold_results = []
fold_loss_history = []  # Store loss history for each fold

for fold, (train_index, val_index) in enumerate(kf.split(X)):
    print(f"\n====== Fold {fold+1}/{k} ======")

    # Split data
    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y[train_index], y[val_index]

    # Scale
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)

    # Convert to tensors
    X_train_tensor = torch.tensor(X_train).float()
    y_train_tensor = torch.tensor(y_train).float()
    X_val_tensor = torch.tensor(X_val).float()
    y_val_tensor = torch.tensor(y_val).float()

    # DataLoaders
    train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=64, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val_tensor, y_val_tensor), batch_size=64)

    # Model, loss, optimizer
    model = MLPModel(input_dim=X.shape[1])
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    train_losses, val_losses, val_r2_scores = [], [], []
    best_val_loss = float('inf')
    no_improve = 0

    epochs = 100
    patience = 10

    for epoch in range(epochs):
        # Train
        model.train()
        train_loss = 0.0
        for xb, yb in train_loader:
            pred = model(xb)
            loss = criterion(pred, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        avg_train_loss = train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Validate
        model.eval()
        val_loss = 0.0
        y_val_pred_all, y_val_true_all = [], []
        with torch.no_grad():
            for xb, yb in val_loader:
                pred = model(xb)
                loss = criterion(pred, yb)
                val_loss += loss.item()
                y_val_pred_all.append(pred.numpy())
                y_val_true_all.append(yb.numpy())

        avg_val_loss = val_loss / len(val_loader)
        val_losses.append(avg_val_loss)

        y_val_pred_all = np.vstack(y_val_pred_all)
        y_val_true_all = np.vstack(y_val_true_all)
        r2 = r2_score(y_val_true_all, y_val_pred_all)
        val_r2_scores.append(r2)

        print(f"Epoch {epoch+1}: Train Loss = {avg_train_loss:.5f}, Val Loss = {avg_val_loss:.5f}, Val R² = {r2:.5f}")

        # Early stopping
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            no_improve = 0
            best_model_state = model.state_dict()
            best_epoch = epoch
        else:
            no_improve += 1

        if no_improve >= patience:
            epoch_remarks = f"Early stopping at epoch {epoch+1}"
            print(epoch_remarks)
            break

        if epoch == epochs - 1:
            epoch_remarks = f"No early stopping ({epochs} epochs)"
            print(epoch_remarks)

    # Store loss history for this fold
    fold_loss_history.append({
        'fold': fold+1,
        'train_losses': train_losses,
        'val_losses': val_losses,
        'best_epoch': best_epoch,
        'early_stop': 'early stopping' in epoch_remarks.lower()
    })

    # Save fold results
    model.load_state_dict(best_model_state)
    model.eval()
    with torch.no_grad():
        y_val_pred = model(X_val_tensor).numpy().flatten()
        y_val_true = y_val.flatten()

        # Store results for overall analysis and plotting
        all_preds.extend(y_val_pred)
        all_targets.extend(y_val_true)

        mse_final = mean_squared_error(y_val, y_val_pred)
        r2_final = r2_score(y_val, y_val_pred)
        print(f"Final Fold MSE: {mse_final:.5f}, R²: {r2_final:.5f}")
        fold_results.append({'fold': fold+1, 'mse': mse_final, 'r2': r2_final, 'epoch_remarks': epoch_remarks.lower()})


In [None]:
# ====== Plot all training curves after cross-validation is complete ======
plt.figure(figsize=(15, 8))
for i, fold_data in enumerate(fold_loss_history):
    plt.subplot(2, 3, i+1)  # Create a 2x3 grid of plots (for 5 folds)

    epochs = range(1, len(fold_data['train_losses'])+1)
    plt.plot(epochs, fold_data['train_losses'], label='Training Loss')
    plt.plot(epochs, fold_data['val_losses'], label='Validation Loss')

    plt.title(f'Fold {fold_data["fold"]} Loss Curves')
    plt.xlabel('Epoch')
    plt.ylabel('MSE Loss')
    plt.legend()
    plt.grid(True)

# Add an additional plot showing all validation losses together
plt.subplot(2, 3, 6)
for fold_data in fold_loss_history:
    epochs = range(1, len(fold_data['val_losses'])+1)
    plt.plot(epochs, fold_data['val_losses'], label=f'Fold {fold_data["fold"]}')
plt.title('All Folds Validation Loss Comparison')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Report cross-validated results
all_mse = [r['mse'] for r in fold_results]
all_r2 = [r['r2'] for r in fold_results]
print("\n===== Cross-Validation Results =====")
for r in fold_results:
    print(f"Fold {r['fold']}: MSE = {r['mse']:.5f}, R² = {r['r2']:.5f}, {r['epoch_remarks']}")
print(f"\nAverage MSE: {np.mean(all_mse):.5f}, Std MSE: {np.std(all_mse):.5f}")
print(f"Average R²:  {np.mean(all_r2):.5f}, Std R²:  {np.std(all_r2):.5f}")


In [None]:
# Plot actual vs predicted
plt.figure(figsize=(7, 7))
plt.scatter(all_targets, all_preds, alpha=0.6, edgecolor='k')
plt.plot([min(all_targets), max(all_targets)],
         [min(all_targets), max(all_targets)],
         color='red', linestyle='--', label='Perfect Prediction (y = x)')
plt.xlabel('Actual Rf')
plt.ylabel('Predicted Rf')
plt.title('Actual vs. Predicted Rf (Cross-Validation)')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
import seaborn as sns

# Compute residuals
residuals = np.array(all_targets) - np.array(all_preds)

# Plot residuals vs. actual values
plt.figure(figsize=(8, 6))
plt.scatter(all_targets, residuals, alpha=0.6, edgecolor='k')
plt.axhline(y=0, color='red', linestyle='--', label='Zero Residual')
plt.xlabel('Actual Rf')
plt.ylabel('Residual (Actual − Predicted)')
plt.title('Residuals vs. Actual Rf')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()