<a href="https://colab.research.google.com/github/selmakurtovic4/ZavrsniRad/blob/main/ZavrsniRad.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if len(device_name) > 0:
    print("Found GPU at: {}".format(device_name))
else:
    device_name = "/device:CPU:0"
    print("No GPU, using {}.".format(device_name))

Found GPU at: /device:GPU:0


In [None]:
import os
!git clone https://github.com/selmakurtovic4/ZavrsniRad.git
os.chdir('/content/ZavrsniRad')

In [None]:
import sys
import re
import numpy as np
import pandas as pd
import music21
from keras.utils import to_categorical
from glob import glob
import IPython
from tqdm import tqdm
import pickle
import os
from music21 import converter, instrument, note, chord, stream,duration
from keras.models import Sequential
from keras.layers import Activation, Dense, LSTM, Dropout, Flatten, concatenate
from keras.callbacks import ModelCheckpoint
from keras import*
import json
import matplotlib.pyplot as plt
from tensorflow.keras.utils import plot_model
from IPython.display import Image
from tensorflow.keras.layers import Input, LSTM, Dropout, Flatten, Dense, Activation, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Accuracy
from sklearn.preprocessing import MinMaxScaler

In [None]:
songs = glob('./Datasets/TheBeatles/*.mid')

In [None]:
def get_notes(songs):
    notes = []
    for file in songs:
        try:
            midi = converter.parse(file)
            notes_to_parse = []
            try:
                Score = instrument.partitionByInstrument(midi)
            except:
                pass
            if Score and len(Score)>1: # if parts have instrument parts and if it has more than one instrument
                notes_to_parse = Score.parts[1].recurse()
            else:
                notes_to_parse = midi.flat.notes

            for element in notes_to_parse:
                if isinstance(element, note.Note):
                    element={"pitch": str(element.pitch), "duration": str(element.duration.quarterLength), "played":1 }
                    notes.append(element)
                elif isinstance(element, chord.Chord):
                    normalOrderChord='.'.join(str(n) for n in element.normalOrder)
                    element={"pitch":  normalOrderChord, "duration": str(element.duration.quarterLength), "played":1 }
                    notes.append(element)
                elif isinstance(element, note.Rest):
                    element={"pitch": "r", "duration": str(element.duration.quarterLength), "played":1 }
                    notes.append(element)
        except Exception as e:
            print(f"Error parsing MIDI file {file}: {e}")
            continue

    # Save the notes to a file
    with open('Data/notes.json', 'w') as filepath:
        json.dump(notes, filepath)

    return notes

In [None]:
def prepare_duration_dictionary(notes):
     durations = [element["duration"] for element in notes]
     duration_names = sorted(set(item for item in durations))
     unique_duration_num=len(duration_names)
     duration_to_int = {note: value for value, note in enumerate(duration_names, start=0)}

     return  unique_duration_num,duration_to_int


In [None]:
def prepare_pitch_dictionary(notes):
     pitches = [element["pitch"] for element in notes]
     pitch_names = sorted(set(item for item in pitches))
     unique_count=len(pitch_names)
     pitch_to_int = dict((note, number) for number, note in enumerate(pitch_names))

     return unique_count,pitch_to_int

In [None]:
def normalize_data(data):
  scaler = MinMaxScaler()
  normalized_data = scaler.fit_transform(data)
  return normalized_data

In [None]:
def transform_duration_value(duration):
   num=1
   base_value= 1/16
   middle=1/32
   decimalValue=1.0
   while(duration > base_value):
    duration=duration-base_value
    num=num+1
    decimalValue=duration
   if(decimalValue<middle):
    num=num-1
   duration=num/16
   return duration

