In [None]:
import torch
import os 
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
%run ~/violin-renderer/src/misc/parse.ipynb
# %run ~/violin-renderer/src/misc/randomizer.ipynb

In [None]:
HOME_DIR = os.path.expanduser("~")

In [None]:
# initialize GPU to move model/tensors onto
device = torch.device("cuda:2" if torch.cuda.is_available() else "cpu")

In [None]:
# load all the datasets
training_X, training_y, testing_X, testing_y = load_from_paths(HOME_DIR)

In [None]:
class MLPMusicDataset(Dataset):
    def __init__(self, input_data, ground_truth, transform=None):
        self.input_data = input_data
        self.ground_truth = ground_truth
        self.transform = transform

    def __len__(self):
        return len(self.input_data)

    def __getitem__(self, idx):
        input_sample = self.input_data[idx]
        ground_truth_sample = self.ground_truth[idx]

        if self.transform:
            input_sample = self.transform(input_sample)

        return input_sample, ground_truth_sample

In [None]:
# first, compile all the data into one big matrix
all_input = []
all_truth = []
for (input_notes, truth) in zip(training_X, training_y):
    all_input.extend(input_notes)
    all_truth.extend(truth)

all_testing_input = []
all_testing_truth = []
for (input_notes, truth) in zip(testing_X, testing_y):
    all_testing_input.extend(input_notes)
    all_testing_truth.extend(truth)

In [None]:
# helper functions to scale data

# @param column The column to be scaled
# @returns the scaled column
def scale_data(column):
    min_val = np.min(column)
    max_val = np.max(column)
    # print(min_val, " ", max_val)
    scaled_column = (2 * (column - min_val) / (max_val - min_val)) - 1
    return scaled_column

def scale_pitch(column):
    return column / 128

def set_duration(ref, target):
    target = target - ref
    return target

In [None]:
# changing the offset feature to duration
all_input = np.array(all_input)
all_truth = np.array(all_truth)

all_input[:, 1] = set_duration(all_input[:, 0], all_input[:, 1])
all_truth[:, 1] = set_duration(all_truth[:, 0], all_truth[:, 1])

In [None]:
# normalizing the input
all_input[:, 0] = scale_data(all_input[:, 0])
all_input[:, 1] = scale_data(all_input[:, 1])
all_input[:, 2] = scale_pitch(all_input[:, 2]) * 0 # Assume we have no pitch

In [None]:
# first create the custom datasets, then create the data loaders
training_data = MLPMusicDataset(input_data=torch.Tensor(all_input), ground_truth=torch.Tensor(all_truth))
testing_data = MLPMusicDataset(input_data=torch.Tensor(all_testing_input), ground_truth=torch.Tensor(all_testing_truth))

training_loader = DataLoader(training_data, batch_size=100, shuffle=True)
testing_loader = DataLoader(testing_data, batch_size=100, shuffle=False)

In [None]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
# initialize the MLP
model = MLP(3, 4, 2)

# transfer model to GPU
model.to(device)

In [None]:
# Define our loss function (mean squared error) to be used in the grad descent step
loss = nn.MSELoss()

# Performs the gradient descent steps
optimizer = torch.optim.SGD(model.parameters(), lr=1e-6)

In [None]:
# Trains the model inputted into the function.

# @param model The model object to be trained
# @param optimizer The optimizing equation to use to train the model
# @param input_notes The training input data
# @param truth The actual output for the corresponding input
# @param loss_module Equation for calculating the difference between generated and actual output
# @param num_epochs Number of cycles to train the model
def train_model_loop(model, optimizer, dataloader, loss_module):
    # Set model to train mode
    model.train()

    # Training loop   
    for batch, (input_notes, truth) in enumerate(dataloader):

        ## Step 1: Move input data to device (only strictly necessary if we use GPU)
        input_notes = input_notes.to(device)
        truth = truth.to(device)

        ## Step 2: Run the model on the input data
        preds = model(input_notes)

        ## Step 3: Calculate the loss
        loss = loss_module(preds, truth)

        ## Step 4: Perform backpropagation
        # Before calculating the gradients, we need to ensure that they are all zero.
        # The gradients would not be overwritten, but actually added to the existing ones.
        optimizer.zero_grad()
        # Perform backpropagation
        loss.backward()

        ## Step 5: Update the parameters
        optimizer.step()

        ## Step 6: For every 50th batch, print out the current loss as well # of samples trained
        if batch % 50 == 0:
                    loss, current = loss.item(), batch * 100 + len(input_notes)
                    print(f"loss: {loss:>7f}  [{current:>5d}/{len(dataloader.dataset):>5d}]")

In [None]:
# trains the model using the dataloader
def train_model():
    epochs = 50
    for t in range(epochs):
        print(f"Epoch {t+1}\n-------------------------------")
        train_model_loop(model, optimizer, training_loader, loss)
    torch.save(model.state_dict(), 'mlp.pt')

In [None]:
# this creates a dictionary of outputs for all pieces in the testing dataset

# @param:
    # model_type: which model to use between "pitch" or "no pitch"
# @returns a dictionary mapping with key of file path and value of csv values
def generate_all_outputs(model_type):
    
    testing_results = {}
    test_paths = []

    file = open('dataset-paths/testing-truth.txt','r')
    lines = file.readlines()
    for line in lines:
        test_paths.append(line.strip())
    
    # generating an output for each piece in the testing input dataset
    for i in range(len(testing_X)):
        new_test_input = np.array(testing_X[i])
        new_test_input[:, 1] = set_duration(new_test_input[:, 0], new_test_input[:, 1])
        new_test_input[:, 0] = scale_data(new_test_input[:, 0])
        new_test_input[:, 1] = scale_data(new_test_input[:, 1])

        if model_type == "pitch":
            new_test_input[:, 2] = scale_pitch(new_test_input[:, 2])
        elif model_type == "no pitch":
            new_test_input[:, 2] = scale_pitch(new_test_input[:, 2]) * 0
        else:
            print("Please choose the correct model")
            return


        
        new_test_input = torch.Tensor(new_test_input)
        new_test_input = new_test_input.to(device)
        
        y_test = model(new_test_input)
        y_test = y_test.tolist()
        for j in range(len(y_test)):
            y_test[j].append(testing_X[i][j][2])

        testing_results[test_paths[i]] = y_test

    return testing_results

In [None]:
# calculates error between generated output and testing truth

# @param:
    # model_type: which model to use between "pitch" or "no pitch"
# @return: MSE values for each song in the testing dataset
def MSE_error(model_type):
    testing_results = generate_all_outputs(model_type)
    loss_values = []
    for output_path, truth in zip(testing_results, testing_y):
        output = testing_results[output_path]

        # print(output_path)

        # pitch was only added so the result is able to be synthesized, we can remove it here
        for i in range(len(output)):
            output[i].pop()
        output = torch.Tensor(output)
        truth = torch.Tensor(truth)
        loss_value = loss(output, truth) / len(output)
        loss_values.append(loss_value)

    return loss_values