Imports:

In [None]:
import zipfile
import numpy as np
import torch
import torch.nn as nn
from torchsummary import summary
from torchvision import datasets, transforms as T
from efficientnet_pytorch import EfficientNet
import os
import torch.optim as optim
import pandas as pd
from torch.utils.data import Dataset
import copy
from sklearn.metrics import classification_report

Hyperparameters:

In [None]:
img_height = 224
img_width = img_height

batch_size = 32
n_epochs = 20
train_base = True
optimizer_name = "Adam"
learning_rate = 0.00005
num_classes = 10

#Checking the availability of a GPU
use_cuda = torch.cuda.is_available()

Image Transform Functions (For Later):

In [None]:
class MyAddGaussNoise(object):
    def __init__(self, input_size, mean=0.0, std=None, add_noise_probability=1.0):
        assert isinstance(input_size, (int, tuple))
        assert isinstance(mean, (int, float))
        assert isinstance(std, (int, float)) or std is None
        assert isinstance(add_noise_probability, (float))


        if isinstance(input_size, int):
            self.input_size = (input_size, input_size)
        else:
            assert len(input_size) == 2
            self.input_size = input_size

        self.mean = mean

        if std is not None:
            assert std > 0.0
            self.std = std
        else:
            self.std = std

        assert add_noise_probability > 0.0 and add_noise_probability <= 1.0
        self.add_noise_prob = add_noise_probability


    def __call__(self, spectrogram):
        if np.random.random() > self.add_noise_prob:
            return spectrogram

        # set some std value 
        min_pixel_value = np.min(spectrogram)
        if self.std is None:
            std_factor = 0.03     # factor number 
        std = np.abs(min_pixel_value*std_factor)

        # generate a white noise spectrogram
        gauss_mask = np.random.normal(self.mean, 
                                    std, 
                                    size=self.input_size).astype('float32')
        
        # add white noise to the sound spectrogram
        noisy_visual = spectrogram + gauss_mask

        return noisy_visual

class MyRightShift(object):
    def __init__(self, input_size, width_shift_range, shift_probability=1.0):
        assert isinstance(input_size, (int, tuple))
        assert isinstance(width_shift_range, (int, float))
        assert isinstance(shift_probability, (float))

        if isinstance(input_size, int):
            self.input_size = (input_size, input_size)
        else:
            assert len(input_size) == 2
            self.input_size = input_size

        if isinstance(width_shift_range, int):
            assert width_shift_range > 0
            assert width_shift_range <= self.input_size[1]
            self.width_shift_range = width_shift_range
        else:
            assert width_shift_range > 0.0
            assert width_shift_range <= 1.0
            self.width_shift_range = int(width_shift_range * self.input_size[1])
                        
        assert shift_probability > 0.0 and shift_probability <= 1.0
        self.shift_prob = shift_probability

    def __call__(self, image):
        if np.random.random() > self.shift_prob:
            return image

        # create a new array filled with the min value
        shifted_image= np.full(self.input_size, np.min(image), dtype='float32')

        # randomly choose a start postion
        rand_position = np.random.randint(1, self.width_shift_range)

        # shift the image
        shifted_image[:,rand_position:] = copy.deepcopy(image[:,:-rand_position])

        return shifted_image

#applying required transformations on the dataset
img_transforms = {
    'train':
    T.Compose([
        MyAddGaussNoise(input_size = img_height,add_noise_probability=0.5),
        MyRightShift(input_size = img_height, width_shift_range=0.9, shift_probability=0.5),
        T.ToTensor()
        ]),

    'valid':
    T.Compose([
        T.ToTensor()
        ])
     }

Custom Dataset Object (UrbanSound8kDataset):

In [None]:
class UrbanSound8kDataset(Dataset):
    def __init__(self, featuresdf, transform=None):
        assert isinstance(featuresdf, pd.DataFrame)
        assert len(featuresdf.columns) == 3
        
        self.transform = transform

        self.featuresdf = featuresdf

    def __len__(self):
        return len(self.featuresdf)

    def __getitem__(self, index):
        if torch.is_tensor(index):
            index = index.tolist()

        cochleagram, label, fold = self.featuresdf.iloc[index]

        if self.transform is not None:
           cochleagram = self.transform(cochleagram)

        if not torch.is_tensor(cochleagram):
            cochleagram = torch.as_tensor(cochleagram.astype('float'))

        label = torch.as_tensor(np.array(label)).type(torch.LongTensor)

        cochleagram = cochleagram.expand(3,-1,-1).float()

        return cochleagram, label

