In [None]:
import os
import json
from tensorflow.keras.utils import to_categorical
import numpy as np
import keras

import os
import music21 as m21
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

In [None]:
ACCEPTABLE_DURATIONS = [0.25,0.5,0.75,1,2,3,4]
SAVING_DIRECTORY = "dataset"
SINGLE_FILE_DATASET = "all_dataset"
MAPPING = "mapping.json"
SEQUENCES = 64
dataset_path = "/content/drive/MyDrive/deutschl/erk"
OUTPUT_UNITS = 38
NUM_UNITS = [256]
LOSS = "sparse_categorical_crossentropy"
LEARNING_RATE = 0.001
EPOCHS = 50
BATCH_SIZE = 64
SAVE_MODEL_PATH = "model.h5"

In [None]:
def load_song(file_path):
    try:
        print(f"Processing file: {file_path}")
        song = m21.converter.parse(file_path)
        print(f"Song converted: {file_path}")
        return song
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None


def load_songs(dataset_path):
    all_songs = []
    file_count = 0

    num_cores = multiprocessing.cpu_count()
    print(f"Number of CPU cores available: {num_cores}")

    with ProcessPoolExecutor(max_workers=num_cores) as executor:
        futures = []
        for path, subdir, files in os.walk(dataset_path):
            for file in files:
                if file.endswith(".krn"):
                    file_path = os.path.join(path, file)
                    futures.append(executor.submit(load_song, file_path))
                    file_count += 1

        print(f"Total MIDI files submitted for processing: {file_count}")

        success_count = 0
        for future in futures:
            result = future.result()
            if result:
                all_songs.append(result)
                success_count += 1

        print(f"Total MIDI files successfully processed: {success_count}")

    return all_songs

Number of CPU cores available: 2


  self.pid = os.fork()


Processing file: /content/drive/MyDrive/deutschl/erk/deut0595.krn
Processing file: /content/drive/MyDrive/deutschl/erk/deut2046.krn
Total MIDI files submitted for processing: 1700
Song converted: /content/drive/MyDrive/deutschl/erk/deut0595.krn
Processing file: /content/drive/MyDrive/deutschl/erk/deut1053.krn
Song converted: /content/drive/MyDrive/deutschl/erk/deut2046.krn
Processing file: /content/drive/MyDrive/deutschl/erk/deut1124.krn
Song converted: /content/drive/MyDrive/deutschl/erk/deut1124.krn
Processing file: /content/drive/MyDrive/deutschl/erk/deut1211.krn
Song converted: /content/drive/MyDrive/deutschl/erk/deut1053.krn
Processing file: /content/drive/MyDrive/deutschl/erk/deut1401.krn
Song converted: /content/drive/MyDrive/deutschl/erk/deut1211.krn
Processing file: /content/drive/MyDrive/deutschl/erk/deut0950.krn
Song converted: /content/drive/MyDrive/deutschl/erk/deut1401.krn
Processing file: /content/drive/MyDrive/deutschl/erk/deut1574.krn
Song converted: /content/drive/MyD

In [None]:
def is_having_acceptable_durations(song, acceptable_durations):
  for note in song.flat.notesAndRests:
    if note.duration.quarterLength not in acceptable_durations:
      return False
  return True

In [None]:
def transpose_songs(song):
  parts = song.getElementsByClass(m21.stream.Part)
  measures_parts_zero = parts[0].getElementsByClass(m21.stream.Measure)
  key = measures_parts_zero[0][4]

  if not isinstance(key, m21.key.Key):
    key = song.analyze("key")

  if key.mode == "major":
    interval = m21.interval.Interval(key.tonic, m21.pitch.Pitch("C"))
  elif key.mode == "minor":
    interval = m21.interval.Interval(key.tonic, m21.pitch.Pitch("A"))

  tranposed_song = song.transpose(interval)
  return tranposed_song

In [None]:
def encoding_song(song, time_steps = 0.25):
  song_encoded = []
  for node in song.flat.notesAndRests:

    #notes
    if isinstance(node, m21.note.Note):
      sym = node.pitch.midi

    #rests
    elif isinstance(node, m21.note.Rest):
      sym = "r"

    #convert the node to time/series duration
    steps = int(node.duration.quarterLength/time_steps)
    for step in range(steps):
      if step == 0:
        song_encoded.append(sym)
      else:
        song_encoded.append("_")

  encoded_song = " ".join(map(str, song_encoded))
  return encoded_song


In [None]:
def preprocess_dataset(songs):
    print("Loading songs .........")
    songs = load_songs(dataset_path)
    print(f"Loaded {len(songs)} songs")
    if not os.path.exists(SAVING_DIRECTORY):
        os.makedirs(SAVING_DIRECTORY)

        # filter out songs that have non-acceptable durations
    for i,song in enumerate(songs):
      if not is_having_acceptable_durations(song, ACCEPTABLE_DURATIONS):
        pass

      # transpose songs to Cmaj/Amin
      song = transpose_songs(song)


      # encode the songs with music time series representation
      encoded_song = encoding_song(song)


      # save songs to text file
      save_path = os.path.join(SAVING_DIRECTORY, str(i))
      with open(save_path, "w") as fp:
        fp.write(encoded_song)

In [None]:
def load_file(file_path):
  with open(file_path, "r") as fp:
    song = fp.read()
  return song

