## **Problem 4: Speech Denoising using RNN (LSTM)**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import torchvision
from torch import nn, optim
import torch.nn.functional as F
import librosa
import torch.utils.data as utils
from torch.utils.data import DataLoader, Dataset
import glob
import pickle
from collections import defaultdict
import IPython.display as ipd

Calculating stft for each audio file every time was too time consuming, Hence I created 3 list, each for source, noise and mix type of audio and dumped them into pickle so next time I run the code, i just have to load them from their respective pickle files.

In [None]:
def getFilenames(directory, file_type):
    if directory == 'tr' or directory == 'v':
        print("getting file from "+directory+" directory")
        source_name, noise_name, mix_name = [], [], []
        for filepath in glob.iglob(r'/content/drive/My Drive/Colab Notebooks/timit-homework/'+directory+'/*.wav'):
            if file_type[2] in filepath[-11:]:
                mix_name.append(filepath)
            if file_type[0] in filepath[-11:]:
                source_name.append(filepath)
            if file_type[1] in filepath[-11:]:
                noise_name.append(filepath)
        source_name = sorted(source_name)
        noise_name = sorted(noise_name)
        mix_name = sorted(mix_name)
        return source_name, noise_name, mix_name
    else:
        print("getting file from "+directory+" directory")
        test = []
        for filepath in glob.iglob(r'/content/drive/My Drive/Colab Notebooks/timit-homework/'+directory+'/*.wav'):
            test.append(filepath)
        test = sorted(test)
        return test

In [None]:
def makeFixLength(arr):
    fix_length = 175
    if arr.shape[1] > fix_length:
        # if length greater than fixed length then truncate 
        arr = arr[:, :fix_length]
    else:
        # if length less than fixed length then pad with zero 
        arr = torch.Tensor(arr)
        arr = F.pad(arr, (0, fix_length-arr.shape[1], 0, 0), 'constant', 0)
        arr = arr.numpy()
    return arr

In [None]:
def getSTFT(dir, source_n=None, noise_n=None, mix_n=None,):
    dicty = defaultdict(tuple)
    if dir == 'tr' or dir == 'v':
        print("getting stft from "+dir+" directory")
        dicty["source"] = ([], [])
        dicty["noise"] = ([], [])
        dicty["mix"] = ([], [])
        dicty["ibm"] = ([])
        dicty["sr"] = 0
        for i in range(len(source_n)):
            #-----Source
            s, sr=librosa.load(source_n[i], sr=None)
            s_stft = makeFixLength(np.abs(librosa.stft(s, n_fft=1024, hop_length=512)))
            dicty["source"][0].append(s)
            dicty["source"][1].append(s_stft)

            #-----Noise
            n, sr=librosa.load(noise_n[i], sr=None) 
            n_stft = makeFixLength(np.abs(librosa.stft(n, n_fft=1024, hop_length=512)))
            dicty["noise"][0].append(n)
            dicty["noise"][1].append(n_stft)

            #-----Mix
            mx, sr=librosa.load(mix_n[i], sr=None)
            mx_stft = makeFixLength(np.abs(librosa.stft(mx, n_fft=1024, hop_length=512)))
            dicty["mix"][0].append(mx)
            dicty["mix"][1].append(mx_stft)

            #-----IBM
            dicty["ibm"].append(1 * (s_stft > n_stft))

            dicty["sr"] = sr
        return dicty
    else:
        #pending
        print("getting stft from "+dir+" directory")
        dicty["test"] = ([], [])
        SR_test = 0
        for i in range(len(source_n)):
            #-----Source
            s, sr=librosa.load(source_n[i], sr=None)
            s_stft = makeFixLength(np.abs(librosa.stft(s, n_fft=1024, hop_length=512)))
            dicty["test"][0].append(s)
            dicty["test"][1].append(s_stft)

            dicty["sr"] = sr
        return dicty

In [None]:
# train data
source_name, noise_name, mix_name = getFilenames('tr', ['trs', 'trn', 'trx'])
print(len(source_name), len(noise_name), len(mix_name), "\n")

# dumping the dictionary into pickle file for future use
dict_train = getSTFT('tr', source_name, noise_name, mix_name)
pickle.dump(dict_train, open("/content/drive/My Drive/Colab Notebooks/HW3/dict_train.pkl", "wb"))
print("...train data saved!")

In [None]:
# validation data
val_source_name, val_noise_name, val_mix_name = getFilenames('v', ['vs', 'vn', 'vx'])
print(len(val_source_name), len(val_noise_name), len(val_mix_name), "\n")

# calculating stft for each data sample
dict_val = getSTFT('v', val_source_name, val_noise_name, val_mix_name)
pickle.dump(dict_val, open("/content/drive/My Drive/Colab Notebooks/HW3/dict_val.pkl", "wb"))
print("...val data saved!")

