## Along with this file please include following files to avoid time being wasted 


#### rnn_model_optimum_wts for trained model weights


## I have commented the pickle load code, please uncomment and use the pickle load code instead of loading data and training model to save time

In [1]:
from __future__ import print_function
from time import time
import librosa
import numpy as np
import librosa.display
import IPython.display as ipd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from torch.autograd import Variable
import torch.autograd as autograd
from torch.nn import Parameter
import argparse
import os
import pickle
import math

In [2]:

parser = argparse.ArgumentParser(description='PyTorch Example')
parser.add_argument('--disable-cuda', action='store_true',
                    help='Disable CUDA')
parser.add_argument('--interval',metavar='N',default=100)
args = parser.parse_args(args=[])
args.cuda = not args.disable_cuda and torch.cuda.is_available()

print(args.cuda)

print(torch.cuda.is_available())

True
True


### Please load all the required training data from part2_data.p instead of running below cells

In [4]:
ip_path ='/opt/e533/timit-homework/tr'
audio_files_dirty = sorted([f for f in os.listdir(ip_path) if f.startswith('trx')])   # X files
audio_files_clean = sorted([f for f in os.listdir(ip_path) if f.startswith('trs')])   # S files
audio_files_noise = sorted([f for f in os.listdir(ip_path) if f.startswith('trn')])   # N fiels

In [4]:
X =[]
for i in range(1200):
    sn, sr=librosa.load(ip_path+'/'+audio_files_dirty[i], sr=None)
    X.append(librosa.stft(sn, n_fft=1024, hop_length=512))


In [5]:
X_mag =[]
for i in range(len(X)):
    X_mag.append(np.abs(X[i]))

In [6]:
S =[]
for i in range(1200):
    sn, sr=librosa.load(ip_path+'/'+audio_files_clean[i], sr=None)
    S.append(librosa.stft(sn, n_fft=1024, hop_length=512))

In [7]:
S_mag =[]
for i in range(len(S)):
    S_mag.append(np.abs(S[i]))

In [8]:
N =[]
for i in range(1200):
    sn, sr=librosa.load(ip_path+'/'+audio_files_noise[i], sr=None)
    N.append(librosa.stft(sn, n_fft=1024, hop_length=512))

In [9]:
N_mag =[]
for i in range(len(N)):
    N_mag.append(np.abs(N[i]))

In [12]:
#Contructing IBM
M=[[]]*1200
for i in range(len(M)):
    temp = np.zeros((S_mag[i].shape[0],S_mag[i].shape[1]))
    
    
    for j in range(S_mag[i].shape[0]):
        for k in range(S_mag[i].shape[1]):
            if S_mag[i][j][k] > N_mag[i][j][k]:
                temp[j][k] = 1
            else:
                temp[j][k] = 0
    
    M[i] =temp



### Please run below cell to load all the training data

In [28]:
# data_dict = pickle.load(open( "part2_data.p", "rb" ) )

In [29]:
# X,X_mag,S,S_mag,N,N_mag,M = data_dict["X"],data_dict["X_mag"],data_dict["S"],data_dict["S_mag"],data_dict["N"],data_dict["N_mag"],data_dict["IBM"]

### Two layer GRU RNN is implemented with 1024 hidden dimension and input dimension of 513 and dropout of 0.2 and batch_first as true

##### Since the batch first is true input and output will be BatchSize(10),Sequence(column),Input_dimension(513)

In [11]:
INPUT_DIM = 513
HIDDEN_DIM =1024


In [12]:
#Defining GRU model for speech denoising

class GRU_RNN(nn.Module):
    def __init__(self, input_dim, hidden_dim,layer,batch_first,dropout=0.20):
        super(GRU_RNN, self).__init__()
        #Assigning hidden and input dimension
        self.hidden_dim = hidden_dim
        self.input_dim =input_dim
        self.layer=layer
        self.gru = nn.GRU(input_dim, hidden_dim,layer,batch_first=batch_first,dropout=dropout)
        
        
        
        self.fc = nn.Linear(hidden_dim, input_dim)
         
            
    def init_hidden(self,batch_size):
        
        
        return Variable(torch.zeros(self.layer, batch_size, self.hidden_dim))

    def forward(self, x):
        
        self.hidden = self.init_hidden(x.shape[0])
        if torch.cuda.is_available():
            self.hidden = self.hidden.cuda()
        output, self.hidden = self.gru(x, self.hidden)
        

        output = F.sigmoid(self.fc(output))
        
        return output
    