In [None]:
def prepare_duration_array(notes):
  durations = [element["duration"] for element in notes]
  #transform to real numbers
  for note in notes:
    note["duration"]=round(eval(note["duration"]),2)
  #transform to 1/16 based values
  base_value= 1/16
  comparing_value=0.001
  for index, note in enumerate(notes):
    duration=note["duration"]
    if(duration<1):
      new_value=transform_duration_value(duration)
      note["duration"] = new_value
    else:
      #create new notes, min 2
      is_it_float=0
      additional_value=duration-int(duration)
      if(additional_value>0):
         is_it_float=1
         #additional_value=round(additional_value,2)
      numberOfNotes=int(duration)+ is_it_float
      numberOfInsertedNotes=numberOfNotes-1
      note["duration"]=1
      for i in range(0,numberOfInsertedNotes-1):
        element={"pitch": note["pitch"], "duration": 1, "played":0 }
        notes.insert(index+1,element)
      if(is_it_float):
        new_duration=transform_duration_value(additional_value)
        element={"pitch": note["pitch"], "duration": new_duration, "played":0 }
        notes.insert(index+numberOfInsertedNotes,element)


In [None]:
def prepare_sequences(notes):
    sequence_length = 50
    prepare_duration_array(notes)
    unique_pitch_num, pitch_dictionary = prepare_pitch_dictionary(notes)
    unique_duration_num, duration_dictionary = prepare_duration_dictionary(notes)

    print(pitch_dictionary)
    network_input = []
    network_out=[]
    network_output_played=[]
    network_output_pitch=[]
    network_output_duration=[]
    pitch_network_input=[]
    duration_network_input=[]
    played_network_input=[]
    for i in range(0, len(notes) - sequence_length, 1):
        output_element=[]
        notes_sequence = notes[i: i + sequence_length]
        sequence_in = []
        pitch_sequence_in = []
        duration_sequence_in = []
        played_sequence_in=[]
        for note in notes_sequence:
            pitch_sequence_in.append(pitch_dictionary[note["pitch"]])
            duration_sequence_in.append(duration_dictionary[note["duration"] ])
            played_sequence_in.append(note["played"])
        pitch_network_input.append(pitch_sequence_in)
        duration_network_input.append(duration_sequence_in)
        played_network_input.append(played_sequence_in)

        output_pitch=pitch_dictionary[notes[i + sequence_length]["pitch"]]
        output_duration=duration_dictionary[notes[i + sequence_length]["duration"]]
        #one element of pl
        output_played=notes[i + sequence_length]["played"]
        #encode
        length=unique_duration_num+unique_pitch_num+1

        output_pitch= to_categorical(output_pitch, num_classes=unique_pitch_num)
        output_duration=to_categorical(output_duration, num_classes=unique_duration_num)
        output_played=to_categorical(output_played, num_classes=2)

        #output arrays

        network_output_pitch.append(output_pitch)
        network_output_duration.append(output_duration)
        network_output_played.append(output_played)

    network_input.append(pitch_network_input)
    network_input.append(duration_network_input)
    network_input.append(played_network_input)

    network_output_pitch=np.array(network_output_pitch)
    network_output_duration=np.array(network_output_duration)
    network_output_played=np.array(network_output_played)

    print(network_input[0][0])
    print(network_input[1][0])
    print(network_input[2][0])

    return (network_input, network_output_pitch, network_output_duration,network_output_played)

In [None]:
def create_network(unique_pitch_num, unique_duration_num, num_of_sequences):
    pitchInput = Input(shape=(50, 1))
    durationInput = Input(shape=(50, 1))
    playedInput = Input(shape=(50, 1))

    x = LSTM(128, return_sequences=True)(pitchInput)
    y = LSTM(64, return_sequences=True)(durationInput)
    z = LSTM(64, return_sequences=True)(playedInput)

    combined = concatenate([x, y, z])
    w = LSTM(128, return_sequences=True, recurrent_dropout=0, activation="tanh", recurrent_activation="sigmoid")(combined)
    w = Dropout(0.2)(w)
    w = LSTM(128, return_sequences=True, recurrent_dropout=0, activation="tanh", recurrent_activation="sigmoid")(w)
    w = Flatten()(w)
    w = Dense(256, activation='relu')(w)
    w = Dropout(0.3)(w)

    pitch_output = Dense(unique_pitch_num, activation='softmax')(w)
    duration_output = Dense(unique_duration_num, activation='softmax')(w)
    played_output = Dense(2, activation='softmax')(w)

    model = Model(inputs=[pitchInput, durationInput, playedInput],
                  outputs=[pitch_output, duration_output, played_output])

    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer,
                  loss=['categorical_crossentropy', 'categorical_crossentropy', 'binary_crossentropy'],
                  metrics=[Accuracy()])

    return model


