In [1]:
from IPython.display import Audio

In [17]:
import tensorflow as tf
import tensorflow_datasets as tfds

# Load the NSynth dataset
dataset_train, dataset_test = tfds.load('nsynth', split=['train[:1%]', 'train[80%:]'])

In [2]:
del dataset_test

In [18]:
len(dataset_train)

2892

In [4]:
import random
import numpy as np

label_maps = {
    'bass':0,
    'brass':1,
    'flute':2,
    'guitar':3,
    'keyboard':4,
    'mallet':5,
    'organ':6,
    'reed':7,
    'string':8,
    'synth_lead':9
}

def get_other_instrument_audio(dataset, current_instrument_family, num_samples=1):
    """
    Select a specified number of random audio snippets from different instrument families.

    :param dataset: A dataset containing audio samples from various instruments.
    :param current_instrument_family: The instrument family of the current audio sample.
    :param num_samples: The number of different samples to return.
    :return: A list of numpy arrays, each containing audio from a different instrument.
    """
    # Filter the dataset to exclude the current instrument family
    other_instruments_dataset = [sample for sample in dataset if sample['instrument']['family'] != current_instrument_family]

    # Check if enough samples are available
    if len(other_instruments_dataset) < num_samples:
        raise ValueError("Not enough samples in the dataset for the requested number of samples")

    # Randomly select the specified number of samples from the filtered dataset
    selected_samples = random.sample(other_instruments_dataset, num_samples)

    # Extract and return the audio data from these samples
    return [sample['audio'].numpy() for sample in selected_samples]


In [32]:
import librosa
import numpy as np
from sklearn.decomposition import NMF
import tensorflow as tf

# Create dataset for training and testing
x_train = dict()
y_train = dict()

num_random_ins = 3
ins_family = 'flute' # single instrument for testing purposes
ins_class = label_maps[ins_family]
for i in range(ins_class, ins_class+1):

  # filter dataset based on instrument family
  filtered_dataset = dataset_train.filter(lambda x: x['instrument']['family'] == i)
  ins_dict_key = str(i)
  x_train = {**x_train, **{ins_dict_key:[]}}
  y_train = {**y_train, **{ins_dict_key:[]}}
  for sample in filtered_dataset:
    audio = sample['audio'].numpy()

    # add white noise
    noise = np.random.normal(0,0.01, audio.shape)
    noisy_audio = noise + audio
    stft_noise = librosa.stft(noisy_audio)
    mag_noisy, phase = librosa.magphase(stft_noise)
    x_train[ins_dict_key].append(mag_noisy.flatten())

    # add true label for white noise
    stft_clean = librosa.stft(audio)
    mag_clean, phase = librosa.magphase(stft_clean)
    y_train[ins_dict_key].append(mag_clean.flatten())

    # add other instruments
    for j in range(1, 2):
      other_instrument_audios = get_other_instrument_audio(dataset_train, ins_class, j)
      mixed_audio = audio

      for other_instrument_audio in other_instrument_audios:
        mixed_audio = mixed_audio + other_instrument_audio

      stft_mixed = librosa.stft(mixed_audio)
      mag_mixed, _ = librosa.magphase(stft_mixed)
      x_train[ins_dict_key].append(mag_mixed.flatten())

      # add true label for each instrument
      y_train[ins_dict_key].append(mag_clean.flatten())



In [24]:
ins_class

2

In [33]:
assert len(x_train[str(ins_class)]) == len(y_train[str(ins_class)])
print("Num samples:",len(x_train[str(ins_class)]))

Num samples: 190


In [16]:
import json
import numpy as np

def save_dict_with_numpy_arrays(dict_to_save, filename):
    """
    Saves a dictionary containing NumPy arrays to a JSON file.

    :param dict_to_save: Dictionary with NumPy arrays.
    :param filename: Name of the file to save the JSON.
    """
    # Convert NumPy arrays to lists
    converted_dict = {key: [array.tolist() for array in value] for key, value in dict_to_save.items()}

    # Save the dictionary to a JSON file
    with open(filename, 'w') as file:
        json.dump(converted_dict, file)

def load_dict_with_numpy_arrays(filename):
    """
    Loads a dictionary with NumPy arrays from a JSON file.

    :param filename: Name of the file to load the JSON from.
    :return: Loaded dictionary with NumPy arrays.
    """
    # Load the JSON file
    with open(filename, 'r') as file:
        loaded_dict = json.load(file)

    # Convert lists back to NumPy arrays
    for key in loaded_dict:
        loaded_dict[key] = [np.array(lst) for lst in loaded_dict[key]]

    return loaded_dict


In [7]:
# Save x and y train in json file; files are large for 10% dataset, run only if you've space

save_dict_with_numpy_arrays(x_train, 'x_train.json')
save_dict_with_numpy_arrays(y_train, 'y_train.json')

In [None]:
# load train data from json

