In [7]:
# Misc
import pickle

# Data manipulation
import numpy as np
import pandas as pd

# Music
import music21 as m21
from music21 import converter, instrument, note, chord, stream

# Data Visualiation
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
import seaborn as sns

# System
import os
import random
import shutil

from tensorflow.keras.utils import to_categorical

# Misc
import pickle

# Data manipulation
import numpy as np
import pandas as pd

# Music
import music21 as m21
from music21 import converter, instrument, note, chord, stream

# Data Visualiation
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
import seaborn as sns

# System
import os
import random
import shutil

# Performance metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error

# Tensorflow
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import BatchNormalization as BatchNorm
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping

In [3]:
def create_train_test_folders(source_dir = "../../data_raw/", train_percentage = 90):

    """
    Create train / test folders
    """

    # Define the percentage of data to use to training the model


    train_dir = "../../data_split/train/"
    test_dir = "../../data_split/test/"


    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)


    all_files = os.listdir(source_dir)

    num_train_files = int(len(all_files) * (train_percentage / 100))


    random.shuffle(all_files)

    # Split the files into training and testing sets
    train_files = all_files[:num_train_files]
    test_files = all_files[num_train_files:]


    for file in train_files:
        source_file = os.path.join(source_dir, file)
        destination_file = os.path.join(train_dir, file)
        shutil.copy(source_file, destination_file)


    for file in test_files:
        source_file = os.path.join(source_dir, file)
        destination_file = os.path.join(test_dir, file)
        shutil.copy(source_file, destination_file)

    print("✅ train & test folders created")


In [4]:
create_train_test_folders(source_dir = "../../data_raw/", train_percentage = 90)

✅ train & test folders created


In [5]:
def load_midi_into_notes():
    """Convert train files & test files into notes"""

    # Extracted notes for each file
    notes_train = []
    notes_test = []

    # Save the indexes of notes in which a new composition started to create consistent sequences
    new_composition_indexes_train = []
    new_composition_indexes_test = []


    # Load all train MIDI files into notes
    input_path_train = "../../data_split/train"
    for file in os.listdir(input_path_train):
        file_path = os.path.join(input_path_train, file)

        # Add new index to alert that it's a new composition
        if new_composition_indexes_train and new_composition_indexes_train[-1] != len(notes_train):
            new_composition_indexes_train.append(len(notes_train))

        try:
            # Convert the music into a score object
            score = converter.parse(file_path)

            print("Parsing %s" % file)

            elements_in_part = None

            try:  # File has instrument parts
                # Given a score that represents the MIDI, partition it into parts for each unique instrument found
                partitions_by_instrument = instrument.partitionByInstrument(score)
                # Visit all the elements (notes, chords, rests, and more) of each of its internal "measures."
                elements_in_part = partitions_by_instrument.parts[0].recurse()

            except:  # File has notes in a flat structure
                elements_in_part = score.flat.notes

            # Scroll through all the elements (notes or chords) picked up
            for element in elements_in_part:
                # If the element is a note...
                if isinstance(element, note.Note):
                    # Add note to array
                    notes_train.append(str(element.pitch))
                # If the element is a chord (a set of notes --> e.g., C4 F4)
                elif isinstance(element, chord.Chord):
                    # Extract each note from the chord and insert it into the array in the format Note1.Note2.Note3
                    notes_train.append('.'.join(str(n) for n in element.normalOrder))

        except Exception as e:
            print(f"Error parsing {file}: {str(e)}")
            continue


    # Load all test MIDI files into notes
    input_path_test = "../../data_split/test"
    for file in os.listdir(input_path_test):
        file_path = os.path.join(input_path_test, file)

        # Add new index to alert that it's a new composition
        if new_composition_indexes_test and new_composition_indexes_test[-1] != len(notes_test):
            new_composition_indexes_test.append(len(notes_test))

        try:
            # Convert the music into a score object
            score = converter.parse(file_path)

            print("Parsing %s" % file)

            elements_in_part = None

            try:  # File has instrument parts
                # Given a score that represents the MIDI, partition it into parts for each unique instrument found
                partitions_by_instrument = instrument.partitionByInstrument(score)
                # Visit all the elements (notes, chords, rests, and more) of each of its internal "measures."
                elements_in_part = partitions_by_instrument.parts[0].recurse()

            except:  # File has notes in a flat structure
                elements_in_part = score.flat.notes

            # Scroll through all the elements (notes or chords) picked up
            for element in elements_in_part:
                # If the element is a note...
                if isinstance(element, note.Note):
                    # Add note to array
                    notes_test.append(str(element.pitch))
                # If the element is a chord (a set of notes --> e.g., C4 F4)
                elif isinstance(element, chord.Chord):
                    # Extract each note from the chord and insert it into the array in the format Note1.Note2.Note3
                    notes_test.append('.'.join(str(n) for n in element.normalOrder))

        except Exception as e:
            print(f"Error parsing {file}: {str(e)}")
            continue


    print("✅Loading notes done for train & test files")

    # Save the 'notes_train' and 'notes_test' list to a pickle file
    os.makedirs("../../data_vocab/")
    with open("../../data_vocab/notes_train.pkl", "wb") as filepath:
        pickle.dump(notes_train, filepath)
    with open("../../data_vocab/notes_test.pkl", "wb") as filepath:
        pickle.dump(notes_test, filepath)

    # Return notes and new composition indexes
    return notes_train, notes_test, new_composition_indexes_train, new_composition_indexes_test

