In [139]:
import torch
import sys
sys.path.append('../model')
sys.path.append('../../preprocessed_dataset/')
import math

from dataset_loader import GrooveMidiDataset
from Subset_Creators.subsetters import GrooveMidiSubsetter


In [140]:
class PositionalEncoding(torch.nn.Module):
    r"""Inject some information about the relative or absolute position of the tokens
        in the sequence. The positional encodings have the same dimension as
        the embeddings, so that the two can be summed. Here, we use sine and cosine
        functions of different frequencies.
    .. math::
        \text{PosEncoder}(pos, 2i) = sin(pos/10000^(2i/d_model))
        \text{PosEncoder}(pos, 2i+1) = cos(pos/10000^(2i/d_model))
        \text{where pos is the word position and i is the embed idx)
    Args:
        d_model: the embed dim (required).
        dropout: the dropout value (default=0.1).
        max_len: the max. length of the incoming sequence (default=5000).
    Examples:
        >>> pos_encoder = PositionalEncoding(d_model)
    """

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = torch.nn.Dropout(p=dropout)
        
        pe = torch.zeros(max_len, d_model) # shape (max_len=5000, d_model)
        position = torch.arange(0, max_len, dtype=torch.float) # Shape (max_len=5000)
        position = position.unsqueeze(1) # Shape (5000, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # Shape (d_model/2)
        pe[:, 0::2] = torch.sin(position * div_term) 
        if d_model % 2 is not 0:
            pe[:, 1::2] = torch.cos(position * div_term)[:,:-1]
        else: 
            pe[:, 1::2] = torch.cos(position * div_term)
        # Insert a new dimension for batch size
        pe = pe.unsqueeze(0) # Shape (1, 5000, d_model)
        pe = pe.transpose(0, 1) # Shape (5000, 1, d_model) 
        self.register_buffer('pe', pe)

    def forward(self, x):
        r"""Inputs of forward function
        Args:
            x: the sequence fed to the positional encoder model (required).
        Shape:
            x: [sequence length, batch size, embed dim]
            output: [sequence length, batch size, embed dim]
        Examples:
            >>> output = pos_encoder(x)
        """
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [256]:
class Transformer (torch.nn.Module):
    def __init__(self, d_model_enc, d_model_dec, nhead_enc,nhead_dec, dim_feedforward, dropout, num_encoder_layers, num_decoder_layers,max_len):
        super(Transformer,self).__init__()
        
        self.PositionalEncoder = PositionalEncoding(d_model_enc, dropout, max_len)
        
        norm_encoder = torch.nn.LayerNorm(d_model_enc)
        encoder_layer = torch.nn.TransformerEncoderLayer(d_model_enc, nhead_enc, dim_feedforward,dropout)
        self.Encoder = torch.nn.TransformerEncoder(encoder_layer, num_encoder_layers, norm_encoder)
        
        self.MemoryMap = torch.nn.Linear(d_model_enc, d_model_dec, bias=False)
        
        norm_decoder = torch.nn.LayerNorm(d_model_dec)
        decoder_layer = torch.nn.TransformerDecoderLayer(d_model_dec, nhead_dec, dim_feedforward, dropout)
        self.Decoder = torch.nn.TransformerDecoder(decoder_layer, num_decoder_layers, norm_decoder)
        
    def forward(self, src=None, tgt=None, _mem=None, only_encoder=False, only_decoder=False):
        
        x = self.PositionalEncoder(src)
        
        """
        
        if only_encoder:
            memory = self.Encoder(x)
            return memory
        
        if only_decoder: 
            mask = (torch.triu(torch.ones(tgt.shape[0], tgt.shape[0])) == 1)             # future mask
            out = self.Decoder(tgt,_mem, tgt_mask=mask)
            out = torch.reshape(out, (tgt_len, N, 3, d_model // 3))
            return out
        
        """
        print("x", x.shape)
        memory = self.Encoder(x)
        print("memory", memory.shape)
        memory_map = self.MemoryMap(memory)
        print("memory_map", memory_map.shape)

        mask = (torch.triu(torch.ones(tgt.shape[0], tgt.shape[0])) == 1)             # future mask
        print("mask", mask.shape)
        out = self.Decoder(tgt, memory_map, tgt_mask=mask)
        print("out", out.shape)
        out = torch.reshape(out, (tgt.shape[0], tgt.shape[1], 3, d_model_dec // 3))
            
        return out

## test random

In [242]:
d_model_enc = 27
d_model_dec = 27
nhead_enc = 3
nhead_dec = 3
dim_feedforward = d_model*10
dropout = 0.1
num_encoder_layers = 5
num_decoder_layers = 6
max_len=32



TM = Transformer(d_model_enc,d_model_dec, nhead_enc, nhead_dec, dim_feedforward, dropout, num_encoder_layers, num_decoder_layers,max_len)

N = 24
src_len = 12
tgt_len = 12

src = torch.rand(src_len, N, d_model_enc)
tgt = torch.rand(tgt_len, N, d_model_dec)

print(TM.forward(src,tgt).shape)

x torch.Size([12, 24, 27])
memory torch.Size([12, 24, 27])
memory_map torch.Size([12, 24, 27])
mask torch.Size([12, 12])
out torch.Size([12, 24, 27])
torch.Size([12, 24, 3, 9])


## test gmd

In [219]:
filters = {"beat_type": ["beat"],
           "time_signature" : ["4-4"],
           "master_id":["drummer9/session1/9"]}

mso_parameters = {"sr": 44100,
                  "n_fft": 1024,
                  "win_length": 1024,
                  "hop_length": 441,
                  "n_bins_per_octave": 16,
                  "n_octaves": 9,
                  "f_min": 40,
                  "mean_filter_size": 22
                 }

voices_parameters = {"voice_idx": [2], # closed hihat
                     "min_n_voices_to_remove": 1,
                     "max_n_voices_to_remove": 1,
                     "prob": [1],
                     "k": 1}


# train subset
pickle_source_path = '../../preprocessed_dataset/datasets_extracted_locally/GrooveMidi/hvo_0.4.2/Processed_On_17_05_2021_at_22_32_hrs'
subset_name = 'GrooveMIDI_processed_train'
metadata_csv_filename = 'metadata.csv'
hvo_pickle_filename = 'hvo_sequence_data.obj'

gmd_subsetter = GrooveMidiSubsetter(
    pickle_source_path=pickle_source_path,
    subset=subset_name,
    hvo_pickle_filename=hvo_pickle_filename,
    list_of_filter_dicts_for_subsets=[filters],
)
_, subset_list = gmd_subsetter.create_subsets()

subset_info = {"pickle_source_path": pickle_source_path,
               "subset": subset_name,
               "metadata_csv_filename": metadata_csv_filename,
               "hvo_pickle_filename": hvo_pickle_filename,
               "filters": filters}


train_data = GrooveMidiDataset(subset=subset_list[0], subset_info=subset_info, mso_parameters=mso_parameters,
                                     max_aug_items=100, voices_parameters=voices_parameters)

# test 
pickle_source_path = '../../preprocessed_dataset/datasets_extracted_locally/GrooveMidi/hvo_0.4.2/Processed_On_17_05_2021_at_22_32_hrs'
subset_name = 'GrooveMIDI_processed_test'
metadata_csv_filename = 'metadata.csv'
hvo_pickle_filename = 'hvo_sequence_data.obj'

filters = {"beat_type": ["beat"],
           "time_signature" : ["4-4"],
           "master_id":["drummer9/session1/7"]}


gmd_subsetter = GrooveMidiSubsetter(
    pickle_source_path=pickle_source_path,
    subset=subset_name,
    hvo_pickle_filename=hvo_pickle_filename,
    list_of_filter_dicts_for_subsets=[filters],
)
_, subset_list = gmd_subsetter.create_subsets()

subset_info = {"pickle_source_path": pickle_source_path,
               "subset": subset_name,
               "metadata_csv_filename": metadata_csv_filename,
               "hvo_pickle_filename": hvo_pickle_filename,
               "filters": filters}


test_data = GrooveMidiDataset(subset=subset_list[0], subset_info=subset_info, mso_parameters=mso_parameters,
                                    max_aug_items=100, voices_parameters=voices_parameters)



In [257]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'


from torch.utils.data import DataLoader

train_dataloader = DataLoader(train_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)


d_model_enc = 16
d_model_dec = 27
nhead_enc = 4
nhead_dec = 3
dim_feedforward = d_model*10
dropout = 0.1
num_encoder_layers = 5
num_decoder_layers = 6
max_len=32



TM = Transformer(d_model_enc,d_model_dec, nhead_enc, nhead_dec, dim_feedforward, dropout, num_encoder_layers, num_decoder_layers,max_len)



model = TM.to(device)

learning_rate = 1e-3
batch_size = 64
epochs = 5

loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [267]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)    
    for batch, (X,y,idx) in enumerate(dataloader):
        X = X.reshape((32,64,16)) # reorder dimensions
        y = y.reshape((32,64,27)) # reorder dimensions
        
        # Compute prediction and loss
        y_shifted = torch.zeros([1,64,27])
        y_shifted = torch.cat((y_shifted, y), dim=0)
        pred = model(X,y_shifted)
        print(pred.shape, y.shape)
        pred_l = pred.reshape([64,33,27])
        y_l = y.reshape([64,32,27])
        
        # sum 3 different losses
        loss = loss_fn(pred_l, y_l)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            

def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y, idx in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= size
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

            

In [268]:
print(train_dataloader.__len__())
print(test_dataloader.__len__())

2
3


In [269]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)
print("Done!")


Epoch 1
-------------------------------
x torch.Size([32, 64, 16])
memory torch.Size([32, 64, 16])
memory_map torch.Size([32, 64, 27])
mask torch.Size([33, 33])
out torch.Size([33, 64, 27])
torch.Size([33, 64, 27]) torch.Size([32, 64, 27])


ValueError: Expected target size (64, 27), got torch.Size([64, 32, 27])

In [210]:
enumerate(train_dataloader)


<enumerate at 0x7fc7ece46798>