<a href="https://colab.research.google.com/github/isaacgoff/DL_Project_2022/blob/master/Isaac_Deep_Learning_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Mount Google Drive (Must allow access manually when prompted)
from google.colab import drive
drive.mount('/content/drive')

Models Class

In [None]:
from torch import nn
from torch.nn.functional import softmax
import torchvision.models as models

# List of models to choose from. Currently in list:
#   * Basic 4 layer CNN
#   * AlexNet
#   * VGG16
#   * ResNet18
class Models():
    def __init__(self, model_name: str):
        self.model_list = ['Basic_4_Layer_CNN', 'Alex_Net', 'VGG_16', 'Res_Net_18']
        self.input_model = model_name
        self.num_output_classes = 11
        if self.input_model not in self.model_list:
            raise ValueError('Model list does not contain model "%s"' %(model_name))
    
    def choose_model(self):
        if self.input_model == 'Basic_4_Layer_CNN':
            model = Basic_4_Layer_CNN()
        elif self.input_model == 'Alex_Net':
            model = models.alexnet(False, False)
            model.classifier[6] = nn.Linear(in_features=4096, out_features=self.num_output_classes, bias=True)
            model.features[0] = nn.Conv2d(1, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
        elif self.input_model == 'VGG_16':
            model = models.vgg16(False, False)
            model.classifier[6] = nn.Linear(in_features=4096, out_features=self.num_output_classes, bias=True)
            model.features[0] = nn.Conv2d(1, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
        elif self.input_model == 'Res_Net_18':
            model = models.resnet18(False, False)
            model.fc = nn.Linear(in_features=512, out_features=self.num_output_classes, bias=True)
            model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        return model


class Basic_4_Layer_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(                                       # Dimension starts with 1 of 128 x 128
            # larger kernel CNN layers
            nn.Conv2d(1, 6, kernel_size=5, padding=2), nn.ReLU(),       # Dimension becomes 6 of 128 x 128
            nn.AvgPool2d(kernel_size=2, stride=2),                      # Dimension now 6 of 64 x 64
            nn.Conv2d(6, 16, kernel_size=5, padding=2), nn.ReLU(),      # Dimension now 16 of 64 x 64
            nn.AvgPool2d(kernel_size=2, stride=2),                      # Dimension now 16 of 32 x 32
            # smaller kernel CNN layers
            nn.Conv2d(16, 24, kernel_size=3, padding=1), nn.ReLU(),     # Dimension now 24 of 32 x 32
            nn.AvgPool2d(kernel_size=2, stride=2),                      # Dimension now 24 of 16 x 16
            nn.Conv2d(24, 30, kernel_size=3, padding=1), nn.ReLU(),     # Dimension now 30 of 16 x 16
            nn.AvgPool2d(kernel_size=2, stride=2),                      # Dimension now 30 of 8 x 8
            # fully connected layers
            nn.Flatten(),
            nn.Linear(30 * 8 * 8, 200), nn.ReLU(),
            nn.Linear(200, 100), nn.ReLU(),
            nn.Linear(100, 11)                                          # Because we have 11 output classes
        )

    def forward(self, x):
        return softmax(self.net(x), dim=-1)

Create Dataset Module

In [None]:
import os
import torch
import librosa
import numpy as np
import json
from torch.utils.data import Dataset
import torch.nn.functional as F


def create_dataset(audio_input_path, json_path):
    # Load JSON file data
    file = open(json_path, 'rb')
    metadata = json.load(file)
    file.close()
    # Create list of audio files
    sample_list = os.listdir(audio_input_path)

    data = []
    labels = []
    # Loop through files and store spectrogram and instrument family for each sample
    for file in sample_list:
        labels.append(metadata[file[:-4]]['instrument_family'])
        # load the waveform y and sampling rate sr
        y, sr = librosa.load(f'{audio_input_path}{file}', sr=None)
        # convert to 2 dimensional spectogram format
        spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=8000, hop_length=502)
        # Convert raw power to dB
        S_dB = librosa.power_to_db(spectrogram, ref=np.max)
        data.append(S_dB)

    data_np = torch.tensor(np.stack(data))
    # labels = F.one_hot(torch.tensor(np.stack(labels)), num_classes=11)
    labels = F.one_hot(torch.tensor(np.stack(labels)), num_classes=11).type(torch.float32)
    return AudioSpectogramDataset(data_np, labels)


class AudioSpectogramDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, index):
        return self.data[index], self.labels[index]


