In [1]:
import torch
import matplotlib
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import SubsetRandomSampler, DataLoader, Subset, Dataset
from torchvision.transforms import v2
from PIL import Image
import torch.nn as nn
from torchvision.models import resnet34, resnet18, resnet50, alexnet, googlenet
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import time 
from tempfile import TemporaryDirectory
import os
import warnings
from torch.optim.lr_scheduler import CosineAnnealingLR, StepLR, ReduceLROnPlateau
import random
from torchsummary import summary
import numpy as np
import random
import torchvision.utils as utils
import math
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.preprocessing import label_binarize
from sklearn.metrics import precision_recall_curve
import wave
import pylab
from scipy import signal
from scipy.io import wavfile
from matplotlib.colors import Normalize
import itertools
from scipy.signal import spectrogram, find_peaks
from torch.optim.lr_scheduler import CosineAnnealingLR
import os
import torchvision.transforms.functional as TF
from torchvision.transforms.functional import invert
warnings.filterwarnings("ignore", category=UserWarning)
torch.backends.cudnn.benchmark = True

### Dataset

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [3]:
num_classes = 2

In [4]:
model_name = "CustomModel"
explained_variance = 0.8
f_selected = 6500
batch_size = 30

In [5]:
data_dir = "./data/"
minor_folder = os.path.join(data_dir, "Minor")
major_folder = os.path.join(data_dir, "Major")

In [6]:
prefix = f"PCA_f{f_selected}_expVar{explained_variance}"

In [7]:
folder_dir = "./data/audio-images/"
matching_directory = [d for d in os.listdir(folder_dir) if os.path.isdir(os.path.join(folder_dir, d)) and d.startswith(prefix)]

In [8]:
images_dir = os.path.join(data_dir, 'audio-images', matching_directory[0])
if not os.path.exists(images_dir):
    print(f'{images_dir} is not present.')
else:
    print(f'{images_dir} already present.')

./data/audio-images/PCA_f6500_expVar0.8_ncompMinor36_ncompMajor36 already present.


In [9]:
dataset = datasets.ImageFolder(root=images_dir)

In [10]:
os.listdir(images_dir)

['test', 'val', 'train']

In [11]:
class_names = ['Major', 'Minor']

In [12]:
train_path = os.path.join(images_dir, "train"); val_path = os.path.join(images_dir, "val"); test_path = os.path.join(images_dir, "test")
train_dataset = datasets.ImageFolder(train_path)
val_dataset = datasets.ImageFolder(val_path)
test_dataset = datasets.ImageFolder(test_path)

In [13]:
custom_transformations_train = [
    transforms.Compose([v2.Resize((224, 224), antialias='True'), transforms.ToTensor()]),
    transforms.Compose([v2.Resize((224, 224), antialias='True'), transforms.ToTensor(),  transforms.Grayscale(num_output_channels=3)]),
    transforms.Compose([v2.Resize((224, 224), antialias='True'), invert, transforms.ToTensor()]),
    transforms.Compose([ v2.Resize((224, 224), antialias='True'), v2.RandomErasing(p=1, scale=(0.02, 0.2), ratio=(0.3, 3.3), value=255), transforms.ToTensor()]),
]

custom_transformations_val_test = [
    transforms.Compose([v2.Resize((224, 224), antialias='True'), transforms.ToTensor()]),#, v2.Normalize(mean, stdev)]),
]

In [14]:
class AugmentedDataset(Dataset):
    def __init__(self, augmented_data, transform=None):
        self.augmented_data = augmented_data
        self.transform = transform

    def __len__(self):
        return len(self.augmented_data)

    def __getitem__(self, idx):
        img, target = self.augmented_data[idx]

        if self.transform:
            img = self.transform(img)

        return img, target

In [15]:
def transform_dataset(dataset, custom_transformations):
    augmented_data = []
    for i in range(len(dataset)):
        original_img, target = dataset[i] #image (not tensor)
        for idx, augment_transform in enumerate(custom_transformations):
            augmented_img = augment_transform(original_img) #output is a tensor
            augmented_img = transforms.ToPILImage()(augmented_img)
            augmented_data.append((augmented_img, target))
    return augmented_data

In [16]:
augmented_train_dataset = transform_dataset(train_dataset, custom_transformations_train)

In [None]:
augmented_val_dataset = transform_dataset(val_dataset, custom_transformations_val_test)
augmented_test_dataset = transform_dataset(test_dataset, custom_transformations_val_test)

