In [1]:
!pip install pretty_midi



In [1]:
import os
import matplotlib
import collections
import pretty_midi
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from IPython import display
from typing import Optional
import matplotlib.pyplot as plt

In [2]:
try:
  resolver = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
  tf.config.experimental_connect_to_cluster(resolver)
  tf.tpu.experimental.initialize_tpu_system(resolver)
  strategy = tf.distribute.TPUStrategy(resolver)
  print("Running on TPU ", resolver.master())
except ValueError:
  strategy = tf.distribute.get_strategy()  # Default strategy that works on CPU and single GPU
  print("Running on CPU/GPU")

print("Number of accelerators: ", strategy.num_replicas_in_sync)


Running on CPU/GPU
Number of accelerators:  1


In [3]:
BATCH_SIZE = 128 * strategy.num_replicas_in_sync

In [4]:
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# Sampling rate for audio playback
_SAMPLING_RATE = 16000

In [5]:
# dir = '/midis/'
# local_dir = 'fuzzy_proj/EMOPIA_2.2/midis/'
dir = 'content/midis/'
files = os.listdir(dir)


In [6]:
def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
  waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
  # Take a sample of the generated waveform to mitigate kernel resets
  waveform_short = waveform[:seconds*_SAMPLING_RATE]
  return display.Audio(waveform_short, rate=_SAMPLING_RATE)

In [7]:
def midi_to_notes(midi_file: str) -> pd.DataFrame:
  pm = pretty_midi.PrettyMIDI(midi_file)
  instrument = pm.instruments[0]
  notes = collections.defaultdict(list)

  # Sort the notes by start time
  sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
  prev_start = sorted_notes[0].start

  for note in sorted_notes:
    start = note.start
    end = note.end
    notes['pitch'].append(note.pitch)
    notes['velocity'].append(note.velocity)
    notes['start'].append(start)
    notes['end'].append(end)
    notes['step'].append(start - prev_start)
    notes['duration'].append(end - start)
    prev_start = start

  return pd.DataFrame({name: np.array(value) for name, value in notes.items()})

In [8]:
df = pd.read_csv('content/EMOPIA_2.2_normalized_metadata_by_song.csv')

In [9]:
def plot_piano_roll(notes: pd.DataFrame, count: Optional[int] = None):
    if count is None:
        count = len(notes)
    title = f'First {count} notes' if count else 'Whole track'

    plt.figure(figsize=(5, 15))  # Change the figure size as needed
    for index, note in notes.iterrows():
        if index < count:
            color=matplotlib.colors.to_hex((min(0.1 * note['velocity'], 1), 0.3, 0.5))
            plt.plot([note['pitch'], note['pitch']], [note['start'], note['end']], color=color)

    plt.gca().invert_yaxis()  # Invert the y-axis so notes "fall" from top to bottom
    plt.title(title)
    plt.xlabel('Pitch')
    plt.ylabel('Time [s]')
    plt.grid(True)  # Optional: adds a grid to the plot
    plt.show()


In [10]:
def plot_distributions(notes: pd.DataFrame, drop_percentile=2.5):
  plt.figure(figsize=[15, 5])
  plt.subplot(1, 3, 1)
  sns.histplot(notes, x="pitch", bins=20)

  plt.subplot(1, 3, 2)
  max_step = np.percentile(notes['step'], 100 - drop_percentile)
  sns.histplot(notes, x="step", bins=np.linspace(0, max_step, 21))

  plt.subplot(1, 3, 3)
  max_duration = np.percentile(notes['duration'], 100 - drop_percentile)
  sns.histplot(notes, x="duration", bins=np.linspace(0, max_duration, 21))

In [12]:
import random

# 250 represents number of files. This can be changed to len(files) to get all files
random_files = random.sample(files, 250)


In [15]:
# Create dataset
#################################### CHANGE ####################################
# num_files = 15 # Change this to len(files) to process all files.
num_files = len(random_files)
#################################### CHANGE ####################################

all_notes = []
local_dir = '/midis/'
for f in files[:num_files]:
  # four quarters, q1, q2, q3, q4, one hot encoded
  moods = [0, 0, 0, 0]

  print(f, end='\r')

  mood = f.split('_')[0]

  # read file fuzzy_proj/EMOPIA_2.2_normalized_metadata_by_song.csv
  df = pd.read_csv('content/EMOPIA_2.2_normalized_metadata_by_song.csv')
  # get Q1 of the row when column songID==id
  id = f.split('_')[1].split('.mid')[0]
  df = df.loc[df['songID'] == id]

  if df.empty:
    continue

  Q1 = df['num_Q1'].values[0]
  Q2 = df['num_Q2'].values[0]
  Q3 = df['num_Q3'].values[0]
  Q4 = df['num_Q4'].values[0]

  f = 'content/midis/' + f
  data = midi_to_notes(f)

  data['Q1'] = Q1; data['Q2'] = Q2; data['Q3'] = Q3; data['Q4'] = Q4; data['id'] = id

  all_notes.append(data)