In [None]:
def converge_data_to_single_file(dataset_path, store_all_dataset, sequences):
  deliminator = "/ " * sequences
  songs = ""

  for path, subdir, files in os.walk(dataset_path):
    for file in files:
      file_path = os.path.join(path, file)
      song = load_file(file_path)
      songs = songs + song + " " + deliminator

  songs = songs[:-1]

  with open(store_all_dataset, "w") as fp:
    fp.write(songs)

  return songs

In [None]:
def generate_mapping(songs, mappings):

  mapping = {}
  songs = songs.split()
  vocab = list(set(songs))

  for i, symbols in enumerate(vocab):
    mapping[symbols] = i

  with open(MAPPING, "w") as fp:
    json.dump(mapping, fp, indent = 4)


In [None]:
def converting_songs_to_int(songs):
  integer_songs = []

  with open(MAPPING, "r") as fp:
    mapping = json.load(fp)

  songs  = songs.split()
  for song in songs:
    integer_songs.append(mapping[song])

  return integer_songs

In [None]:
def generate_training_data(seq_length):
  songs = load_file(SINGLE_FILE_DATASET)
  int_songs = converting_songs_to_int(songs)

  inputs = []
  targets = []

  number_of_sequences  = len(int_songs)  - seq_length

  for i in range(number_of_sequences):
    inputs.append(int_songs[i:i+seq_length])
    targets.append(int_songs[i+seq_length])

  vocab_size  = len(set(int_songs))

  inputs = to_categorical(inputs, num_classes=vocab_size)
  targets = np.array(targets)

  return inputs,targets

In [None]:
def build_model(output_units, num_units, loss, learning_rate):
    input = keras.layers.Input(shape=(None, output_units))
    x = keras.layers.LSTM(num_units[0])(input)
    x = keras.layers.Dropout(0.2)(x)

    output = keras.layers.Dense(output_units, activation="softmax")(x)

    model = keras.Model(input, output)

    # compile model
    model.compile(loss=loss,
                  optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
                  metrics=["accuracy"])

    model.summary()

    return model


def train_data(output_units=OUTPUT_UNITS, num_units=NUM_UNITS, loss=LOSS, learning_rate=LEARNING_RATE):
  inputs, targets = generate_training_data(SEQUENCES)
  model = build_model(output_units, num_units, loss, learning_rate)

  # train the model
  model.fit(inputs, targets, epochs=EPOCHS, batch_size=BATCH_SIZE)

  # save the model
  model.save(SAVE_MODEL_PATH)



In [None]:
class MelodyGeneration:
  def __init__(self, model_path = SAVE_MODEL_PATH, mapping_path = MAPPING):
    self.model_path = model_path
    self.mapping_path = mapping_path
    self.model = keras.models.load_model(model_path)

    with open(mapping_path, "r") as fp:
      self._mapping = json.load(fp)

    self._start_symbols = ['/'] * SEQUENCES

  def melody_generate(self, song, num_steps, max_sequence_length, temp):
    song = song.split()
    mel = song
    song = self._start_symbols + song

    song = [self._mapping[sym] for sym in song]

    for i in range(num_steps):
      song = song[-max_sequence_length:]
      onehot_song = keras.utils.to_categorical(song, num_classes=len(self._mapping))
      onehot_song = onehot_song[np.newaxis, ...]
      _prediction = self.model.predict(onehot_song)[0]
      output_integers = self._sample_with_temperature(_prediction, temp)
      song.append(output_integers)
      output_symbol = [key for key, value in self._mapping.items() if value == output_integers][0]
      if output_symbol == "/":
        break
      mel.append(output_symbol)
      print(mel)
    return mel

  def _sample_with_temperature(self, _prediction, temp):
    predictions = np.log(_prediction) / temp
    _prediction = np.exp(predictions) / np.sum(np.exp(predictions))

    choices = range(len(_prediction))
    index = np.random.choice(choices, p=_prediction)

    return index

  def save_melody(self, mel, step_duration=0.25, format="midi", file_name="mel.mid"):
    stream = m21.stream.Stream()
    start_sym = None
    step_counter = 1

    for i, sym in enumerate(mel):
      if sym != "_" or i + 1 == len(mel):
        if start_sym is not None:
          quarter_length_time = step_duration * step_counter
          if start_sym == "r":
            m21_event = m21.note.Rest(quarterLength=quarter_length_time)
          else:
            m21_event = m21.note.Note(int(start_sym), quarterLength=quarter_length_time)
          stream.append(m21_event)
          step_counter = 1
        start_sym = sym
      else:
        step_counter += 1
    stream.write(format, file_name)




In [None]:
if __name__ == "__main__":
    songs = load_songs(dataset_path)
    preprocess_dataset(songs)
    songs = converge_data_to_single_file(SAVING_DIRECTORY, SINGLE_FILE_DATASET, SEQUENCES)
    generate_mapping(songs, MAPPING)
    train_data()
    mg = MelodyGeneration()
    seed = "67 _ 67 _ 65 67 _ _ 65 64 _ 64 _ 64 _ _"
    seed2 = "67 _ _ _ _ _ 65 _ 64 _ 62 _ 60 _ _ _"
    melody = mg.melody_generate(seed,500, SEQUENCES, 0.8)
    print(melody)
    mg.save_melody(melody)

['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71', '73']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71', '73', '71']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71', '73', '71', '71']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71', '73', '71', '71', '55']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71', '73', '71', '71', '55', '67']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71', '73', '71', '71', '55', '67', '71']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_', '64', '_', '_', '71', '73', '71', '71', '55', '67', '71', '51']
['67', '_', '67', '_', '65', '67', '_', '_', '65', '64', '_', '64', '_',