Training Module

In [None]:
import argparse
import torch
from torch import nn
from datetime import datetime
from torch.utils.data import DataLoader
from copy import deepcopy


def main():
    parser = argparse.ArgumentParser(description='Train the individual Transformer model')
    parser.add_argument('-f')  # Required for argument parser to work in Colab
    parser.add_argument('--train_folder', type=str, default='debug/')
    parser.add_argument('--val_folder', type=str, default='debug/')
    parser.add_argument('--model', type=str, default='Res_Net_18')
    parser.add_argument('--batch_size', type=int, default=128)
    parser.add_argument('--lr', type=float, default=.01)
    parser.add_argument('--num_epochs', type=int, default=20)
    parser.add_argument('--status_interval', type=int, default=1)
    parser.add_argument('--model_name', type=str, default='unspecified')
    parser.add_argument('--save_model', type=str, default='False')
    args = parser.parse_args()

    if args.save_model.lower() == 'true':
        save_trained_model = True
    else:
        save_trained_model = False

    drive_path = '/content/drive/MyDrive/DL_data/'
    json_path_tng = f'{drive_path}nsynth-train/examples.json'
    json_path_val = f'{drive_path}nsynth-valid/examples.json'
    audio_input_path_tng = f'{drive_path}nsynth-train/{args.train_folder}'
    audio_input_path_val = f'{drive_path}nsynth-valid/{args.val_folder}'

    # Select GPU for runtime if available
    if not torch.cuda.is_available():
        device = torch.device("cpu")
        print('No GPU selected')
    else:
        device = torch.device("cuda")
        print(torch.cuda.get_device_name(device))

    start = datetime.now()
    # Create datasets
    tng_dataset = create_dataset(audio_input_path_tng, json_path_tng)
    val_dataset = create_dataset(audio_input_path_val, json_path_val)

    # Create Data Loaders
    tng_dataloader = DataLoader(tng_dataset, batch_size=args.batch_size, shuffle=False)
    val_dataloader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False)

    print(f'\nDatasets created in {datetime.now()-start}')

    # Load model
    net = Models(args.model).choose_model().to(device)

    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)

    net.apply(init_weights)

    optimizer = torch.optim.SGD(net.parameters(), lr=args.lr)
    loss = nn.CrossEntropyLoss()

    epoch = 0
    epoch_results =[]
    for i in range(args.num_epochs):
        epoch_tng_loss = 0
        epoch_tng_acc = 0
        epoch_val_loss = 0
        epoch_val_acc = 0
        epoch_result = {'epoch': epoch}

        # Training Loop
        net.train()
        n = 0
        # print(f'\n*** TRAINING LOOP ***\n')
        for (img_batch, label_batch) in tng_dataloader:
            # print(img_batch.shape)

            optimizer.zero_grad()
            img_batch = img_batch.to(device)
            label_batch = label_batch.to(device)
            # print(f'img_batch:\n{img_batch}\nlabel_batch ({label_batch.shape}):\n{label_batch}')

            img_batch = img_batch.reshape(img_batch.shape[0], 1, img_batch.shape[1], img_batch.shape[2])
            # print(f'img_batch shape: {img_batch.shape}')
            predicted_labels = net(img_batch)
            # print(f'predicted_labels ({predicted_labels.shape}): {predicted_labels}')

            tng_loss = loss(predicted_labels, label_batch)
            tng_loss.backward()
            optimizer.step()
            epoch_tng_loss += float(tng_loss.detach().item())
            with torch.no_grad():
                epoch_tng_acc += (predicted_labels.argmax(axis=1) == label_batch.argmax(axis=1)).sum().item()
            n += len(label_batch)

        # print(f'\nn = {n}')
        epoch_tng_loss /= len(tng_dataloader)
        epoch_tng_acc /= n
        epoch_result['tng_loss'] = epoch_tng_loss
        epoch_result['tng_acc'] = epoch_tng_acc

        # Validation Loop
        # print(f'\n*** VALIDATION LOOP ***\n')
        with torch.no_grad():
            net.eval()
            n = 0
            confusion_matrix = torch.zeros(11, 11)
            for (img_batch, label_batch) in val_dataloader:
                img_batch = img_batch.to(device)
                label_batch = label_batch.to(device)
                # print(f'img_batch:\n{img_batch}\nlabel_batch:\n{label_batch}')

                img_batch = img_batch.reshape(img_batch.shape[0], 1, img_batch.shape[1], img_batch.shape[2])
                predicted_labels = net(img_batch)
                # print(f'predicted_labels: {predicted_labels}')

                val_loss = loss(predicted_labels, label_batch)
                epoch_val_loss += float(val_loss.item())
                epoch_val_acc += (predicted_labels.argmax(axis=1) == label_batch.argmax(axis=1)).sum().item()
                n += len(label_batch)

                # calculate confusion matrix elements
                for j in range(len(label_batch)):
                    confusion_matrix[torch.argmax(label_batch[j, :])][torch.argmax(predicted_labels[j, :])] += 1

            # print(f'\nn = {n}')
            epoch_val_loss /= len(val_dataloader)
            epoch_val_acc /= n
            epoch_result['val_loss'] = epoch_val_loss
            epoch_result['val_acc'] = epoch_val_acc
            label_counts = torch.sum(confusion_matrix, dim=1).reshape(len(confusion_matrix), 1)
            confusion_matrix /= label_counts

        epoch_results.append(epoch_result)
        if epoch % args.status_interval == 0:
            print(f'epoch {epoch} completed: Training Loss = {epoch_tng_loss} //'
                  f' Training Score = {epoch_tng_acc} // Validation Score = {epoch_val_acc}')

        # Establish training cutoff criteria
        if epoch == 0:
            max_val_acc = epoch_val_acc
            best_model_state = deepcopy(net.state_dict())
            best_confusion_matrix = confusion_matrix
        elif epoch_val_acc > max_val_acc:
            # print(f'new minimum loss achieved at epoch {epoch}', file=output_file)
            max_val_acc = epoch_val_acc
            best_model_state = deepcopy(net.state_dict())  # Save state of model with minimum validation loss
            best_confusion_matrix = confusion_matrix

        epoch += 1

    # Call function to generate performance data
    plot_model_results(epoch_results)

    # Display confusion matrix
    print(f'Confusion Matrix:\n {best_confusion_matrix}')
    plot_confusion_matrix(best_confusion_matrix)

    # Save the best model state for future use
    if save_trained_model:
        torch.save(best_model_state, f'{drive_path}nsynth-models/{args.model_name}')

    end = datetime.now()
    print(f'\nelapsed time: {end - start}')


