In [None]:
import librosa as lr
import math
import numpy as np
import os
import pandas as pd
import random
import soundfile as sf
from tqdm.autonotebook import tqdm
import torchaudio

In [None]:
#declare paths for clean speech and noise signals
speech_dataset_1 = 'clean_trainset_56spk_wav'
speech_dataset_2 = 'SPEECH_DATASET'
speech_dataset_3 = 'lombardgrid_audio'

noise_dataset_1 = 'FSDnoisy18k.audio_train'
noise_dataset_1_labels = 'FSDnoisy18k.audio_train//train.csv'

noise_dataset_2 = 'ESC-50-master'
noise_dataset_2_labels = 'ESC-50-master//ESC-50-master//meta//esc50.csv'

corrupted_speech = 'Corrupted_speach'

In [None]:
class Corupt_Signal:
    def __init__(self, sample_rate, snr_from, snr_to, add_noises_from, add_noises_to) -> None:
        self.sample_rate = sample_rate
        self.snr_from = snr_from
        self.snr_to = snr_to
        self.add_noises_from = add_noises_from
        self.add_noises_to = add_noises_to

        self.clean_speech = []
        self.noises = pd.DataFrame(columns=['File Name','Label'])
        self.labels_used = pd.DataFrame(columns=['File Name','Labels','SNR_db','Number of Noises','Corrupted File Name'])

    def _read_speech_folder_(self, folder, lombard=False):
        # scans given folder and appends clean_speech list with file names
        for (root, _, file) in os.walk(folder):
            for f in file:
                if '.wav' in f:
                    if lombard and '_l_' in f:
                        self.clean_speech.append(root+'//'+f)
                    elif lombard==False:
                        self.clean_speech.append(root+'//'+f)     
        print('Speech dataset loaded.')   

    def _read_noise_folder_(self, folder, label_path):
        # scans given folder and appends noises with file names and their labels
        noise_labels = pd.read_csv(label_path)

        for (root, _, file) in os.walk(folder):
            for f in file:
                if '.wav' in f:
                    # check label of file
                    if 'FSDnoisy18k' in folder:
                        self.noises.loc[len(self.noises.index)]=[root+'/'+f, 
                                                                 noise_labels[noise_labels['fname']==f]['label'].values[0]]
                    elif 'ESC-50' in folder:
                        self.noises.loc[len(self.noises.index)]=[root+'/'+f, 
                                                                 noise_labels[noise_labels['filename']==f]['category'].values[0]]

        print('Noise dataset loaded.')   

    def _load_record_(self, path_to_file):
        signal, sr = lr.load(path_to_file, 
                             rate=self.sample_rate)

        return signal

    def _generate_noise_(self, length_of_signal, color):
        X_white = np.fft.rfft(np.random.randn(length_of_signal));
        if color == 'white':
            psd = lambda f: 1
        elif color == 'blue':
            psd = lambda f: np.sqrt(f)
        elif color == 'violet':
            psd = lambda f: f
        elif color == 'brown':
            psd = lambda f: 1/np.where(f == 0, float('inf'), f)
        elif color == 'pink':
            psd = lambda f: 1/np.where(f == 0, float('inf'), np.sqrt(f))
        S = psd(np.fft.rfftfreq(length_of_signal))
        # Normalize S
        S = S / np.sqrt(np.mean(S**2))
        X_shaped = X_white * S;
        return np.fft.irfft(X_shaped)

    def _normalize_noise_power_and_length(self, clean_speech, noise_signal):
        random_start_location = random.choice(np.arange(len(noise_signal)))
        noise_signal = np.tile(noise_signal, (len(clean_speech)//len(noise_signal))+2)
        noise_signal = noise_signal[random_start_location:random_start_location+len(clean_speech)]

        p_speech = sum(clean_speech**2)/len(clean_speech)
        p_noise = sum(noise_signal**2)/len(noise_signal)

        alpha = math.sqrt(p_speech/(p_noise))
        return noise_signal*alpha        

    def _corrupt_signal_(self, results_directory):
        for path_to_signal in tqdm(self.clean_speech):
            # load clean_speech signal
            clean_speech, _ = lr.load(path_to_signal, sr=self.sample_rate)

            # take 1 stationary noise from color noises, normalize its power and length
            color_of_noise = random.choice(['white','blue','violet','brown','pink'])
            noise_stationary = self._generate_noise_(len(clean_speech), color_of_noise)
            noise_stationary = self._normalize_noise_power_and_length(clean_speech, noise_stationary)

            noise_combined = noise_stationary

            # take X non-stationary noises
            count_of_noises = random.choice(np.arange(self.add_noises_from, self.add_noises_to))

            # select X classes from noise list:
            selected_labels = random.choices(self.noises['Label'].unique(), 
                                             weights=self.noises['Label'].value_counts(normalize=True).values,
                                             k = count_of_noises)
            for label in selected_labels:
                selected_file_path = random.choice(self.noises[(self.noises['Label']==label)]['File Name'].values)

                # load noise
                noise_non_stationary, _ = lr.load(selected_file_path, sr=self.sample_rate)
                noise_non_stationary = self._normalize_noise_power_and_length(clean_speech, noise_non_stationary)

                # add noise to combined noises
                noise_combined += noise_non_stationary

            # select SNR level
            SNR_db = random.choice(range(self.snr_from, self.snr_to))

            SNR = math.exp(SNR_db)/10

            p_s = sum(clean_speech**2)/len(clean_speech)
            p_n = sum(noise_combined**2)/len(noise_combined)

            alpha = math.sqrt(p_s/(p_n*SNR))

            selected_labels.append(color_of_noise)
            self.labels_used.loc[len(self.labels_used.index)]=[path_to_signal,
                                                               selected_labels,
                                                               SNR_db,
                                                               len(selected_labels),
                                                               f'{self.labels_used.shape[0]-1}.wav']

            corrupted_signal = clean_speech + alpha*noise_combined

            sf.write(f'{results_directory}//{self.labels_used.shape[0]-1}.wav',
                     corrupted_signal,
                     self.sample_rate)

        # save labels file also
        self.labels_used.to_csv(f'{results_directory}//labels.csv', index=False)

In [None]:
# declare class for corruption
corrupt_speech = Corupt_Signal(48000,0,10,2,5)

# load speech files
corrupt_speech._read_speech_folder_(speech_dataset_1)
corrupt_speech._read_speech_folder_(speech_dataset_2)
corrupt_speech._read_speech_folder_(speech_dataset_3, True)

# load noise files
corrupt_speech._read_noise_folder_(noise_dataset_1, noise_dataset_1_labels)

In [None]:
# execute corruption
corrupt_speech._corrupt_signal_(corrupted_speech)