In [1]:
import numpy as np 
import pandas as pd 
from io import open
import tensorflow as tf
import glob
import pickle
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import time
import copy

In [2]:
!pip install music21



In [3]:
from music21 import converter, instrument, note, chord, stream

In [4]:
def preprocess_input(filename, folder=False):
    # master list of notes
    notes = []
    
    # converting folders with multiple MIDI files
    if folder == True:
        assert os.path.exists('../input/classical-music-midi/'+filename)
        for file in glob.glob('../input/classical-music-midi/'+filename+'/*.mid'):
            notes_per_piece = []
            # read the MIDI file
            midi = converter.parse(file)            
            
            try: # file has instrument parts
                s2 = instrument.partitionByInstrument(midi)
                notes_to_parse = s2.parts[1].recurse() 
            except: # file has notes in a flat structure
                notes_to_parse = midi.flat.notes
            
#             print(notes_to_parse)
            for element in notes_to_parse:
                if isinstance(element, note.Note):
                    notes_per_piece.append(str(element.pitch))
                elif isinstance(element, chord.Chord):
                    notes_per_piece.append('.'.join(str(n) for n in element.normalOrder))
            notes.append(notes_per_piece)
    else:
        assert os.path.exists(filename)
        midi = converter.parse(filename)
        try: # file has instrument parts
            s2 = instrument.partitionByInstrument(midi)
            notes_to_parse = s2.parts[1].recurse() 
        except: # file has notes in a flat structure
            notes_to_parse = midi.flat.notes
#         print(notes_to_parse)
        for element in notes_to_parse:
            if isinstance(element, note.Note):
                notes.append(str(element.pitch))
            elif isinstance(element, chord.Chord):
                notes.append('.'.join(str(n) for n in element.normalOrder))
        

    with open('./notes', 'wb') as filepath:
        pickle.dump(notes, filepath)
#     print(notes)
    return notes

In [6]:
notes = preprocess_input('beeth', folder=True)

In [5]:
notes = []
with (open("./notes", "rb")) as openfile:
    while True:
        try:
            notes = (pickle.load(openfile))
        except EOFError:
            break
# notes = 

In [6]:
len(notes[-5:])

5

In [7]:
class Dictionary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = []

    def add_word(self, word):
        if word not in self.word2idx:
            self.idx2word.append(word)
            self.word2idx[word] = len(self.idx2word) - 1
        return self.word2idx[word]

    def __len__(self):
        return len(self.idx2word)
    
    def words(self):
        return self.idx2word


class Corpus(object):
    def __init__(self, path):
        self.dictionary = Dictionary()
        self.train = self.tokenize(notes[:-12])
        self.valid = self.tokenize(notes[-12:-6])
        self.test = self.tokenize(notes[-6:])

    def tokenize(self, notes):
        """Tokenizes a note sequence"""
        assert len(notes) > 0
        
        # Add notes to the dictionary
        for song in notes:
            for note in song:
                self.dictionary.add_word(note)
#         # Add words to the dictionary
#         with open(path, 'r', encoding="utf8") as f:
#             for line in f:
#                 words = line.split() + ['<eos>']
#                 for word in words:
#                     self.dictionary.add_word(word)

        # Tokenize file content
        idss = []
        
        for song in notes:
            ids = []
            for note in song:
#                 print(note)
                ids.append(self.dictionary.word2idx[note])
        idss.append(torch.tensor(ids).type(torch.int64))
        ids = torch.cat(idss)
            
#         with open(path, 'r', encoding="utf8") as f:
#             idss = []
#             for line in f:
#                 words = line.split() + ['<eos>']
#                 ids = []
#                 for word in words:
#                     ids.append(self.dictionary.word2idx[word])
#                 idss.append(torch.tensor(ids).type(torch.int64))
#             ids = torch.cat(idss)

        return ids

In [8]:
corpus = Corpus('../input/classical-music-midi/beeth')

In [176]:
device = 'cuda'
def batchify(data, bsz):
    # Work out how cleanly we can divide the dataset into bsz parts.
    nbatch = data.size(0) // bsz
    # Trim off any extra elements that wouldn't cleanly fit (remainders).
    data = data.narrow(0, 0, nbatch * bsz)
    # Evenly divide the data across the bsz batches.
    data = data.view(bsz, -1).t().contiguous()
    return data.to(device)