if __name__ == '__main__':
    main()


Training Plots Module

In [None]:
import matplotlib.pyplot as plt


def plot_model_results(epoch_results):

    # Plot training and validation loss by epoch
    # Create lists for plotting
    epochs, tng_losses, val_losses, tng_acc, val_acc = [], [], [], [], []
    for epoch in epoch_results:
        epochs.append(epoch["epoch"])
        tng_losses.append(epoch["tng_loss"])
        val_losses.append(epoch["val_loss"])
        tng_acc.append(epoch["tng_acc"])
        val_acc.append(epoch["val_acc"])

    # Code to plot loss values by epoch
    plt.plot(epochs, tng_losses, label=f'Training Loss')
    plt.plot(epochs, val_losses, label=f'Validation Loss')
    plt.plot(epochs, tng_acc, label=f'Training Accuracy')
    plt.plot(epochs, val_acc, label=f'Validation Accuracy')
    plt.title(f'Model Results by Epoch')
    plt.xlabel(f'Epoch')
    plt.ylabel(f'Loss and Accuracy')
    plt.legend()
    plt.axis([0, len(epochs), 0, 3])
    plt.show()
    # plt.savefig(f'/content/drive/MyDrive/DL_data/plot-results.png', dpi=150, bbox_inches='tight', facecolor='gray')
    # plt.clf()


