In [1]:
import librosa as librosa
import numpy as np
import pandas as pd
from scipy import signal
import random
import time
import torch.nn as nn

from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm
import sounddevice as sd
import torch.optim as optim

import plotly.express as px

from torch.utils.data import DataLoader
import torch.utils.data as data
from torchvision import transforms, datasets
import torch
import googledrivedownloader as gdd

In [2]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
print("Current device: {}".format(device))

Current device: cuda


# Data

In [3]:
filenames_pd = pd.read_csv('./../data/cv-corpus-13.0-2023-03-09/uk/train.tsv', sep='\t')
filenames_pd = filenames_pd[['path', 'sentence']]
filenames_pd = filenames_pd.where(filenames_pd['sentence'].str.len() > 40).dropna()
filenames_pd

Unnamed: 0,path,sentence
0,common_voice_uk_23934033.mp3,Комонник ускочив до лісу й напинив коня біля них.
3,common_voice_uk_23934080.mp3,— Про Сікура речеш? — перепитав жупан Сватоплу...
8,common_voice_uk_23934090.mp3,"— Видів єси, що на середині? — гукнув Великий ..."
9,common_voice_uk_23934091.mp3,— Ти — князь? Та Брунгільда тебе ратищем із сі...
10,common_voice_uk_23934093.mp3,"— Хай відають, коли пирували в князя нашого! —..."
...,...,...
16855,common_voice_uk_36922749.mp3,"По очах твоїх читаю твою долю, — люто засичала..."
16856,common_voice_uk_36922750.mp3,Тому я підніс його одною рукою і відкинув від ...
16857,common_voice_uk_36922751.mp3,Того ж він такий сумний та задуманий завжди.
16858,common_voice_uk_36922754.mp3,Там була робота по душі та були приятелі!


In [4]:
noise_names = pd.read_csv('./../data/noise/labels.csv')
noise_names = noise_names[['fname', 'label']]
noise_names['label'].unique()

array(['Walk_or_footsteps', 'Coin_(dropping)', 'Dishes_and_pots_and_pans',
       'Bass_guitar', 'Crash_cymbal', 'Slam', 'Clapping', 'Rain', 'Wind',
       'Piano', 'Engine', 'Glass', 'Fire', 'Acoustic_guitar', 'Fart',
       'Fireworks', 'Hi-hat', 'Squeak', 'Tearing', 'Writing'],
      dtype=object)

In [5]:
noise_names = noise_names.where(noise_names['label'].isin(['Walk_or_footsteps', 'Coin_(dropping)', 'Slam', 'Clapping', 'Wind', 'Glass', 'Fart', 'Squeak', 'Tearing'])).dropna()
noise_names

Unnamed: 0,fname,label
0,274679.wav,Walk_or_footsteps
1,365220.wav,Walk_or_footsteps
2,233458.wav,Walk_or_footsteps
3,370931.wav,Walk_or_footsteps
4,137172.wav,Walk_or_footsteps
...,...,...
941,208946.wav,Squeak
942,235535.wav,Squeak
944,62469.wav,Tearing
945,273450.wav,Tearing


In [6]:
def get_custom_sinusoid(freq, amplitude, length):
    t = np.linspace(0, length, length * 10 ** 3, endpoint=False)
    x = amplitude * np.sin(2 * np.pi * freq * t) + amplitude * np.sin(2 * np.pi * (freq * 2) * t) * np.exp(-0.1 * t)

    x = MinMaxScaler(feature_range=(-1, 1)).fit_transform(x.reshape(-1, 1)).flatten()
    return t, x

