In [0]:
!pip install soundfile
import librosa, librosa.display
import numpy as np
import soundfile as sf
import os
import matplotlib.pyplot as plt
import math, random
import torch
from IPython.display import Audio
from sklearn.model_selection import train_test_split




In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Using audio dataset from http://www.openslr.org/12/ in .flac format

In [0]:
def flac_to_wav(path='.'):
    """Convert .flac to .wav files from given directory
    :param path: path to directory with .flac audio files
    """
    cnt = 0
    for folder, subs, files in os.walk(path):
      for file in files:
        if (file.endswith(".flac")):
          file_flac = os.path.join(folder, file)
          file_wav = "Audio/{0}.wav".format(cnt)
          os.system("ffmpeg -i {0} {1}".format(file_flac, file_wav))
          cnt += 1

Methods to load and construct dataset 

In [0]:
def read_audio(path, limit=300):
    """Read .wav files from given directory
    :param path: path to directory with audio
    :return audio_files: array of arrays with .wav files content  
    """
    audio_files = []
    for folder, subs, files in os.walk(path):
      for file in files:
        if (file.endswith(".wav") or file.endswith(".flac")):
          filename = os.path.join(folder, file)
          print('load ' + file)
          audio_files.append(read_signal(filename)[0])
        if len(audio_files) == limit:
          return audio_files
    return audio_files


def read_signal(filename, normalization='max'):
    """Imports a .wav file as a scaled float numpy array
    :param filename: audio's filename
    :return signal: imported signal from .wav
    :return sr: sampling rate
    :return norm: normalization coefficient
    """
    signal, sr = sf.read(filename)
    if normalization == 'max':
      norm = max(signal)
    else:
      norm = 1
    signal /= norm
    return signal, sr, norm


def decompose(fft):
    """Decomposes fft of signal to magnitude spectrum nd phase spectrum
    :param fft: fft of signal
    :return magnitude: magnitude spectrum
    :return phase: phase spectrum
    """
    return np.abs(fft), np.angle(fft)

In [0]:
def split_signal(signal, len=100):
    """Split signal array into parts of given length
    :param signal: signal array
    :param len: slice length
    :return signal: imported signal from .wav
    :return sr: sampling rate
    :return norm: normalization coefficient
    """
    n = signal.shape[0]
    k = n // len
    sliced_signal = []
    for i in range(k):
      sliced_signal.append(signal[i * len : (i + 1) * len])
    if (k * len < n):
      sliced_signal.append(np.concatenate((signal[k * len : n], np.zeros(len - (n - k * len)))))
    return sliced_signal


def chunks(dataset, batch_size=100):
    """Split dataset into batches of given size
    :param dataset: dataset
    :param batch_size: batch size
    :return batches: batches of dataset
    """
    batches = []
    for i in range(0, len(dataset), batch_size):
      if (i + batch_size <= len(dataset)):
        batches.append(dataset[i:i+batch_size])
    return np.array(batches)


def fit_size(signal, size):
    """Generate signal of given size by concatenating given signal
    :param signal: input signal
    :param size: desired signal size
    :return pumped_signal: pumped signal of given size
    """
    pumped_signal = signal
    while len(pumped_signal) < size:
        pumped_signal = np.concatenate((pumped_signal, pumped_signal))
    return pumped_signal[:size]


def add_noise(signal, noise, alpha=0.02):
    """Add noise to signal
    :param signal: input signal
    :param noise: noise signal
    :return noisy_signal: input signal with noise
    """
    noise = fit_size(noise, len(signal))
    return signal + alpha * noise


def get_dataset(signals, noise, input_size=100):
    """Generate dataset of noised and clean signals
    :param signal_dir: directory with signal .wav files
    :param noise_dir: directory with noise .wav files
    :return: dataset of noised signals and clear signals
    """
    X = []
    y = []
    for signal in signals:
      noised_signal = add_noise(signal, noise)
      X += split_signal(noised_signal, input_size)
      y += split_signal(signal, input_size)
    return np.array(X), np.array(y)

Load .wav files from goolge drive(login: audio.denoiser.2020@gmail.com password: audiodenoiser2020)

In [0]:
signals = read_audio('/content/drive/My Drive/Dataset/Audio')
noises = read_audio('/content/drive/My Drive/Dataset/Noise')

load 3247.flac
load 5378.flac
load 5080.flac
load 6586.flac
load 4857.flac
load 1977.flac
load 6396.flac
load 6175.flac
load 1939.flac
load 3372.flac
load 6044.flac
load 4911.flac
load 2500.flac
load 3938.flac
load 5909.flac
load 20357.flac
load 3091.flac
load 5222.flac
load 4940.flac
load 3902.flac
load 32601.flac
load 6106.flac
load 3793.flac
load 1864.flac
load 3824.flac
load 6783.flac
load 8160.flac
load 5406.flac
load 4270.flac
load 6413.flac
load 4575.flac
load 2119.flac
load 1793.flac
load 6576.flac
load 6464.flac
load 3186.flac
load 2753.flac
load 1685.flac
load 4749.flac
load 3484.flac
load 3392.flac
load 3479.flac
load 20353.flac
load 4347.flac
load 2052.flac
load 4323.flac
load 2999.flac
load 3031.flac
load 5245.flac
load 6781.flac
load 6714.flac
load 2308.flac
load 1628.flac
load 6516.flac
load 6625.flac
load 6240.flac
load 1565.flac
load 1551.flac
load 3905.flac
load 2921.flac
load 5380.flac
load 4229.flac
load 2612.flac
load 4309.flac
load 3105.flac
load 6150.flac
load 20

