In [429]:
import torch
import pandas as pd
import numpy as np
import ast
import itertools
import math

## Data processing

In [430]:
# load the data
data_train = pd.read_csv('../train.csv')
data_train['beat_pitch'] = data_train['beat_pitch'].apply(ast.literal_eval)
data_train['pitches'] = data_train['pitches'].apply(ast.literal_eval)

In [431]:
data_train.shape

(363, 4)

In [432]:
# data augmentation, move all the notes one octave up or down
def pitch_augmentation(beat_pitch):
    new_beat_pitch_higher = []
    new_beat_pitch_lower = []
    for bar in beat_pitch:
        new_bar_higher = []
        new_bar_lower = []
        for (pos, pitch) in bar:
            new_bar_higher.append((pos, pitch+12))
            new_bar_lower.append((pos, pitch-12))
        new_beat_pitch_higher.append(new_bar_higher)
        new_beat_pitch_lower.append(new_bar_lower)
    return new_beat_pitch_higher, new_beat_pitch_lower

length = data_train.shape[0]
for i in range(length):
    beat_pitch = data_train['beat_pitch'][i]
    new_beat_pitch_higher, new_beat_pitch_lower = pitch_augmentation(beat_pitch)
    # append the new beat pitch to the original dataframe, leave all the other columns empty
    data_train = data_train.append({'beat_pitch': new_beat_pitch_higher}, ignore_index=True)
    data_train = data_train.append({'beat_pitch': new_beat_pitch_lower}, ignore_index=True)

data_train.shape

(1089, 4)

In [433]:
data_train.tail()

Unnamed: 0,piece,beats,pitches,beat_pitch
1084,,,,"[[(1, 57), (2, 59), (3, 61), (4, 62), (5, 64),..."
1085,,,,"[[(1, 89), (3, 88), (5, 86), (7, 89), (9, 88),..."
1086,,,,"[[(1, 65), (3, 64), (5, 62), (7, 65), (9, 64),..."
1087,,,,"[[(1, 88), (4, 91), (5, 84), (7, 93)], [(1, 86..."
1088,,,,"[[(1, 64), (4, 67), (5, 60), (7, 69)], [(1, 62..."


In [434]:
# define the dictionary
pitch_number_dict = {i:i for i in range(128)}
pitch_number_dict['<SOS>'] = 129
pitch_number_dict['<EOS>'] = 130
pitch_number_dict['<UNK>'] = 131
pitch_number_dict['<MASK>'] = 132
pitch_number_dict['<SEP>'] = 133
pitch_number_dict['<PAD>'] = 134
pitch_number_dict['<BLANK>'] = 135

In [435]:
# change the (position,pitch) to a sequence of numbers
def change_it_to_bar_vectors(song):
    song_1 = []
    BLANK = pitch_number_dict['<BLANK>']
    for bar in song:
        vec = [BLANK] * 12
        for tup in bar:
            vec[tup[0]-1] = tup[1]
        song_1.append(vec)
    return song_1

In [436]:
# change the original data to x,y
def prepare_data(data_train,max_len):
    # use the start, end, and middle pitch to create the pitch sequence as x
    # use the beat_pitch to create the pitch sequence as y (use 'SEP' to separate the bars)
    x_train = []
    y_train = []

    # get the number of special tokens
    SOS = pitch_number_dict['<SOS>']
    EOS = pitch_number_dict['<EOS>']
    SEP = pitch_number_dict['<SEP>']
    PAD = pitch_number_dict['<PAD>']

    for i in range(len(data_train)):
        piece = data_train.loc[i,'beat_pitch']
        piece = change_it_to_bar_vectors(piece)
        piece_x = []
        piece_y = []

        for bar in piece:
            piece_x.extend(bar + [SEP])
            piece_y.extend(bar + [SEP])
        
        # remove the last <SEP>
        piece_x = piece_x[:-1]
        piece_y = piece_y[1:]

        # add <SOS>
        piece_x = [SOS] + piece_x
        piece_y = [SOS] + piece_y

        # add <PAD>
        piece_x.extend([PAD] * (max_len - len(piece_x)))
        piece_y.extend([PAD] * (max_len - len(piece_y)))
        
        # add <EOS>
        piece_x.append(EOS)
        piece_y.append(EOS)

        x_train.append(piece_x)
        y_train.append(piece_y)
    return x_train,y_train


In [437]:
x_train,y_train = prepare_data(data_train,max_len=399)

In [438]:
print(len(x_train[0]),len(y_train[0]))

400 400


In [439]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y
    
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, index):
        x = torch.tensor(self.x[index])
        y = torch.tensor(self.y[index])
        return x,y

