In [None]:
import torchvision.models as models
import os
from PIL import Image
import numpy as np
import collections
import time

import torch
import torch.nn as nn
from torch.utils.data import random_split, DataLoader, TensorDataset, WeightedRandomSampler
from torchvision import transforms
import io
from tqdm import tqdm

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models


class CNNModel(nn.Module):
    """A convolutional neural network model based on EfficientNet for spectrogram processing."""

    def __init__(self, dropout: float = 0.0):
        """Initializes the CNNModel using EfficientNet-B0 with an optional dropout layer.

        Args:
            dropout (float): Dropout probability before the final classification layer.
        """
        super(CNNModel, self).__init__()

        # Load EfficientNet-B0 with pre-trained weights
        self.efficientnet = models.efficientnet_b0(weights='IMAGENET1K_V1')

        # Get the number of features from the last layer of EfficientNet
        num_features = self.efficientnet.classifier[1].in_features

        # Replace the classifier with a new sequence including Dropout and FC layer
        self.efficientnet.classifier = nn.Sequential(
            nn.Dropout(p=dropout),  # Dropout before classification layer
            nn.Linear(num_features, 1)  # Binary classification output
        )

        # Initialize the new FC layer weights
        nn.init.normal_(self.efficientnet.classifier[1].weight, mean=0.0, std=0.01)
        nn.init.zeros_(self.efficientnet.classifier[1].bias)

    def forward(self, spectrogram: torch.Tensor) -> torch.Tensor:
        """Defines the forward pass for EfficientNet with dropout.

        Args:
            spectrogram (torch.Tensor): Input tensor representing the spectrogram.

        Returns:
            torch.Tensor: The model's output (logit for binary classification).
        """
        return self.efficientnet(spectrogram)


