## Preparations

### Download required packages

In [None]:
!pip install music21
!pip install mido
!pip install pretty_midi



### Import modules and define paths

In [None]:
import pretty_midi
from music21 import *
import numpy as np
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import time
import glob
from itertools import groupby
import math
import pickle
import gc

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
import numpy as np
import os
import re
import string
import random

from sklearn.preprocessing import MultiLabelBinarizer
mlb = MultiLabelBinarizer()
mlb.fit([np.arange(128).tolist()])

encoded_data_path = '/content/'
output_path = '/content/'

batch_size = 32
sequence_length = 600
generate_sample_every_ep = 100

maxlen = sequence_length  # Max sequence size
embed_dim = 128  # Embedding size for each token
num_heads = 4  # Number of attention heads
feed_forward_dim = 128  # Hidden layer size in feed forward network inside transformer

combi_to_int_pickle = 'combi_to_int.pickle'
int_to_combi_pickle = 'int_to_combi.pickle'
vocab_pickle = 'vocab.pickle'

vocab_size = 7184  # classical = 7184; jazz = 40000
unk_tag_str = '<UNK>'
unk_tag_idx = 0
pad_tag_str = ''
pad_tag_idx = 1

# !unzip /content/classical_data.zip
# !unzip /content/jazz_data.zip

### Import variables from pickle
If you did not manually process and tokenise data yourself, importing is necessary.

In [None]:
with open('/content/longer_classical_combi_to_int.pickle', 'rb') as f:
    combi_to_int = pickle.load(f)
    
with open('/content/longer_classical_all_song_tokenised.pickle', 'rb') as f:
    all_song_tokenised = pickle.load(f)

with open('/content/longer_classical_int_to_combi.pickle', 'rb') as f:
    int_to_combi = pickle.load(f)
    
with open('/content/longer_classical_vocab.pickle', 'rb') as f:
    vocab = pickle.load(f)

## Transformer


### Embedding Layers

In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.maxlen = maxlen
        self.maximum_position_encoding = 10000
        
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'vocab_size': self.vocab_size,
            'embed_dim': self.embed_dim,
            'maxlen': self.maxlen,
        })
        return config
    
    def get_angles(self, pos, i, d_model):
        angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
        return pos * angle_rates
    
    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(np.arange(position)[:, np.newaxis],
                              np.arange(d_model)[np.newaxis, :],
                              d_model)

        # apply sin to even indices in the array; 2i
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

        # apply cos to odd indices in the array; 2i+1
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

        pos_encoding = angle_rads[np.newaxis, ...]

        return tf.cast(pos_encoding, dtype=tf.float32)
        
    def call(self, x):
        maxlen = tf.shape(x)[-1]
        pos_encoding = self.positional_encoding(self.maximum_position_encoding, self.embed_dim)
        x = self.token_emb(x)
        return x + pos_encoding[:, :maxlen, :]


### Self-attention with causal masking

In [None]:
class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        #defining no of nodes/dim for each layer
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f'embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}'
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)
        
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'embed_dim': self.embed_dim,
            'num_heads': self.num_heads,
        })
        return config

    @staticmethod
    def causal_attention_mask(n_dest, n_src, dtype):
        """
        1's in the lower triangle, counting from the lower right corner.
        """
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        return tf.cast(m, dtype)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)

        # prevent information flow from future tokens
        shape = tf.shape(scaled_score)
        dim_dest, dim_src = shape[2], shape[3]
        attention_mask = self.causal_attention_mask(
            dim_dest, dim_src, scaled_score.dtype
        )
        attention_mask = tf.reshape(attention_mask, [1, 1, dim_dest, dim_src])
        scaled_score = scaled_score * attention_mask - 1e4 * (1 - attention_mask)

        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        
        # each of size (batch_size, seq_len, embed_dim)
        query = self.query_dense(inputs)  
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)
        
        # each of size (batch_size, num_heads, seq_len, projection_dim)
        query = self.separate_heads(query, batch_size)  
        key = self.separate_heads(key, batch_size)
        value = self.separate_heads(value, batch_size)
        
        attention, weights = self.attention(query, key, value)
        # attention: (batch_size, seq_len, num_heads, projection_dim)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        # concat_attention: (batch_size, seq_len, embed_dim)
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))  
        output = self.combine_heads(concat_attention)  # (batch_size, seq_len, embed_dim)
        return output