eval_batch_size = 10
train_data = batchify(corpus.train, eval_batch_size)
val_data = batchify(corpus.valid, eval_batch_size)
test_data = batchify(corpus.test, eval_batch_size)

In [10]:
class PositionalEncoding(nn.Module):
    r"""Inject some information about the relative or absolute position of the tokens
        in the sequence. The positional encodings have the same dimension as
        the embeddings, so that the two can be summed. Here, we use sine and cosine
        functions of different frequencies.
    .. math::
        \text{PosEncoder}(pos, 2i) = sin(pos/10000^(2i/d_model))
        \text{PosEncoder}(pos, 2i+1) = cos(pos/10000^(2i/d_model))
        \text{where pos is the word position and i is the embed idx)
    Args:
        d_model: the embed dim (required).
        dropout: the dropout value (default=0.1).
        max_len: the max. length of the incoming sequence (default=5000).
    Examples:
        >>> pos_encoder = PositionalEncoding(d_model)
    """

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        self.d_model = d_model

        # PE is the Positional Encoding matrix 
        # THIS STORES THE POSITIONS OF THE SEQUENCE
        pe = torch.zeros(max_len, d_model)
        
        # Arange - RETURNS A RANGE BETWEEN VALUES, HERE IT IS 0 - max_len
        # unsqueeze - adds a dimension, 1 means that each element in the first list is now in a list
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # division term, here it is (10000 ** ((2 * i)/d_model))
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # calculating the position encoding for the even and odd terms
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        # Unsqueeze 0 will put PE in one list
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        r"""Inputs of forward function
        Args:
            x: the sequence fed to the positional encoder model (required).
        Shape:
            x: [sequence length, batch size, embed dim]
            output: [sequence length, batch size, embed dim]
        Examples:
            >>> output = pos_encoder(x)
        """
        # make embeddings relatively larger
        # This is so we do not lose the importance of the embedding
        x = x * math.sqrt(self.d_model)
        # we add the embedding to the PE 
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [13]:
class MultiHeadAttention(nn.Module):
    def __init__(self, heads, d_model, dropout = 0.1):
        super().__init__()
        
        self.d_model = d_model
        self.d_k = d_model // heads
        self.h = heads
        self.max_length = 1024
        
        self.q_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.out = nn.Linear(d_model, d_model)
    
    def forward(self, q, k, v, mask=None):
        
        bs = q.size(0)
        
        
        # perform linear operation and split into h heads
        
        k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
        q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
        v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
        
        # transpose to get dimensions bs * h * sl * d_model
       
        k = k.transpose(1,2)
        q = q.transpose(1,2)
        v = v.transpose(1,2)
        
        #Compute scaled dot-product self-attention
        #scale pre-matrix multiplication   
        q = q / (bs ** (1/4))
        k = k / (bs ** (1/4))


        # calculate attention using function we will define next
        scores = attention(q, k, v, self.d_k, mask, self.dropout)
        
        # concatenate heads and put through final linear layer
        concat = scores.transpose(1,2).contiguous().view(bs, -1, self.d_model)
        
        output = self.out(concat)
    
        return output

In [77]:
def attention(q, k, v, d_k, mask=None, dropout=None):
    
    scores = torch.matmul(q, k.transpose(-2, -1)) /  math.sqrt(d_k)

    if mask is not None:
        mask = mask.unsqueeze(1)
#         scores = scores.masked_fill(mask == 0, -1e9)
    scores = F.softmax(scores, dim=-1)
    
    if dropout is not None:
        scores = dropout(scores)
        
    output = torch.matmul(scores, v)
    return output

In [78]:
class FeedForward(nn.Module):
#     def __init__(self, d_model, d_ff=2048, dropout = 0.1):
#         super().__init__() 
#         # We set d_ff as a default to 2048
#         self.linear_1 = nn.Linear(d_model, d_ff)
#         self.dropout = nn.Dropout(dropout)
#         self.linear_2 = nn.Linear(d_ff, d_model)
#     def forward(self, x):
#         x = self.dropout(F.relu(self.linear_1(x)))
#         x = self.linear_2(x)
#         return x
    def __init__(self, d_model, d_ff=2048, dropout=0.1):
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.linear_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        return self.linear_2(self.dropout(F.relu(self.linear_1(x))))


