In [None]:
!pip install tensorflow tensorflow-datasets librosa

In [2]:
import tensorflow as tf
from tensorflow import keras
import tensorflow_datasets as tfds
import librosa
import numpy as np

In [68]:
def nsynth_parser(data_split,
                  batch_size = 64,
                  num_batches = None,
                  instrument_label = 4,
                  source_type = None):
  # Constants
  INSTRUMENT_LABEL = instrument_label # According to NSynth dataset family label
  SOURCE_TYPE = source_type
  SAMPLE_RATE = 16000
  TRIM_LENGTH = 3 * SAMPLE_RATE  # Trim to the first 3 seconds

  def process_data(example):
      audio = example['audio']
      instrument_family = example['instrument']['family']
      instrument_source = example['instrument']['source']
      pitch = example['pitch']

      is_keyboard = tf.equal(instrument_family, INSTRUMENT_LABEL)
      is_source = tf.equal(True,True)
      if SOURCE_TYPE is not None:
          is_source = tf.equal(instrument_source, SOURCE_TYPE)

      check_valid = tf.equal(is_keyboard,is_source)

      def process_instrument_sample(audio, pitch):
          audio = audio[:TRIM_LENGTH]
          if pitch < 21:
              # Instead of returning None, return a marker (e.g., a zero-length tensor)
              return tf.zeros((0,)), tf.constant(-1, dtype=tf.int64)
          else:
              pitch = pitch - 21
              return audio, pitch

      cond_out = tf.cond(check_valid, lambda: process_instrument_sample(audio, pitch), lambda: (audio, pitch))
      return cond_out

  def filter_instrument_samples(example):
      return tf.equal(example['instrument']['family'], INSTRUMENT_LABEL)

  def filter_source_samples(example):
      return tf.equal(example['instrument']['source'], SOURCE_TYPE)

  def filter_invalid_samples(audio, pitch):
      # Check if the sample is valid (not marked for removal)
      return tf.size(audio) > 0 and tf.not_equal(pitch, -1)

  def get_data_loader(data_split, batch_size=64, num_batches=None):
      ds = tfds.load('nsynth', split=data_split, as_supervised=False)
      ds = ds.filter(filter_instrument_samples)
      if SOURCE_TYPE is not None:
          ds = ds.filter(filter_source_samples)
      ds = ds.map(process_data, num_parallel_calls=tf.data.AUTOTUNE)
      ds = ds.filter(filter_invalid_samples)
      ds = ds.batch(batch_size)
      if num_batches:
          ds = ds.take(num_batches)
      ds = ds.prefetch(tf.data.experimental.AUTOTUNE)

      return ds

  ds = get_data_loader(data_split,batch_size,num_batches)
  return ds


In [57]:
# specifying instrument and data processing parameters
batch_size = 32
instrument_label = 4 #keyboard (default so doesn't have to be passed through)
# generating the loaders
train_loader = nsynth_parser('train', batch_size, num_batches=80)
val_loader = nsynth_parser('valid', batch_size, num_batches=10)
test_loader = nsynth_parser('test', batch_size, num_batches=10)
classes = list(range(88))

In [None]:
# printing a sample audio and pitch value
for audio, pitch in train_loader.take(1):
    first_sample_audio = audio[0]
    first_sample_pitch = pitch[0]

    # Calculate the maximum and minimum values
    max_value = tf.reduce_max(first_sample_audio)
    min_value = tf.reduce_min(first_sample_audio)

    # Calculate the range (max - min)
    range_value = max_value - min_value

    print(f"First sample audio max value: {max_value.numpy()}")
    print(f"First sample audio min value: {min_value.numpy()}")
    print(f"First sample audio range: {range_value.numpy()}")
    print(f"First sample pitch: {first_sample_pitch.numpy()}")

In [None]:
# get number of batches in each loader
def get_dataset_length(data_loader):
    length = 0
    for _ in data_loader:
        length += 1
    return length

# Use this function to get the length of your data loaders
test_loader_length = get_dataset_length(test_loader)
val_loader_length = get_dataset_length(val_loader)
train_loader_length = get_dataset_length(train_loader)

print(f"Train loader length: {train_loader_length}")
print(f"Validation loader length: {val_loader_length}")
print(f"Test loader length: {test_loader_length}")

In [None]:
# get number of samples in each loader
def get_dataset_sample_count(data_loader):
    total_samples = 0
    for audio, pitch in data_loader:
        # Count the number of samples in each batch
        batch_samples = tf.shape(audio)[0]  # assuming audio is a 2D tensor [batch_size, features]
        total_samples += batch_samples
    return total_samples

# Use this function to get the number of samples in your data loaders
test_samples_count = get_dataset_sample_count(test_loader)
val_samples_count = get_dataset_sample_count(val_loader)
train_samples_count = get_dataset_sample_count(train_loader)