In [440]:
loader = torch.utils.data.DataLoader(Dataset(x_train,y_train),
                                     batch_size=8,
                                     shuffle=True,
                                     collate_fn=None,
                                     drop_last=True)

## Utils

In [441]:
def attention(Q, K, V, mask):
    # Q, K, V: (batch_size, len_of_sequence, head_number=4, embedding_size_per_head=8)
    # Q, K, V: (len_of_training_set, head_number=4, len_of_sequence, embedding_size_per_head=8)
    # Q*K: get the attention score between each word in the sequence
    # Q*K: (len_of_training_set, head_number=4, len_of_sequence, len_of_sequence)
    score = torch.matmul(Q,K.permute(0,1,3,2)) / np.sqrt(8)
    
    # mask the score
    # mask: (len_of_training_set, 1, len_of_sequence, len_of_sequence)
    score = score.masked_fill(mask, -float('inf'))
    
    # softmax the score
    score = torch.softmax(score, dim=-1)

    # score*V: get the weighted sum of the value
    # score*V: (len_of_training_set, head_number=4, len_of_sequence, embedding_size_per_head=8)
    score = torch.matmul(score, V)

    # concat the heads
    # [batch_size, head_number=4, len_of_sequence, embedding_size_per_head=8] -> [batch_size, len_of_sequence, embedding_size=4*8=32]
    score = score.permute(0,2,1,3).reshape(score.shape[0], score.shape[2], -1)

    return score

In [442]:
class MultiHead(torch.nn.Module):
    def __init__(self):
        super().__init__()
        
        self.head_number = 4
        self.embedding_size_per_head = 8

        self.fc_Q = torch.nn.Linear(32,32)
        self.fc_K = torch.nn.Linear(32,32)
        self.fc_V = torch.nn.Linear(32,32)

        self.fc_out = torch.nn.Linear(32,32)

        self.norm = torch.nn.LayerNorm(normalized_shape=32, elementwise_affine=True)
        self.dropout = torch.nn.Dropout(0.1)
    
    def forward(self, Q, K, V, mask):
        # Q, K, V: (batch_size, len_of_sequence, embedding_size=32)
        batch_size = Q.shape[0]
        len_of_sequence = Q.shape[1]

        # keep the original Q
        Q_original = Q

        # normalize
        Q = self.norm(Q)
        K = self.norm(K)
        V = self.norm(V)
        
        # linear projection, the dimension will not change
        Q = self.fc_Q(Q)
        K = self.fc_K(K)
        V = self.fc_V(V)
        
        # split the heads   
        # Q, K, V: (batch_size, len_of_sequence, head_number=4, embedding_size_per_head=8)
        Q = Q.reshape(batch_size, len_of_sequence, self.head_number, self.embedding_size_per_head).permute(0,2,1,3)
        K = K.reshape(batch_size, len_of_sequence, self.head_number, self.embedding_size_per_head).permute(0,2,1,3)
        V = V.reshape(batch_size, len_of_sequence, self.head_number, self.embedding_size_per_head).permute(0,2,1,3)

        # get the attention score
        # score: (batch_size, len_of_sequence, embedding_size=4*8=32)
        score = attention(Q, K, V, mask)
        
        # get the output
        # score: (batch_size, len_of_sequence, embedding_size=32)
        score = self.dropout(self.fc_out(score))

        # residual connection
        score = score + Q_original

        return score

In [443]:
class PositionEmbedding(torch.nn.Module):
    def __init__(self):
        super().__init__()

        # pos:index of the position, i: index of the embedding, d_model: embedding size
        def get_pe(pos, i, d_model):
            denominator = 1e4 ** (i / d_model)
            pe = pos / denominator

            if i % 2 == 0:
                return math.sin(pe)
            return math.cos(pe)
        
        # initialize the position embedding
        pe = torch.empty(400,32)
        for i in range(400):
            for j in range(32):
                pe[i,j] = get_pe(i,j,32)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

        # word embedding
        self.embed = torch.nn.Embedding(136,32)
        # initialize the word embedding
        self.embed.weight.data.normal_(0, 0.1)

    def forward(self, x):
        # x: [8, 400] -> [8, 400, 32]
        embed = self.embed(x)
        
        # add the position embedding
        # embed: [8, 400, 32] + [1, 400, 32] -> [8, 400, 32]
        embed = embed + self.pe
        return embed

In [444]:
class FullyConnectedOutput(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = torch.nn.Sequential(
            torch.nn.Linear(in_features=32, out_features=64),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=64, out_features=32),
            torch.nn.Dropout(0.1)
        )
        self.norm = torch.nn.LayerNorm(normalized_shape=32)
    def forward(self, x):
        # get the original x
        x_original = x.clone()
        # normalize
        x = self.norm(x)
        # linear projection
        x = self.fc(x)
        # residual connection
        x = x + x_original
        return x


