In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 14.12.2025
# NN models definitions, training and testing processes for VoiceMOS 2022 data
#
# Author : Mustafa Ozan Duman

import torch.nn as nn
import torch
from tqdm.notebook import tqdm
from torchvision.models import resnet18, ResNet18_Weights
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
import numpy as np
from sklearn.metrics import mean_squared_error
from scipy.stats import pearsonr, spearmanr
import os
import shutil

# --- Hyperparameters (You can adjust these later) ---
BATCH_SIZE = 32
LEARNING_RATE = 0.001
NUM_EPOCHS = 50

# --- Feature Dimensions ---
# The input shape of the feature matrix (excluding the batch dimension)
INPUT_ROWS = 2288
INPUT_COLS = 20
OUTPUT_DIM = 1 # For MOS prediction (a continuous score)


# model 1 classical CNN + MLP

class VoiceMOS_CNN(nn.Module):
    def __init__(self):
        super(VoiceMOS_CNN, self).__init__()

        # We start with 1 input channel (since it's a magnitude matrix)
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Calculate the size of the feature map after convolutional and pooling layers
        # Input size: 2288 x 20
        # After conv1 (MaxPool 2x2): floor(2288/2) x floor(20/2) = 1144 x 10
        # After conv2 (MaxPool 2x2): floor(1144/2) x floor(10/2) = 572 x 5
        # Output channels from conv2: 32

        # Flattened size: 32 channels * 572 rows * 5 columns
        self.flattened_size = 32 * 572 * 5

        self.fc = nn.Sequential(
            nn.Linear(self.flattened_size, 128),
            nn.ReLU(),
            nn.Linear(128, OUTPUT_DIM) # Final output is a single MOS score
        )

    def forward(self, x):
        # x shape: (Batch_Size, 1, 2288, 20)
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1) # Flatten
        x = self.fc(x)
        return x



# model 2 resnet

class VoiceMOS_ResNet(nn.Module):
    """
    ResNet-18 architecture adapted for single-channel (1, 2288, 20) input
    and single-score MOS regression output.
    """
    def __init__(self, use_pretrained=True):
        super(VoiceMOS_ResNet, self).__init__()

        # Load weights: Use pre-trained weights to leverage learned features
        # from ImageNet, or set to None for training from scratch.
        weights = ResNet18_Weights.IMAGENET1K_V1 if use_pretrained else None
        self.resnet = resnet18(weights=weights)

        # 1. Modify the first convolutional layer (Input Layer)
        # Change in_channels from 3 to 1 to accept your single-channel FFT matrix.
        self.resnet.conv1 = nn.Conv2d(
            in_channels=1,
            out_channels=64,
            kernel_size=7,
            stride=2,
            padding=3,
            bias=False
        )

        # 2. Modify the last fully connected layer (Output Layer)
        # Get the size of the features coming out of the average pooling layer (512 for ResNet18)
        num_ftrs = self.resnet.fc.in_features

        # Replace the final classification layer with a regression layer (output 1 score)
        self.resnet.fc = nn.Linear(num_ftrs, 1)

    def forward(self, x):
        # x shape: (Batch_Size, 1, 2288, 20)
        # ResNet performs the forward pass directly.
        return self.resnet(x)



# model 3 CNN-Bidirectional LSTM Hybridm

class VoiceMOS_LSTM(nn.Module):
    """
    CNN-LSTM Hybrid for MOS Prediction.
    Treats the (2288, 20) feature matrix as a sequence of 2288 time steps,
    each having 20 features.
    """
    def __init__(self, input_size=20, hidden_size=128, num_layers=2, bidirectional=True):
        super(VoiceMOS_LSTM, self).__init__()

        # Hyperparameters
        self.D = 2 if bidirectional else 1
        self.H = hidden_size

        # 1. Bidirectional LSTM Layer
        # Processes the time sequence (2288 steps)
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,  # Input shape: (Batch, Seq_Len, Feature_Dim)
            bidirectional=bidirectional
        )

        # 2. Aggregation and Prediction Layer
        # Input size is D * H (e.g., 2 * 128 = 256) after averaging the sequence.
        self.fc = nn.Sequential(
            nn.Linear(self.D * self.H, 64),
            nn.ReLU(),
            nn.Linear(64, 1) # Output single MOS score
        )

    def forward(self, x):
        # x shape: (B, 1, 2288, 20)

        # Squeeze channel dim and transpose to LSTM input format
        # Desired shape: (B, 2288, 20) -> (Batch, Sequence Length, Input Features)
        x = x.squeeze(1)

        # Pass through LSTM
        # lstm_out shape: (B, 2288, D*H)
        lstm_out, _ = self.lstm(x)

        # Global Average Pooling (GAP) over the time dimension (dim=1)
        # to compress the entire utterance's sequence into a single vector: (B, D*H)
        avg_pool = torch.mean(lstm_out, dim=1)

        # Final prediction
        return self.fc(avg_pool)