In [124]:
# build an encoder layer with one multi-head attention layer and one # feed-forward layer
class EncoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout = 0.1):
        super().__init__()
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)
        self.attn = MultiHeadAttention(heads, d_model)
        self.ff = FeedForward(d_model)
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        x2 = self.norm_1(x)
        x = x + self.dropout_1(self.attn(x2,x2,x2,mask))
        x2 = self.norm_2(x)
        x = x + self.dropout_2(self.ff(x2))
        return x
    
# build a decoder layer with two multi-head attention layers and
# one feed-forward layer
class DecoderLayer(nn.Module):
    def __init__(self, d_model, heads, dropout=0.1):
        super().__init__()
        self.norm_1 = nn.LayerNorm(d_model)
        self.norm_2 = nn.LayerNorm(d_model)
        self.norm_3 = nn.LayerNorm(d_model)
        
        self.dropout_1 = nn.Dropout(dropout)
        self.dropout_2 = nn.Dropout(dropout)
        self.dropout_3 = nn.Dropout(dropout)
        
        self.attn_1 = MultiHeadAttention(heads, d_model)
        self.attn_2 = MultiHeadAttention(heads, d_model)
        self.ff = FeedForward(d_model)
    def forward(self, x, mask):
#         x2 = self.norm_1(x)
#         x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask))
#         x2 = self.norm_2(x)
#         x = x + self.dropout_2(self.attn_2(x2, x2, x2,
#         src_mask))
#         x2 = self.norm_3(x)
#         x = x + self.dropout_3(self.ff(x2))
        
        #perform masked attention on input
        #masked so queries cannot attend to subsequent keys
        #Pass through sublayers of attention and feedforward.
        #Apply dropout to sublayer output, add it to input, and norm.
        attn = self.attn_1(x, x, x, mask)
        x = x + self.dropout_1(attn)
        x = self.norm_1(x)

        ff = self.ff(x)
        x = x + self.dropout_2(ff)
        x = self.norm_2(x)

        return x
#         return x
# We can then build a convenient cloning function that can generate multiple layers:
def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

In [125]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads):
        super().__init__()
        self.N = N
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model)
        self.layers = get_clones(EncoderLayer(d_model, heads), self.N)
        self.norm = nn.LayerNorm(d_model)
    def forward(self, src, mask):
#         x = self.embed(src)
#         x = self.pe(x)
        x = self.layers[0](src, mask)
        for i in range(1,self.N):
            x = self.layers[i](x, mask)
        return self.norm(x)
    
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, N, heads):
        super().__init__()
        self.N = N
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model)
        self.layers = get_clones(DecoderLayer(d_model, heads), N)
        self.norm = nn.LayerNorm(d_model)
    def forward(self, trg, src_mask):
#         x = self.embed(trg)
#         x = self.pe(x)
        x = self.layers[0](trg, src_mask)
        for i in range(1,self.N):
            x = self.layers[i](x, src_mask)
        return self.norm(x)

In [126]:
class TransformerModel(nn.Module):
    """Container module with an encoder, a recurrent or transformer module, and a decoder."""

    def __init__(self, ntoken, ninp, nhead, nhid, nlayers, dropout=0.5):
        super(TransformerModel, self).__init__()
        try:
            from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer
        except:
            raise ImportError('TransformerEncoder module does not exist in PyTorch 1.1 or lower.')
        self.model_type = 'Transformer'
        # original mask
        self.src_mask = None
        # positional encoding
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        # encoder
#         encoder_layers = TransformerEncoderLayer(ninp, nhead, nhid, dropout)
#         self.encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = Encoder(ntoken, ninp, nlayers, nhead)
        self.encoder.eval()
        # decoder
#         decoder_layers = TransformerDecoderLayer(ninp, nhead, nhid, dropout)
#         self.decoder = TransformerDecoder(decoder_layers, nlayers)
        self.decoder = Decoder(ntoken, ninp, nlayers, nhead)
        self.decoder.eval()
        # embedding encoding
        self.embedding = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
