In [112]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os

import librosa # audio processing
import math

from tqdm import tqdm

In [113]:
SAMPLE_RATE = 22050
# NUM_SEGMENTS = 6
SEGMENT_SIZE = 5000
N_FFT = 2048
HOP_LENGTH = 512 
N_MFCC = 60

In [114]:
# 1. get clean segment MFCCs
# 2. for every other directory get noise segment MFCCs and append them to training set associated with target clean segment MFCC extracted earlier

clean_segment_mfccs = np.array([])
noise_segment_mfccs = np.array([])
audios_segments = [] #2d array with each row mentioning what segments are in single audio file
X = np.array([])
y = np.array([])
mfcc_shape = None
for dirname, _, filenames in os.walk('/kaggle/input/noizeus/noizeus_corpora-master/NOIZEUS/clean_noizeus/wav'):
    for filename in sorted(filenames):
        file_path = str(os.path.join(dirname, filename))
        signal, sr = librosa.load(file_path, sr = SAMPLE_RATE)
        
        for index in range(0, len(signal), SEGMENT_SIZE):
            start_sample = index
            end_sample = index + SEGMENT_SIZE
            if(end_sample > len(signal) + 1 ): # Making sure the last unfilled segment is truncated
                continue
            mfcc = librosa.feature.mfcc(y=signal[start_sample:end_sample],
                                                sr=sr,
                                                n_fft=N_FFT,
                                                n_mfcc=N_MFCC,
                                                hop_length=HOP_LENGTH)

            mfcc = mfcc.T
            mfcc = np.reshape(mfcc, (1, np.prod(mfcc.shape)))
            mfcc = mfcc.squeeze()
            if clean_segment_mfccs.size == 0:
                clean_segment_mfccs = mfcc
            else:
                clean_segment_mfccs = np.vstack((clean_segment_mfccs, mfcc))
        

X_index = 0
for noise_dir in tqdm(os.listdir('/kaggle/input/noizeus/noizeus_corpora-master/NOIZEUS/')):
    if(noise_dir == 'clean_noizeus'): 
        continue
    data = np.array([])
    noise_segment_mfccs = np.array([])
    for dirname, _, filenames in os.walk(f'/kaggle/input/noizeus/noizeus_corpora-master/NOIZEUS/{noise_dir}/wav'):
        for filename in sorted(filenames):
            audio_segments = []
            file_path = str(os.path.join(dirname, filename))
            signal, sr = librosa.load(file_path, sr = SAMPLE_RATE)

            for segment in range(0, len(signal), SEGMENT_SIZE):
                start_sample = segment
                end_sample = segment + SEGMENT_SIZE
                if(end_sample > len(signal) + 1 ): # Making sure the last unfilled segment is truncated
                    continue
                mfcc = librosa.feature.mfcc(y=signal[start_sample:end_sample],
                                                    sr=sr,
                                                    n_fft=N_FFT,
                                                    n_mfcc=N_MFCC,
                                                    hop_length=HOP_LENGTH)
                mfcc = mfcc.T
                mfcc_shape = mfcc.shape
                mfcc = np.reshape(mfcc, (1, np.prod(mfcc.shape)))
                mfcc = mfcc.squeeze()
                if noise_segment_mfccs.size == 0:
                    noise_segment_mfccs = mfcc
                else:
                    noise_segment_mfccs = np.vstack((noise_segment_mfccs, mfcc))
                audio_segments.append(X_index)
                X_index +=1
            audios_segments.append(audio_segments)
    if X.size == 0:
        X = noise_segment_mfccs
    else:
        X = np.concatenate((X, noise_segment_mfccs), axis = 0)
    if y.size == 0:
        y = clean_segment_mfccs
    else:
        y = np.concatenate((y, clean_segment_mfccs), axis = 0)


100%|██████████| 33/33 [02:06<00:00,  3.83s/it]


In [129]:
#Adjust the data shape to reverse the product and traspose shape to get back the MFCC vector
def adjust_shape(y):
    y = y.reshape(mfcc_shape)
    return y.T

In [116]:
import tensorflow as tf
from tensorflow.keras import backend as K
import sys

#to make numpy soorta methods work on tf symbolic tensor
tf.experimental.numpy.experimental_enable_numpy_behavior()

In [117]:
#Loss function todo: test this later 

