In [None]:
def evaluate_mean_average_accuracy(y_truth, y_pred):
    matches = [x == y for (x,y) in zip(y_truth, y_pred)]
    num_correct = 0
    length = len(matches)
    summ = 0
    for i in range(length):
        if matches[i] == 1:
            num_correct += 1
            summ += (num_correct / (i+1)) / length
    return summ

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from models.LSTMEncoderDecoder import LSTMEncoderDecoder

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MODEL_PATH = "./models/lstm_encoder_decoder_1.pt"

In [None]:
from data.SpotifyDataset import SpotifyDataset

train_set = SpotifyDataset("./data/train_data.csv", "./data/track_feats.csv")
val_set = SpotifyDataset("./data/val_data.csv", "./data/track_feats.csv")
test_set = SpotifyDataset("./data/test_data.csv", "./data/track_feats.csv")

datasets = {"train": train_set,
            "val": val_set,
            "test": test_set}

In [None]:
### TRAINING BLOC:
model = LSTMEncoderDecoder(encode_size=67, decode_size=29).to(device)
loss_fn = nn.CrossEntropyLoss()

learning_rate = 1e-3
weight_decay = 2.5e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), weight_decay=weight_decay)

num_epochs = 1
best_maa = 0
losses = []
epoch_losses = []
for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch+1, num_epochs))
    for phase in ['train', 'val']:
        print("Running {} phase...".format(phase))
        total_maa = []
        total_loss = []
        dataloader = DataLoader(datasets[phase], batch_size=1, shuffle=True)
        for i, (X_encode, X_decode, y) in enumerate(dataloader):
            if i % 25 == 0:
                print("Calculating example {} / {}".format(i, len(dataloader)))
            X_encode, X_decode, y = X_encode.to(device), X_decode.to(device), y.squeeze().to(device)
            if phase == 'train':
                model.train()
                scores = model(X_encode, X_decode).squeeze()
                loss = loss_fn(scores, y)
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
                total_loss.append(loss)
                losses.append(loss.item())
            else:
                model.eval()
                scores = model(X_encode, X_decode).squeeze()
                y_pred = torch.argmax(scores, dim=1)
                maa = evaluate_mean_average_accuracy(y, y_pred)
                total_maa.append(maa)
        if phase == 'train':
            epoch_loss = sum(total_loss) / len(total_loss)
            print("Epoch {} Avg. Loss: {}".format(epoch, epoch_loss))
            epoch_losses.append(epoch_loss.item())
        else:
            epoch_maa = sum(total_maa) / len(total_maa)
            print("Epoch {} Avg. MAA: {}, Best MAA: {}".format(epoch, epoch_maa, best_maa))
            if epoch_maa > best_maa:
                torch.save(model.state_dict(), MODEL_PATH)
                best_maa = epoch_maa
    print()
print()
print("List of avg. loss across epochs: ")
print(epoch_losses)
print("List of losses across batches: ")
print(losses)

In [None]:
### TESTING BLOCK
test_model = LSTMEncoderDecoder(encode_size=67, decode_size=29).to(device)
test_model.load_state_dict(torch.load(MODEL_PATH))

with torch.no_grad():
    dataloader = DataLoader(datasets["test"], batch_size=1, shuffle=False)
    total_maa = []
    for i, (X_encode, X_decode, labels) in enumerate(dataloader):
        X_encode, X_decode, y = X_encode.to(device), X_decode.to(device), y.squeeze().to(device)
        scores = test_model(X_encode, X_decode).squeeze()
        y_pred = torch.argmax(scores, dim=1)
        maa = evaluate_mean_average_accuracy(y, y_pred)
        total_maa.append(maa)
    print("Average MAA over test set: {}".format(sum(total_maa) / len(total_maa)))