### Transformer block

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout_rate=0.1):
        super(TransformerBlock, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.dropout_rate = dropout_rate
        
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = keras.Sequential([
            layers.Dense(ff_dim, activation='relu'),
            layers.Dense(embed_dim)
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(dropout_rate)
        self.dropout2 = layers.Dropout(dropout_rate)
        
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'embed_dim': self.embed_dim,
            'num_heads': self.num_heads,
            'ff_dim': self.ff_dim,
            'dropout_rate': self.dropout_rate,
        })
        return config

    def call(self, inputs):
        attention_output = self.att(inputs)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

### Create a model

In [None]:
train_loss = []
val_loss = []

def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block1 = TransformerBlock(embed_dim, num_heads, feed_forward_dim, dropout_rate = 0.25)
    transformer_block2 = TransformerBlock(embed_dim, num_heads, feed_forward_dim, dropout_rate = 0.25)
    transformer_block3 = TransformerBlock(embed_dim, num_heads, feed_forward_dim, dropout_rate = 0.25)
    x = transformer_block1(x)
    x = transformer_block2(x)
    x = transformer_block3(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    opt = keras.optimizers.Adam(learning_rate=0.001)
    # No loss and optimization based on word embeddings from transformer block
    model.compile('adam', loss=[loss_fn, None])
    return model

In [None]:
model = create_model()
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 600)]             0         
                                                                 
 token_and_position_embeddin  (None, 600, 128)         919552    
 g (TokenAndPositionEmbeddin                                     
 g)                                                              
                                                                 
 transformer_block (Transfor  (None, 600, 128)         99584     
 merBlock)                                                       
                                                                 
 transformer_block_1 (Transf  (None, 600, 128)         99584     
 ormerBlock)                                                     
                                                                 
 transformer_block_2 (Transf  (None, 600, 128)         99584 

## Generator
A custom generator to input one random sequence from each song to train. (Instead of the old method of one shot loading all iterative sequence to the model to train, referenced from MusicTransformer)


In [None]:
class Generator(keras.utils.Sequence) :
    def __init__(self, all_song_tokenised, batch_size, sequence_length, 
                 val_split=0, shuffle=True) :
        self.all_song_tokenised = all_song_tokenised
        self.pad_tag_idx = 1
        self.sequence_length = sequence_length
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.val_split = val_split
        if(self.val_split != 0):
            self.all_song_tokenised = random.choices(
                self.all_song_tokenised, k=int(self.val_split*len(self.all_song_tokenised)))
            self.batch_size = len(self.all_song_tokenised)
        self.on_epoch_end()
    
    def __len__(self) :
        return int(np.ceil(len(self.all_song_tokenised)/ self.batch_size))

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        if self.shuffle == True:
            np.random.shuffle(self.all_song_tokenised)
  
    def __getitem__(self, idx) :
        batch_x = np.empty((0, self.sequence_length), float)
        batch_y = np.empty((0, self.sequence_length), float)
        for i in range(self.batch_size):
            if idx * self.batch_size + i == len(self.all_song_tokenised) - 1:
                return batch_x, batch_y
            song = self.all_song_tokenised[idx*self.batch_size + i]
            start_idx = random.randint(0,len(song) - self.sequence_length / 2)
            seq = song[start_idx: start_idx + self.sequence_length + 1]
            x = seq[:-1]
            y = seq[1:]
            # padding if needed
            if len(y) < self.sequence_length:
                no_of_pad = self.sequence_length - len(y)
                x = np.append(x, [self.pad_tag_idx]*no_of_pad, axis = 0)
                y = np.append(y, [self.pad_tag_idx]*no_of_pad, axis = 0)
            
            batch_x = np.append(batch_x, [x], axis = 0)
            batch_y = np.append(batch_y, [y], axis = 0)
            
        return batch_x, batch_y

## Sequence Generator Callback
It shows an instance of how the model behave once every specified epochs. Fixed seed sequence.

In [None]:
class GeneratorCallback(keras.callbacks.Callback):
    '''Callback to generate text from trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for next token
    3. Sample next token and add it to the next input

    # Arguments
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    '''

    def __init__(self, max_tokens, start_tokens, top_k=10, print_every=5):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype('int32')
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype('float32')
        return np.random.choice(indices, p=preds)


    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            x = start_tokens[-sequence_length:]
            pad_len = maxlen - len(start_tokens)
            sample_index = -1
            if pad_len > 0:
                x = start_tokens + [0] * pad_len
                sample_index = len(start_tokens) - 1
            
            x = np.array([x])
            y, _ = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)

        print(f'Last 40 tokens of starting token:\n{self.start_tokens[-50:]}\n')
        print(f'Generated token:\n{tokens_generated}\n')