In [None]:
augmented_train_dataset = AugmentedDataset(augmented_train_dataset, transform= transforms.ToTensor())
augmented_val_dataset = AugmentedDataset(augmented_val_dataset, transform= transforms.ToTensor())
augmented_test_dataset = AugmentedDataset(augmented_test_dataset, transform= transforms.ToTensor())

In [None]:
train_loader = DataLoader(augmented_train_dataset, batch_size=batch_size, num_workers=2, pin_memory=True, shuffle=True)
val_loader = DataLoader(augmented_val_dataset, batch_size=batch_size, num_workers=2, pin_memory=True, shuffle=True)
test_loader = DataLoader(augmented_test_dataset, batch_size=batch_size, num_workers=2, pin_memory=True, shuffle=True)

### Model

In [None]:
def plot_metrics(train_accuracy_list, train_loss_list, val_accuracy_list, val_loss_list, learning_rate_list, model_name):

    results_dict = f'./results/{model_name}_batchsize{batch_size}_PCA_f{f_selected}_expVar{explained_variance}'
    if not os.path.exists(results_dict):
        os.makedirs(results_dict)

    fig, axs = plt.subplots(1, 2, figsize=(12, 5))
    
    axs[0].plot(train_loss_list, label='Training Loss')
    axs[0].plot(val_loss_list, label='Validation Loss')
    axs[0].set_title('Loss Curves')
    axs[0].set_xlabel('Epochs')
    axs[0].set_ylabel('Loss')

    axs_lr1 = axs[0].twinx()
    axs_lr1.semilogy(np.arange(len(learning_rate_list)), learning_rate_list, 'r--', label='Learning Rate')
    axs_lr1.set_ylabel('Learning Rate')
    handles1, labels1 = axs[0].get_legend_handles_labels()
    handles_lr1, labels_lr1 = axs_lr1.get_legend_handles_labels()
    handles1.extend(handles_lr1)
    labels1.extend(labels_lr1)
    
    axs[1].plot(train_accuracy_list, label='Training Accuracy')
    axs[1].plot(val_accuracy_list, label='Validation Accuracy')
    axs[1].set_title('Accuracy Curves')
    axs[1].set_xlabel('Epochs')
    axs[1].set_ylabel('Accuracy')
    axs[1].set_ylim([0, 110])

    axs_lr2 = axs[1].twinx()
    axs_lr2.semilogy(np.arange(len(learning_rate_list)), learning_rate_list, 'r--', label='Learning Rate')
    axs_lr2.set_ylabel('Learning Rate')
    handles2, labels2 = axs[1].get_legend_handles_labels()
    handles_lr2, labels_lr2 = axs_lr2.get_legend_handles_labels()
    handles2.extend(handles_lr2)
    labels2.extend(labels_lr2)

    axs[0].legend(handles1, labels1, loc='upper center', bbox_to_anchor=(0.5, -0.15), fancybox=True, shadow=True, ncol=3)
    axs[1].legend(handles2, labels2, loc='upper center', bbox_to_anchor=(0.5, -0.15), fancybox=True, shadow=True, ncol=3)

    fig.suptitle(fr'Learning Curves - {model_name}, $f_{{sel}}: {f_selected}$ Hz, Batch size: {batch_size}, Explained variance: {explained_variance*100}%', fontsize=14)

    plt.tight_layout()
    plt.savefig(f'{results_dict}/learningCurves_{model_name}_batchsize{batch_size}_PCA_f{f_selected}_expVar{explained_variance}.png')
    plt.show()
    