In [None]:
# test data
test_name = getFilenames('te', ['tex'])
print(len(test_name), "\n")

# dumping the dictionary into pickle file for future use
dict_test = getSTFT('te', test_name)
pickle.dump(dict_test, open("/content/drive/My Drive/Colab Notebooks/HW3/dict_test.pkl", "wb"))
print("...test data saved!")

In [None]:
# extracting train data from the pickle
dict_train = pickle.load(open("/content/drive/My Drive/Colab Notebooks/HW3/dict_train.pkl", "rb"))

In [None]:
# extracting validation data from the pickle when needed
dict_val = pickle.load(open("/content/drive/My Drive/Colab Notebooks/HW3/dict_val.pkl", "rb" ))

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

train_data = utils.TensorDataset(torch.Tensor(dict_train["mix"][1]).permute((0, 2, 1)).to(device), 
                                 torch.Tensor(dict_train["ibm"]).permute((0, 2, 1)).to(device))
train_loader = DataLoader(train_data, batch_size=10, shuffle=True)

del dict_train

# checking the dimension of the data
a, b = next(iter(train_loader))
print(a.shape, b.shape)

torch.Size([10, 175, 513]) torch.Size([10, 175, 513])


In [None]:
val_data = utils.TensorDataset(torch.Tensor(dict_val["mix"][1]).permute((0, 2, 1)).to(device), 
                               torch.Tensor(dict_val["ibm"]).permute((0, 2, 1)).to(device))
val_loader = DataLoader(val_data, batch_size=1, shuffle=False)

# checking the dimension of the data
a, b = next(iter(val_loader))
print(a.shape, b.shape)

torch.Size([1, 175, 513]) torch.Size([1, 175, 513])


### **Training the Model**

In [None]:
# defined the network
class Denoiser(nn.Module):
    def __init__(self):
        super(Denoiser, self).__init__()
        self.lstm1  = nn.LSTM(input_size=513, hidden_size=513, num_layers=2, batch_first=True)
        self.fcn = nn.Linear(in_features=513, out_features=513)
    
    def forward(self, x):
        x, _ = self.lstm1(x)
        fcn_output = torch.sigmoid(self.fcn(x))
        return fcn_output

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = Denoiser()
net.to(device)
err_func = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)
print(net)

Denoiser(
  (lstm1): LSTM(513, 513, num_layers=2, batch_first=True)
  (fcn): Linear(in_features=513, out_features=513, bias=True)
)


In [None]:
# utils
def calculateSNR(s_org, s_bar):
    # if s_bar.shape[0] > s_org.shape[0]:
    #     s_bar = s_bar[:s_org.shape[0]]
    # else:
    #     s_org = s_org[:s_bar.shape[0]]
    denom = np.sum(np.square(s_org - s_bar))
    return 10 * (np.log10(np.sum(np.square(s_org))/denom))

def findMask(mask, mixed):
    mixed = librosa.stft(mixed, n_fft=1024, hop_length=512)
    if mixed.shape[1] > mask.shape[1]:
        mixed = mixed[:, :mask.shape[1]]
    mask = torch.squeeze(mask).cpu().numpy().T
    
    return np.multiply(mask, mixed)

In [None]:
!pip install soundfile
import soundfile



In [None]:
epochs = 100
avg_snr = 0
for e in range(1, epochs+1):
    net.train()
    train_loss_epoch = 0
    val_loss = 0
    val_snr = 0
    for row, target in train_loader:
        # set the gradients to zero as PyTorch automatically accumulates gradients
        optimizer.zero_grad()
        outs = net(row)

        # calculate loss
        loss = err_func(outs, target)
        
        # Calculate gradients
        loss.backward()
        
        # Propagate gradients back
        optimizer.step()
        
        train_loss_epoch += loss.item()
    else:
        net.eval()
        with torch.no_grad():
            for idx, val_d in enumerate(val_loader):
                data, target = val_d

                # truncate if require
                r = librosa.stft(dict_val["mix"][0][idx], n_fft=1024, hop_length=512)
                if data.shape[1] > r.shape[1]:
                    data = data[:, :r.shape[1], :]
                    target = target[:, :r.shape[1], :]
                
                mask = net(data)
                val_loss += err_func(mask, target).item()

                sv = findMask(mask, dict_val["mix"][0][idx])
                st = librosa.core.istft(sv, hop_length=512, length=dict_val["mix"][0][idx].shape[0])
                
                snr = calculateSNR(dict_val["source"][0][idx], st)
                val_snr += snr
                if idx >= 0 and idx <= 3 and e == epochs:
                    soundfile.write("val_result_"+str(idx)+".wav", st, dict_val["sr"])
        if e == epochs:
            avg_snr = val_snr/1200
        if e%10 == 0:
            print("EPOCH:{eps}   |   train_loss:{tls}   |   val_loss:{vls}   |   SNR:{sn}".
                  format(eps = e, tls=round(train_loss_epoch/len(train_loader),5), vls=round(val_loss/len(val_loader),5), sn=round(val_snr/1200, 3)))