In [11]:
notes_train, notes_test, new_composition_indexes_train, new_composition_indexes_test = load_midi_into_notes()

Parsing mazrka35.mid
Parsing dgffc49.mid
Parsing rondo73.mid
Parsing contreda.mid
Parsing mk_chim2.mid
Parsing chonoc14.mid
Parsing choval12.mid
Parsing chonoc17.mid
Parsing mazrka11.mid
Parsing mazrka04.mid
Parsing mazrka06.mid
Parsing chpnimpu.mid
Parsing chopineb.mid
Parsing waltz_am.mid
Parsing chetude1.mid
Parsing mazrka32.mid
Parsing mazrka22.mid
Parsing chopol12.mid
Parsing mazrka27.mid
Parsing mazrka09.mid
Parsing mazrka44.mid
Parsing mazrka41.mid
Parsing mazrka43.mid
Error parsing lecpsc3.mid: badly formatted midi bytes, got: b'RIFF\x0eN\x00\x00RMIDdata\x01N\x00\x00'
Parsing chlargo.mid
Parsing mazrka12.mid
Parsing choval13.mid
Parsing lecpsb3.mid
Parsing choval05.mid
Parsing chopol05.mid
Parsing pologm.mid
Parsing fugaam.mid
Parsing chopol08.mid
Parsing prelop45.mid
Parsing nocturne.mid
Parsing mazrka28.mid
Parsing op72.mid
Parsing chpolfnt.mid
Parsing mazrka03.mid
Parsing varigerm.mid
Parsing chonoc08.mid
Parsing souvpaga.mid
Parsing chonoc12.mid
Parsing mazrka38.mid
Parsing

In [12]:
def create_vocabulary(notes_train):

    """define notes vocabulary based on train set"""

    # get all pitch names
    pitchnames_train = sorted(set(item for item in notes_train))

    # create a dictionary to map pitches to integers
    note_to_int_train = dict((note, number) for number, note in enumerate(pitchnames_train))

    n_vocab_train = len(set(notes_train))

    return note_to_int_train, n_vocab_train, pitchnames_train

In [14]:
note_to_int_train, n_vocab_train, pitchnames_train =  create_vocabulary(notes_train)

In [16]:
n_vocab_train

897