In [None]:
def train(model, network_input, network_output_pitch, network_output_duration, network_output_played, epochs):
    # Create checkpoint to save the best model weights.
    filepath = 'weights.hdf5'
    checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=0, save_best_only=True)
    pitch_input=normalize_data(network_input[0])
    duration_input=normalize_data(network_input[1])
    played_input=normalize_data(network_input[2])
    print(duration_input[0])
    with tf.device(device_name):
      history = model.fit(
          [pitch_input, duration_input, played_input],  # Input data for each branch
          [network_output_pitch, network_output_duration, network_output_played],  # Target data for each output
          epochs=epochs, batch_size=50, callbacks=[checkpoint]
    )
    return history

In [None]:
 notes = json.load(open('./Data/notes.json'))
 network_input, network_output_pitch, network_output_duration, network_output_played = prepare_sequences(notes)
 with open('Data/notes-uredjene.json', 'w') as filepath:
        json.dump(notes, filepath)


{'0': 0, '0.1': 1, '0.1.4.6.7.9': 2, '0.1.4.6.9': 3, '0.2': 4, '0.2.4.5.7': 5, '0.2.4.6.7.9': 6, '0.2.4.6.9': 7, '0.2.4.7': 8, '0.2.4.7.9': 9, '0.2.5.7': 10, '0.2.6': 11, '0.2.7': 12, '0.3.7': 13, '0.4': 14, '0.4.5.7': 15, '0.4.6.7': 16, '0.4.7': 17, '0.5': 18, '0.6': 19, '1': 20, '1.2.4.6.9': 21, '1.2.6': 22, '1.2.6.8': 23, '1.2.6.9': 24, '1.2.7': 25, '1.3': 26, '1.4': 27, '1.4.6': 28, '1.4.6.7.9': 29, '1.4.6.9': 30, '1.4.7': 31, '1.4.7.9': 32, '1.4.8': 33, '1.5': 34, '1.5.8': 35, '1.6': 36, '1.7': 37, '10': 38, '10.0': 39, '10.0.2': 40, '10.0.2.4': 41, '10.0.4': 42, '10.0.5': 43, '10.1': 44, '10.1.4': 45, '10.1.4.6': 46, '10.11': 47, '10.2': 48, '10.2.5': 49, '11': 50, '11.0': 51, '11.0.1.2.4.6.7.9': 52, '11.0.2': 53, '11.0.2.4.7': 54, '11.0.2.5.7': 55, '11.0.3.4.7': 56, '11.0.4.7': 57, '11.1': 58, '11.1.2.6': 59, '11.1.4.6': 60, '11.2': 61, '11.2.3.4.6.7': 62, '11.2.3.6.7': 63, '11.2.4': 64, '11.2.4.5': 65, '11.2.4.5.7': 66, '11.2.4.6': 67, '11.2.4.7': 68, '11.2.5': 69, '11.2.5.7': 

In [None]:
durations = [element["played"] for element in notes]

In [None]:
for index,x in enumerate(durations):
  if x==0:
    print(index)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
28208
28213
28214
28216
28217
28220
28225
28226
28228
28229
28232
28237
28238
28240
28241
28244
28249
28250
28252
28253
28256
28261
28262
28264
28265
28268
28273
28274
28276
28277
28280
28285
28286
28288
28289
28292
28297
28298
28300
28301
28304
28309
28310
28312
28313
28316
28321
28322
28324
28325
28328
28333
28334
28336
28337
28340
28345
28346
28348
28349
28352
28357
28358
28360
28361
28364
28369
28370
28372
28373
28376
28381
28382
28384
28385
28388
28393
28394
28396
28397
28400
28405
28406
28408
28409
28412
28417
28418
28420
28421
28424
28429
28430
28432
28433
28436
28441
28442
28444
28445
28448
28453
28454
28456
28457
28460
28465
28466
28468
28469
28472
28477
28478
28480
28481
28484
28489
28490
28492
28493
28496
28501
28502
28504
28505
28508
28513
28514
28516
28517
28520
28525
28526
28528
28529
28532
28538
28539
28542
28544
28546
28551
28552
28554
28558
28566
28569
28571
28572
28574
28576
28581
28582
28584
28588
28599

In [None]:
def train_model():
    epochs = 200
    notes = json.load(open('./Data/notes.json'))
    print('Notes processed')
    network_input, network_output_pitch, network_output_duration, network_output_played = prepare_sequences(notes)

    num_of_sequences = len(network_input[0])
    unique_pitch_num = len(set([item['pitch'] for item in notes]))
    unique_duration_num = len(set([item['duration'] for item in notes]))

    print('Input and Output processed')

    with tf.device(device_name):
      model = create_network(unique_pitch_num, unique_duration_num, num_of_sequences)
        # Visualize the model architecture
    plot_model(model, to_file='model.png', show_shapes=True, show_layer_names=True)

    # Display the image inline
    Image('model.png')
    print('Model created')

    print('Training in progress')
    print(model.summary())
    history=train(model,network_input, network_output_pitch, network_output_duration, network_output_played, epochs)
    print('Training completed')

    # Visualize training loss
    plt.plot(history.history['loss'])
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.show()

    return model


In [None]:
import random
from numpy.random import choice

def generate_notes(model, network_input, pitch_dictionary, duration_dictionary, unique_pitch_num, unique_duration_num):

    start = np.random.randint(0, len(network_input)-1)
    int_to_pitch = dict((number, note) for number, note in enumerate(pitch_dictionary))
    int_to_duration = dict((number, note) for number, note in enumerate(duration_dictionary))

    pitch_input=network_input[0][start]
    duration_input=network_input[1][start]
    played_input=network_input[2][start]

    pitch_input=np.squeeze(pitch_input)
    duration_input=np.squeeze(duration_input)


    pitch_prediction=[]
    duration_prediction=[]
    played_prediction=[]
    prediction_output=[]


    list_pitch=[]
    list_duration=[]
    for note_index in range(150):


        batch_size = 1  # Assuming you are processing one batch at a time
        time_steps = 50
        input_features = 1  # Each input is a single value

        #scaling data
        pitch_input_normalized=normalize_data(np.reshape(pitch_input, (-1, 1)))
        duration_input_normalized=normalize_data(np.reshape(duration_input, (-1, 1)))
        played_input_normalized=played_input

        pitch_input_reshaped = np.reshape(pitch_input_normalized, (batch_size, time_steps, input_features))
        duration_input_reshaped = np.reshape(duration_input_normalized, (batch_size, time_steps, input_features))
        played_input_reshaped = np.reshape(played_input_normalized, (batch_size, time_steps, input_features))

        # Now you can create an input list for prediction

        input_list = [pitch_input_reshaped, duration_input_reshaped, played_input_reshaped]

        # Predict using the reshaped inputs
        prediction = model.predict(input_list, verbose=0)

        #random number
        random_integer = random.randint(-3,-1)


        pitch = prediction[0].ravel()
        duration = prediction[1].ravel()

        sorted_pitch = np.sort(pitch)
        sorted_duration = np.sort(duration)

        pitch_to_find = sorted_pitch[random_integer]
        duration_to_find= sorted_duration[random_integer]

        pitch_result = np.where(pitch == pitch_to_find)[0][0]
        duration_result=np.where(duration == duration_to_find)[0][0]

        list_pitch.append(pitch_result)
        list_duration.append(duration_result)

        if note_index==0:
          played=1
        else:
          list_of_candidates=[1,0]
          previous_pitch=list_pitch[note_index-1]
          if previous_pitch == pitch_result:
            probability_distribution=[0.15, 0.85]
          else:
            probability_distribution=[1.00, 0.00]


          played=choice(list_of_candidates, 1,
                p=probability_distribution)


        print(played)

        # Mapping the predicted interger back to the corresponding note
        pitch = int_to_pitch[pitch_result]
        duration = int_to_duration[duration_result]


        pitch_prediction = np.append(pitch_prediction, pitch)
        duration_prediction = np.append(duration_prediction, duration)
        played_prediction = np.append(played_prediction, played)

        #add predicted value
        pitch_input=np.append(pitch_input,pitch_result)
        duration_input=np.append(duration_input,duration_result)
        played_input=np.append(played_input,played)

        # Next input to the model
        pitch_input = pitch_input[1:len(pitch_input)]
        duration_input = duration_input[1:len(duration_input)]
        played_input = played_input[1:len(played_input)]

    prediction_output.append(pitch_prediction)
    prediction_output.append(duration_prediction)
    prediction_output.append(played_prediction)

    print('Notes Generated...')

    return prediction_output


In [None]:
def generate():
    current_directory = os.getcwd()
    print("Current Directory:", current_directory)
    notes = json.load(open('./Data/notes-bezRest.json'))
    print('Notes processed')
    network_input, network_output_pitch, network_output_duration, network_output_played = prepare_sequences(notes)
    unique_pitch_num,pitch_dictionary=prepare_pitch_dictionary(notes)
    unique_duration_num,duration_dictionary=prepare_duration_dictionary(notes)
    network_input=np.array(network_input)
    num_of_sequences = len(network_input[0])
    print('Input and Output processed')

    with tf.device(device_name):
      model = create_network(unique_pitch_num, unique_duration_num, num_of_sequences)
    print('Loading Model weights.....')
    os.chdir('./Models')
    current_directory = os.getcwd()
    print("Current Directory:", current_directory)

    model.load_weights('model1_weights.hdf5')
    print('Model Loaded')
    os.chdir('..')
    prediction_output = generate_notes(model, network_input, pitch_dictionary,duration_dictionary, unique_pitch_num, unique_duration_num)
    durations = prediction_output[1]
    # Count occurrences of 'r' in pitches
    print(durations)

    create_midi(prediction_output)

In [None]:
def create_midi(prediction_output):
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    output_notes_index=-1
    for index, pattern in enumerate(prediction_output[0]):
       note_duration = prediction_output[1][index]
       played=prediction_output[2][index]
       print(played)
       if(played==0):
          print(output_notes_index)
          print("--")
          print(len(output_notes))
          offset += prediction_output[1][index]
          old_duration= prediction_output[1][index-1]
          new_duration= old_duration+note_duration
          output_notes[output_notes_index].duration = duration.Duration(new_duration)
          continue
       else:
          output_notes_index=output_notes_index+1
          if pattern == 'r':
            # Handle rests

            new_rest = note.Rest()
            new_rest.duration = duration.Duration(note_duration)
            new_rest.offset = offset
            output_notes.append(new_rest)
        # pattern is a chord
          elif('.' in pattern) or pattern.isdigit():
              notes_in_chord = pattern.split('.')
              notes = []
              for current_note in notes_in_chord:
                  new_note = note.Note(int(current_note))
                  new_note.storedInstrument = instrument.ElectricGuitar()
                  notes.append(new_note)
              new_chord = chord.Chord(notes)
              new_chord.duration = duration.Duration(note_duration)
              new_chord.offset = offset
              output_notes.append(new_chord)
          # pattern is a note
          else:
              new_note = note.Note(pattern)
              new_note.offset = offset
              new_note.duration = duration.Duration(note_duration)
              new_note.storedInstrument = instrument.ElectricGuitar()
              output_notes.append(new_note)

          # increase offset each iteration so that notes do not stack
          offset += prediction_output[1][index]


    midi_stream = stream.Stream(output_notes)

    folder_path = '/content/ZavrsniRad/GeneratedSongs'
    number = len(os.listdir(folder_path))+10
    filename = f"test{number}.mid"
    midi_stream.write('midi', fp=filename)

In [None]:
with tf.device(device_name):
    model=train_model()
generate()