print("average SNR on dataset", avg_snr)

# Save the model
torch.save(net.state_dict(), 'denoiser.pt')
print("model saved..!")

EPOCH:10   |   train_loss:0.07786   |   val_loss:0.13524   |   SNR:8.349
EPOCH:20   |   train_loss:0.06748   |   val_loss:0.12606   |   SNR:9.13
EPOCH:30   |   train_loss:0.06041   |   val_loss:0.12251   |   SNR:10.336
EPOCH:40   |   train_loss:0.05523   |   val_loss:0.11986   |   SNR:11.048
EPOCH:50   |   train_loss:0.05165   |   val_loss:0.11919   |   SNR:11.414
EPOCH:60   |   train_loss:0.05038   |   val_loss:0.12049   |   SNR:11.612
EPOCH:70   |   train_loss:0.04644   |   val_loss:0.11908   |   SNR:11.867
EPOCH:80   |   train_loss:0.04516   |   val_loss:0.11949   |   SNR:11.918
EPOCH:90   |   train_loss:0.04406   |   val_loss:0.12042   |   SNR:11.923
EPOCH:100   |   train_loss:0.04332   |   val_loss:0.12189   |   SNR:11.98
average SNR on dataset 11.979796292756994
model saved..!


### **Validated Examples**

Below I have presented the clean (groundtruth source), mixed and their corresponding denoised audio obtained after training the model for the final epoch and validating on the validation set

#### **clean audio**

In [7]:
ipd.Audio('examples/val_clean.wav')

#### **Mixed Aaudio**

In [3]:
ipd.Audio('examples/val_mix_0.wav')

In [4]:
ipd.Audio('examples/val_mix_1.wav')

In [5]:
ipd.Audio('examples/val_mix_2.wav')

#### **denoised audio**

In [8]:
ipd.Audio('examples/val_result_0.wav')

In [9]:
ipd.Audio('examples/val_result_1.wav')

In [10]:
ipd.Audio('examples/val_result_2.wav')

### **Testing the Model**

In [None]:
# load the trained model
# net = Denoiser()
# net.to(device)
# net.load_state_dict(torch.load('denoiser.pt'))

In [None]:
del dict_val, train_loader, val_data, train_data

# extracting validation data from the pickle when needed
dict_test = pickle.load(open("/content/drive/My Drive/Colab Notebooks/HW3/dict_test.pkl", "rb" ))

In [None]:
test_data = utils.TensorDataset(torch.Tensor(dict_test["test"][1]).permute((0, 2, 1)).to(device), 
                               torch.Tensor(dict_test["test"][1]).permute((0, 2, 1)).to(device))
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)

# checking the dimension of the data
a, b = next(iter(test_loader))
print(a.shape, b.shape)

torch.Size([1, 175, 513]) torch.Size([1, 175, 513])


In [None]:
net.eval()
test_loss = 0
denoised_audio = []
with torch.no_grad():
    for idx, test_d in enumerate(test_loader):
        data, target = test_d

        # truncate if require
        r = librosa.stft(dict_test["test"][0][idx], n_fft=1024, hop_length=512)
        if data.shape[1] > r.shape[1]:
            data = data[:, :r.shape[1], :]
            target = target[:, :r.shape[1], :]

        mask = net(data)
        test_loss += err_func(mask, target).item()

        sv = findMask(mask, dict_test["test"][0][idx])
        st = librosa.core.istft(sv, hop_length=512, length=dict_test["test"][0][idx].shape[0])

        denoised_audio.append(("/content/drive/My Drive/Colab Notebooks/HW3/results/result_"+str(idx)+".wav", st))
        
print("finally done!")

finally done!


In [None]:
# write the files
for i in range(len(denoised_audio)):
    soundfile.write(denoised_audio[i][0], denoised_audio[i][1], dict_test["sr"])

### **Test Examples**

Below I have provided the examples of the output obtained after running the model on the test audio.

#### **Original mixed audio**

In [11]:
ipd.Audio('examples/test_org_0.wav')

In [12]:
ipd.Audio('examples/test_org_200.wav')

In [13]:
ipd.Audio('examples/test_org_399.wav')

#### **Denoised Audio**

In [14]:
ipd.Audio('examples/result_0.wav')

In [15]:
ipd.Audio('examples/result_200.wav')

In [16]:
ipd.Audio('examples/result_399.wav')