<a href="https://colab.research.google.com/github/vydiep/MLProject/blob/main/CNN_Utils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import json
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from torch.nn import BCEWithLogitsLoss
from torch import optim
import matplotlib.pyplot as plt
import torch.nn as nn

In [None]:
def load_data(path):
  '''
  Read in data from a JSON file

  Parameters:
    path: The path to the JSON file

  Returns:
    X: The spectrograms data
    y: The labels corresponding to the spectrogram data

  '''
    with open(path, 'r') as f:
        data = json.load(f)

        X = np.array(data["spectrograms"])
        y = np.array(data["labels"])

    return X, y

def spect_tensor(path):
  '''
  Generate mel spectrograms using the JSON data. Convert the spectrograms to 
  tensors and reshape their dimensions

  Parameters:
    path: The path to the JSON file

  Returns:
    spectrograms: A list of spectrogram tensors
    y: The composer labels

  '''
    X, y = load_data(path)

    spectrograms = []

    for i in X:
        spect_tensor = torch.from_numpy(i).unsqueeze(0).repeat(3, 1, 1)
        spectrograms.append(spect_tensor)

    return spectrograms, y

def split_data(path):
  '''
  Split the data in preparation for training, validation, and testing

  Parameters:
    path: The path to the JSON file

  Returns:
    X_train: The spectrograms that will be used for training
    X_test: The spectrograms that will be used for testing
    X_val: The spectrograms that will be used for validation
    y_train: The composer labels for training
    y_test: The composer labels for testing
    y_val: The composer labels for validation
  '''

    X, y = spect_tensor(path)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size = 0.2, random_state = 42)

    return X_train, X_test, X_val, y_train, y_test, y_val

def get_loaders(path):
  '''
  Convert X_train, X_test, X_val, y_train, y_test, and y_val to tensors. Created data set from those tensors, and created data loaders from the data set

  Parameters:
    path: The path to the JSON file

  Returns:
    train_loader: An iterator that provides batches of training data
    test_loader: An iterator that provides batches of testing data
    val_loader: An iterator that provides batches of validation data
  '''

    X_train, X_test, X_val, y_train, y_test, y_val = split_data(path)

    batch_size = 32

    X_train_tensor = torch.stack(X_train).to(torch.float32)
    X_test_tensor = torch.stack(X_test).to(torch.float32)
    X_val_tensor = torch.stack(X_val).to(torch.float32)

    y_train_tensor = torch.tensor(y_train).to(torch.float32)
    y_test_tensor = torch.tensor(y_test).to(torch.float32)
    y_val_tensor = torch.tensor(y_val).to(torch.float32)

    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

    train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
    test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle = False)
    val_loader = DataLoader(val_dataset, batch_size = batch_size, shuffle = False)

    return train_loader, test_loader, val_loader
  
def train(model, train_loader, val_loader, k_epochs, device):
  ''' 
  Training and validation loop model

  Parameters:
    model: The model to train
    train_loader: An iterator that provides batches of training data
    val_loader: An iterator that provides batches of validation data
    k_epochs: The number of iterations to train and validate for
    device: Specify whether to work on CPU or GPU

  Returns:
    No returns

  Note: 
    Keeping track of the model's loss and accuracies-- append them to model.history
  '''

    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    best_val_loss = float('inf')
    best_val_acc = 0.0

    for epoch in range(k_epochs):

        print(f'Epoch: {epoch}')

        model.train()
        train_loss = 0
        total = 0
        correct = 0

        for i, data in enumerate(train_loader, 0):
            X, y = data
            X = X.to(device)
            y = y.to(device)

            optimizer.zero_grad()
            
            output = model(X)

            loss = loss_fn(output.squeeze(), y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            
            predicted = (output.sigmoid() >= 0.5).float()
            predicted = predicted.view(-1)
            total += y.size(0)
            correct += (predicted == y.float()).sum().item()

        train_loss /= len(train_loader)
        train_accuracy = correct / total

        model.history["train_loss"].append(train_loss)
        model.history["train_acc"].append(train_accuracy)

        model.eval()
        validation_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for i, data in enumerate(val_loader, 0):
                X, y = data
                X = X.to(device)
                y = y.to(device)
            
                output = model(X)

                loss = loss_fn(output.squeeze(), y)
                validation_loss += loss.item()

                predicted = (output.sigmoid() >= 0.5).float()
                predicted = predicted.view(-1)
                total += y.size(0)
                correct += (predicted == y.float()).sum().item()

            validation_loss /= len(val_loader)
            validation_accuracy = correct / total

            model.history["val_loss"].append(validation_loss)
            model.history["val_acc"].append(validation_accuracy)

            if validation_loss < best_val_loss:
                best_val_loss = validation_loss
                torch.save(model.state_dict(), '/content/drive/Shareddrives/MLProject/MusicNet/models/cnn-drop-loss.pth')

            if validation_accuracy > best_val_acc:
                best_val_acc = validation_accuracy
                torch.save(model.state_dict(), '/content/drive/Shareddrives/MLProject/MusicNet/models/cnn-drop-acc.pth')
 
        print(f'Train Loss: {train_loss:.4f} Train Accuracy: {train_accuracy:.2f}')
        print(f'Validation Loss: {validation_loss:.4f} Validation Accuracy: {validation_accuracy:.2f}')


def plot(model, path):
  '''
  Plot the loss and accuracy history of the model

  Parameters:
    model: The model to plot
    path: The path to save the graphs

  Returns:
    No returns

  Note:
    Displays two graphs:
      - One of the loss history for training and validation
      - One of the validation history for training and validation

  

  '''
    plt.figure(figsize=(16, 5))

    # Plot loss history
    plt.subplot(1, 2, 1)
    plt.plot(model.history["train_loss"], label='Train Loss')
    plt.plot(model.history["val_loss"], label='Validation Loss')
    plt.title('Loss History')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Plot accuracy history
    plt.subplot(1, 2, 2)
    plt.plot(model.history["train_acc"], label='Train Accuracy')
    plt.plot(model.history["val_acc"], label='Validation Accuracy')
    plt.title('Accuracy History')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.suptitle('Loss and Accuracy Histories for CNN Model With Dropout Layer')
    plt.savefig(path)
    plt.show()

def test(model, test_loader, device):
  '''
  Testing loop for the model

  Parameters:
    model: The model to train
    test_loader: An iterator that provides batches of testing data
    device: Specify whether to work on CPU or GPU
  '''
    correct = 0
    total = 0
    # torch.no_grad creates an environment in which we do NOT store the 
    # computational graph. We don't need to do this because we don't care about 
    # gradients unless we're training
    with torch.no_grad():
        for data in test_loader:
            X, y = data
            X = X.to(device)
            y = y.to(device)
            
            # run all the images through the model
            y_hat = model(X)

            # the class with the largest model output is the prediction
            pred = (y_hat.sigmoid() >= 0.5).float()
            pred = pred.view(-1)

            # compute the accuracy
            total += y.size(0)
            correct += (pred == y).sum().item()

    print(f'Test accuracy: {100 * correct // total}%')