In [97]:
import librosa
import numpy as np
import tensorflow as tf
import soundfile as sf

In [98]:
# Load clean and noisy audio files
s, sr_s = librosa.load('train_clean_male.wav', sr=None)
sn, sr_sn = librosa.load('train_dirty_male.wav', sr=None)
t, sr_t = librosa.load('test_s_01.wav', sr=None)
tn, sr_tn = librosa.load('test_x_01.wav', sr=None)
i, sr_i = librosa.load('test_x_02.wav', sr=None)

# Set the STFT parameters
n_fft = 1024
hop_length = 512

# Compute the STFT and magnitude spectrograms
X = librosa.stft(sn, n_fft=n_fft, hop_length=hop_length)
S = librosa.stft(s, n_fft=n_fft, hop_length=hop_length)

TN = librosa.stft(tn, n_fft=n_fft, hop_length=hop_length)
T = librosa.stft(t, n_fft=n_fft, hop_length=hop_length)

I = librosa.stft(i, n_fft=n_fft, hop_length=hop_length)

# print the min, max of the arrays and the dtype
print(np.min(X), np.max(X), X.dtype, X.shape)
print(np.min(S), np.max(S), S.dtype, S.shape)

print(np.min(TN), np.max(TN), TN.dtype, TN.shape)
print(np.min(T), np.max(T), T.dtype, T.shape)

print(np.min(I), np.max(I), I.dtype, I.shape)


# Transpose the magnitude spectrograms
X_train = X.T
Y_train = S.T
X_test = TN.T
Y_test = T.T
X_input = I.T


# print the shape of the magnitude spectrograms
print('X_train shape:', X_train.shape)
print('Y_train shape:', Y_train.shape)
print('X_test shape:', X_test.shape)
print('Y_test shape:', Y_test.shape)
print('X_input shape:', X_input.shape)

print('X Shape:', X.shape)
print('S Shape:', S.shape)
print('TN Shape:', TN.shape)
print('T Shape:', T.shape)
print('I Shape:', I.shape)

(-30.21952+9.251362j) (21.528316-6.439366j) complex64 (513, 2459)
(-30.195765+9.228826j) (21.51682-6.52095j) complex64 (513, 2459)
(-21.403784-3.3156915j) (22.47935+4.4695153j) complex64 (513, 142)
(-21.564178-3.344126j) (22.465538+4.424191j) complex64 (513, 142)
(-10.3424015+1.1153554j) (10.666173+1.1317672j) complex64 (513, 380)
X_train shape: (2459, 513)
Y_train shape: (2459, 513)
X_test shape: (142, 513)
Y_test shape: (142, 513)
X_input shape: (380, 513)
X Shape: (513, 2459)
S Shape: (513, 2459)
TN Shape: (513, 142)
T Shape: (513, 142)
I Shape: (513, 380)


In [99]:
def snr_metric(y_true, y_pred):
    # Compute the signal power (sum of squares of the true signal)
    signal_power = tf.reduce_sum(tf.square(y_true), axis=-1)

    # Compute the noise power (sum of squares of the difference between true and predicted signal)
    noise_power = tf.reduce_sum(tf.square(y_true - y_pred), axis=-1)

    # Add a small epsilon to avoid division by zero
    epsilon = 1e-20

    # Calculate the SNR
    snr = 10 * tf.math.log(signal_power / (noise_power + epsilon)) / tf.math.log(10.0)

    # Define a reasonable SNR range to map to a 0-100% scale
    # SNR in practice could be in a range like -20 dB to +40 dB
    snr_min = -20.0  # Poor signal
    snr_max = 40.0   # Excellent signal

    # Clip SNR to this range to avoid extreme values skewing the result
    snr_clipped = tf.clip_by_value(snr, snr_min, snr_max)

    # Normalize SNR to a 0-100% scale
    snr_percentage = (snr_clipped - snr_min) / (snr_max - snr_min) * 100.0

    # Return the average percentage score across all samples
    return tf.reduce_mean(snr_percentage)