Initialize Model Architecture Function:

In [None]:
#importing the pretrained EfficientNet model
def init_model():
    model_transfer = EfficientNet.from_pretrained('efficientnet-b0')

    # Unfreeze weights
    for param in model_transfer.parameters():
        param.requires_grad = train_base
    in_features = model_transfer._fc.in_features

    # Defining Dense top layers after the convolutional layers
    model_transfer._fc = nn.Sequential(   
        nn.Linear(in_features, num_classes),
        )

    # selecting loss function
    criterion_transfer = nn.CrossEntropyLoss()

    #using Adam classifier
    if optimizer_name == 'Adam':
        optimizer_transfer = optim.Adam(model_transfer.parameters(), lr=learning_rate)

    if use_cuda:
        model_transfer = model_transfer.cuda()

    return model_transfer, optimizer_transfer, criterion_transfer

Train Loop Function:

In [None]:
# Creating the function for training
def train_model(n_epochs, loaders, model, optimizer, criterion, fold_k, data_type, model_name, save_path):
    """returns trained model"""
    # initialize tracker for minimum validation loss
    valid_loss_min = np.Inf
    epochs = [] 
    trainingloss = []
    validationloss = []
    valaccuracy = []
    learningrates = []

    for epoch in range(1, n_epochs+1):
        # initialize the variables to monitor training and validation loss
        train_loss = 0.0
        valid_loss = 0.0
        correct = 0.0
        total = 0.0

        preds = []
        targets = []
        
        ###################
        # training the model #
        ###################
        model.train()
        for batch_idx, (data, target) in enumerate(loaders['train']):     
            if use_cuda:
                data, target = data.cuda(), target.cuda()   
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
           
            train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.data - train_loss))
        
    
        ######################    
        # validating the model #
        ######################
        model.eval()
        for batch_idx, (data, target) in enumerate(loaders['valid']):
            if use_cuda:
                data, target = data.cuda(), target.cuda()

            output = model(data)
            loss = criterion(output, target)
            valid_loss = valid_loss + ((1 / (batch_idx + 1)) * (loss.data - valid_loss))
            pred = output.data.max(1,keepdim=True)[1]
            preds.append(pred)
            targets.append(target)
            # compare predictions
            correct += np.sum(np.squeeze(pred.eq(target.data.view_as(pred))).cpu().numpy())
            total += data.size(0)
        
        train_loss = train_loss/len(train_ds)
        valid_loss = valid_loss/len(val_ds)
        valid_acc = correct / total
        current_lr = optimizer.param_groups[0]['lr']

        trainingloss.append(train_loss)
        validationloss.append(valid_loss)
        valaccuracy.append(valid_acc)
        epochs.append(epoch)
        learningrates.append(current_lr)

        # printing training/validation statistics 
        print('Fold: {} \tEpoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f} \tValidation Accuracy: {:.6f}'.format(
            fold_k,
            epoch, 
            train_loss,
            valid_loss,
            valid_acc
            ))
        
        ## saving the model if validation loss has decreased
        if valid_loss < valid_loss_min:
            torch.save(model.state_dict(), save_path)
            
            valid_loss_min = valid_loss
        
    #GATHER TRAINING RESULTS IN DATAFRAME
    validationloss = [i.cpu().tolist() for i in validationloss]
    trainingloss = [i.cpu().tolist() for i in trainingloss]
    zipped_data = list(zip(epochs, learningrates, trainingloss, validationloss, valaccuracy))
    train_report = pd.DataFrame(zipped_data,columns=['Epoch','Learning Rate', 'Training Loss','Validation Loss','Validation Accuracy'])
    train_report['Test Fold'] = fold_k
    train_report['Model'] = model_name
    train_report['Data Representation'] = data_type
    train_report = train_report[['Model','Data Representation', 'Test Fold', 'Epoch','Learning Rate','Training Loss','Validation Loss','Validation Accuracy']]
            
    # return trained model
    return model, train_report

Training Script (Load Data From Disk, Train on All 10 Folds, Evaluate on Test Data, Generate Training and Test Metric Reports):

In [None]:
# Representation Name
data_type = 'approxGT'