start_tokens = all_song_tokenised[1][:sequence_length - 200]
num_tokens_generated = 80
gen_callback = GeneratorCallback(num_tokens_generated, start_tokens, print_every=generate_sample_every_ep)

# Train data (by loading weight or training from the start)
Note: Even with GPU, training can take as long as 2.5 hours.

In [None]:
method = 'load'  # 'load' or 'train': training can take as long as 2.5 hours!

In [None]:
if method == 'load':
    model.load_weights('/content/classic_music-gen-weight.hdf5')
elif method == 'train':
    epochs = 1500
    batchsize = 64
    output_path = f'/content/output/classic_MuGenTransformer_v3_{epochs}{batchsize}{int(time.time())}_16v2f/'

    training_batch_generator = Generator(all_song_tokenised, batchsize, sequence_length)
    validation_batch_generator = Generator(all_song_tokenised, batchsize, sequence_length, val_split=0.1)

    if not os.path.exists(output_path):
        os.mkdir(output_path)

    weight_path = output_path + 'music-gen-weight.hdf5'
    checkpoint = tf.keras.callbacks.ModelCheckpoint(
        weight_path,
        monitor='loss',
        verbose=0,
        save_best_only=True,
        mode='min'
    )
    callbacks_list = [checkpoint,gen_callback]

    history = model.fit(x=training_batch_generator, callbacks=callbacks_list, 
                        epochs=epochs, verbose=1, 
                        validation_data=validation_batch_generator)

    train_loss += history.history['loss']
    val_loss += history.history['val_loss']

    plt.plot(train_loss)
    plt.plot(val_loss)
    plt.title('model train vs validation loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train_loss', 'validation_loss'], loc='upper right')
    plt.savefig(output_path + 'loss.png')
    plt.show()
    print('Result stored in {}'.format(output_path))
else:
    raise ValueError('method must be one of "load" or "train"')

## Inference
We take a random sequence from a random song as the input. 
Then we will pass this input and predict the next notes. 

In [None]:
seed_len = 100
num_note_to_gen = 1000

song_idx = random.randint(0, len(all_song_tokenised)-1)
seq_start_at = random.randint(0, abs(len(all_song_tokenised[song_idx]) - sequence_length))   
start_tokens = all_song_tokenised[song_idx][seq_start_at:seq_start_at + seed_len].tolist()
while start_tokens == [()] * sequence_length:
    print('Got all zeros, rerolling')
    song_idx = random.randint(0, len(all_song_tokenised) - 1)
    seq_start_at = random.randint(0,len(all_song_tokenised[song_idx])-sequence_length)   
    start_tokens = all_song_tokenised[song_idx][seq_start_at:seq_start_at + sequence_length].tolist()
    
ori = start_tokens.copy()
backup = ori.copy()


def softmax(x):
    '''Compute softmax values for each sets of scores in x.'''
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0)


def sample_from(logits, k):
    logits, indices = tf.math.top_k(logits, k= k, sorted=True)
    indices = np.asarray(indices).astype('int32')
    preds = np.asarray(logits).astype('float32')
    if(unk_tag_idx in indices):
        unk_tag_position = np.where(indices == unk_tag_idx)[0].item()
        indices = np.delete(indices, unk_tag_position)
        preds = np.delete(preds, unk_tag_position)
    preds = softmax(preds)
    return np.random.choice(indices, p=preds)