print(f"Train loader samples: {train_samples_count}")
print(f"Validation loader samples: {val_samples_count}")
print(f"Test loader samples: {test_samples_count}")

In [None]:
################################################################################################################################################################
####################################################################### Model and Training #####################################################################
################################################################################################################################################################

In [62]:
from keras import layers, models

In [63]:
class PitchDetectionModel(tf.keras.Model):
    def __init__(self):
        super(PitchDetectionModel, self).__init__()
        self.reshape = layers.Reshape((48000, 1))
        self.conv1 = layers.Conv1D(1024, kernel_size=4, strides=4, activation='relu')
        self.drop1 = layers.Dropout(0.2)  # Dropout layer after conv1
        self.conv2 = layers.Conv1D(128, kernel_size=4, strides=4, activation='relu')
        self.drop2 = layers.Dropout(0.2)  # Dropout layer after conv4
        self.conv3 = layers.Conv1D(128, kernel_size=4, strides=4, activation='relu')
        self.drop3 = layers.Dropout(0.5)  # Dropout layer after conv4
        self.conv4 = layers.Conv1D(256, kernel_size=2, strides=2, activation='relu')
        self.drop4 = layers.Dropout(0.5)  # Dropout layer after conv4
        self.pool = layers.MaxPooling1D(2)
        self.flatten = layers.Flatten()
        self.fc1 = layers.Dense(88)

    def call(self, x, training=False):
        x = self.reshape(x)
        x = self.conv1(x)
        x = self.pool(x)
        x = self.drop1(x, training=training)  # Apply dropout only during training
        x = self.conv2(x)
        x = self.pool(x)
        x = self.drop2(x, training=training)
        x = self.conv3(x)
        x = self.pool(x)
        x = self.drop3(x, training=training)
        x = self.conv4(x)
        x = self.pool(x)
        x = self.drop4(x, training=training)
        x = self.flatten(x)
        return self.fc1(x)


In [None]:
# Create an instance of the model
model = PitchDetectionModel()

# Compile the model
model.compile(
    optimizer=keras.optimizers.Adam(0.001),  # Optimizer
    # Loss function to minimize
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    # List of metrics to monitor
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

print("Fit model on training data")
history = model.fit(
  x=train_loader,
  epochs=35,
  verbose=1,
  validation_data=val_loader
)

In [65]:
import matplotlib.pyplot as plt

In [None]:
training_accuracy = history.history['sparse_categorical_accuracy']
validation_accuracy = history.history['val_sparse_categorical_accuracy']

epochs = range(1, len(training_accuracy) + 1)

# Plotting the accuracy graph
plt.figure(figsize=(12, 6))
plt.plot(epochs, training_accuracy, label='Training Accuracy')
plt.plot(epochs, validation_accuracy, label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
# Evaluate on test data
test_loss, test_acc = model.evaluate(test_loader)
print(f'Test accuracy: {test_acc}, Test loss: {test_loss}')

In [None]:
################################################################################################################################################################
####################################################################### Evaluation on New Data #################################################################
################################################################################################################################################################

In [69]:
# specifying instrument and data processing parameters
batch_size = 32
instrument_label = 6 #organ
source_type = 1 #electronic
# generating the loader
organ_test_loader = nsynth_parser('test',
                                  batch_size,
                                  num_batches=10,
                                  instrument_label=instrument_label,
                                  source_type=source_type)

In [None]:
# Evaluate organ_test_loader
test_loss, test_acc = model.evaluate(organ_test_loader)
print(f'Test accuracy: {test_acc}, Test loss: {test_loss}')

In [77]:
################################################################################################################################################################
####################################################################### Baseline Model #########################################################################
################################################################################################################################################################

In [101]:
SAMPLE_RATE = 16000
def pYIN_pitch_estimate_accuracy(audio,true_pitch):
  f0, voiced_flag, _ = librosa.pyin(audio, fmin=librosa.note_to_hz('C1'), fmax=librosa.note_to_hz('C8'), sr=SAMPLE_RATE)
  estimated_midi_notes = librosa.hz_to_midi(f0)
  # MIDI notes can be fractional, so we use a small tolerance to consider two pitches to be equal
  tolerance = 0.5
  correct_estimations = np.sum(np.abs(estimated_midi_notes - true_pitch) <= tolerance)
  total_voiced_frames = np.sum(~np.isnan(estimated_midi_notes))
  # Calculate the accuracy
  accuracy = correct_estimations / total_voiced_frames if total_voiced_frames > 0 else 0
  return accuracy

In [102]:
test_dataset_as_np_iterator = organ_test_loader.unbatch().as_numpy_iterator()
accuracy_accross_samples = list()
for audio, pitch in test_dataset_as_np_iterator:
  accuracy_accross_samples.append(pYIN_pitch_estimate_accuracy(audio,pitch + 21))
average_accuracy = np.mean(accuracy_accross_samples)
print(f'Average accuracy: {average_accuracy}')

Average accuracy: 0.8179077547919589