# Model Name
model_name = 'EfficientNet-b0'

# Perform 10 fold validation
for fold_k in range(1,num_classes+1):
    featuresdf = pd.read_pickle('approxGT_224.pkl')
    model_transfer, optimizer_transfer, criterion_transfer = init_model()
    train_df = featuresdf[featuresdf['fold'] != fold_k]
    val_df = featuresdf[featuresdf['fold'] == fold_k]
    test_fold = fold_k

    train_ds = UrbanSound8kDataset(train_df, transform=img_transforms['train'])
    val_ds = UrbanSound8kDataset(val_df, transform=img_transforms['valid'])

    #Creating loaders for the dataset
    loaders_transfer={
        'train':torch.utils.data.DataLoader(train_ds,batch_size,shuffle=True),
        'valid':torch.utils.data.DataLoader(val_ds,batch_size,shuffle=False)
    }

    del featuresdf, train_df, val_df

    if fold_k == 1:
        #TRAIN THE MODEL AND SAVE RESULTS TO RESULTS_DF
        train_report = train_model(n_epochs, loaders_transfer, model_transfer, optimizer_transfer, criterion_transfer, fold_k, data_type, model_name, model_name + '_' + data_type + '_fold' + str(fold_k) + '.pt')[1]
    else:
        #TRAIN THE MODEL AND ADD DATA TO RESULTS DF
        train_report_temp = train_model(n_epochs, loaders_transfer, model_transfer, optimizer_transfer, criterion_transfer, fold_k, data_type, model_name, model_name + '_' + data_type + '_fold' + str(fold_k) + '.pt')[1]
        train_report = pd.concat([train_report, train_report_temp])
    
    del model_transfer, optimizer_transfer, criterion_transfer
    
    #RELOAD FINAL CHECKPOINTED MODEL IN FOR VALIDATION RESULTS
    model_transfer = init_model()[0]
    model_transfer.load_state_dict(torch.load(model_name + '_' + data_type + '_fold' + str(fold_k) + '.pt'))
    model_transfer.eval()

    #PERFORM FINAL INFERENCE ON VALIDATION SET
    preds = []
    targets = []

    class_names = ['air_conditioner', 'car_horn', 'children_playing', 'dog_bark', 'drilling', 'engine_idling', 'gun_shot', 'jackhammer', 'siren', 'street_music']

    for batch_idx, (data, target) in enumerate(loaders_transfer['valid']):
        data, target = data.cuda(), target.cuda()
        output = model_transfer(data)
        prediction = torch.argmax(output, dim=1)
        preds.append(prediction.cpu().numpy())
        targets.append(target.cpu().numpy())

    targets = np.concatenate(targets)
    preds = np.concatenate(preds)

    metrics_report_dict = classification_report(targets, preds, target_names=class_names, output_dict=True)
    fold_acc_dict = {'Test Fold' : fold_k , 'Accuracy' : metrics_report_dict['accuracy']}
    del metrics_report_dict['accuracy']

    if fold_k == 1:
        metrics_report = pd.DataFrame(metrics_report_dict).rename_axis('metric').reset_index()
        fold_accuracies = pd.DataFrame(fold_acc_dict,index=[0])
        metrics_report.insert(0,'Test Fold',fold_k)
        metrics_report.insert(0,'Data Representation', data_type)
        metrics_report.insert(0,'Model', model_name)
    else:
        metrics_report_temp = pd.DataFrame(metrics_report_dict).rename_axis('metric').reset_index()
        fold_accuracies_temp = pd.DataFrame(fold_acc_dict,index=[0])
        metrics_report_temp.insert(0,'Test Fold',fold_k)
        metrics_report_temp.insert(0,'Data Representation', data_type)
        metrics_report_temp.insert(0,'Model', model_name)
        metrics_report = pd.concat([metrics_report, metrics_report_temp])
        fold_accuracies = pd.concat([fold_accuracies, fold_accuracies_temp])

    del data, target, output, prediction, targets, preds, metrics_report_dict, fold_acc_dict

train_report.to_csv('results/TrainReportbyEpoch_' + model_name + '_' + data_type + '.csv', index=False)
metrics_report.to_csv('results/TestMetricsbyClass_' + model_name + '_' + data_type + '.csv', index=False)
fold_accuracies.to_csv('results/FoldAccuracies_' + model_name + '_' + data_type + '.csv', index=False)