Confusion Matrix Module

In [None]:
import itertools
import numpy as np
import matplotlib.pyplot as plt

def plot_confusion_matrix(cm, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    cm = cm[[0, 1, 2, 3, 4, 5, 6, 7, 8, 10], :]
    cm = cm[:, [0, 1, 2, 3, 4, 5, 6, 7, 8, 10]]
    print('Confusion matrix')
    classes = ('bass', 'brass', 'flute', 'guitar', 'keyboard', 'mallet', 'organ', 'reed', 'string', 'vocal')
    plt.figure(2, figsize=(11,11))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    thresh = 0.5
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, "{:.3f}".format(cm[i, j]), horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

Testing Module

In [None]:
import argparse
import torch
from torch import nn
from datetime import datetime
from torch.utils.data import DataLoader
# from create_dataset import create_dataset
# from Models import Models

# from Confusion_matrix_graphic import plot_confusion_matrix


def main():
    parser = argparse.ArgumentParser(description='Test the individual instrument identification model')
    parser.add_argument('-f')  # Required for argument parser to work in Colab
    parser.add_argument('--test_folder', type=str, default='small_audio/')
    parser.add_argument('--batch_size', type=int, default=128)
    parser.add_argument('--model_name', type=str,  default='res_net')
    parser.add_argument('--model_type', type=str, default='Res_Net_18')
    args = parser.parse_args()

    # Dataset and model paths
    drive_path = '/content/drive/MyDrive/DL_data/'
    # json_path_test = f'{drive_path}nsynth-test/examples.json'
    # audio_input_path_test = f'{drive_path}nsynth-test/{args.test_folder}'
    json_path_test = f'{drive_path}nsynth-valid/examples.json'
    audio_input_path_test = f'{drive_path}nsynth-valid/{args.test_folder}'
    model_path = f'{drive_path}nsynth-models/{args.model_name}'

    # Select GPU for runtime if available
    if not torch.cuda.is_available():
        device = torch.device("cpu")
        print('No GPU selected')
    else:
        device = torch.device("cuda")
        print(torch.cuda.get_device_name(device))

    # Add code to store and deal with output files

    start = datetime.now()

    # Create dataset
    test_dataset = create_dataset(audio_input_path_test, json_path_test)

    # Create Data Loader
    test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)

    print(f'\nDatasets created in {datetime.now()-start}')

    # Create instance of model and load saved weights
    model = Models(args.model_type).choose_model().to(device)
    model.load_state_dict(torch.load(f'{model_path}', map_location=device))

    # Inference loop
    print("Beginning inference loop\n.....")
    test_score = 0
    with torch.no_grad():
        model.eval()
        n = 0

        confusion_matrix = torch.zeros(11,11)

        for (img_batch, label_batch) in test_dataloader:
            img_batch = img_batch.to(device)
            label_batch = label_batch.to(device)
            # print(f'img_batch:\n{img_batch}\nlabel_batch:\n{label_batch}')

            img_batch = img_batch.reshape(img_batch.shape[0], 1, img_batch.shape[1], img_batch.shape[2])
            predicted_labels = model(img_batch)
            # print(f'predicted_labels: {predicted_labels}')

            test_score += (predicted_labels.argmax(axis=1) == label_batch.argmax(axis=1)).sum().item()
            print(f'Correct predictions in batch: {test_score}\n')
            
            # calculate confusion matrix elements
            for i in range(len(label_batch)):
              confusion_matrix[torch.argmax(label_batch[i, :])][torch.argmax(predicted_labels[i, :])] += 1

            n += len(label_batch)
        
        # print(f'\nn = {n}')
        label_counts = torch.sum(confusion_matrix, dim=1).reshape(len(confusion_matrix), 1)
        confusion_matrix /= label_counts
        print(f'Confusion Matrix:\n {confusion_matrix}')
        #Use %run not !python3 to get cm to display in collab
        plot_confusion_matrix(confusion_matrix)

        test_score = test_score / n
        print(f'Final test accuracy: {test_score}')

    end = datetime.now()
    print(f'\nelapsed time: {end - start}')


if __name__ == '__main__':
    main()