#         self.decoder = nn.Linear(ninp, ntoken)

        # classification layer
        self.classification_layer = nn.Linear(ninp, ntoken)

        self.init_weights()

    def _generate_square_subsequent_mask(self, sz):
        
        mask = (torch.triu(torch.ones(1, sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def init_weights(self):
        initrange = 0.1
        nn.init.uniform_(self.embedding.weight, -initrange, initrange)
        nn.init.zeros_(self.classification_layer.weight)
        nn.init.uniform_(self.classification_layer.weight, -initrange, initrange)

    def forward(self, src, has_mask=True):
        if has_mask:
            device = src.device
            if self.src_mask is None or self.src_mask.size(0) != len(src):
                mask = self._generate_square_subsequent_mask(len(src)).to(device)
                self.src_mask = mask
        else:
            self.src_mask = None

#         src = self.embedding(src) * math.sqrt(self.ninp)
        src_embedding = self.embedding(src)
        src_embedding = self.pos_encoder(src_embedding)
        output = self.encoder(src_embedding, self.src_mask)
        output = self.decoder(output, self.embedding(src))
        
        output = self.classification_layer(output) # projection to vocab size

        return F.log_softmax(output, dim=-1)


In [136]:
ntokens = len(corpus.dictionary)
emsize = 200
nhead = 10
nhid = 200
nlayers = 4
dropout = 0.2
# Loop over epochs.
lr = 5
best_val_loss = None
epochs = 400
save = './model.pt'

In [128]:
model = TransformerModel(ntokens, emsize, nhead, nhid, nlayers, dropout).to(device)

In [129]:
criterion = nn.NLLLoss()

In [130]:
def repackage_hidden(h):
    """Wraps hidden states in new Tensors, to detach them from their history."""

    if isinstance(h, torch.Tensor):
        return h.detach()
    else:
        return tuple(repackage_hidden(v) for v in h)

In [131]:
# get_batch subdivides the source data into chunks of length args.bptt.
# If source is equal to the example output of the batchify function, with
# a bptt-limit of 2, we'd get the following two Variables for i = 0:
# ┌ a g m s ┐ ┌ b h n t ┐
# └ b h n t ┘ └ c i o u ┘
# Note that despite the name of the function, the subdivison of data is not
# done along the batch dimension (i.e. dimension 1), since that was handled
# by the batchify function. The chunks are along dimension 0, corresponding
# to the seq_len dimension in the LSTM.
seq_length = 40
def get_batch(source, i):
    seq_len = min(seq_length, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].view(-1)
    return data, target

In [132]:
def evaluate(data_source):
    # Turn on evaluation mode which disables dropout.
    model.eval()
    total_loss = 0.
    ntokens = len(corpus.dictionary)

    with torch.no_grad():
        for i in range(0, data_source.size(0) - 1, seq_length):
            data, targets = get_batch(data_source, i)
            
            output = model(data)
            output = output.view(-1, ntokens)
            
            total_loss += len(data) * criterion(output, targets).item()
    return total_loss / (len(data_source) - 1)

In [177]:
def train():
    # Turn on training mode which enables dropout.
    model.train()
    total_loss = 0.
    start_time = time.time()
    ntokens = len(corpus.dictionary)
    optimizer = torch.optim.Adam(model.parameters())
    
    
    for batch, i in enumerate(range(0, train_data.size(0) - 1, seq_length)):
        data, targets = get_batch(train_data, i)
        # Starting each batch, we detach the hidden state from how it was previously produced.
        # If we didn't, the model would try backpropagating all the way to start of the dataset.
        model.zero_grad()
        
        output = model(data)
        output = output.view(-1, ntokens)
     
        loss = criterion(output, targets)
        loss.backward()
        
        optimizer.step()
        
        clip = 0.25
        log_interval = 100
        
    
        for p in model.parameters():
            print(p)
#             p.data.add_(p.grad, alpha=-lr)

        
        # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        total_loss += loss.item()

        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.2f} | ms/batch {:5.2f} | '
                    'loss {:5.2f} | ppl {:8.2f}'.format(
                epoch, batch, len(train_data) // seq_length, lr,
                elapsed * 1000 / log_interval, cur_loss, math.exp(cur_loss)))
            total_loss = 0
            start_time = time.time()
            
            evaluate(val_data)
            model.train()
#         if dry_run:
#             break

In [None]:
# At any point you can hit Ctrl + C to break out of training early.
try:
    for epoch in range(1, epochs+1):
        epoch_start_time = time.time()
        train()
        val_loss = evaluate(val_data)
        print('-' * 89)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
                'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
                                           val_loss, math.exp(val_loss)))
        print('-' * 89)
        # Save the model if the validation loss is the best we've seen so far.
        if not best_val_loss or val_loss < best_val_loss:
            with open(save, 'wb') as f:
                torch.save(model, f)
            best_val_loss = val_loss
        else:
            # Anneal the learning rate if no improvement has been seen in the validation dataset.
            lr /= 4.0