In [None]:
def evaluate_model(model, loader, criterion, title): 
    results_dict = f'./results/{model_name}_batchsize{batch_size}_PCA_f{f_selected}_expVar{explained_variance}'
    fig, axs = plt.subplots(1, 2, figsize=(12, 5)) 
    all_preds = []; all_labels = []; all_probs = []
    model.eval()
    running_loss = 0.0; running_corrects = 0
    for inputs, labels in loader:
        inputs = inputs.permute(0,1,3,2)
        inputs = inputs.to(device); labels = labels.to(device)
        loss, preds, outputs = get_loss_preds(model, criterion, inputs, labels) 
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        all_preds.extend(preds.detach().cpu().numpy())
        all_labels.extend(labels.detach().cpu().numpy())  
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
        all_probs.append(probabilities.detach().cpu().numpy())
    
    loss = running_loss / len(loader.dataset)
    acc = 100*running_corrects.double().item() /len(loader.dataset)
    print(f'Loss: {loss}, Accuracy: {acc}')
    
    conf_matrix = confusion_matrix(all_labels, all_preds, normalize='true')

    sns.heatmap(conf_matrix,  cmap="YlGnBu", annot=True, cbar=False,
                xticklabels=class_names, yticklabels=class_names, ax = axs[0])
    axs[0].set_xlabel('Predicted labels')
    axs[0].set_ylabel('True labels')
    axs[0].set_title(f'Confusion Matrix')
    #plt.show()
    
    binarized_labels = all_labels
    all_probs = np.concatenate(all_probs)

    precision, recall, _ = precision_recall_curve(binarized_labels, all_probs[:, 1])
    axs[1].plot(recall, precision, lw=2, label = 'Precision-Recall curve')

    axs[1].set_xlabel('Recall')
    axs[1].set_ylabel('Precision')
    axs[1].set_title('Precision-Recall Curve')
    axs[1].legend()
    plt.grid(True)
    plt.suptitle( title + fr' {model_name}, $f_{{sel}}: {f_selected}$ Hz, Batch size: {batch_size}, Explained variance: {explained_variance*100}%' "\n" 
                    fr'Test Loss: {loss:.3f}, Test Accuracy: {acc:.3f}%', fontsize=14)
    plt.tight_layout()
    plt.savefig(f'{results_dict}/{title}_confusion_PRC_{model_name}_batchsize{batch_size}_PCA_f{f_selected}_expVar{explained_variance}.png')
    plt.show()

In [None]:
def get_loss_preds(model, criterion, inputs, labels):
    outputs = model(inputs)
    _, preds = torch.max(outputs, 1)
    loss = criterion(outputs, labels)
    return loss, preds, outputs

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25, model_name = "CustomModel"):
    since = time.time()

    checkpoint_dir_prefix = f'./checkpoints/{model_name}_batchsize{batch_size}_PCA_f{f_selected}_expVar{explained_variance}'
    checkpoint_dir = f'./checkpoints/{model_name}_batchsize{batch_size}_PCA_f{f_selected}_expVar{explained_variance}/checkpoint.pth'
    
    if not os.path.exists(checkpoint_dir_prefix):
        os.makedirs(checkpoint_dir_prefix)
    
    files_to_delete = [f for f in os.listdir(checkpoint_dir_prefix) if os.path.isfile(os.path.join(checkpoint_dir_prefix, f))]

    if os.path.exists(checkpoint_dir_prefix) and os.path.getsize(checkpoint_dir_prefix) > 0:
        if (files_to_delete):
            file_path = os.path.join(checkpoint_dir_prefix, 'checkpoint.pth')
            os.remove(file_path)
    
    phases = ["train", "val"]; loaders = {"train":train_loader, "val":val_loader}
    train_accuracy_list = []; val_accuracy_list = []; train_loss_list = []; val_loss_list = []; learning_rate_list = []
    
    layer_adjustment_epochs = []
    
    for phase in phases:
        running_loss = 0.0; running_corrects = 0
        
        model.eval()
        
        for inputs, labels in loaders[phase]:
            inputs = inputs.permute(0,1,3,2)
            inputs = inputs.to(device); labels = labels.to(device)
            loss, preds, outputs = get_loss_preds(model, criterion, inputs, labels)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss /len(loaders[phase].dataset)
        epoch_acc = 100*running_corrects.double().item() /len(loaders[phase].dataset)

        if (phase=="train"):
            train_accuracy_list.append(epoch_acc)
            train_loss_list.append(epoch_loss)
        elif (phase=="val"):
            val_accuracy_list.append(epoch_acc)
            val_loss_list.append(epoch_loss)
            best_acc = epoch_acc
    learning_rate_list.append(optimizer_ft.param_groups[0]["lr"])
    print(f'Epoch {0}/{num_epochs}')
    print('-' * 10)
    print(f'Training Loss: {train_loss_list[0]:.4f}, Validation Loss: {val_loss_list[0]:.4f}, Training Acc: {train_accuracy_list[0]:.4f}, Validation Acc: {val_accuracy_list[0]:.4f}, Learning Rate: {learning_rate_list[0]}')
    
    print("Model saved.")
    torch.save({
    'epoch': 0,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    }, checkpoint_dir)
    
    for epoch in range(num_epochs):
        for phase in phases:
            running_loss = 0.0; running_corrects = 0
            if (phase=="train"):
                model.train()
            elif (phase=="val"):
                model.eval()
            for inputs, labels in loaders[phase]:
                inputs = inputs.permute(0,1,3,2)
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
                loss, preds, outputs = get_loss_preds(model, criterion, inputs, labels)
                if (phase=="train"):
                    loss.backward()
                    optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            epoch_loss = running_loss / len(loaders[phase].dataset)
            epoch_acc = 100*running_corrects.double().item() /len(loaders[phase].dataset)
            
            if (phase=="train"):
                train_accuracy_list.append(epoch_acc)
                train_loss_list.append(epoch_loss)
            elif (phase=="val"):
                val_acc = epoch_acc
                val_accuracy_list.append(epoch_acc)
                val_loss_list.append(epoch_loss)   
        scheduler.step()
        
        learning_rate_list.append(optimizer_ft.param_groups[0]["lr"])
        
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        print(f'Training Loss: {train_loss_list[epoch+1]:.4f}, Validation Loss: {val_loss_list[epoch+1]:.4f}, Training Acc: {train_accuracy_list[epoch+1]:.4f}, Validation Acc: {val_accuracy_list[epoch+1]:.4f}, Learning Rate: {learning_rate_list[epoch+1]}')
        
        
        if val_acc > best_acc:
            best_acc = val_acc
            print("Model saved.")
            torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            }, checkpoint_dir)

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')
    return model, train_accuracy_list, train_loss_list, val_accuracy_list, val_loss_list, learning_rate_list

