## LSTM на оригинальном датасете

Попробуем сделать модель LSTM, похожую на ту, что описана в соседнем Notebook, но для нашей текущей задачи

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np

Сделаем также пользовательский импорт

In [None]:
from decode_patterns import data_conversion

Загружаем датасет с помощью DataLoader

In [None]:
# по аналогии с предыдущим примером:
# первая строка -- ожидаемый выходной сигнал, вторая -- входные данные :)
# dataset_train = torch.tensor([
#     [[1,3,1,4,2,3,3,2], [1,0,0,1,8,4,7,2], [2,4,2,3,2,5,6,3]],
#     [[2,2,1,1,4,3,4,2], [2,0,4,2,2,4,2,8], [3,5,3,4,6,1,1,2]],
# ], dtype=torch.float)

# import dataset
drum, bass = data_conversion.make_lstm_dataset(height=64, limit=1000, patterns_file="decode_patterns/patterns.pairs.tsv")



# define shuffling of dataset
def shuffle(A, B, p=0.8):
    # take 80% to training, other to testing
    L = len(drum)
    idx = np.arange(L) < p*L
    np.random.shuffle(idx)
    yield A[idx]
    yield B[idx]
    yield A[np.logical_not(idx)]
    yield B[np.logical_not(idx)]
    
    
# we can select here a validation set
drum, bass, drum_validation, bass_validation = shuffle(drum, bass)
    
# and we can shuffle train and test set like this:
drum_train, bass_train, drum_test, bass_test = shuffle(drum, bass)

In [None]:
bass_train.shape, drum_train.shape, bass_test.shape, drum_test.shape

Модель определим в самом простом варианте, который только можно себе представить -- как в примере с конечным автоматом

In [None]:
# попробуем определить модель LSTM как конечный автомат
class DrumNBassLSTM(nn.Module):
    def __init__(self):
        super(DrumNBassLSTM, self).__init__()
        # one input neuron, one output neuron, one layer in LSTM block
        self.input_size = 14
        self.hidden_size = 36
        self.layer_count = 8
        self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.layer_count)
    
    def forward(self, input):
        # пусть в input у нас приходит вектор размерности (64, 32, 14)
        # то есть 64 отсчёта, тридцать два примера (минибатч), 14 значение в каждом (барабанная партия)
        output, _ = self.lstm(input)
        # пробуем превратить это в классификацию (для NLLLoss)
        # output = F.log_softmax(output, dim=1)
        return output

In [None]:
# часть обучения
dnb_lstm = DrumNBassLSTM()

criterion = nn.MSELoss()
# определим функцию потерь, которая не хочет видеть интервалы в мелодии
def interval_penalty(melody):
    return (melody.sum(axis=1) > 1).int().sum()

# оценим также и разнообразие мелодии по её.. дисперсии?)
# def melody_variety(melody):
#     return 1/(1 + (melody.sum(axis=2) > 1).int())
    
# criterion = nn.NLLLoss() # -- этот товарищ требует, чтобы LSTM выдавал классы,
# criterion = nn.CrossEntropyLoss() # и этот тоже
# (числа от 0 до C-1), но как всё-таки его заставить это делать?...
optimizer = optim.SGD(dnb_lstm.parameters(), lr=0.001, momentum=0.9)

Найденные баги и их решения:

https://stackoverflow.com/questions/56741087/how-to-fix-runtimeerror-expected-object-of-scalar-type-float-but-got-scalar-typ

https://stackoverflow.com/questions/49206550/pytorch-error-multi-target-not-supported-in-crossentropyloss/49209628

https://stackoverflow.com/questions/56243672/expected-target-size-50-88-got-torch-size50-288-88

In [None]:
epoch_count = 300
batch_size = 32
shuffle_every_epoch = True
    
if shuffle_every_epoch:
    print(f"shuffle_every_epoch is on")
else:
    print(f"shuffle_every_epoch is off")
    # shuffle train and test set:
    drum_train, bass_train, drum_test, bass_test = shuffle(drum, bass)
    drum_train = torch.tensor(drum_train, dtype=torch.float)
    bass_train = torch.tensor(bass_train, dtype=torch.float)
    drum_test = torch.tensor(drum_test, dtype=torch.float)
    drum_test = torch.tensor(drum_test, dtype=torch.float)
        
for epoch in range(epoch_count):  # loop over the dataset multiple times
    print(f"Epoch #{epoch}")
    if shuffle_every_epoch:
        # shuffle train and test set:
        drum_train, bass_train, drum_test, bass_test = shuffle(drum, bass)
        drum_train = torch.tensor(drum_train, dtype=torch.float)
        bass_train = torch.tensor(bass_train, dtype=torch.float)
        drum_test = torch.tensor(drum_test, dtype=torch.float)
        bass_test = torch.tensor(bass_test, dtype=torch.float)
        
    examples_count = drum_train.size()[0]
    examples_id = 0
    
    running_loss = 0.0
    runnint_count = 0
    batch_id = 0
    while examples_id < examples_count:
        batch_drum_train = drum_train[examples_id:examples_id + batch_size,:,:].transpose(0,1)
        batch_bass_train = bass_train[examples_id:examples_id + batch_size,:,:].transpose(0,1)
        # transpose нужен для обмена размерности батча и размерности шагов
        # print(f"i:{i}, batch_drum_train:{batch_drum_train.size()}, batch_bass_train:{batch_bass_train.size()}")

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        bass_outputs = dnb_lstm(batch_drum_train)
        
        loss = criterion(bass_outputs, batch_bass_train) + interval_penalty(bass_outputs)# + melody_variety(bass_outputs)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        runnint_count += 1
        period = 5
        if batch_id % period == 0 or examples_id + batch_size >= examples_count:
            print('[%d, %5d] train loss: %.7f' %
                  (epoch + 1, batch_id + 1, running_loss / runnint_count))
            running_loss = 0.0
            runnint_count = 1
            
        # update batch info
        examples_id += batch_size
        batch_id += 1
        
    # here we can insert measure error on test set

#should check accuracy on validation set
print('Finished Training')

In [None]:
batch_drum_train = drum_train[:1,:,:].transpose(0,1)
batch_bass_train = bass_train[:1,:,:].transpose(0,1)
with torch.no_grad():
    bass_outputs = dnb_lstm(batch_drum_train)

In [None]:
((bass_outputs[:,:,5:12].squeeze() + 1) / 2 > 0.5).int()

Попробуем сохранить результаты работы сети. На anaconda нет mido, поэтому сохраняем результаты работы просто в массивчик npy... Однако, как альтернатива, его можно поставить чере pip в conda:
https://github.com/mido/mido/issues/198

In [None]:
import mido
from decode_patterns.data_conversion import build_track, DrumMelodyPair, NumpyImage, Converter


converter = Converter((64,50))

batch_drum = torch.cat((drum_train, drum_test, torch.tensor(drum_validation))).transpose(0,1)
with torch.no_grad():
    bass_outputs = dnb_lstm(batch_drum)
    bass_outputs = ((bass_outputs.squeeze() + 1) / 2 > 0.5).int()
    
    for i in range(bass_outputs.size()[1]):
        img_dnb = torch.cat((batch_drum[:,i,:].int(),bass_outputs[:,i,:]), axis=1)
        print(f"img_dnb:{img_dnb.size()}")
        np_image = NumpyImage(image=img_dnb, tempo=120, instrument=34, denominator=1, min_note=24)
        pair = converter.convert_numpy_image_to_pair(np_image)   
        mid = build_track(pair, tempo=pair.tempo)
        mid.save(f"midi/npy/sample{i+1}.mid")