In [1]:
import os
import glob
import numpy as np
from music21 import converter, instrument, note, chord, stream, pitch
import tensorflow as tf
from tqdm import tqdm
from keras import Model
from keras.layers import Dense, Reshape, InputLayer, Embedding, GlobalAveragePooling1D,\
                            Layer, MultiHeadAttention, LayerNormalization, LSTM, Dropout,\
                            Conv1D, RepeatVector, GRU, TimeDistributed
from keras.optimizers import Adam
from keras import Input
from keras.losses import CategoricalCrossentropy, binary_crossentropy, categorical_crossentropy
from keras.models import Sequential, load_model
from keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint

In [2]:
with open("jazz.txt", 'r') as f:
    notes = f.read().split('\n')

pitchnames = sorted(set(item for item in notes))
pitchnames

['0',
 '0.1',
 '0.1.2',
 '0.1.2.3',
 '0.1.2.3.6.9',
 '0.1.2.5',
 '0.1.2.7',
 '0.1.3.4.6.9',
 '0.1.3.5.7.8',
 '0.1.3.7.8',
 '0.1.4.5',
 '0.1.4.7',
 '0.1.5',
 '0.1.5.6.7',
 '0.1.5.7',
 '0.1.5.8',
 '0.1.6',
 '0.2',
 '0.2.3',
 '0.2.3.6',
 '0.2.3.7',
 '0.2.4',
 '0.2.4.5',
 '0.2.4.5.7',
 '0.2.4.5.7.9',
 '0.2.4.6',
 '0.2.4.6.7',
 '0.2.4.6.9',
 '0.2.4.7',
 '0.2.4.7.8',
 '0.2.4.7.9',
 '0.2.4.8',
 '0.2.5',
 '0.2.5.6',
 '0.2.5.7',
 '0.2.5.7.8',
 '0.2.5.8',
 '0.2.6',
 '0.2.6.7',
 '0.2.6.8',
 '0.2.7',
 '0.3',
 '0.3.4',
 '0.3.5',
 '0.3.5.7',
 '0.3.5.8',
 '0.3.6',
 '0.3.6.8',
 '0.3.6.9',
 '0.3.7',
 '0.4',
 '0.4.5',
 '0.4.5.6',
 '0.4.5.7',
 '0.4.6',
 '0.4.7',
 '0.4.8',
 '0.5',
 '0.5.6',
 '0.6',
 '1',
 '1.2',
 '1.2.3',
 '1.2.4',
 '1.2.4.5.6',
 '1.2.4.5.6.9',
 '1.2.4.6',
 '1.2.4.6.8',
 '1.2.4.6.9',
 '1.2.4.7',
 '1.2.4.8',
 '1.2.4.8.9',
 '1.2.5',
 '1.2.5.8',
 '1.2.5.9',
 '1.2.6',
 '1.2.6.7',
 '1.2.6.8',
 '1.2.6.9',
 '1.2.7',
 '1.2.7.8',
 '1.3',
 '1.3.4',
 '1.3.4.6',
 '1.3.4.8',
 '1.3.5.6.8',
 '1.3.5.7.10

In [3]:
def prepare_sequences(notes, sequence_length, num_notes):
    # dictionary used to map notes to numbers
    pitchnames = sorted(set(item for item in notes))
    note_to_int = dict((note, number) for number, note in enumerate(pitchnames))
    
    network_input = []
    network_output = []

    # generate training data: sequence of notes => next note
    for i in range(len(notes) - sequence_length):
        sequence_in = notes[i: i + sequence_length]
        sequence_out = notes[i + sequence_length]
        network_input.append([note_to_int[char] for char in sequence_in])
        network_output.append(note_to_int[sequence_out])

    # one-hot encode
    network_input = to_categorical(network_input, num_notes)
    network_input = np.reshape(network_input, (-1, sequence_length, num_notes))
    
    network_output = to_categorical(network_output, num_notes)

    return network_input, network_output

In [4]:
latent_dim = 32
sequence_length = 50
num_notes = len(set(notes))

### Positional encoding 

- it gives the model information about the relative position of the elements in a sequence (unlike RNNs, it processes all inputs simultaneously rather than sequentially);

In [5]:
class PositionalEncoding(Layer):
    def __init__(self, sequence_length, num_notes, **kwargs):
        super(PositionalEncoding, self).__init__(**kwargs)
        self.sequence_length = sequence_length
        self.num_notes = num_notes
        self.pos_encoding = self.positional_encoding(sequence_length, num_notes)

    def get_angles(self, position, i, d_model):
        angles = 1 / np.power(10000., (2 * (i // 2)) / np.float32(d_model))
        return position * angles

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            np.arange(position)[:, np.newaxis],
            np.arange(d_model)[np.newaxis, :],
            d_model
        )
        # Apply sin to even indices in the array; 2i
        angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
        # Apply cos to odd indices in the array; 2i+1
        angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
        pos_encoding = angle_rads[np.newaxis, ...]
        return tf.cast(pos_encoding, dtype=tf.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

In [6]:
class VAE(Model):
    def __init__(self, sequence_length, num_notes, latent_dim=64, num_heads=4, ff_dim=128, **kwargs):
        super(VAE, self).__init__(**kwargs)
        
        self.sequence_length = sequence_length
        self.num_notes = num_notes
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim

        self.encoder = self.build_encoder()
        self.decoder = self.build_decoder()

    def build_encoder(self):
        inputs = Input(shape=(self.sequence_length, self.num_notes))
        x = self.transformer_encoder(inputs, self.num_notes, self.num_heads, self.ff_dim)
        z_mean = Dense(self.latent_dim, name="z_mean")(x)
        z_log_var = Dense(self.latent_dim, name="z_log_var")(x)
        return Model(inputs, [z_mean, z_log_var], name="encoder")

    def build_decoder(self):
        latent_inputs = Input(shape=(self.sequence_length, self.latent_dim,))
        x = self.transformer_encoder(latent_inputs, self.latent_dim, self.num_heads, self.ff_dim)
        outputs = TimeDistributed(Dense(self.num_notes, activation="softmax"))(x)
        return Model(latent_inputs, outputs, name="decoder")

    def transformer_encoder(self, inputs, dim, num_heads, ff_dim, dropout=0):
        inputs = self.add_positional_encoding(inputs, dim)
        x = LayerNormalization(epsilon=1e-6)(inputs)
        x = MultiHeadAttention(
            key_dim=dim, num_heads=num_heads, dropout=dropout
        )(x, x)
        x = Dropout(dropout)(x)
        res = x + inputs
        out1 = LayerNormalization(epsilon=1e-6)(res)
        
        x = Dense(ff_dim, activation="relu")(out1)
        x = Dense(dim)(x)
        out2 = LayerNormalization(epsilon=1e-6)(out1 + x)

        return out2

    def add_positional_encoding(self, x, dim):
        pos_enc = PositionalEncoding(self.sequence_length, dim)
        return pos_enc(x)

    def call(self, inputs):
        z_mean, z_log_var = self.encoder(inputs)
        z = self.reparameterize(z_mean, z_log_var)
        decoded = self.decoder(z)
        return decoded

    def reparameterize(self, mean, log_var):
        eps = tf.random.normal(shape=tf.shape(mean))
        return eps * tf.exp(log_var * 0.5 + 1e-7) + mean

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var = self.encoder(data)
            z = self.reparameterize(z_mean, z_log_var)
            reconstruction = self.decoder(z)  
            reconstruction_loss = tf.reduce_mean(
                categorical_crossentropy(data, reconstruction)
            )
            kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)  # Removed the clipping operation
            kl_loss = tf.reduce_mean(kl_loss)
            kl_loss *= -0.5
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }


vae = VAE(sequence_length, num_notes)
vae.compile(optimizer=Adam(learning_rate=1e-4))

In [11]:
vae.encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 50, 787)]    0           []                               
                                                                                                  
 positional_encoding (Positiona  (None, 50, 787)     0           ['input_1[0][0]']                
 lEncoding)                                                                                       
                                                                                                  
 layer_normalization (LayerNorm  (None, 50, 787)     1574        ['positional_encoding[0][0]']    
 alization)                                                                                       
                                                                                            