all_notes = pd.concat(all_notes)
all_notes

Q2_cm6E860vDjY_0.midd

Unnamed: 0,pitch,velocity,start,end,step,duration,Q1,Q2,Q3,Q4,id
0,43,48,0.148438,0.729167,0.000000,0.580729,-0.469478,-0.519611,0.964776,0.161622,egYSmNuIFGk
1,55,39,0.164062,0.729167,0.015625,0.565104,-0.469478,-0.519611,0.964776,0.161622,egYSmNuIFGk
2,60,53,0.447917,1.529948,0.283854,1.082031,-0.469478,-0.519611,0.964776,0.161622,egYSmNuIFGk
3,52,50,0.483073,1.009115,0.035156,0.526042,-0.469478,-0.519611,0.964776,0.161622,egYSmNuIFGk
4,43,59,0.735677,1.269531,0.252604,0.533854,-0.469478,-0.519611,0.964776,0.161622,egYSmNuIFGk
...,...,...,...,...,...,...,...,...,...,...,...
392,65,69,31.200521,31.235677,0.003906,0.035156,-0.469478,1.774184,-0.468500,-0.640272,cm6E860vDjY
393,39,97,31.433594,31.753906,0.233073,0.320312,-0.469478,1.774184,-0.468500,-0.640272,cm6E860vDjY
394,27,94,31.434896,31.748698,0.001302,0.313802,-0.469478,1.774184,-0.468500,-0.640272,cm6E860vDjY
395,26,71,31.865885,31.959635,0.430990,0.093750,-0.469478,1.774184,-0.468500,-0.640272,cm6E860vDjY


In [16]:
def one_hot_encode(series, num_classes):
    return np.eye(num_classes)[series.astype(int)]

def create_sequences(df, sequence_length=100):
    X, y_pitch, y_velocity, y_step, y_duration = [], [], [], [], []
    unique_song_ids = df['id'].unique()

    for song_id in unique_song_ids:
        song_df = df[df['id'] == song_id]
        song_length = len(song_df)

        # Create sequences for each song
        for i in range(song_length - sequence_length):
            sequence_df = song_df.iloc[i:i+sequence_length]
            target_idx = i + sequence_length

            # Prepare the sequence for X
            sequence_x = np.hstack((one_hot_encode(sequence_df['pitch'], 128),
                                    one_hot_encode(sequence_df['velocity'], 128),
                                    sequence_df[['step', 'duration', 'Q1', 'Q2', 'Q3', 'Q4']].values))

            # Prepare the next values for y
            target_pitch = song_df.iloc[target_idx]['pitch']
            target_velocity = song_df.iloc[target_idx]['velocity']
            target_step = song_df.iloc[target_idx]['step']
            target_duration = song_df.iloc[target_idx]['duration']

            # Append to our X and y lists
            X.append(sequence_x)
            y_pitch.append(target_pitch)
            y_velocity.append(target_velocity)
            y_step.append(target_step)
            y_duration.append(target_duration)

    # Convert lists to numpy arrays
    X = np.array(X)
    y = {
        'pitch': np.array(y_pitch),
        'velocity': np.array(y_velocity),
        'step': np.array(y_step),
        'duration': np.array(y_duration)
    }

    return X, y


In [17]:
seq_length = 100
x, y = create_sequences(all_notes, seq_length)
print('x shape:')
print(x.shape)
print(y)
print(len(y['duration']))

x shape:
(50142, 100, 262)
{'pitch': array([44, 64, 36, ..., 62, 68, 71]), 'velocity': array([46, 51, 40, ..., 66, 65, 68]), 'step': array([0.35286458, 0.45182292, 0.00651042, ..., 0.14713542, 0.00260417,
       0.        ]), 'duration': array([0.53645833, 0.75911458, 0.453125  , ..., 0.0546875 , 0.05208333,
       0.05208333])}
50142


In [18]:
def mse_with_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
  mse = (y_true - y_pred) ** 2
  positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
  return tf.reduce_mean(mse + positive_pressure)