In [17]:
def split_train_test(notes_train, notes_test, new_composition_indexes_train, new_composition_indexes_test, note_to_int_train,
                     n_vocab_train, pitchnames_train, sequence_length = 100):
    """
    Prepare the test and train sequences to be used by the Neural Network
    """

    # !!TRAIN!! split creation

    X_train = []
    y_train = []

    wait = 0
    # create input sequences and the corresponding outputs
    for i in range(0, len(notes_train) - sequence_length, 1):

        # if the ground truth index is a note/chord that belongs to a new composition
        if (i + sequence_length) in new_composition_indexes_train:
            wait = sequence_length - 1
            continue
        if wait != 0:
            wait = wait - 1
            continue

        sequence_in = notes_train[i:i + sequence_length]
        sequence_out = notes_train[i + sequence_length]

        X_train.append([note_to_int_train[char] for char in sequence_in])
        y_train.append(note_to_int_train[sequence_out])

    n_patterns = len(X_train)

    # reshape the input into a format compatible with LSTM layers
    X_train = np.reshape(X_train, (n_patterns, sequence_length, 1))

    # normalize input
    X_train = X_train / float(n_vocab_train)

    # one-hot encoding of the output
    y_train = to_categorical(y_train, num_classes=n_vocab_train)


    ####################
    # !!TEST!! split creation
    ####################

    X_test = []
    y_test = []

    wait = 0
    # create input sequences and the corresponding outputs
    for i in range(0, len(notes_test) - sequence_length, 1):

        # if the ground truth index is a note/chord that belongs to a new composition
        if (i + sequence_length) in new_composition_indexes_test:
            wait = sequence_length - 1
            continue
        if wait != 0:
            wait = wait - 1
            continue

        sequence_in = notes_test[i:i + sequence_length]
        sequence_out = notes_test[i + sequence_length]

        X_test.append([note_to_int_train.get(char, -1) for char in sequence_in])
        if sequence_out in note_to_int_train.keys():
            y_test.append(note_to_int_train[sequence_out])
        else:
            pass

    n_patterns = len(X_test)

    # reshape the input into a format compatible with LSTM layers
    X_test = np.reshape(X_test, (n_patterns, sequence_length, 1))

    # normalize input
    X_test = X_test / float(n_vocab_train)

    # one-hot encoding of the output
    y_test = to_categorical(y_test, num_classes=n_vocab_train)


    return X_train, y_train, X_test, y_test

In [18]:
X_train, y_train, X_test, y_test = split_train_test(notes_train, notes_test, new_composition_indexes_train, new_composition_indexes_test, note_to_int_train,
                     n_vocab_train, pitchnames_train, sequence_length = 100)

In [50]:
def initialize_and_compile_LSTM_model(X_train, n_vocab_train):
    """Define model architecture"""
    model = Sequential()
    model.add(LSTM(256,
        input_shape=(X_train.shape[1], X_train.shape[2]),
        return_sequences=True,
    ))
    model.add(LSTM(256))
    model.add(Dense(n_vocab_train))
    model.add(Activation('softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
    return model

In [51]:
y_train.shape

(126391, 897)

In [52]:
model = initialize_and_compile_LSTM_model(X_train, n_vocab_train)

In [53]:
model.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_9 (LSTM)               (None, 100, 256)          264192    
                                                                 
 lstm_10 (LSTM)              (None, 256)               525312    
                                                                 
 dense_5 (Dense)             (None, 897)               230529    
                                                                 
 activation_5 (Activation)   (None, 897)               0         
                                                                 
Total params: 1020033 (3.89 MB)
Trainable params: 1020033 (3.89 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [54]:
def train_model(model, X_train, y_train):

    #checkpoints & callbacks:

    os.makedirs("../../checkpoint_lstm", exist_ok=True)
    file_path = "../../checkpoint_lstm/best_weights.h5"

    checkpoint = ModelCheckpoint(
        file_path,
        monitor='accuracy',
        verbose=0,
        save_best_only=True,
        mode='max'
    )

    es = EarlyStopping(patience=5, restore_best_weights=True)

    callbacks_list = [checkpoint,es]


    # train model :

    history = model.fit(X_train, y_train, epochs=200, batch_size=64 ,callbacks=callbacks_list, validation_split=0.2, verbose=1)

    return history

In [55]:
X_train.shape

(126391, 100, 1)

In [57]:
history = train_model(model, X_train, y_train)
history

Epoch 1/200
 250/1580 [===>..........................] - ETA: 9:35 - loss: 5.2585 - accuracy: 0.0185

KeyboardInterrupt: 