In [12]:
vae.decoder.summary()

Model: "decoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 50, 64)]     0           []                               
                                                                                                  
 positional_encoding_1 (Positio  (None, 50, 64)      0           ['input_2[0][0]']                
 nalEncoding)                                                                                     
                                                                                                  
 layer_normalization_3 (LayerNo  (None, 50, 64)      128         ['positional_encoding_1[0][0]']  
 rmalization)                                                                                     
                                                                                            

In [13]:
# split the one-hot encoded dataset into batches with DataGenerator
def data_generator(notes, sequence_length, num_notes, batch_size):
    pitchnames = sorted(set(item for item in notes))
    note_to_int = dict((note, number) for number, note in enumerate(pitchnames))
    
    while True:
        for i in range(0, len(notes) - sequence_length, batch_size):
            network_input = []
            network_output = []
            
            # input-output sequences batches
            for j in range(i, min(i + batch_size, len(notes) - sequence_length)):
                sequence_in = notes[j: j + sequence_length]
                sequence_out = notes[j + sequence_length]
                network_input.append([note_to_int[char] for char in sequence_in])
                network_output.append(note_to_int[sequence_out])
            
            # one-hot encode
            network_input = to_categorical(network_input, num_notes)
            network_input = np.reshape(network_input, (-1, sequence_length, num_notes))
            
            network_output = to_categorical(network_output, num_notes)
            
            yield network_input, network_output