x_train = load_dict_with_numpy_arrays(x_train, 'x_train.json')
y_train = load_dict_with_numpy_arrays(y_train, 'y_train.json')

In [36]:
import tensorflow as tf

#Define model

sample_rate = 16000
n_components = 2
input_shape = x_train[str(ins_class)][0].shape[0]
layer_size = 256
model = tf.keras.Sequential([
    tf.keras.layers.Dense(n_components*layer_size),
    tf.keras.layers.Activation(tf.keras.activations.softplus),
    tf.keras.layers.Dense(input_shape),
    tf.keras.layers.Activation(tf.keras.activations.softplus)
])


def custom_loss_function(y_true, y_pred):
    # Ensure that the prediction values are within a valid range
    # y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())

    # Compute the loss using TensorFlow operations
    # loss = tf.reduce_sum(y_true * (tf.math.log(y_true) - tf.math.log(y_pred)) - y_true + y_pred)
    loss = tf.reduce_mean(tf.square(y_true - y_pred))

    return loss

model.compile(optimizer='Adam', loss = custom_loss_function)



In [37]:
np.array(x_train[str(ins_class)]).shape


(190, 129150)

In [38]:
model.fit(np.array(x_train[str(ins_class)]), np.array(y_train[str(ins_class)]), epochs=10, batch_size=4)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7cfbe09dbd60>

In [51]:
import soundfile as sf

for sample in dataset_test.take(5):
  audio = sample['audio'].numpy()

  # Add noise to test input
  noise = np.random.normal(0,0.01, audio.shape)
  noisy_audio = noise + audio
  stft = librosa.stft(noisy_audio)

  mag_noisy, phase = librosa.magphase(stft)

  x_test = mag_noisy.flatten()
  x_test_reshaped = x_test.reshape(1, -1)  # Add batch dimension

  # Predict magnitude spectrogram
  predicted_mag = model.predict(x_test_reshaped)
  predicted_mag_reshaped = predicted_mag.reshape(mag_noisy.shape)
  reconstructed_stft = predicted_mag_reshaped * phase

  reconstructed_audio = librosa.istft(reconstructed_stft)

  # Normalize the reconstructed audio
  reconstructed_audio = np.float32(reconstructed_audio)
  max_val = max(reconstructed_audio.max(), -reconstructed_audio.min())
  reconstructed_audio /= max_val

  # Save to WAV file
  sf.write('pred.wav', reconstructed_audio, samplerate=sample_rate)

  # audio_stft = librosa.stft(audio)

  # audio_istft = librosa.istft(audio_stft)

  # # Normalize the audio
  # audio = np.float32(audio_istft)
  # max_val = max(audio.max(), -audio.min())
  # audio /= max_val

  # Save to WAV file
  sf.write('true.wav', audio, samplerate=sample_rate)




In [52]:
from IPython.display import Audio

Audio('pred.wav')

In [53]:
from IPython.display import Audio

Audio('true.wav')

# Appendix

In [None]:
import tensorflow as tf

sample_rate = 16000
n_components = 2

model = tf.keras.Sequential([
    tf.keras.layers.Dense(n_components*input_shape),
    tf.keras.layers.Activation(tf.keras.activations.softplus),
    tf.keras.layers.Dense(input_shape),
    tf.keras.layers.Activation(tf.keras.activations.softplus)
])

def loss_function(y_true, y_pred):
    input_len = y_true.shape[0]
    loss = 0
    for i in range(input_len):
      loss += y_true[i]*(np.log(y_true[i]) - np.log(y_pred[i])) - y_true[i] + y_pred[i]

    return loss


def custom_loss_function(y_true, y_pred):
    # Ensure that the prediction values are within a valid range
    # y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())

    # Compute the loss using TensorFlow operations
    loss = tf.reduce_sum(y_true * (tf.math.log(y_true) - tf.math.log(y_pred)) - y_true + y_pred)

    return loss


# Apply NMF to the magnitude of the STFT (since phase information is not non-negative)

# model = NMF(n_components=n_components, init='random', random_state=0)
# W = model.fit_transform(magnitude)
# H = model.components_


# # Reconstruct the magnitude from NMF components
# idx = 0
# reconstructed_magnitude = np.dot(W[:,idx], H[idx])

# # Combine with original phase
# reconstructed_stft = reconstructed_magnitude * phase

# Inverse STFT to get the audio signal
# reconstructed_audio = librosa.istft(reconstructed_stft)
reconstructed_sources = []
for i in range(n_components):
    # Reconstruct the magnitude for each source
    source_magnitude = np.outer(W[:, i], H[i])

    # Combine with original phase information
    source_stft = source_magnitude * phase

    # Inverse STFT to get the time-domain signal of each source
    source_audio = librosa.istft(source_stft)
    reconstructed_sources.append(source_audio)

# reconstructed_audio is the approximation of the original audio after NMF decomposition and reconstruction