In [445]:
def mask_pad(data):
    # data: (len_of_training_set, len_of_sequence)
    mask = (data == pitch_number_dict['<PAD>'])

    # mask: (len_of_training_set, 1, 1, len_of_sequence)
    mask = mask.reshape(-1,1,1,mask.shape[1])
    # mask: (len_of_training_set, 1, len_of_sequence, len_of_sequence)
    mask = mask.expand(-1,1,mask.shape[3],mask.shape[3])
    return mask

In [446]:
def mask_tril(data):
    # data: (len_of_training_set, len_of_sequence)
    tril = 1 - torch.tril(torch.ones(1,data.shape[1],data.shape[1]))
    mask = (data == pitch_number_dict['<PAD>'])
    mask = mask.unsqueeze(1).long()
    mask = mask + tril
    mask = mask > 0
    mask = (mask == 1).unsqueeze(dim=1)
    return mask

## Model

In [447]:
class EncoderLayer(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.multihead = MultiHead()
        self.fc = FullyConnectedOutput()
    
    def forward(self, x, mask):
        # x: (len_of_training_set, len_of_sequence, embedding_size=32)
        # mask: (len_of_training_set, 1, len_of_sequence, len_of_sequence)
        score = self.multihead(x, x, x, mask)
        out = self.fc(score)
        return out

In [448]:
class Encoder(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.layer_1 = EncoderLayer()
        self.layer_2 = EncoderLayer()
        self.layer_3 = EncoderLayer()
    
    def forward(self, x, mask):
        x = self.layer_1(x, mask)
        x = self.layer_2(x, mask)
        x = self.layer_3(x, mask)
        return x

In [449]:
class DecoderLayer(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.multihead_1 = MultiHead()
        self.multihead_2 = MultiHead()
        self.fc = FullyConnectedOutput()
    
    def forward(self,x,y,mask_pad_x,mask_tril_y):
        # x: (len_of_training_set, len_of_sequence, embedding_size=32)
        # y: (len_of_training_set, len_of_sequence, embedding_size=32)
        # mask_pad_x: (len_of_training_set, 1, len_of_sequence, len_of_sequence)
        # mask_tril_y: (len_of_training_set, 1, len_of_sequence, len_of_sequence)
        y = self.multihead_1(y, y, y, mask_tril_y)
        y = self.multihead_2(y, x, x, mask_pad_x)
        out = self.fc(y)
        return out

In [450]:
class Decoder(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.layer_1 = DecoderLayer()
        self.layer_2 = DecoderLayer()
        self.layer_3 = DecoderLayer()
    
    def forward(self,x,y,mask_pad_x,mask_tril_y):
        y = self.layer_1(x,y,mask_pad_x,mask_tril_y)
        y = self.layer_2(x,y,mask_pad_x,mask_tril_y)
        y = self.layer_3(x,y,mask_pad_x,mask_tril_y)
        return y

In [451]:
class Transformer(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.embed = PositionEmbedding()
        self.encoder = Encoder()
        self.decoder = Decoder()
        self.fc_out = torch.nn.Linear(32,136)
    
    def forward(self,x,y):
        mask_pad_x = mask_pad(x)
        mask_tril_y = mask_tril(y)
        # x: (len_of_training_set, len_of_sequence) -> (len_of_training_set, len_of_sequence, embedding_size=32)
        # y: (len_of_training_set, len_of_sequence) -> (len_of_training_set, len_of_sequence, embedding_size=32)
        x,y = self.embed(x),self.embed(y)

        # encoder layer
        # x: (len_of_training_set, len_of_sequence, embedding_size=32) -> (len_of_training_set, len_of_sequence, embedding_size=32)
        x = self.encoder(x, mask_pad_x)

        # decoder layer
        # y: (len_of_training_set, len_of_sequence, embedding_size=32) -> (len_of_training_set, len_of_sequence, embedding_size=32)
        y = self.decoder(x, y, mask_pad_x, mask_tril_y)

        # fully connected layer
        # y: (len_of_training_set, len_of_sequence, embedding_size=32) -> (len_of_training_set, len_of_sequence, embedding_size=136)
        y = self.fc_out(y)
        
        return y

## Train

In [452]:
model = Transformer()
loss_func = torch.nn.CrossEntropyLoss()
optim = torch.optim.Adam(model.parameters(), lr=2e-3)
sched = torch.optim.lr_scheduler.StepLR(optim, step_size=3, gamma=0.5)

In [465]:
best_acc = 0
best_acc_loss = 0

# load the pretrained model
# model.load_state_dict(torch.load('transformer.pth'))

for epoch in range(30):
    print('epoch: ',epoch)
    for i,(x,y) in enumerate(loader):
        # x: [8,400], y: [8,400]
        pred = model(x,y)
        pred = pred.reshape(-1,136)
        y = y.reshape(-1)

        select = (y != pitch_number_dict['<PAD>'])
        pred = pred[select]
        y = y[select]

        loss = loss_func(pred,y)
        optim.zero_grad()
        loss.backward()
        optim.step()

        if i % 10 == 0:
            pred = pred.argmax(dim=1)
            correct = (pred == y).sum().item()
            acc = correct / len(pred)
            if acc > best_acc:
                best_acc = acc
                best_acc_loss = loss.item()
                torch.save(model.state_dict(), 'transformer.pth')
    print('best accuracy: ',best_acc, ' best accuracy loss: ',best_acc_loss)
    sched.step()

epoch:  0
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  1
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  2
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  3
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  4
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  5
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  6
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  7
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  8
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  9
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  10
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  11
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  12
best accuracy:  1.0  best accuracy loss:  0.001516359276138246
epoch:  13
best accuracy:  1.0  best accuracy lo

KeyboardInterrupt: 

## Predict

In [466]:
def predict(x):
    x = torch.tensor(x).unsqueeze(0)
    print(x)
    # x: [1,400]
    model.eval()
    
    # initialize the mask
    mask_pad_x = mask_pad(x)

    # initialize the output
    target = [pitch_number_dict['<SOS>']] + [pitch_number_dict['<PAD>']] * 399
    target = torch.tensor(target).unsqueeze(0)

    # embed the input, [1, 400] -> [1, 400, 32]
    x = model.embed(x)

    # encoder layer
    # x: [1, 400, 32] -> [1, 400, 32]
    x = model.encoder(x, mask_pad_x)

    # generate the output
    # 1 + 8*12 + 7 + 1 = 105
    for i in range(105):
        y = target

        # initialize the mask
        mask_tril_y = mask_tril(y)

        # embed the output, [1, 400] -> [1, 400, 32]
        y = model.embed(y)

        # decoder layer
        # y: [1, 400, 32] -> [1, 400, 32]
        y = model.decoder(x, y, mask_pad_x, mask_tril_y)

        # fully connected layer
        # y: [1, 400, 32] -> [1, 400, 136]
        out = model.fc_out(y)

        # get the output for the current position
        # out: [1, 400, 136] -> [1, 136]
        out = out[:,i,:]

        # get the index of the maximum value
        # out: [1, 136] -> [1]
        out = out.argmax(dim=1).detach()

        # update the target
        target[:,i+1] = out

    return target

In [None]:
def generate_x(triple_counter):
    pass

In [467]:
for i in range(1):
    # x = generate_x(triple_counter)
    # pred = predict(x)
    # test triple [[(1, 83), (5, 79), (9, 76)]]
    # test_triple = [[(1, 83), (5, 79), (9, 76)]]
    # test_triple = change_it_to_bar_vectors(test_triple)
    # test_x = [pitch_number_dict['<SOS>']] + test_triple[0] + [pitch_number_dict['<PAD>']] * (400-2-len(test_triple[0])) + [pitch_number_dict['<EOS>']]

    test_x = x_train[i]

    pred = predict(test_x)
    print(pred)
    print(y_train[i])

tensor([[129,  74, 135, 135, 135,  69, 135,  67, 135,  66, 135,  64, 135, 133,
          62, 135,  79, 135,  78, 135,  76, 135,  78, 135,  76, 135, 133,  74,
         135, 135, 135,  69, 135,  67, 135,  66, 135,  64, 135, 133,  62, 135,
          74,  73,  74, 135, 135, 135, 135, 135, 135, 135, 133,  78, 135,  76,
         135,  74, 135,  76, 135,  78, 135,  74, 135, 133,  79, 135,  78, 135,
          76, 135,  78, 135,  79, 135,  76, 135, 133,  78, 135,  76, 135,  74,
         135,  81, 135,  79, 135,  78, 135, 133,  76, 135, 135, 135,  69, 135,
         135, 135, 135, 135, 135, 135, 133,  78, 135,  76, 135,  74, 135,  73,
         135,  71, 135, 135, 135, 133,  79, 135,  78, 135,  76, 135,  74, 135,
          73, 135, 135, 135, 133,  74, 135, 135, 135,  69, 135,  67, 135,  66,
         135,  64, 135, 133,  62, 135,  74,  73,  74, 135, 135, 135, 135, 135,
         135, 135, 134, 134, 134, 134, 134, 134, 134, 134, 134, 134, 134, 134,
         134, 134, 134, 134, 134, 134, 134, 134, 134