### Treść mini projektu 5

Zadanie polega na stworzeniu modelu rekurencyjnego który przewidywał będzie kompozytora danego utworu muzyki klasycznej w oparciu o jego zapis w formie sekwencji akordów. Akordy znormalizowane zostały do klucza C-dur lub a-mol w zależności od skali utworu (durowa/molowa).
Dane przygotowane są w postaci Pickli - https://docs.python.org/3/library/pickle.html w których znajduje się lista krotek z sekwencjami i odpowiadającymi im klasami - autorami odpowiednio: {0: 'bach', 1: 'beethoven', 2: 'debussy', 3: 'scarlatti', 4: 'victoria'} (train.pkl). W pliku test_no_target znajdują się testowe sekwencje, dla których predykcje mają Państwo przewidzieć.


Uwaga, utwory mogą być oczywiście różnych długości. W celu stworzenia batcha danych różnej długości, muszą je Państwo odpowiednio przygotować stosując tzw. padding. Przykładowo można się posiłkować tym tutorialem: https://suzyahyah.github.io/pytorch/2019/07/01/DataLoader-Pad-Pack-Sequence.html. W Państwa przypadku będzie to trochę łatwiejsze bo dotyczy problemu klasyfikacji sekwencji, a nie tłumaczenia sequence-to-sequence.

In [1]:
import pandas as pd
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
import numpy as np
import matplotlib.pyplot as plt
import torch.utils.data as data
from torch.utils.data import DataLoader

In [2]:
device = torch.device("cuda") 
device

device(type='cuda')

In [3]:
import pickle

In [5]:
with open('sample_data/train.pkl', 'rb') as f:
    dataset = pickle.load(f)

In [6]:
dataset[:] = [(torch.from_numpy(x[0]), x[1]) for x in dataset[:]]

In [7]:
torch.manual_seed(620)
A = int(len(dataset)*0.75)
B = len(dataset) - int(len(dataset)*0.75)
lengths = [A, B]
print(len(dataset))
print(lengths)
train_set, test_set = torch.utils.data.random_split(dataset, lengths)
type(train_set)


2939
[2204, 735]


torch.utils.data.dataset.Subset

In [8]:
def pad_collate(batch):
  (xx, yy) = zip(*batch)
  xx_pad = pad_sequence(xx, batch_first=True, padding_value=0)
  
  yy = torch.Tensor(yy)
  return xx_pad, yy

In [9]:
y_train_indices = train_set.indices

y_train = [dataset[i][1] for i in y_train_indices]
class_sample_count = np.array(
    [len(np.where(y_train == t)[0]) for t in np.unique(y_train)])

weight = 1. / class_sample_count
samples_weight = np.array([weight[t] for t in y_train])
samples_weight = torch.from_numpy(samples_weight)
sampler = torch.utils.data.sampler.WeightedRandomSampler(samples_weight.type('torch.DoubleTensor'), len(samples_weight))

train_loader = DataLoader(train_set, batch_size=32, sampler=sampler, num_workers = 0, collate_fn=pad_collate)


In [10]:
y_test_indices = test_set.indices

y_test = [dataset[i][1] for i in y_test_indices]
class_sample_count = np.array(
    [len(np.where(y_test == t)[0]) for t in np.unique(y_test)])

weight = 1. / class_sample_count
samples_weight = np.array([weight[t] for t in y_test])
samples_weight = torch.from_numpy(samples_weight)
sampler = torch.utils.data.sampler.WeightedRandomSampler(samples_weight.type('torch.DoubleTensor'), len(samples_weight))

test_loader = DataLoader(test_set, batch_size=32,shuffle = False, sampler=sampler, num_workers = 0, collate_fn=pad_collate)

In [None]:
# train_loader = data.DataLoader(train_set, batch_size=32, shuffle=True, num_workers = 0, collate_fn=pad_collate)
# test_loader = data.DataLoader(test_set, batch_size=32, shuffle=False, num_workers = 0, collate_fn=pad_collate)

In [11]:
class LSTMRegressor(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, out_size):
        super().__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers)
        self.fc = nn.Linear(hidden_size, out_size)
        
    def init_hidden(self, batch_size):
        hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size)
        state = torch.zeros(self.num_layers, batch_size, self.hidden_size)
        return hidden, state
    
    def forward(self, x, hidden):
        x = torch.transpose(x,0,1)
        all_outputs, hidden = self.lstm(x, hidden)
        out = all_outputs[-1]
        x = self.fc(out)
        return x, hidden
    

In [12]:
model = LSTMRegressor(1,50,2,5).to(device)
model

LSTMRegressor(
  (lstm): LSTM(1, 50, num_layers=2)
  (fc): Linear(in_features=50, out_features=5, bias=True)
)

In [13]:
def get_accuracy(model, data):
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
      for x, targets in test_loader:
          x = x.type(torch.float).to(device).unsqueeze(2)
          targets = targets.to(device)
          hidden, state = model.init_hidden(x.size(0))
          hidden, state = hidden.to(device), state.to(device)

          output, last_hidden = model(x, (hidden,state))
          pred = output.max(1, keepdim=True)[1]
          correct += pred.eq(targets.view_as(pred)).sum().item()
          total += x.shape[0]
    model.train()
    return correct / total