# Model 4: CNN-Bidirectional GRU Hybrid

class VoiceMOS_GRU(nn.Module):
    """
    CNN-GRU Hybrid Model (Uses GRU instead of LSTM).
    """
    def __init__(self, input_size=20, hidden_size=128, num_layers=2, bidirectional=True):
        super(VoiceMOS_GRU, self).__init__()

        self.D = 2 if bidirectional else 1
        self.H = hidden_size

        # --- Only change is here: nn.GRU instead of nn.LSTM ---
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional
        )
        # -----------------------------------------------------

        self.fc = nn.Sequential(
            nn.Linear(self.D * self.H, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        x = x.squeeze(1) # Shape: (B, 2288, 20)

        # GRU output: (B, 2288, D*H)
        gru_out, _ = self.gru(x)

        # Global Average Pooling over the time dimension
        avg_pool = torch.mean(gru_out, dim=1)

        return self.fc(avg_pool)



# Model 5: Transformer Encoder

import torch.nn as nn
import torch
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class VoiceMOS_Transformer(nn.Module):
    def __init__(self, d_model=20, nhead=10, num_layers=5):
        super(VoiceMOS_Transformer, self).__init__()

        # Ensure d_model (feature dimension) is divisible by nhead (4)
        if d_model % nhead != 0:
            raise ValueError("d_model must be divisible by nhead")

        # 1. Positional Encoding is implicitly handled by the standard Transformer
        # structure, but we rely on the self-attention mechanism to learn sequential context.

        # Define one Encoder Layer
        encoder_layer = TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            batch_first=False # We will use (Sequence, Batch, Features) format
        )

        # Stack multiple layers
        self.transformer_encoder = TransformerEncoder(
            encoder_layer,
            num_layers=num_layers
        )

        # 2. Prediction Head
        self.fc = nn.Sequential(
            nn.Linear(d_model, 64), # d_model is 20
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        # x shape: (B, 1, 2288, 20)

        # 1. Prepare for Transformer: (Seq Length, Batch, Feature Dim)
        x = x.squeeze(1)        # Shape: (B, 2288, 20)
        x = x.permute(1, 0, 2)  # Shape: (2288, B, 20) -> (Seq_Len, Batch, Features)

        # 2. Pass through Transformer Encoder
        # Output shape: (2288, B, 20)
        transformer_output = self.transformer_encoder(x)

        # 3. Global Pooling: Average across the sequence dimension (dim=0)
        # Result shape: (B, 20)
        avg_pool = torch.mean(transformer_output, dim=0)

        # 4. Final Prediction
        return self.fc(avg_pool)


In [None]:
class MOSFeatureDataset(Dataset):
    def __init__(self, features_path, mos_scores_path):
        # Load the feature and score arrays
        self.features = np.load(features_path, allow_pickle=True)
        self.mos_scores = np.load(mos_scores_path)

    def __len__(self):
        return len(self.mos_scores)

    def __getitem__(self, idx):
        # Features are (2288, 20), need to add a channel dimension for PyTorch CNNs
        # From (H, W) -> (C, H, W) => (1, 2288, 20)
        feature = torch.from_numpy(self.features[idx]).float().unsqueeze(0)
        score = torch.tensor(self.mos_scores[idx]).float()
        return feature, score

In [None]:
# Define the paths where you saved your data
BASE_FEATURE_DIR = '/content/drive/MyDrive/BUU_PHD_THESIS/VoiceMOS_2022_features/'

TRAIN_FEAT_DIR = os.path.join(BASE_FEATURE_DIR, 'VoiceMOS_2022_train_data_features')
VAL_FEAT_DIR = os.path.join(BASE_FEATURE_DIR, 'VoiceMOS_2022_validation_data_features')
TEST_FEAT_DIR = os.path.join(BASE_FEATURE_DIR, 'VoiceMOS_2022_test_data_features')

CNN_MLP_CHECKPOINT_PATH = '/content/drive/MyDrive/BUU_PHD_THESIS/Voice_MOS_2022_data_trained_models/CNN_MLP_classic_mos_model.pth'
RESNET_CHECKPOINT_PATH = '/content/drive/MyDrive/BUU_PHD_THESIS/Voice_MOS_2022_data_trained_models/ResNet18_mos_model.pth'
LSTM_CHECKPOINT_PATH = '/content/drive/MyDrive/BUU_PHD_THESIS/Voice_MOS_2022_data_trained_models/LSTM_mos_model.pth'
GRU_CHECKPOINT_PATH = '/content/drive/MyDrive/BUU_PHD_THESIS/Voice_MOS_2022_data_trained_models/GRU_mos_model.pth'
TRANSFORMER_CHECKPOINT_PATH = '/content/drive/MyDrive/BUU_PHD_THESIS/Voice_MOS_2022_data_trained_models/TRANSFORMER_mos_model.pth'

train_dataset = MOSFeatureDataset(
    features_path=os.path.join(TRAIN_FEAT_DIR, 'mfcc_features.npy'),
    mos_scores_path=os.path.join(TRAIN_FEAT_DIR, 'mos_scores.npy')
)

val_dataset = MOSFeatureDataset(
    features_path=os.path.join(VAL_FEAT_DIR, 'mfcc_features.npy'),
    mos_scores_path=os.path.join(VAL_FEAT_DIR, 'mos_scores.npy')
)

test_dataset = MOSFeatureDataset(
    features_path=os.path.join(TEST_FEAT_DIR, 'mfcc_features.npy'),
    mos_scores_path=os.path.join(TEST_FEAT_DIR, 'mos_scores.npy')
)



train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# # Initialize Model, Loss, and Optimizer

# --- Model Setup ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the ResNet model
model_CNN_MLP = VoiceMOS_CNN().to(device)
model_ResNet = VoiceMOS_ResNet(use_pretrained=True).to(device)
model_LSTM = VoiceMOS_LSTM().to(device)
model_GRU = VoiceMOS_GRU().to(device)
model_Transformer = VoiceMOS_Transformer().to(device)


# Loss Function and Optimizer
criterion = nn.MSELoss()


# --- Early Stopping Hyperparameters ---
PATIENCE = 10      # Number of epochs to wait for improvement before stopping
MIN_DELTA = 0.0001 # Minimum change required to qualify as improvement


def train_model(model, optimizer, checkpoint_path, start_epoch, best_val_loss_start, wait_counter_start, local_path):

    print(f"\n--- Starting Training on {device} from Epoch {start_epoch} ---")

    # Early Stopping Variables (use starting values)
    best_val_loss = best_val_loss_start
    wait_counter = wait_counter_start

    for epoch in range(NUM_EPOCHS):
        # ... (A. Training Phase: uses 'model' and 'optimizer' arguments)
        model.train()
        train_loss = 0.0

        for features, scores in tqdm(train_loader, desc=f"Epoch {epoch+1} Train"):
            features, scores = features.to(device), scores.to(device).unsqueeze(1)

            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, scores)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)

        # ... (B. Validation Phase: uses 'model' argument)
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for features, scores in val_loader:
                features, scores = features.to(device), scores.to(device).unsqueeze(1)
                outputs = model(features)
                loss = criterion(outputs, scores)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)

        # ... (C. Checkpointing and Early Stopping: uses 'checkpoint_path' argument)
        print(f"\nEpoch {epoch+1}/{NUM_EPOCHS}:")
        print(f"  Training MSE: {avg_train_loss:.4f}")
        print(f"  Validation MSE: {avg_val_loss:.4f}")

        if avg_val_loss < best_val_loss - MIN_DELTA:
            best_val_loss = avg_val_loss
            wait_counter = 0

            # Create a dictionary containing all necessary states
            checkpoint = {
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'best_val_loss': best_val_loss,
                'wait_counter': wait_counter
            }

            # 2. Save to the fast local path first
            torch.save(checkpoint, local_path)

             # 3. Copy the completed local file to Google Drive
            shutil.copyfile(local_path, checkpoint_path)

            print("  >>> Checkpoint saved (best validation MSE) to Drive and Local <<<")

            # Use the passed checkpoint_path for saving
            # torch.save(checkpoint, checkpoint_path)
            # print("  >>> Checkpoint saved (best validation MSE) <<<")

        else:
            wait_counter += 1
            print(f"  No improvement for {wait_counter}/{PATIENCE} epochs.")

            if wait_counter >= PATIENCE:
                print(f"  --- Early Stopping! ---")
                break