except KeyboardInterrupt:
    print('-' * 89)
    print('Exiting from training early')

Parameter containing:
tensor([[ 0.3811, -0.5035,  0.7977,  ..., -1.3944, -0.9503, -1.2206],
        [-0.5620,  0.1179, -1.0413,  ...,  0.5574,  1.3636, -0.7547],
        [ 0.8543,  0.6769, -0.6652,  ...,  0.6294, -1.1389,  0.9313],
        ...,
        [ 1.1222, -0.5374,  0.3128,  ...,  0.2706, -0.7053,  1.1442],
        [-0.1760,  1.6945, -0.2137,  ...,  1.5362,  1.4991, -0.1906],
        [-0.0424,  0.6481, -1.5634,  ..., -0.2551,  0.9475, -1.1640]],
       device='cuda:0', requires_grad=True)
Parameter containing:
tensor([1.0718, 1.0489, 1.0190, 1.1066, 0.9619, 1.0221, 1.0453, 0.9423, 0.9567,
        0.9296, 1.0160, 0.9612, 0.9863, 1.0215, 0.9430, 1.0657, 0.9548, 0.9667,
        1.0084, 0.9732, 1.0196, 0.9006, 1.0498, 0.9085, 0.8470, 0.9832, 0.9991,
        0.8574, 1.0495, 0.8511, 0.9618, 0.9996, 0.8824, 0.9466, 0.9002, 0.9295,
        1.0536, 0.8833, 0.9265, 0.9039, 0.9085, 0.9333, 0.9680, 1.0154, 0.8993,
        0.8964, 0.9666, 0.8081, 0.9150, 0.9471, 0.9363, 0.9834, 0.8399, 0.8710

In [None]:
n_generate = 500
temperature = 2.0
sequence = []
log_interval = 50 # interval between logs

In [None]:
model.eval()

input = torch.randint(ntokens, (1, 1), dtype=torch.long).to(device)

with open('./output', 'w') as outf:
    with torch.no_grad():  # no tracking history
        for i in range(n_generate):
            
            output = model(input, False)
            word_weights = output[-1].squeeze().div(temperature).exp().cpu()
            word_idx = torch.multinomial(word_weights, 1)[0]
            word_tensor = torch.Tensor([[word_idx]]).long().to(device)
            input = torch.cat([input, word_tensor], 0)


            word = corpus.dictionary.idx2word[word_idx]

            outf.write(word + ('\n' if i % 20 == 19 else ' '))
            
            sequence.append(word)

            if i % log_interval == 0:
                print('| Generated {}/{} notes'.format(i, n_generate))

In [None]:
# Creating a MIDI file from prediction
def create_MIDI(gen, name=""):
    
    # the offset is the time difference between notes, we assume its 0.5 here
    offset = 0
    music = []
    
    for seq in gen:
        # chords are seperated by .
        if ('.' in seq) or seq.isdigit():
            chordnotes = seq.split('.')
            n = []
            
            for cur_note in chordnotes:
                new_n = note.Note(int(cur_note))
                new_n.storedInstrument = instrument.Piano() # single piano instrument only
                n.append(new_n)
            
            new_c = chord.Chord(n)
            new_c.offset = offset
            music.append(new_c)
        
        else:            
            new_n = note.Note(seq)
            new_n.storedInstrument = instrument.Piano() # single piano instrument only
            new_n.offset = offset
            music.append(new_n)
        
        offset += 0.5
    
    print(notes)
    # producing a MIDI stream
    midi = stream.Stream(music)
    
    midi.write('midi', fp='new_music_'+name+'.mid')

In [None]:
create_MIDI(sequence, name='beeth_all_diverse')