In [13]:
model = GRU_RNN(INPUT_DIM,HIDDEN_DIM,2,batch_first=True)
if torch.cuda.is_available():
    model= model.cuda()

In [14]:
criterion = nn.MSELoss()

In [15]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [35]:
def train():
    
    torch.manual_seed(42)
    batch_size = 10
    m = len(X_mag)
    costs=[]
    for epoch in range(200):
        n_batch = int(math.ceil(m/batch_size))
        for batch_idx in range(n_batch):
            start, end = batch_idx * batch_size, (batch_idx + 1) * batch_size
            
            data = np.rollaxis(np.array(X_mag[start:end]),-1,1)
            target = np.rollaxis(np.array(M[start:end]),-1,1)
            
            
            data,target=Variable(torch.from_numpy(data)),Variable(torch.from_numpy(target).float())
            
            if torch.cuda.is_available():
                data,target = data.cuda(),target.cuda()
            
            model.zero_grad()
            
            model.hidden = model.init_hidden(batch_size)
            
            if torch.cuda.is_available():
                model.hidden = model.hidden.cuda()
            
            y_pred =model(data)
            
            

            loss = criterion(y_pred,target)

            
            loss.backward()
            optimizer.step()
            
               

            if batch_idx % args.interval  == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.\
                      format(epoch, batch_idx * len(data), len(S_mag),\
                             100. * batch_idx *len(data) / len(S_mag), loss.data[0]))
                costs.append(loss.data[0])
            

In [36]:
train()







#### In order to avoid training time please load the model from below cell to save time 

In [16]:
#Load the trained models
# model.load_state_dict(torch.load("rnn_model_optimum_wts"))

#### To avoid time from loading all files for validation please load the validation data from part2_val_data.p

In [3]:
#Getting Validation Data
ip_path ='/opt/e533/timit-homework/v'

val_audio_files_dirty = sorted([f for f in os.listdir(ip_path) if f.startswith('vx')])   # X files
val_audio_files_clean = sorted([f for f in os.listdir(ip_path) if f.startswith('vs')])   # S files
val_audio_files_noise = sorted([f for f in os.listdir(ip_path) if f.startswith('vn')])   # N fiels


In [6]:
val_X =[]
for i in range(len(val_audio_files_dirty)):
    sn, sr=librosa.load(ip_path+'/'+val_audio_files_dirty[i], sr=None)
    val_X.append(librosa.stft(sn, n_fft=1024, hop_length=512))

In [7]:
val_X_mag =[]
for i in range(len(val_X)):
    val_X_mag.append(np.abs(val_X[i]))

In [8]:
val_S =[]
for i in range(len(val_audio_files_clean)):
    sn, sr=librosa.load(ip_path+'/'+val_audio_files_clean[i], sr=None)
    val_S.append(librosa.stft(sn, n_fft=1024, hop_length=512))

In [9]:
val_S_mag =[]
for i in range(len(val_S)):
    val_S_mag.append(np.abs(val_S[i]))

In [10]:
val_N =[]
for i in range(len(val_audio_files_noise)):
    sn, sr=librosa.load(ip_path+'/'+val_audio_files_noise[i], sr=None)
    val_N.append(librosa.stft(sn, n_fft=1024, hop_length=512))

In [11]:
val_N_mag =[]
for i in range(len(val_N)):
    val_N_mag.append(np.abs(val_N[i]))

In [12]:
val_M=[[]]*len(val_S_mag)
for i in range(len(val_M)):
    temp = np.zeros((val_S_mag[i].shape[0],val_S_mag[i].shape[1]))
    
    
    for j in range(val_S_mag[i].shape[0]):
        for k in range(val_S_mag[i].shape[1]):
            if val_S_mag[i][j][k] > val_N_mag[i][j][k]:
                temp[j][k] = 1
            else:
                temp[j][k] = 0
    
    val_M[i] =temp

In [39]:
# val_data_dict = pickle.load(open( "part2_val_data.p", "rb" ) )

In [40]:
# val_X,val_X_mag,val_S,val_S_mag,val_N,val_N_mag,val_M = val_data_dict["val_X"],val_data_dict["val_X_mag"],val_data_dict["val_S"],val_data_dict["val_S_mag"],val_data_dict["val_N"],val_data_dict["val_N_mag"],val_data_dict["val_IBM"]