Construct dataset. Devide audio file in parts of length input_size(padded
 with zerros)

In [0]:
input_size = 100
hidden_size = 200
output_size = input_size
num_layers=3
X, y = get_dataset(signals, noises[2], input_size)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

In [0]:
print(X_train[0])
Audio(data=add_noise(X_train[0], noises[2]), rate=16000)

[ 0.04380851  0.08724702  0.08823102  0.08813779  0.07725086  0.06365188
  0.04934047  0.05699407  0.0480747   0.04766438  0.0472423   0.03954584
  0.03519108  0.02956715  0.0024358   0.00258921 -0.01449381 -0.01009173
  0.0034893  -0.00640552 -0.02074039 -0.01497453  0.00604671  0.01713406
  0.04605592  0.09331486  0.11833436  0.11741598  0.14650509  0.16281883
  0.16335709  0.14858911  0.15903951  0.16631162  0.18763828  0.2095473
  0.20915731  0.22222662  0.20868728  0.20358382  0.20376971  0.19923813
  0.19831076  0.19743675  0.21146219  0.19935043  0.20863095  0.20328759
  0.17280412  0.16346785  0.14468996  0.1515388   0.13626836  0.15034251
  0.15783561  0.14897555  0.14915138  0.13508954  0.1212719   0.11172391
  0.09250867  0.07208503  0.0840268   0.08586326  0.06569793  0.07291703
  0.07589009  0.06791107  0.07655555  0.09486221  0.08440664  0.06386795
  0.05764054  0.030483    0.00071428 -0.02814261 -0.03999167 -0.05781292
 -0.07154075 -0.07275546 -0.07480051 -0.10810599 -0.

Devide dataset into batches and convert to tensor

In [0]:
batch_size=50
X_train_torch = torch.from_numpy(chunks(X_train, batch_size)).float()
print(X_train_torch.shape)
y_train_torch = torch.from_numpy(chunks(y_train, batch_size)).float()
X_test_torch = torch.from_numpy(chunks(X_test, batch_size)).float()
y_test_torch = torch.from_numpy(chunks(y_test, batch_size)).float()

torch.Size([8138, 50, 100])


# RNN

In [0]:
import torch
import torch.nn as nn
from torch.autograd import Variable

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(RNN, self).__init__()
        self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)
        self.act = nn.ReLU()
    def forward(self, x):
        pred, hidden = self.rnn(x)
        pred = self.act(self.linear(pred))
        return pred

model = RNN(input_size, hidden_size, output_size, num_layers)
print(model)

RNN(
  (rnn): RNN(100, 200, num_layers=3, batch_first=True)
  (linear): Linear(in_features=200, out_features=100, bias=True)
  (act): ReLU()
)


In [0]:
learning_rate=1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = nn.MSELoss()
epoch = 100
for t in range(epoch):
    y_pred = model(X_train_torch)
    optimizer.zero_grad()
    loss = loss_fn(y_pred, y_train_torch)
    print(t, loss.data)
    loss.backward()
    optimizer.step()

0 tensor(0.0187)
1 tensor(0.0202)
2 tensor(0.0336)
3 tensor(0.0821)


KeyboardInterrupt: ignored

Methods to visualise data

In [0]:
def compare_spectrogram(filter_type, original_signal, filtered_signal, sr=16000):
    """Plots the spectrogram of the audio and the filtered audio signals in a subplot
    :param filter_type: type of used filter
    :param original_signal: original audio signal
    :param filtered_signal: filtered audio signal
    :param sr: sampling rate
    """
    plt.subplot(1, 2, 1)
    plt.title('Original')
    plt.specgram(x=original_signal, Fs=sr)
    plt.axis(ymin=10, ymax=10000)
    plt.subplot(1, 2, 2)
    plt.title(filter_type)
    plt.specgram(x=filtered_signal, Fs=sr)
    plt.axis(ymin=10, ymax=10000)
    plt.show()


def compare_magnitude(filter_type, original_audio, filtered_audio, sr=16000):
    """Plots magnitude of the audio and the filtered audio signals in a subplot
    :param filter_type: type of used filter
    :param original_audio: original audio signal
    :param filtered_audio: filtered audio signal
    :param sr: sampling rate
    """
    plt.grid(True)
    plt.subplot(2, 1, 1)
    plt.title('Original')
    librosa.display.waveplot(original_audio, sr=sr)
    plt.subplot(2, 1, 2)
    plt.title('Filtered by ' + filter_type)
    librosa.display.waveplot(filtered_audio, sr=sr)
    plt.show()

    plt.plot(original_audio, label='Input', color='r')
    plt.plot(filtered_audio, label='Output', color='b')
    plt.show()


In [0]:
print(chunks(split_signal(add_noise(signals[1], noises[2]), input_size)))
X_test_torch = torch.from_numpy(np.array([split_signal(add_noise(signals[1], noises[2]), input_size)])).float()
y_test_torch = torch.from_numpy(np.array([split_signal(signals[1], input_size)])).float()
y_pred = model(X_test_torch)
compare_magnitude("RNN", X_test_torch.data.numpy().reshape(-1), y_pred.data.numpy().reshape(-1))

In [0]:
Audio(data=y_test_torch.reshape(1, -1)[:100], rate=16000)

In [0]:

Audio(data=y_pred.data.reshape(1, -1)[:100], rate=16000)

In [0]:
Audio(data=X_test_torch.reshape(1, -1)[:100], rate=16000)