In [19]:
with strategy.scope():
  def create_model(input_shape, learning_rate):
    inputs = tf.keras.Input(shape=input_shape, name='Input_Layer')

    # Base LSTM layers with attention
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True, name='LSTM_Base'))(inputs)
    x = tf.keras.layers.Attention(name='Attention_Base')([x, x])

    # Additional LSTM + Attention layers for deep feature extraction
    # for i in range(3):
    #     lstm_layer = tf.keras.layers.LSTM(512, return_sequences=True, name=f'LSTM_{i}')
    #     x = tf.keras.layers.Bidirectional(lstm_layer)(x)
    #     x = tf.keras.layers.Attention(name=f'Attention_{i}')([x, x])

    # Final LSTM layer before branching
    final_lstm_layer = tf.keras.layers.LSTM(64, return_sequences=False, name='LSTM_Final')
    x = tf.keras.layers.Bidirectional(final_lstm_layer)(x)

    # Branch for categorical outputs
    x_cat = tf.keras.layers.Dense(128, activation='relu', name='Dense_Cat_1')(x)
    x_cat = tf.keras.layers.Dropout(0.3, name='Dropout_Cat')(x_cat)
    pitch = tf.keras.layers.Dense(128, activation='softmax', name='pitch')(x_cat)
    velocity = tf.keras.layers.Dense(128, activation='softmax', name='velocity')(x_cat)

    # Branch for continuous outputs
    x_cont = tf.keras.layers.Dense(512, activation='relu', name='Dense_Cont_1')(x)
    x_cont = tf.keras.layers.Dropout(0.3, name='Dropout_Cont')(x_cont)
    step = tf.keras.layers.Dense(1, activation=None, name='step')(x_cont)  # Linear activation for continuous
    duration = tf.keras.layers.Dense(1, activation=None, name='duration')(x_cont)  # Linear activation for continuous

    # Model setup
    model = tf.keras.Model(inputs=inputs, outputs={'pitch': pitch, 'velocity': velocity, 'step': step, 'duration': duration})
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss={
            'pitch': tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
            'velocity': tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
            'step': 'mse',   # Mean Squared Error for regression
            'duration': 'mse'
        }
    )

    return model


  # seq_length (time series), 8 features (pitch, velocity,...) pitch and velocity must be applied by 128 (vocabulary size)
  input_shape = (seq_length, 262)
  learning_rate = 0.01

In [20]:
with strategy.scope():
  callbacks = [
      tf.keras.callbacks.ModelCheckpoint(
          filepath='./training_checkpoints_large_model/ckpt_{epoch}.weights.h5',
          save_weights_only=True),
      tf.keras.callbacks.EarlyStopping(
          monitor='loss',
          patience=100,
          verbose=1,
          restore_best_weights=True),
  ]

In [21]:
def convert_types(feature, labels):
    labels['step'] = tf.cast(labels['step'], tf.float32)
    labels['duration'] -= tf.cast(labels['duration'], tf.float32)
    return feature, labels

In [26]:
# create model
with strategy.scope():
  # Create and summarize the model
  model = create_model(input_shape, learning_rate)

  # pitch	start	end	step	duration	velocity	Q1	Q2	Q3	Q4
  loss = {
        'pitch': tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
        'velocity': tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
        'step': mse_with_positive_pressure,
        'duration': mse_with_positive_pressure,
  }

  optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

  model.compile(loss=loss, optimizer=optimizer)

  model.summary()

  # prompt: define callbacks, ensure Adam optimizer, 0.005 training rate, and all the other basics. Use context from the entire notebook to write the callbacks

with strategy.scope():
  model.compile(
      loss=loss,
      loss_weights={
          'pitch': 1.0,
          'velocity': 0.005,
          'step': 0.0001,
          'duration':0.001,
      },
      optimizer=tf.keras.optimizers.Adam(learning_rate=0.005),
      # run_eagerly=True
  )

with strategy.scope():
  callbacks = [
      tf.keras.callbacks.ModelCheckpoint(
          filepath='training_medium_checkpoints/ckpt_{epoch}.weights.h5',
          save_weights_only=True),
      tf.keras.callbacks.EarlyStopping(
          monitor='loss',
          patience=500,
          verbose=1,
          restore_best_weights=True),
  ]


In [27]:
# sometimes this cell must be run 1-3 times before error disapeasrs
num_samples = x.shape[0]
steps_per_epoch = num_samples // BATCH_SIZE
print('steps_per:', steps_per_epoch)
print('num_samples:', num_samples)
print('BATCH_SIZE:', BATCH_SIZE)

epochs = 1_000 # for testing

history = model.fit(
    x, y,
    batch_size=BATCH_SIZE,
    initial_epoch=0,
    epochs=epochs,
    callbacks=callbacks,
    verbose=1,
)

steps_per: 391
num_samples: 50142
BATCH_SIZE: 128
Epoch 1/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m93s[0m 230ms/step - loss: 3.8533
Epoch 2/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 265ms/step - loss: 3.2933
Epoch 3/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m101s[0m 257ms/step - loss: 3.1396
Epoch 4/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m114s[0m 291ms/step - loss: 3.0363
Epoch 5/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m115s[0m 293ms/step - loss: 2.9739
Epoch 6/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m118s[0m 300ms/step - loss: 2.9102
Epoch 7/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m115s[0m 294ms/step - loss: 2.8499
Epoch 8/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m117s[0m 298ms/step - loss: 2.7963
Epoch 9/1000
[1m392/392[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m116s[0m 295ms/step - l

In [None]:
plt.plot(history.epoch, history.history['loss'], label='total loss')
plt.show()