In [None]:
class ModelHandler:
    """Handles the model training, evaluation, and inference pipeline.

    Attributes:
        device (torch.device): The device on which the model is executed (e.g., 'cpu' or 'cuda').
        model_path: Path to where .pth models should be saved.
    """

    def __init__(self,
                 model,
                 model_path: str,
                 optimizer: torch.optim.Optimizer,
                 loss_function: nn.Module,
                 steps_per_decay = 5,
                 lr_decay = 0.1):
        """Initializes the ModelHandler.

        Args:
            model_path (str | None): Path to the pre-trained model file (if available).
        """
        self.model = model
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model_path = model_path
        self.optimizer = optimizer
        self.lr_scheduler = opt.lr_scheduler.StepLR(self.optimizer, step_size=steps_per_decay, gamma=lr_decay)
        self.loss_function = loss_function

    def train_step(self, dataloader):
        """Trains the model for a single epoch.

        Args:
            dataloader (torch.utils.data.DataLoader): DataLoader for the training dataset.
        """
        self.model.train()
        avg_loss, acc = 0, 0
        for in_tensor, labels in dataloader:
            in_tensor, labels = in_tensor.to(self.device), labels.to(self.device)
            labels = labels.float().unsqueeze(1)  # Ensure correct shape for BCE loss

            logits = self.model(in_tensor) # Feed input into model

            loss = self.loss_function(logits, labels)  # Calculate loss
            avg_loss += loss.item()  # Add to cumulative loss

            # Gradient descent
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            # Calculate batch accuracy and add it to cumulative accuracy
            prediction_classes = torch.round(torch.sigmoid(logits))
            batch_acc = torch.mean((prediction_classes == labels).float()).item()
            acc += batch_acc

        avg_loss /= len(dataloader)  # Calculate avg loss for epoch from cumulative loss
        acc /= len(dataloader)  # Calculate avg accuracy for epoch from cumulative accuracy
        train_results = {"avg_loss_per_batch": avg_loss, "avg_acc_per_batch": acc * 100}
        return train_results

    def val_step(self, dataloader):
        """Evaluates the model on the validation dataset.

        Args:
            dataloader (torch.utils.data.DataLoader): DataLoader for the validation dataset.
        """

        self.model.eval()
        with torch.inference_mode():
            avg_loss, acc = 0, 0
            for in_tensor, labels in dataloader:
                in_tensor, labels = in_tensor.to(self.device), labels.to(self.device)
                labels = labels.float().unsqueeze(1)  # Ensure correct shape for BCE loss

                logits = self.model(in_tensor)  # Feed input into model

                loss = self.loss_function(logits, labels)  # Calculate loss
                avg_loss += loss.item()  # Add to cumulative loss

                # Calculate batch accuracy and add it to cumulative accuracy
                prediction_classes = torch.round(torch.sigmoid(logits))
                batch_acc = torch.mean((prediction_classes == labels).float()).item()
                acc += batch_acc

            avg_loss /= len(dataloader)  # Calculate avg loss for each epoch from cumulative loss
            acc /= len(dataloader)  # Calculate avg accuracy for each epoch from cumulative accuracy
            valid_results = {"avg_loss_per_batch": avg_loss, "avg_acc_per_batch": acc * 100}
            return valid_results

    def train(self, train_loader, epochs: int, model_name: str):
        """Trains the model

        Args:
            train_loader: DataLoader for the training datasets
            epochs (int): Number of training epochs.
            model_name (str): Name to save the trained model.
        """
        self.model.to(self.device)
        training_results = {"epoch": [], "loss": [], "accuracy": []}
        validation_results = {"epoch": [], "loss": [], "accuracy": []}

        for epoch in range(epochs):

            # Train the model
            training_data = self.train_step(train_loader)
            training_results["epoch"].append(epoch)
            training_results["loss"].append(training_data["avg_loss_per_batch"])
            training_results["accuracy"].append(training_data["avg_acc_per_batch"])

            # Check the validation loss after training
            validation_data = self.val_step(val_loader)
            validation_results["epoch"].append(epoch)
            validation_results["loss"].append(validation_data["avg_loss_per_batch"])
            validation_results["accuracy"].append(validation_data["avg_acc_per_batch"])

            # Adjust learning rate if necessary
            if self.lr_scheduler:
                self.lr_scheduler.step()

            if epoch % 1 == 0:
                print(f"{epoch}:")
                print(f"LR: {self.optimizer.param_groups[0]['lr']}")
                print(f"Loss - {training_data['avg_loss_per_batch']:.5f} | Accuracy - {training_data['avg_acc_per_batch']:.2f}%")
                print(f"VLoss - {validation_data['avg_loss_per_batch']:.5f} | VAccuracy - {validation_data['avg_acc_per_batch']:.2f}%\n")

        self.save_model(model_state_dict=self.model.state_dict(), model_name=model_name)
        return training_results, validation_results


    def validate(self, val_loader, hyperparams: dict, save_best: bool = True) -> tuple[float, float]:
        """Validates the model on the validation dataset.

        Args:
            val_loader: DataLoader for the validation dataset.

        Returns:
            tuple: (validation accuracy, validation loss)
        """

        self.model.to(self.device)
        self.model.eval()

        val_losses_epoch, batch_sizes, accs = [], [], []
        best_acc = -1
        best_model_state = None  # Track the best model weights

        with torch.no_grad():
            for X_val, y_val in val_loader:
                X_val = X_val.to(self.device)
                y_val = y_val.to(self.device).float().unsqueeze(1)

                y_prediction_val = self.model(X_val)  # forward pass
                loss = self.loss_function(y_prediction_val, y_val)
                val_losses_epoch.append(loss.item())

                # Compute accuracy
                y_prediction_val = torch.sigmoid(y_prediction_val)  # Convert logits to probabilities
                prediction_classes = (y_prediction_val > 0.5).float()  # Convert to binary 0/1

                acc = torch.mean((prediction_classes == y_val).float()).item()
                accs.append(acc)
                batch_sizes.append(X_val.shape[0])

        # Compute final validation loss and accuracy
        val_loss = np.mean(val_losses_epoch)
        val_acc = np.average(accs, weights=batch_sizes)  # Weighted average accuracy

        print(f'Validation accuracy: {val_acc*100:.2f}% | Validation loss: {val_loss:.4f}')

        if save_best and val_acc > best_acc:
            best_acc = val_acc
            best_model_state = self.model.state_dict()

            # Create model filename using hyperparameters
            hyperparam_str = "_".join(f"{key}:{value}" for key, value in hyperparams.items())
            model_filename = f"model_{hyperparam_str}_{time.time()}.pth"

            # Save the best model
            save_path = os.path.join(self.model_path, model_filename)
            torch.save(best_model_state, save_path)
            print(f"Best model saved at: {save_path}")
        return val_acc, val_loss


    def evaluate(self, test_loader) -> float:
        """Evaluates the model on the test dataset.

        Args:
            test_loader: DataLoader for the test dataset.
        """
        self.model.to(self.device)
        self.model.eval()
        batch_sizes, accs = [], []
        with torch.no_grad():
            for X_test, y_test, in test_loader:
                X_test = X_test.to(self.device)
                y_test = y_test.to(self.device)

                prediction = self.model(X_test)
                batch_sizes.append(X_test.shape[0])

                prediction = torch.sigmoid(prediction)
                prediction_classes = (prediction > 0.5).float() # This converts to binary classes 0 and 1

                acc = torch.mean((prediction_classes == y_test).float()).item()
                accs.append(acc)

        # Return average accuracy
        return 0.0 if not accs else np.average(accs, weights=batch_sizes)


    def predict(self, spectrogram: torch.Tensor, model_name: str) -> int:
        """Performs inference on a single spectrogram.

        Args:
            spectrogram (torch.Tensor): Input spectrogram for inference.

        Returns:
            torch.Tensor: The predicted output from the model.
        """
        self.load_model(self.model_path +f"/{model_name}")
        spectrogram = spectrogram.unsqueeze(0).to(self.device)

        with torch.no_grad:
            logits = self.model(spectrogram)

            probability = torch.sigmoid(logits)

            prediction = (probability > 0.5).float() # Turn probability into binary classificaiton

        return prediction.item()


    def save_model(self, model_state_dict: collections.OrderedDict, model_name: str | None) -> None:
        """Saves the model to the specified file path.

        Args:
            path (str): Path to save the model file.
        """
        path = self.model_path + "/" + model_name
        torch.save(model_state_dict, path)


    def load_model(self, path: str) -> None:
        """Loads a model from the specified file path.

        Args:
            path (str): Path to the model file.
        """
        self.model.load_state_dict(torch.load(path))
        self.model.to(self.device)
        self.model.eval()