In [14]:
batch_size = 32 
generator = data_generator(notes, sequence_length, num_notes=num_notes, batch_size=batch_size)

steps_per_epoch = (len(notes) - sequence_length) // batch_size 

In [15]:
vae.fit(generator, epochs=2, steps_per_epoch=steps_per_epoch)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x25e0f294d30>

In [136]:
vae.save("TransformerVAE")



INFO:tensorflow:Assets written to: TransformerVAE\assets


INFO:tensorflow:Assets written to: TransformerVAE\assets


In [None]:
vae = tf.keras.models.load_model("TransformerVAE")

Temperature hyperparameter is used to control the randomness of the output: trade-off between following the model's predictions (low temperatures) and generating diverse output (high temperatures).

In [16]:
def generate_music(model, seed, sequence_length, num_notes, temperature=1.0):
    # starting from seed sequence generate new sequence
    sequence = list(seed)
    sequence = sequence[:sequence_length]
    
    for i in range(sequence_length): 
        seed_sequence = sequence[-sequence_length:]
        
        # one-hot encode input
        seed_sequence_one_hot = to_categorical(seed_sequence, num_notes)
        seed_sequence_one_hot = np.reshape(seed_sequence_one_hot, (1, sequence_length, num_notes))

        output_sequence = model(seed_sequence_one_hot)
        
        # last note from the output sequence
        last_note = output_sequence.numpy()[0, -1, :]
        
        # apply temperature
        prediction = np.log(last_note + 1e-7) / temperature
        prediction = np.exp(prediction) / np.sum(np.exp(prediction))
        
        # next note from the adjusted prediction
        if np.isnan(prediction).any():
            pass
        else:
            next_note = np.random.choice(range(num_notes), p=prediction)
            sequence.append(next_note)

    return sequence

In [118]:
note_to_int = dict((note, number) for number, note in enumerate(pitchnames))
int_to_note = {i: n for n, i in note_to_int.items()}

first_batch = next(generator)
seed_sequence_one_hot = first_batch[0][0]

# one-hot encoded sequence to integer format
seed_sequence = [int_to_note[np.argmax(note)] for note in seed_sequence_one_hot]

# note names to integers
seed_sequence = [note_to_int[note] for note in seed_sequence]

generated_sequence = generate_music(vae, seed_sequence, sequence_length, num_notes=num_notes, temperature=0.3)
generated_sequence