In [41]:
#test the model with validation data

def val_data():
    model.eval()
    torch.manual_seed(42)
    batch_size = 1
    m = len(val_X_mag)
    costs=[]
    n_batch = int(math.ceil(m/batch_size))
    signal_predictions=[]
    for batch_idx in range(n_batch):
        start, end = batch_idx * batch_size, (batch_idx + 1) * batch_size
        val_X
        data = np.rollaxis(np.array(val_X_mag[start:end]),-1,1)
        data_2 = np.rollaxis(np.array(val_X[start:end]),-1,1)
        
        data=Variable(torch.from_numpy(data))

        
        if torch.cuda.is_available():
            data = data.cuda()

        model.hidden = model.init_hidden(batch_size)

        if torch.cuda.is_available():
            model.hidden = model.hidden.cuda()

        y_pred =model(data)
        prod = y_pred.cpu().data.numpy() * data_2
        
        signal_predictions.append(prod[0].T)
    
    return signal_predictions

In [42]:
val_s_hat = val_data()

In [43]:
#Recover the speech audio
val_s_hat_recovered=[]
for i in range(len(val_s_hat)):
    
    val_s_hat_recovered.append(librosa.istft(val_s_hat[i],hop_length=512))

In [44]:
#perform inverse on clean signal
val_s_clean=[]
for i in range(len(val_S)):
    
    val_s_clean.append(librosa.istft(val_S[i],hop_length=512))
    

## The Speech to Noise ratio calculation for validation data

In [45]:
numerator=0
denominator=0
for i in range(len(val_s_clean)):
    numerator += (np.sum(np.square(val_s_clean[i])))
    denominator += np.sum(np.square(np.subtract(val_s_clean[i] , val_s_hat_recovered[i])))

print("The Speech to Noise Ratio : {}".format(10*(np.log10(np.divide(numerator,denominator)))))

The Speech to Noise Ratio : 10.307045745067427


#### To avoid time from loading all test files, load it from part2_test_data.p file

In [4]:
#Getting test data
ip_path ='/opt/e533/timit-homework/te'

test_audio_files_dirty = sorted([f for f in os.listdir(ip_path) if f.startswith('tex')])   # X files


In [5]:
test_X =[]
sr_list=[]
for i in range(len(test_audio_files_dirty)):
    sn, sr=librosa.load(ip_path+'/'+test_audio_files_dirty[i], sr=None)
    test_X.append(librosa.stft(sn, n_fft=1024, hop_length=512))
    sr_list.append(sr)

In [6]:
test_X_mag =[]
for i in range(len(test_X)):
    test_X_mag.append(np.abs(test_X[i]))

In [9]:
# test_data_dict = pickle.load(open( "part2_test_data.p", "rb" ) )

In [10]:
# test_x = test_data_dict["test_X"]
# test_x_mag = test_data_dict["test_X_mag"]
# sr_list = test_data_dict["sr_list"]

In [17]:
def test_data():
    model.eval()
    torch.manual_seed(42)
    batch_size = 1
    m = len(test_x_mag)
    costs=[]
    n_batch = int(math.ceil(m/batch_size))
    signal_predictions=[]
    for batch_idx in range(n_batch):
        start, end = batch_idx * batch_size, (batch_idx +1 ) * batch_size
        if start < len(test_x_mag):
            

            data = np.rollaxis(np.array(test_x_mag[start:end]),-1,1)
            data_2 = np.rollaxis(np.array(test_x[start:end]),-1,1)
            
            data=Variable(torch.from_numpy(data))


            if torch.cuda.is_available():
                data = data.cuda()

            model.hidden = model.init_hidden(batch_size)

            if torch.cuda.is_available():
                model.hidden = model.hidden.cuda()

            y_pred =model(data)

            prod = y_pred.cpu().data.numpy() * data_2

            signal_predictions.append(prod[0].T)
    
    return signal_predictions

In [18]:
predicted_test = test_data()

In [21]:
for i in range(len(predicted_test)):
    
    audio_spect = librosa.istft(predicted_test[i],hop_length=512)
    librosa.output.write_wav('test_audio/test_'+str(i)+'.wav', audio_spect,sr_list[i])