In [None]:
class DataPipeline:
    """Processes datasets, including loading, splitting, and preparing for inference.

    This class provides methods for loading datasets, processing them for training,
    and preparing single instances for inference.

    Attributes:
        test_size (float): Proportion of the dataset to include in the test split.
        val_size (float): Proportion of the dataset to include for validation.
        audio_processor: AudioProcessor instance for handling audio processing.
        image_processor: ImageProcessor instance for handling spectrogram or extracted features processing.
    """

    def __init__(self, test_size: float, val_size: float):
        """Initializes the DatasetProcessor.

        Args:
            data_path (str): Path to the dataset file.
            test_size (float): Proportion of the dataset to include in the test split.
            audio_processor (AudioProcessor): Instance for handling audio processing.
            image_processor (ImageProcessor): Instance for handling spectrogram processing.
        """
        self.test_size = test_size
        self.val_size = val_size

    def load_dataset(self) -> TensorDataset:
        """Loads the dataset from the specified file path into a DataFrame."""
        tensors = []
        labels = []

        for label_folder, label_value in zip(["positive", "negative"], [1, 0]):
            spectrogram_folder = '/content/drive/MyDrive/RespiraCheck/Cough Data/spectrograms'
            output_dir = os.path.join(spectrogram_folder, label_folder)

            for image_name in tqdm(os.listdir(output_dir)):
                image_path = os.path.join(output_dir, image_name)
                image_tensor = self.image_to_tensor(image_path)

                tensors.append(image_tensor)
                labels.append(label_value)

        # Tensor of all features (N x D) - N is number of samples (377), D is feature dimension (3,224,224)
        X = torch.stack(tensors)
        # Tensor of all labels (N x 1) - 377x1
        y = torch.tensor(labels, dtype=torch.long)

        return TensorDataset(X, y)


    def image_to_tensor(self, image_path: str) -> torch.Tensor:
        """Converts a spectrogram image to a PyTorch tensor.

        Args:
            image_path (str): Path to the spectrogram image file.

        Returns:
            torch.Tensor: The PyTorch tensor representation of the image.
        """
        transform = transforms.Compose([
            transforms.Resize((224, 224)),  # Resize to ResNet18 input size
            transforms.ToTensor(),  # Convert image to tensor
        ])

        image = Image.open(image_path).convert("RGB") # Convert from RGBA to RGB
        tensor_image = transform(image)

        return tensor_image  # shape will be 3, 224, 224

    def create_dataloaders(self, batch_size, dataset_path = None, upsample = True) -> tuple[DataLoader, DataLoader, DataLoader]:
        """Splits the dataset into training and test sets.

        Args:
            batch_size (int): The batch size for the DataLoader.
            dataset_path (str | None): Path to the TensorDataset file.

        Returns:
            tuple: (train_df, test_df) - The training and testing DataFrames.
        """
        if dataset_path:
            print(f"Loading dataset from {dataset_path}")
            dataset = torch.load(dataset_path, weights_only=False)
        else:
            print("Processing and loading dataset")
            dataset = self.load_dataset()

        # Calculate sizes
        test_size = round(self.test_size * len(dataset))
        val_size = round(self.val_size * len(dataset))
        train_size = round(len(dataset) - test_size - val_size)  # Remaining for training

        # Perform split
        train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

        # Upsample positive class
        if upsample:
            print("Upsampling data")
            labels = [label.item() for _, label in train_dataset]
            train_counts = {}
            for label in labels:
                train_counts[label] = train_counts.get(label, 0) + 1
            # print(train_counts)

            weights = torch.where(torch.tensor(labels) == 0, 1 / train_counts[0], 1 / train_counts[1])
            # print(labels[:5], weights[:5])

            wr_sampler = WeightedRandomSampler(weights, int(len(train_dataset) * 1.5))

            train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=wr_sampler)

        else:
            print("No upsampling")
            train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        # Create DataLoaders
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        # Count labels in train_loader
        train_counts = {}
        for _, labels in train_loader:
            for label in labels:
                train_counts[label.item()] = train_counts.get(label.item(), 0) + 1

        # print(train_counts)

        # Reduce memory footprint
        dataset, train_dataset, val_dataset, test_dataset = None, None, None, None

        return train_loader, val_loader, test_loader