[695,
 760,
 695,
 695,
 695,
 760,
 734,
 726,
 726,
 734,
 726,
 734,
 708,
 760,
 760,
 760,
 750,
 708,
 734,
 760,
 760,
 694,
 760,
 760,
 750,
 734,
 750,
 734,
 750,
 734,
 734,
 708,
 694,
 708,
 734,
 734,
 734,
 708,
 734,
 708,
 694,
 708,
 734,
 760,
 760,
 695,
 695,
 760,
 760,
 760,
 55,
 305,
 707,
 305,
 716,
 55,
 305,
 782,
 540,
 305,
 399,
 782,
 55,
 749,
 781,
 540,
 305,
 693,
 692,
 55,
 305,
 55,
 734,
 305,
 707,
 759,
 55,
 55,
 55,
 708,
 694,
 708,
 749,
 305,
 717,
 55,
 55,
 693,
 55,
 55,
 749,
 708,
 749,
 725,
 708,
 55,
 707,
 55,
 748,
 749]

In [119]:
int_to_note = dict((number, note) for number, note in enumerate(sorted(set(item for item in notes))))
note_sequence = [int_to_note[i] for i in generated_sequence]
note_sequence

['A5',
 'F#5',
 'A5',
 'A5',
 'A5',
 'F#5',
 'D5',
 'C5',
 'C5',
 'D5',
 'C5',
 'D5',
 'B4',
 'F#5',
 'F#5',
 'F#5',
 'E5',
 'B4',
 'D5',
 'F#5',
 'F#5',
 'A4',
 'F#5',
 'F#5',
 'E5',
 'D5',
 'E5',
 'D5',
 'E5',
 'D5',
 'D5',
 'B4',
 'A4',
 'B4',
 'D5',
 'D5',
 'D5',
 'B4',
 'D5',
 'B4',
 'A4',
 'B4',
 'D5',
 'F#5',
 'F#5',
 'A5',
 'A5',
 'F#5',
 'F#5',
 'F#5',
 '0.4.7',
 '2.6.9',
 'B3',
 '2.6.9',
 'C#4',
 '0.4.7',
 '2.6.9',
 'G3',
 '7.11.2',
 '2.6.9',
 '4.7.11',
 'G3',
 '0.4.7',
 'E4',
 'G2',
 '7.11.2',
 '2.6.9',
 'A3',
 'A2',
 '0.4.7',
 '2.6.9',
 '0.4.7',
 'D5',
 '2.6.9',
 'B3',
 'F#4',
 '0.4.7',
 '0.4.7',
 '0.4.7',
 'B4',
 'A4',
 'B4',
 'E4',
 '2.6.9',
 'C#5',
 '0.4.7',
 '0.4.7',
 'A3',
 '0.4.7',
 '0.4.7',
 'E4',
 'B4',
 'E4',
 'C4',
 'B4',
 '0.4.7',
 'B3',
 '0.4.7',
 'E3',
 'E4']

In [120]:
# creates a midi file from text representation outputted by TransformerVAE
def create_midi(prediction_output, output_file):
    offset = 0
    output_notes = []

    for pattern in prediction_output:
        try:
            if ('.' in pattern) or pattern.isdigit():
                notes_in_chord = pattern.split('.')
                notes = []
                for current_note in notes_in_chord:
                    new_note = note.Note(int(current_note))
                    new_note.storedInstrument = instrument.Piano()
                    notes.append(new_note)
                new_chord = chord.Chord(notes)
                new_chord.offset = offset
                output_notes.append(new_chord)
            else:
                # print(pattern)
                new_note = note.Note()
                new_note.pitch = pitch.Pitch(pattern)
                new_note.offset = offset
                new_note.storedInstrument = instrument.Piano()
                output_notes.append(new_note)

            offset += 0.5
        except Exception as e:
            print(f'Error loading pattern: {pattern}: {e}')

    midi_stream = stream.Stream(output_notes)
    midi_stream.write('midi', fp=output_file)

output_file = "TransformerVAE_generated_music.mid"
create_midi(note_sequence, output_file)