def test_model(model, test_loader, model_path):
    """
    Evaluates the model on the test dataset.
    Loads model weights from the 'model_state_dict' key within the checkpoint dictionary.
    """
    print("\n--- Starting Testing on Test Dataset ---")

    # Load the best model weights from the specified path
    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Load the ENTIRE checkpoint dictionary
        checkpoint = torch.load(model_path, map_location=device)

        # ðŸŽ¯ CRITICAL FIX: Extract the model weights from the dictionary
        state_dict = checkpoint['model_state_dict']

        # Load weights into the currently initialized model architecture
        model.load_state_dict(state_dict)
        print(f"Successfully loaded model weights from: {model_path}")

    except FileNotFoundError:
        print(f"Error: Model checkpoint not found at {model_path}. Cannot test.")
        return
    except Exception as e:
        # A specific check for the case where the file might be old and saved without the dictionary wrapper
        if "Missing key(s)" in str(e) and "'model_state_dict'" in str(e):
             print("\nFATAL ERROR: The saved file is corrupted or not a valid structured checkpoint.")
        else:
            print(f"Error loading model state: {e}. Check if the model class matches the saved weights.")
        return

    model.eval() # Set model to evaluation mode
    all_predictions = []
    all_targets = []

    with torch.no_grad():
        for features, scores in tqdm(test_loader, desc="Testing"):
            features, scores = features.to(device), scores.to(device).unsqueeze(1)

            outputs = model(features)

            # Collect results
            all_predictions.extend(outputs.cpu().numpy().flatten())
            all_targets.extend(scores.cpu().numpy().flatten())

    # Convert to numpy arrays for metric calculation
    all_predictions = np.array(all_predictions)
    all_targets = np.array(all_targets)

    # Calculate Metrics
    test_mse = mean_squared_error(all_targets, all_predictions)
    test_rmse = np.sqrt(test_mse)
    pearson_r, _ = pearsonr(all_targets, all_predictions)
    spearman_r, _ = spearmanr(all_targets, all_predictions)

    print("\n=============================================")
    print("        MOS PREDICTION TEST RESULTS")
    print("=============================================")
    print(f"Model Tested: {type(model).__name__}")
    print(f"Final Test Samples: {len(all_targets)}")
    print(f"MSE (Mean Squared Error): {test_mse:.4f}")
    print(f"RMSE (Root Mean Squared Error): {test_rmse:.4f}")
    print(f"PCC (Pearson Correlation Coefficient): {pearson_r:.4f}")
    print(f"SCC (Spearman Rank Correlation): {spearman_r:.4f}")
    print("=============================================")