In [None]:
pip install noisereduce



In [None]:
import torch.optim as opt

# Static hyperparameters
EPOCHS = 20

# Learning rate scheduler
STEPS_PER_LR_DECAY = 20
LR_DECAY = 0.5

# Model parameters
DROPOUT = 0.5

# Training
LOSS_FN = nn.BCEWithLogitsLoss()



In [None]:
model = CNNModel(DROPOUT)

In [None]:
datapipeline = DataPipeline(test_size=0.15, val_size=0.15)
train_loader, val_loader, test_loader = datapipeline.create_dataloaders(batch_size=8)


Processing and loading dataset


100%|██████████| 842/842 [00:24<00:00, 33.93it/s]
100%|██████████| 2986/2986 [01:16<00:00, 38.84it/s]


Upsampling data


In [None]:
import torch.optim as opt
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Define fixed hyperparameters
batch_size = 16  # Choose a single batch size
learning_rate = 0.001  # Set the learning rate

print(f"\n🚀 Training with batch size: {batch_size}, learning rate: {learning_rate}")

# Initialize model
cnn_model = CNNModel()

# Choose optimizer (AdamW is better than Adam for weight decay)
optimizer = opt.AdamW(params=cnn_model.parameters(), lr=learning_rate, weight_decay=1e-4)