In [7]:
def add_noise_to_sinusoid(noise_amplitude, length):
    t = np.linspace(0, length, length * 10 ** 3, endpoint=False)
    noise = np.random.normal(0, noise_amplitude, len(t))
    for i in range(len(t)):
        noise[i] += (random.randint(1, 200) // 200) * (-1) ** (random.randint(1,2))
    return t, noise

In [8]:
def get_noisy_sinusoid_array(freq, amplitude, noise_amplitude, length, amount=10):
    clear_signals = []
    noisy_signals = []
    _, x = get_custom_sinusoid(freq, amplitude, length + 100)
    for i in tqdm(range(amount)):
        shift = random.randint(0, 99)
        clear_signals.append(x[shift:shift+length*10**3])
        noisy_signals.append(x[shift:shift+length*10**3] + add_noise_to_sinusoid(noise_amplitude, length)[1])

    return clear_signals, noisy_signals

In [9]:
class SinusoidDataset(data.Dataset):
    def __init__(self, freq, amplitude, noise_amplitude, length, amount):
        self.input, self.output = get_noisy_sinusoid_array(freq, amplitude, noise_amplitude, length, amount)

    def __getitem__(self, index):
        input = self.input[index]
        output = self.output[index]

        return input, output

    def __len__(self):
        return len(self.input)

In [27]:
class SpeechAndRealNoiseDataset(data.Dataset):
    def __init__(self, speech_pd, noise_pd, max_audio_length=1*10**5):
        self.speech_pd = speech_pd
        self.noise_pd = noise_pd
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.max_audio_length = max_audio_length

    def loadAudio(self, path):
        sound = librosa.load(path, sr=16000, mono=True)[0]
        sound = self.scaler.fit_transform(sound.reshape(-1, 1)).flatten()
        return sound

    @staticmethod
    def addNoise(origin, noise):
        if len(noise) > len(origin):
            return None
        else:
            random_coefficient = random.randint(20, 60) / 100
            max_shift = len(origin) - len(noise)
            for i in range(random.randint(1, max_shift//len(noise) + 1)):
                shift = random.randint(0, max_shift)
                origin[shift:shift + len(noise)] += noise * random_coefficient

            # add some white noise with 35% probability
            if random.randint(1, 100) <= 35:
                origin += np.random.normal(0, (random.randrange(1, 50) / 1000), len(origin))

            return origin

    def addPadding(self, sound):
        if len(sound) < self.max_audio_length:
            return np.pad(sound, (0, self.max_audio_length - len(sound)), 'constant', constant_values=(0, 0))
        else:
            return sound[:self.max_audio_length]

    def __getitem__(self, index):
        speech_path = self.speech_pd.iloc[index]['path']
        speech = self.loadAudio('./../data/cv-corpus-13.0-2023-03-09/uk/clips/{0}'.format(speech_path))
        noisy_speech = None
        while noisy_speech is None:
            noise_path = self.noise_pd.iloc[random.randint(0, len(self.noise_pd) - 1)]['fname']
            noise = self.loadAudio('./../data/noise/FSDnoisy18k/{0}'.format(noise_path))
            noisy_speech = self.addNoise(speech, noise)
            index += 1
        speech = self.addPadding(speech)
        noisy_speech = self.addPadding(noisy_speech)
        return speech, noisy_speech

    def __len__(self):
        return self.speech_pd.shape[0]

In [28]:
batch_size = 4
dataset = SinusoidDataset(10, 1, 0.1, 10**2, amount=100)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

100%|██████████| 100/100 [00:13<00:00,  7.32it/s]


In [29]:
dataset = SpeechAndRealNoiseDataset(filenames_pd, noise_names, max_audio_length=10**5)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=False)

In [30]:
class SoundDenoiser(nn.Module):
    def __init__(self,):
        super(SoundDenoiser, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv1d(1, 256, kernel_size=17, stride=4, padding=2),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Conv1d(256, 128, kernel_size=7, stride=3, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Conv1d(128, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Conv1d(64, 64, kernel_size=2, stride=2, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Conv1d(64, 64, kernel_size=2, stride=2, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Conv1d(64, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU()
        )

        self.linear1 = nn.Linear(522, 64)
        self.linear2 = nn.Linear(64, 522)
        self.ReLU = nn.ReLU()

        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(16, 64, kernel_size=3, stride=2, output_padding=0, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.ConvTranspose1d(64, 64, kernel_size=2, stride=2, output_padding=0, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.ConvTranspose1d(64, 64, kernel_size=2, stride=2, output_padding=0, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.ConvTranspose1d(64, 128, kernel_size=3, stride=2, output_padding=0, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.ConvTranspose1d(128, 256, kernel_size=7, stride=3, output_padding=1, padding=1),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.ConvTranspose1d(256, 1, kernel_size=17, stride=4, padding=0, output_padding=3),
            nn.Tanh()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        encoded = self.linear1(encoded)
        encoded = self.ReLU(encoded)
        encoded = self.linear2(encoded)
        encoded = self.ReLU(encoded)
        decoded = self.decoder(encoded)
        return decoded

In [31]:
input_size = 128
learning_rate = 0.01
num_epochs = 100

model = SoundDenoiser().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [32]:
"""dataset = SpeechAndRealNoiseDataset(filenames_pd, noise_names, max_audio_length=10**5)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=False)"""

'dataset = SpeechAndRealNoiseDataset(filenames_pd, noise_names, max_audio_length=10**5)\ndataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=False)'

In [33]:
clear, noisy = dataset[9]

t = np.linspace(0, len(clear), len(clear) * 10 ** 3, endpoint=False)
fig = px.line(x=t[:1000], y=noisy[:1000], title='Noisy signal')
fig.add_scatter(x=t[:1000], y=clear[:1000], mode='lines', name='clear signal')
fig.show()

In [34]:
def train_autoencoder(model, dataloader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in tqdm(range(num_epochs+1)):
        running_loss = 0.0
        for i, signals in enumerate(dataloader):
            optimizer.zero_grad()
            
            # Розпакування звукових сигналів
            clean_signals, noisy_signals = signals
            
            # Перетворення на тензори PyTorch
            clean_signals = clean_signals.unsqueeze(1).float().to(device)
            noisy_signals = noisy_signals.unsqueeze(1).float().to(device)
            
            # Пропуск зашумлених сигналів через автоенкодер
            outputs = model(noisy_signals)
            
            # Обчислення функції втрати
            loss = criterion(outputs, clean_signals)
            
            # Зворотнє поширення помилки та оптимізація
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()

            if i > 100:
                break
        
        # Виведення середньої втрати на епоху
        epoch_loss = running_loss / len(dataloader)
        if epoch % 10 == 0:
            print('Epoch {}/{} Loss: {:.4f}'.format(epoch, num_epochs, epoch_loss))

In [35]:
torch.cuda.empty_cache()

In [36]:
train_autoencoder(model, dataloader, criterion, optimizer, 100)

  1%|          | 1/101 [00:09<16:00,  9.61s/it]

Epoch 0/100 Loss: 0.0045


 11%|█         | 11/101 [01:43<14:08,  9.43s/it]

Epoch 10/100 Loss: 0.0037


 21%|██        | 21/101 [03:18<12:53,  9.66s/it]

Epoch 20/100 Loss: 0.0037


 31%|███       | 31/101 [04:53<11:01,  9.45s/it]

Epoch 30/100 Loss: 0.0039


 41%|████      | 41/101 [06:33<09:54,  9.91s/it]

Epoch 40/100 Loss: 0.0039


 50%|█████     | 51/101 [08:11<08:18,  9.98s/it]

Epoch 50/100 Loss: 0.0037


 60%|██████    | 61/101 [09:49<06:24,  9.62s/it]

Epoch 60/100 Loss: 0.0037


 70%|███████   | 71/101 [11:25<04:46,  9.54s/it]

Epoch 70/100 Loss: 0.0037


 80%|████████  | 81/101 [13:01<03:10,  9.51s/it]

Epoch 80/100 Loss: 0.0037


 90%|█████████ | 91/101 [14:36<01:35,  9.55s/it]

Epoch 90/100 Loss: 0.0038


100%|██████████| 101/101 [16:12<00:00,  9.63s/it]

Epoch 100/100 Loss: 0.0037





In [37]:
clear_signal, noisy_signal = dataset[0]
noisy_signal_tensor = torch.from_numpy(noisy_signal).unsqueeze(0).unsqueeze(1).float().to(device)
filtered_signal = model(noisy_signal_tensor).squeeze().detach().cpu().numpy()

In [38]:
t = np.linspace(0, input_size, input_size * 10 ** 3, endpoint=False)
fig = px.line(x=t[:1000], y=noisy_signal[:1000], title='Noisy signal')
fig.add_scatter(x=t[:1000], y=filtered_signal[:1000], mode='lines', name='filtered signal')
fig.add_scatter(x=t[:1000], y=clear_signal[:1000], mode='lines', name='clear signal')
fig.show()

## Застосування до аудіо

In [39]:
def read_from_wav(filename):
    signal, sample_rate = librosa.load(filename, sr=16000)
    signal = signal.flatten()
    return signal, sample_rate

In [40]:
scaler = MinMaxScaler(feature_range=(-1, 1))
clear_sound, sample_rate = read_from_wav('../data/test-shrek.wav')
clear_sound = MinMaxScaler().fit_transform(clear_sound.reshape(-1, 1)).flatten()

noisy_sound = clear_sound + np.random.normal(0, 0.01, len(clear_sound))
t = np.linspace(0, len(noisy_sound) / sample_rate, len(noisy_sound), endpoint=False)


In [41]:
noisy_signal_tensor = torch.from_numpy(noisy_sound).unsqueeze(0).unsqueeze(1).float().to(device)
filtered_signal = model(noisy_signal_tensor).squeeze().detach().cpu().numpy()

RuntimeError: mat1 and mat2 shapes cannot be multiplied (16x8586 and 522x64)

In [None]:
t = np.linspace(0, input_size, input_size * 10 ** 3, endpoint=False)
fig = px.line(x=t[4000:5000], y=noisy_sound[4000:5000], title='Noisy signal')
fig.add_scatter(x=t[4000:5000], y=filtered_signal[4000:5000], mode='lines', name='filtered signal')
fig.add_scatter(x=t[4000:5000], y=clear_sound[4000:5000], mode='lines', name='clear signal')
fig.show()

In [None]:
sd.play(clear_sound, sample_rate)

In [None]:
sd.play(noisy_sound, sample_rate)

In [None]:
sd.play(filtered_signal, sample_rate)

In [None]:
sd.stop()