def convertToRoll(seq_list):
    seq_list = [int_to_combi[i] for i in seq_list]
    roll = mlb.transform(seq_list)
    print(seq_list)
    return roll


k = 10
tokens_generated = []
num_tokens_generated = 0

while num_tokens_generated <= num_note_to_gen:
    x = start_tokens[-sequence_length:]
    pad_len = maxlen - len(start_tokens)
    sample_index = -1
    if pad_len > 0:
        x = start_tokens + [0] * pad_len
        sample_index = len(start_tokens) - 1
    
    x = np.array([x])
    y, _ = model.predict(x)
    sample_token = sample_from(y[0][sample_index], 10)
    tokens_generated.append(sample_token)
    start_tokens.append(sample_token)
    num_tokens_generated = len(tokens_generated)
    if num_tokens_generated % 50 == 0:
        print(f'Generated {num_tokens_generated} notes...')
    
piano_roll = convertToRoll(start_tokens)
print('-------------------------------------------')
ori = convertToRoll(ori)


Generated 50 notes...
Generated 100 notes...
Generated 150 notes...
Generated 200 notes...
Generated 250 notes...
Generated 300 notes...
Generated 350 notes...


In [None]:
def piano_roll_to_pretty_midi(piano_roll_in, fs, program=0, velocity = 64):
    '''Convert a Piano Roll array into a PrettyMidi object
     with a single instrument.
    Parameters
    ----------
    piano_roll : np.ndarray, shape=(128,frames), dtype=int
        Piano roll of one instrument
    fs : int
        Sampling frequency of the columns, i.e. each column is spaced apart
        by ``1./fs`` seconds.
    program : int
        The program number of the instrument.
    Returns
    -------
    midi_object : pretty_midi.PrettyMIDI
        A pretty_midi.PrettyMIDI class instance describing
        the piano roll.
    '''
    piano_roll = np.where(piano_roll_in == 1, 64, 0)
    notes, frames = piano_roll.shape
    pm = pretty_midi.PrettyMIDI(initial_tempo=100.0)
    instrument = pretty_midi.Instrument(program=program)

    # pad 1 column of zeros so we can acknowledge inital and ending events
    piano_roll = np.pad(piano_roll, [(0, 0), (1, 1)], 'constant')
    print(piano_roll.shape)
    
    # use changes in velocities to find note on / note off events
    velocity_changes = np.nonzero(np.diff(piano_roll).T)

    # keep track on velocities and note on times
    prev_velocities = np.zeros(notes, dtype=int)
    note_on_time = np.zeros(notes)

    for time, note in zip(*velocity_changes):
        # use time + 1 because of padding above
        velocity = piano_roll[note, time + 1]
        time = time / fs
        if velocity > 0:
            if prev_velocities[note] == 0:
                note_on_time[note] = time
                prev_velocities[note] = velocity
        else:
            pm_note = pretty_midi.Note(
                velocity=prev_velocities[note],
                pitch=note,
                start=note_on_time[note],
                end=time)
            instrument.notes.append(pm_note)
            prev_velocities[note] = 0
    pm.instruments.append(instrument)
    return pm

## Export as MIDI
Save the inference result to output folder.

In [None]:
bpm = 100
fs = 1 / ((60 / bpm) /4)
name = 'random8_200'
mid_out = piano_roll_to_pretty_midi(piano_roll.T, fs=fs)
mid_ori = piano_roll_to_pretty_midi(ori.T, fs=fs)
midi_out_path = output_path + f'gpt-v3-id-{name}.mid'
if midi_out_path is not None:
    mid_out.write(midi_out_path)
        
midi_ori_path = output_path + f'ori-gpt-v3-id-{name}.mid'
if midi_ori_path is not None:
    mid_ori.write(midi_ori_path)

Save full length of seed song for reference.

In [None]:
from google.colab import files
# if need zip
# !zip -r /content/output.zip /content/output
files.download('/content/gpt-v3-id-random8_200.mid')
# files.download('/content/ori-gpt-v3-id-random8_200.mid')