# Learning rate scheduler (Reduce LR if validation loss doesn’t improve)
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True)

# Create ModelHandler
model_handler = ModelHandler(model=cnn_model,
                             model_path="/content/drive/MyDrive/RespiraCheck/Cough Data",
                             optimizer=optimizer,
                             loss_function=LOSS_FN,
                             steps_per_decay=STEPS_PER_LR_DECAY,
                             lr_decay=LR_DECAY)

# Load dataset with the chosen batch size
datapipeline = DataPipeline(test_size=0.15, val_size=0.15)
train_loader, val_loader, test_loader = datapipeline.create_dataloaders(batch_size=batch_size)

# Early stopping setup
patience = 5
best_val_loss = float("inf")
epochs_since_improve = 0
best_model = None
best_acc = 0.0

# Training loop
for epoch in range(EPOCHS):
    print(f"\n🔄 Epoch {epoch+1}/{EPOCHS}")

    # Train
    training_results, validation_results = model_handler.train(train_loader=train_loader, epochs=1, model_name="CNN_EfficientNet")

    # Validate
    val_acc, val_loss = model_handler.validate(val_loader, {"batch_size": batch_size, "lr": learning_rate})

    # Scheduler step
    scheduler.step(val_loss)

    # Early stopping check
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_since_improve = 0
    else:
        epochs_since_improve += 1
        if epochs_since_improve >= patience:
            print("⏹️ Early stopping triggered!")
            break  # Stop training

    # Save the best model based on accuracy
    if val_acc > best_acc:
        best_acc = val_acc
        best_model = model_handler

    print(f"✅ Validation accuracy: {val_acc*100:.2f}% | Validation loss: {val_loss:.4f}")

# Final Testing
if best_model:
    test_acc = best_model.evaluate(test_loader)
    print(f"\n🎯 Test accuracy: {test_acc*100:.2f}% 🚀 Best model saved!")



🚀 Training with batch size: 16, learning rate: 0.001
Processing and loading dataset


100%|██████████| 842/842 [00:20<00:00, 41.34it/s]
100%|██████████| 2986/2986 [00:50<00:00, 58.90it/s]


Upsampling data

🔄 Epoch 1/20
0:
LR: 0.001
Loss - 0.69161 | Accuracy - 55.26%
VLoss - 0.72579 | VAccuracy - 36.01%

Validation accuracy: 36.06% | Validation loss: 0.7258
Best model saved at: /content/drive/MyDrive/RespiraCheck/Cough Data/model_batch_size:16_lr:0.001_1740873099.7350264.pth
✅ Validation accuracy: 36.06% | Validation loss: 0.7258

🔄 Epoch 2/20
0:
LR: 0.001
Loss - 0.67354 | Accuracy - 58.09%
VLoss - 0.80803 | VAccuracy - 29.41%

Validation accuracy: 29.44% | Validation loss: 0.8080
Best model saved at: /content/drive/MyDrive/RespiraCheck/Cough Data/model_batch_size:16_lr:0.001_1740873121.3861074.pth
✅ Validation accuracy: 29.44% | Validation loss: 0.8080

🔄 Epoch 3/20
0:
LR: 0.001
Loss - 0.66645 | Accuracy - 60.04%
VLoss - 0.67412 | VAccuracy - 56.75%

Validation accuracy: 56.79% | Validation loss: 0.6741
Best model saved at: /content/drive/MyDrive/RespiraCheck/Cough Data/model_batch_size:16_lr:0.001_1740873143.2397268.pth
✅ Validation accuracy: 56.79% | Validation loss: 0

In [None]:
audioprocessor = AudioProcessor()
spectroprocessor = SpectrogramProcessor()
datapipeline = DataPipeline(test_size=0.15, val_size=0.15,
                            audio_processor=audioprocessor,
                            spectrogram_processor=spectroprocessor,
                            metadata_df=None,
                            metadata_path="data/cough_data/metadata.csv")
train_loader, val_loader, test_loader = datapipeline.create_dataloaders(batch_size=32)

NameError: name 'AudioProcessor' is not defined