def hybrid_loss(y_true, y_pred):
    
    # Mean Squared Error in the MFCC domain
    
    #adjust the shape and transpose 
    y_true = adjust_shape(y_true)
    y_pred = adjust_shape(y_pred)
    
    mse_loss = tf.reduce_mean(tf.square(y_pred - y_true), axis=-1)
    
    # Compute spectrograms from MFCCs
    y_true_audio = librosa.feature.inverse.mfcc_to_audio(y_true, sr=SAMPLE_RATE, n_fft=N_FFT, hop_length=HOP_LENGTH)
    y_pred_audio = librosa.feature.inverse.mfcc_to_audio(y_pred, sr=SAMPLE_RATE, n_fft=N_FFT, hop_length=HOP_LENGTH)
    
    y_true_audio = tf.convert_to_tensor(y_true_audio, dtype=tf.float32)
    y_pred_audio = tf.convert_to_tensor(y_pred_audio, dtype=tf.float32)
    
    spectrogram_true = tf.signal.stft(y_true_audio, frame_length=N_FFT, frame_step=HOP_LENGTH, fft_length=N_FFT)
    spectrogram_pred = tf.signal.stft(y_pred_audio, frame_length=N_FFT, frame_step=HOP_LENGTH, fft_length=N_FFT)
    
    # Magnitude Spectrogram Loss
    mag_spec_loss = tf.reduce_mean(tf.square(tf.abs(spectrogram_pred) - tf.abs(spectrogram_true)), axis=-1)
    
    # Combine the losses
    total_loss = mse_loss + mag_spec_loss
    
    return total_loss


In [120]:
#Used from https://www.kaggle.com/code/residentmario/autoencoders
from keras.layers import Input, Dense
from keras.models import Model
from keras import regularizers
from keras.losses import MeanAbsoluteError

input_segment = Input(shape=(X.shape[1],))
encoded = Dense(256, activation='relu')(input_segment)  # encoding_dim = 256
encoded = Dense(128, activation='relu')(encoded)  # encoding_dim = 128
encoded = Dense(32, activation='relu')(encoded)  # encoding_dim = 32
decoded = Dense(128, activation='relu')(encoded)
decoded = Dense(256, activation='relu')(decoded)
decoded = Dense(X.shape[1], activation='sigmoid')(decoded)

# this model maps an input to its reconstruction
autoencoder = Model(input_segment, decoded)

# get the encoder and decoder as seperate models
# encoder
encoder = Model(input_segment, encoded)

# decoder
encoded_input = Input(shape=(32,))  # encoding_dim = 32
decoder_layer1 = autoencoder.layers[-1]
decoder_layer2 = autoencoder.layers[-2]
decoder_layer3 = autoencoder.layers[-3]
decoder = Model(encoded_input, decoder_layer1(decoder_layer2(decoder_layer3(encoded_input))))


autoencoder.compile(optimizer='adadelta', loss=MeanAbsoluteError())

In [121]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size= 0.3, random_state= 69)

In [122]:
autoencoder.fit(X_train, y_train,
                epochs=150,
                batch_size=256,
                validation_data=(X_test, y_test),
                verbose=1)

Epoch 1/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 21ms/step - loss: 20.8203 - val_loss: 20.8096
Epoch 2/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step - loss: 20.8259 - val_loss: 20.8076
Epoch 3/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step - loss: 20.8140 - val_loss: 20.8056
Epoch 4/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step - loss: 20.8259 - val_loss: 20.8038
Epoch 5/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step - loss: 20.8081 - val_loss: 20.8021
Epoch 6/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step - loss: 20.8179 - val_loss: 20.8004
Epoch 7/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 16ms/step - loss: 20.7992 - val_loss: 20.7987
Epoch 8/150
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 19ms/step - loss: 20.8057 - val_loss: 20.7970
Epoch 9/150
[1m30/30[0

<keras.src.callbacks.history.History at 0x789b519d2080>

In [124]:
encoded_segments = encoder.predict(X_test)
decoded_segments = decoder.predict(encoded_segments)

[1m102/102[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m102/102[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step


In [125]:
def reconstruct_signal(mfccs):
    return librosa.feature.inverse.mfcc_to_audio(mfccs.T, sr=SAMPLE_RATE, n_fft=N_FFT)

In [126]:
#generate audio for whole audio that is noisy, clean and predicted.
#for 69th audio
segment_indices =audios_segments[69]
X[segment_indices[0]: (segment_indices[-1] + 1)]


y_pred = decoded_segments[segment_indices[0]: (segment_indices[-1] + 1)]

### Disclaimer: only listen to this if you want your ears to blead. :-( 

In [127]:
original_signal = np.hstack([reconstruct_signal(segment.reshape(mfcc_shape)) for segment in X[segment_indices[0]: (segment_indices[-1] + 1)]])
predicted_signal = np.hstack([reconstruct_signal(segment.reshape(mfcc_shape)) for segment in y_pred])
ipd.Audio(predicted_signal, rate = SAMPLE_RATE)

In [128]:
#where as original noisy audio is this (reconstructed one)
ipd.Audio(original_signal, rate = SAMPLE_RATE)