In [108]:
# Define the speech enhancement model
def train_speech_enhancement_model(X_train_raw, Y_train_raw, X_test_raw, Y_test_raw, X_input_raw,
                                   layers=[512, 256, 128], activations=['relu', 'relu', 'relu'],
                                   output_activation='relu',
                                   epochs=50, optimizer='adam'):

    # Take the absolute value of the magnitude spectrograms
    Y_train_magnitude = np.abs(Y_train_raw)
    X_train_magnitude = np.abs(X_train_raw)
    Y_test_magnitude = np.abs(Y_test_raw)
    X_test_magnitude = np.abs(X_test_raw)
    X_input_magnitude = np.abs(X_input_raw)

    print('X_train :', X_train.shape, X_train.dtype)
    print('Y_train :', Y_train.shape, Y_train.dtype)
    print('X_test :', X_test.shape, X_test.dtype)
    print('Y_test :', Y_test.shape, Y_test.dtype)
    print('X_input :', X_input.shape, X_input.dtype)

    # normalize the magnitude spectrograms using standard scaler
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_magnitude)
    Y_train_scaled = scaler.transform(Y_train_magnitude)
    X_test_scaled = scaler.transform(X_test_magnitude)
    Y_test_scaled = scaler.transform(Y_test_magnitude)
    X_input_scaled = scaler.transform(X_input_magnitude)

    # Define the SNR loss function
    def snr_loss_fn(y_true, y_pred):
        # Compute the signal power (sum of squares of the true signal)
        signal_power = tf.reduce_sum(tf.square(y_true), axis=-1)
        noise_power = tf.reduce_sum(tf.square(y_true - y_pred), axis=-1)
        epsilon = 1e-20
        snr = 10 * tf.math.log(signal_power / (noise_power + epsilon)) / tf.math.log(10.0)
        return -tf.reduce_mean(snr)


    def snr_metric(y_true, y_pred):
        signal_power = tf.reduce_mean(tf.square(y_true))
        noise_power = tf.reduce_mean(tf.square(y_true - y_pred))
        snr = 10.0 * tf.math.log(signal_power / noise_power) / tf.math.log(10.0)
        return snr


    # Ensure input shape for the model
    input_shape = 513
    print(X_train.shape[1])
    # Build the model
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.InputLayer(shape=(input_shape,)))
    for units, activation in zip(layers, activations):
        model.add(tf.keras.layers.Dense(units, activation=activation))
    model.add(tf.keras.layers.Dense(input_shape, activation=output_activation))

    # Compile the model
    model.compile(optimizer=SGD(learning_rate=0.1), loss=snr_loss_fn, metrics=[snr_metric])

    # Train the model
    history = model.fit(X_train_scaled, Y_train_scaled, epochs=epochs, validation_data=(X_test_scaled, Y_test_scaled))

    # Test the model on the test dataset
    X_output = model.predict(X_test_scaled)

    # Inverse transform the scaled magnitude spectrograms
    Y_output_magnitude = scaler.inverse_transform(X_output)

    # Compute the SNR on the test dataset
    signal_power = np.sum(np.square(Y_test_magnitude), axis=-1)
    noise_power = np.sum(np.square(Y_test_magnitude - Y_output_magnitude), axis=-1)
    snr = np.mean(10 * np.log10(signal_power / noise_power))

    # recapture the original phase information
    Y_output = Y_output_magnitude * Y_test_raw / Y_test_magnitude

    # Inverse STFT to obtain the enhanced audio
    Y_output = librosa.istft(Y_output.T, hop_length=hop_length)

    # Return the model and prediction results
    return model, Y_output, history, snr


In [109]:
    # def plot_spectrograms(original, noisy, enhanced, sr, hop_length):
    #     fig, axes = plt.subplots(3, 1, figsize=(12, 10))

    #     # Plot original clean audio spectrogram
    #     ax = axes[0]
    #     S_dB = librosa.amplitude_to_db(np.abs(original), ref=np.max)
    #     img = librosa.display.specshow(S_dB, sr=sr, hop_length=hop_length, x_axis='time', y_axis='log', ax=ax)
    #     ax.set_title('Original Clean Spectrogram')
    #     fig.colorbar(img, ax=ax, format="%+2.0f dB")

    #     # Plot noisy audio spectrogram
    #     ax = axes[1]
    #     X_dB = librosa.amplitude_to_db(np.abs(noisy), ref=np.max)
    #     img = librosa.display.specshow(X_dB, sr=sr, hop_length=hop_length, x_axis='time', y_axis='log', ax=ax)
    #     ax.set_title('Noisy Spectrogram')
    #     fig.colorbar(img, ax=ax, format="%+2.0f dB")

    #     # Plot enhanced audio spectrogram
    #     ax = axes[2]
    #     Y_dB = librosa.amplitude_to_db(np.abs(enhanced), ref=np.max)
    #     img = librosa.display.specshow(Y_dB, sr=sr, hop_length=hop_length, x_axis='time', y_axis='log', ax=ax)
    #     ax.set_title('Enhanced Spectrogram')
    #     fig.colorbar(img, ax=ax, format="%+2.0f dB")

    #     plt.tight_layout()
    #     plt.show()
    #     # Plot original clean audio spectrogram
    # plot_spectrograms(T, TN, Y_output, sr_t, hop_length)


In [110]:
import itertools
import matplotlib.pyplot as plt