In [None]:

def load_checkpoint_and_resume( model, optimizer, checkpoint_path, local_path ):

    """Loads a checkpoint if it exists and updates model, optimizer, and training metrics."""
    start_epoch = 0
    best_val_loss = float('inf')
    wait_counter = 0

    # Prioritize loading the local copy, then fall back to the permanent Drive copy
    load_path = local_path if os.path.exists(local_path) else checkpoint_path

    if os.path.exists(load_path):
        print(f"Found checkpoint! Resuming training from {load_path}")
        try:
            # We use 'load_path' for loading
            checkpoint = torch.load(load_path, map_location=device)

            # Load model and optimizer states
            model.load_state_dict(checkpoint['model_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

            # Load training variables
            start_epoch = checkpoint['epoch'] + 1
            best_val_loss = checkpoint['best_val_loss']
            wait_counter = checkpoint['wait_counter']

            print(f"Resumed from Epoch {start_epoch}, Best Loss: {best_val_loss:.4f}")

        except Exception as e:
            print(f"Error loading checkpoint: {e}. Starting from scratch.")

    # After loading, ensure the local file is present for faster subsequent saving
    # If we loaded from Drive, copy it back to local for the next save.
    if os.path.exists(checkpoint_path) and not os.path.exists(local_path):
        print("Copying Drive checkpoint to local path for faster saving...")
        shutil.copyfile(checkpoint_path, local_path)

    return start_epoch, best_val_loss, wait_counter


In [None]:
# CNN_MLP_CHECKPOINT_PATH
# RESNET_CHECKPOINT_PATH
# LSTM_CHECKPOINT_PATH
# GRU_CHECKPOINT_PATH
# TRANSFORMER_CHECKPOINT_PATH

# model_CNN_MLP
# model_ResNet
# model_LSTM
# model_GRU
# model_Transformer


# --- Execution Variables ---

# select model and checkpoint using above names

selected_model = model_Transformer
selected_path = TRANSFORMER_CHECKPOINT_PATH

local_save_path = '/content/temp_checkpoint.pth' # Use a unique name per model

# --- Directory Creation Block (Must be before all training) ---
# os.path.dirname() extracts the directory part of the path:
destination_dir = os.path.dirname( selected_path )

# os.makedirs creates the directory (folder):
os.makedirs(destination_dir, exist_ok=True)
print(f"Destination directory checked/created: {destination_dir}")

# 1. Initialize Optimizer (needs to be done before loading the checkpoint)
optimizer = optim.Adam(selected_model.parameters(), lr=LEARNING_RATE)

# 2. Load Checkpoint and get starting parameters
start_epoch, best_val_loss_loaded, wait_counter_loaded = load_checkpoint_and_resume(
    selected_model, optimizer, selected_path, local_save_path
)

# 3. Start Training
train_model(
    selected_model,
    optimizer,
    selected_path,
    start_epoch,
    best_val_loss_loaded,
    wait_counter_loaded,
    local_save_path # Pass the local path for saving
)

# 4. Test the model
# The test_model function loads from selected_path, which is the final saved Drive location.
test_model(selected_model, test_loader, selected_path)

In [None]:
torch.cuda.empty_cache()