In [14]:
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)
loss_fun = nn.CrossEntropyLoss()

# Training loop
for epoch in range(750):
    epoch_losses = []
    for x, targets in train_loader:
        x = x.type(torch.float).to(device).unsqueeze(2)
        targets = targets.to(device)
        hidden, state = model.init_hidden(x.size(0))
        hidden, state = hidden.to(device), state.to(device) 
        preds, last_hidden = model(x, (hidden,state))
        loss = loss_fun(preds, targets.type(torch.int64))
        loss.backward()
        epoch_losses.append(loss.item())
        optimizer.step()
        optimizer.zero_grad()
    if epoch % 10 == 0:
        loss_mean = np.array(epoch_losses).mean()
        test_acc = get_accuracy(model, test_set)
        print(f"Epoch: {epoch}, loss_mean: {loss_mean:.3}, loss: {loss.item():.3}, test_acc: {test_acc:.3}")

Epoch: 0, loss_mean: 1.59, loss: 1.56, test_acc: 0.226
Epoch: 10, loss_mean: 1.58, loss: 1.58, test_acc: 0.23
Epoch: 20, loss_mean: 1.56, loss: 1.64, test_acc: 0.244
Epoch: 30, loss_mean: 1.55, loss: 1.55, test_acc: 0.229
Epoch: 40, loss_mean: 1.58, loss: 1.55, test_acc: 0.267
Epoch: 50, loss_mean: 1.53, loss: 1.42, test_acc: 0.282
Epoch: 60, loss_mean: 1.53, loss: 1.49, test_acc: 0.273
Epoch: 70, loss_mean: 1.55, loss: 1.6, test_acc: 0.267
Epoch: 80, loss_mean: 1.54, loss: 1.73, test_acc: 0.282
Epoch: 90, loss_mean: 1.52, loss: 1.53, test_acc: 0.288
Epoch: 100, loss_mean: 1.52, loss: 2.1, test_acc: 0.282
Epoch: 110, loss_mean: 1.54, loss: 1.63, test_acc: 0.287
Epoch: 120, loss_mean: 1.54, loss: 1.47, test_acc: 0.273
Epoch: 130, loss_mean: 1.54, loss: 1.57, test_acc: 0.265
Epoch: 140, loss_mean: 1.53, loss: 1.5, test_acc: 0.252
Epoch: 150, loss_mean: 1.54, loss: 1.55, test_acc: 0.261
Epoch: 160, loss_mean: 1.53, loss: 1.49, test_acc: 0.25
Epoch: 170, loss_mean: 1.55, loss: 1.56, test_a

In [17]:
classes = ('0', '1', '2', '3','4')

In [18]:
correct_pred = {classname: 0 for classname in classes}
total_pred = {classname: 0 for classname in classes}


with torch.no_grad():
    for x, targets in test_loader:
        x = x.type(torch.float).to(device).unsqueeze(2)
        targets = targets
        hidden, state = model.init_hidden(x.size(0))
        hidden, state = hidden.to(device), state.to(device) 

        outputs, last_hidden = model(x, (hidden,state))
        outputs = outputs.cpu()
        _, predictions = torch.max(outputs, 1)
        for target, prediction in zip(targets, predictions):
            target = target.item()
            prediction = prediction.item()
            if target == prediction:
                correct_pred[str(int(target))] += 1
            total_pred[str(int(target))] += 1

  
for classname, correct_count in correct_pred.items():
    accuracy = 100 * float(correct_count) / total_pred[classname]
    print("Accuracy for class {:5s} is: {:.1f} %".format(classname, 
                                                   accuracy))

Accuracy for class 0     is: 81.6 %
Accuracy for class 1     is: 68.4 %
Accuracy for class 2     is: 44.4 %
Accuracy for class 3     is: 73.8 %
Accuracy for class 4     is: 73.1 %


In [20]:
with open('sample_data/test_no_target.pkl', 'rb') as f:
    test_dataset = pickle.load(f)

test_dataset = [(torch.from_numpy(x)) for x in test_dataset]

In [21]:
import os
def get_pred(model, data):
    model.eval()
    f = open("results.csv", "w")
    pred = []
    test_all_data = torch.utils.data.DataLoader(data, batch_size=1)
    print(test_all_data)
    for i, data in enumerate(test_all_data, 0):
        x = data
        x = x.type(torch.float).to(device).unsqueeze(2)
        hidden, state = model.init_hidden(x.size(0))
        hidden, state = hidden.to(device), state.to(device)
        output, last_hidden = model(x, (hidden,state))
        

        pred = output.max(1, keepdim=True)[1]
        f.write("{}\n".format( pred.item()))
    f.close()
    print(i)
get_pred(model, test_dataset)

<torch.utils.data.dataloader.DataLoader object at 0x7f00669ed390>
1102