In [None]:
class CustomModel(nn.Module):
    def __init__(self, num_classes, IMAGE_HEIGHT, IMAGE_WIDTH):
        super(CustomModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=2, padding=1)
        self.relu = nn.ReLU()
        self.batchnorm1 = nn.BatchNorm2d(32)
        self.maxpool1 = nn.MaxPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.batchnorm2 = nn.BatchNorm2d(64)
        self.maxpool2 = nn.MaxPool2d(kernel_size=2)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.batchnorm3 = nn.BatchNorm2d(128)
        self.maxpool3 = nn.MaxPool2d(kernel_size=2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(128 * (IMAGE_HEIGHT // 16) * (IMAGE_WIDTH // 16), 256)
        self.dropout = nn.Dropout(0.2)
        self.batchnorm4 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.batchnorm1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.batchnorm2(x)
        x = self.maxpool2(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.batchnorm3(x)
        x = self.maxpool3(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.batchnorm4(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
if (model_name == "ResNet18"):
    model = resnet18(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    torch.nn.init.xavier_uniform_(model.fc.weight)
elif (model_name == "ResNet34"):
    model = resnet34(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    torch.nn.init.xavier_uniform_(model.fc.weight)
elif (model_name == "CustomModel"):
    model = CustomModel(num_classes=num_classes, IMAGE_HEIGHT =224, IMAGE_WIDTH = 224)
model = model.to(device)

In [None]:
summary(model, input_size=(3, 224, 224))

In [None]:
lr, weight_decay, epochs = 1e-4, 1e-3, 20
optimizer_ft = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
scheduler = StepLR(optimizer_ft, step_size=5, gamma=0.1)
criterion = nn.CrossEntropyLoss()

In [None]:
model, train_accuracy_list, train_loss_list, val_accuracy_list, val_loss_list, learning_rate_list = train_model(model, criterion, optimizer_ft, scheduler, num_epochs=epochs, model_name = model_name)

In [None]:
plot_metrics(train_accuracy_list, train_loss_list, val_accuracy_list, val_loss_list, learning_rate_list, model_name)

In [None]:
checkpoint_dir = f'./checkpoints/{model_name}_batchsize{batch_size}_PCA_f{f_selected}_expVar{explained_variance}/checkpoint.pth'

if os.path.exists(checkpoint_dir) and os.path.getsize(checkpoint_dir) > 0:
    if (model_name == "ResNet18"):
        model = resnet18(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
        torch.nn.init.xavier_uniform_(model.fc.weight)
    elif (model_name == "ResNet34"):
        model = resnet34(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
        torch.nn.init.xavier_uniform_(model.fc.weight)
    elif (model_name == "CustomModel"):
        model = CustomModel(num_classes=num_classes, IMAGE_HEIGHT =224, IMAGE_WIDTH = 224)
    model = model.to(device)

    title = 'Raw'
    evaluate_model(model, test_loader, criterion, title)

In [None]:
checkpoint = torch.load(checkpoint_dir)
model.load_state_dict(checkpoint['model_state_dict'])
title = "Fine-tuned"
evaluate_model(model, test_loader, criterion, title)