# Define the grid search function
def grid_search_snr(X_train, Y_train, X_test, Y_test, X_input, param_grid):

    param_combinations = list(itertools.product(*param_grid.values()))
    histories = []
    Y_outputs = []
    models = []
    snrs = []

    # Perform grid search
    for param_combination in param_combinations:
        params = {key: value for key, value in zip(param_grid.keys(), param_combination)}

        print(f"Training with parameters: {params}")  # Print the current parameter combination

        # Train the model with the current parameter combination
        model, Y_output, history, snr = train_speech_enhancement_model(
            X_train, Y_train, X_test, Y_test, X_input,
            layers=params['layers'],
            activations=params['activations'],
            output_activation=params['output_activation'],
            epochs=params['epochs'],
            optimizer=params['optimizer']
        )

        # Store the result
        histories.append({
            'params': params,
            'history': history
        })

        Y_outputs.append({
            'params': params,
            'Y_output': Y_output,
            'snr': snr
        })

        models.append({
            'params': params,
            'model': model
        })

        snrs.append({
            'params': params,
            'snr': snr
        })

    return models, Y_outputs, histories, snrs

In [111]:
from google.colab import drive
import soundfile as sf
import os

# Mount the Google Drive
drive.mount('/content/drive')

def save_y_outputs(Y_outputs, sr):
    # Define the path in Google Drive
    drive_path = '/content/drive/My Drive/Colab_Audio_Files'
    os.makedirs(drive_path, exist_ok=True)  # Ensure the directory exists

    # Loop through each output dict in Y_outputs
    for idx, output_dict in enumerate(Y_outputs):
        params = output_dict['params']  # Extract parameters
        Y_output = output_dict['Y_output']  # Extract audio data

        snr = output_dict['snr']  # Extract SNR

        # Construct the filename safely
        filename = f"{params['layers']}_{params['activations']}_{params['epochs']}_[{snr}].wav"

        # Define the full path for the file
        full_path = os.path.join(drive_path, filename)

        # Save the audio file
        sf.write(full_path, Y_output, sr)

        # Print confirmation
        print(f"Saved {full_path}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [112]:
import matplotlib.pyplot as plt

def plot_losses(histories, snrs):
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 15))  # Vertically stacked subplots

    for history_info, snr_info in zip(histories, snrs):
        params = history_info['params']
        snr = snr_info['snr']
        history = history_info['history']

        label = f"Params: {params}, SNR: {snr:.2f} dB"

        ax1.plot(history.history['val_loss'], label=label)
        ax2.plot(history.history['val_snr_metric'], label=label)

    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss')
    ax1.set_title('Validation Loss over Epochs')

    ax2.set_xlabel('Epochs')
    ax2.set_ylabel('SNR (dB)')
    ax2.set_title('Validation SNR over Epochs')

    # Position the legends beneath each plot
    ax1.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15))
    ax2.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15))

    # Adjust the layout for more space between subplots
    plt.tight_layout()
    plt.subplots_adjust(bottom=0.2, hspace=2)  # Increase hspace for more vertical space between subplots
    plt.show()


In [113]:
import pandas as pd
def snr_table(snrs):

    # print the SNR values in a dataframe
    snr_df = pd.DataFrame(snrs)
    snr_df = snr_df.sort_values(by='snr', ascending=False)
    print(snr_df)


Testing

In [114]:
from tensorflow.keras.optimizers import SGD

# Define the parameter grid
param_grid = {
    'layers': [[128, 64, 32, 64]],
    'activations': [['tanh', 'tanh', 'tanh', 'tanh'], ['sigmoid', 'sigmoid', 'sigmoid', 'sigmoid'], ['relu', 'relu', 'relu', 'relu'], ['leaky_relu', 'leaky_relu', 'leaky_relu', 'leaky_relu'], ['elu', 'elu', 'elu', 'elu']],
    'output_activation': ['relu', 'softplus', 'exponential', 'sigmoid'],
    'epochs': [100],  # Shorter epochs for grid search
    'optimizer': ['adam']
}

In [None]:
models, Y_outputs, histories, snrs = grid_search_snr(X_train, Y_train, X_test, Y_test, X_test, param_grid)

Training with parameters: {'layers': [128, 64, 32, 64], 'activations': ['tanh', 'tanh', 'tanh', 'tanh'], 'output_activation': 'relu', 'epochs': 100, 'optimizer': 'adam'}
X_train : (2459, 513) complex64
Y_train : (2459, 513) complex64
X_test : (142, 513) complex64
Y_test : (142, 513) complex64
X_input : (380, 513) complex64
513
Epoch 1/100
[1m77/77[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 33ms/step - loss: 0.3257 - snr_metric: -0.1709 - val_loss: 0.0083 - val_snr_metric: 0.0034
Epoch 2/100
[1m77/77[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step - loss: 0.0014 - snr_metric: 0.0273 - val_loss: -0.0145 - val_snr_metric: 0.0276
Epoch 3/100
[1m77/77[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: -0.0436 - snr_metric: 0.0957 - val_loss: -0.0393 - val_snr_metric: 0.0569
Epoch 4/100
[1m77/77[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: -0.0879 - snr_metric: 0.1717 - val_loss: -0.0639 - val_snr_metric: 0.0880
Epoc

In [None]:
save_y_outputs(Y_outputs, sr_tn)


In [None]:
plot_losses(histories, snrs)


In [None]:
snr_table(snrs)