In [2]:
import os
import librosa
import numpy as np
import matplotlib.pyplot as plt
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

In [3]:
#Loading training and testing files
#Computing STFT on all the files
s, sr = librosa.load('input/train_clean_male.wav', sr=None)
S = librosa.stft(s, n_fft=1024, hop_length=512)

sn, sr = librosa.load('input/train_dirty_male.wav', sr=None)
X = librosa.stft(sn, n_fft=1024, hop_length=512)

x_test, sr = librosa.load('input/test_x_01.wav', sr=None)
X_test = librosa.stft(x_test, n_fft=1024, hop_length=512)

x_test2, sr = librosa.load('input/test_x_02.wav', sr=None)
X_test2 = librosa.stft(x_test2, n_fft=1024, hop_length=512)

In [4]:
#Calculating the magnitude of all the input files
mag_S = np.abs(S)
mag_X = np.abs(X)
mag_X_test = np.abs(X_test)
mag_X_test2 = np.abs(X_test2)

#Defining model specifications
learning_rate = 0.001
act_layers = [tf.nn.relu, tf.nn.relu, tf.nn.relu, tf.nn.relu]
neurons = [513, 513, 513, 513]
num_layers = len(act_layers)

In [5]:
#Generating a deep network of n layers with specific activation functions
#and specified number of neurons in each layer
def getModel(x , act_layers , neurons):
    num_layers = len(act_layers)
    layers = [0]*num_layers
    
    for i in range(0 , len(act_layers)):        
        if i == 0:
            layers[i] = tf.layers.dense(x , units= neurons[i] , activation=act_layers[i])        
        elif i < num_layers-1:
            layers[i] = tf.layers.dense(layers[i-1] , units= neurons[i] , activation=act_layers[i])
        else:
            layers[i] = tf.layers.dense(layers[i-1] , units= neurons[i] , activation=act_layers[i])
    
    return layers

#Creating placeholders for input and output
input = tf.placeholder(tf.float32, [None, 513])
labels = tf.placeholder(tf.float32, [None, 513])

output = getModel(input, act_layers, neurons)

Instructions for updating:
Use keras.layers.Dense instead.
Instructions for updating:
Please use `layer.__call__` method instead.


In [6]:
#Defining the loss function along with its optimizer
loss = tf.reduce_mean(tf.square(output[num_layers - 1]-labels))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss)

sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

count = 0
batch_size = 100
flag = True

while flag:
    size = 0
    #Mini batching with the given batch size
    for i in range(0 , 2500, batch_size):
        size += batch_size
        if size <= 2459:
            batch_x = mag_X[:,i : size]
            batch_y = mag_S[:,i : size]
        else:
            batch_x = mag_X[:,i : 2459]
            batch_y = mag_S[:,i : 2459]
       
        
        feed_dict = {input: batch_x.T, labels: batch_y.T}
        train_step.run(feed_dict=feed_dict)
   
    if count%10 == 0:             
        loss_calc = loss.eval(feed_dict=feed_dict)
        print("Epoch %d, loss %g"%(count, loss_calc))
    
    #Once 100 epochs are completed, training is stopped
    if count >= 1000:
        flag = False  
        
    count+=1

Epoch 0, loss 0.00663616
Epoch 10, loss 0.00213117
Epoch 20, loss 0.00187513
Epoch 30, loss 0.00148363
Epoch 40, loss 0.00115989
Epoch 50, loss 0.00135642
Epoch 60, loss 0.000946548
Epoch 70, loss 0.000907108
Epoch 80, loss 0.000894378
Epoch 90, loss 0.000893587
Epoch 100, loss 0.000799187
Epoch 110, loss 0.000725083
Epoch 120, loss 0.000850075
Epoch 130, loss 0.000652957
Epoch 140, loss 0.00120002
Epoch 150, loss 0.00083351
Epoch 160, loss 0.000772839
Epoch 170, loss 0.000676201
Epoch 180, loss 0.000788015
Epoch 190, loss 0.00119815
Epoch 200, loss 0.000910781
Epoch 210, loss 0.00130056
Epoch 220, loss 0.000547659
Epoch 230, loss 0.000595519
Epoch 240, loss 0.000504148
Epoch 250, loss 0.000666248
Epoch 260, loss 0.000681327
Epoch 270, loss 0.000452497
Epoch 280, loss 0.000536667
Epoch 290, loss 0.000576298
Epoch 300, loss 0.000761346
Epoch 310, loss 0.000502072
Epoch 320, loss 0.000416536
Epoch 330, loss 0.000359763
Epoch 340, loss 0.000474319
Epoch 350, loss 0.000431665
Epoch 360, lo

In [7]:
#Calculating the output from the given input, trained model and layer number
def feedforward(input_data, dnn_output , layer_num):
    output = dnn_output[layer_num - 1].eval(feed_dict = {input : input_data})
    
    return output

#Recovering the complex values of the file from the output of the model
def recover_sound(X , mag_X , mag_output):
  temp = X / mag_X
  s_hat = temp * mag_output
  
  return s_hat

In [8]:
#Computing the output from the model for both the test files
s_hat_test1 = feedforward(mag_X_test.T , output , 4)
s_hat_test2 = feedforward(mag_X_test2.T , output , 4)

#Recovering the complex values of both the test files
s_hat1 = recover_sound(X_test , mag_X_test , s_hat_test1.T)
s_hat2 = recover_sound(X_test2 , mag_X_test2 , s_hat_test2.T)

In [9]:
#Reconstructing the test files after removing noise
import soundfile as sf
recon_sound = librosa.istft(s_hat1 , hop_length=512 , win_length=1024)
sf.write('output/test_DNN_s_01_recons.wav', recon_sound, sr)

recon_sound2 = librosa.istft(s_hat2 , hop_length=512 , win_length=1024)
sf.write('output/test_DNN_s_02_recons.wav', recon_sound2, sr)

In [10]:
#For testing purpose, feeding the model with train_dirty_male file
#From the output generated, reconstructing the audio file
s_hat_test3 = feedforward(mag_X.T , output , 4)
s_hat3 = recover_sound(X, mag_X , s_hat_test3.T)
recon_sound3 = librosa.istft(s_hat3 , hop_length=512 , win_length=1024)
size_recon_sound3 = np.shape(recon_sound3)[0]

In [11]:
#Once the audio file is generated, calculating the SNR value
s = s[: size_recon_sound3]
num = np.dot(s.T , s)
den = np.dot((s - recon_sound3).T,(s - recon_sound3))
SNR = 10 * np.log10(num/den)
print('Value of SNR : ' + str(SNR))

Value of SNR : 18.82308006286621


# CNN - 1D

In [12]:
import librosa
import numpy as np

In [13]:
#Loading training and testing files
#Computing STFT on all the files
s, sr = librosa.load('input/train_clean_male.wav', sr=None)
S = librosa.stft(s, n_fft=1024, hop_length=512)

sn, sr = librosa.load('input/train_dirty_male.wav', sr=None)
X = librosa.stft(sn, n_fft=1024, hop_length=512)

x_test, sr = librosa.load('input/test_x_01.wav', sr=None)
X_test = librosa.stft(x_test, n_fft=1024, hop_length=512)

x_test2, sr = librosa.load('input/test_x_02.wav', sr=None)
X_test2 = librosa.stft(x_test2, n_fft=1024, hop_length=512)

In [14]:
#Calculating the magnitude of all the input files
mag_S = np.abs(S)
mag_X = np.abs(X)
mag_X_test = np.abs(X_test)
mag_X_test2 = np.abs(X_test2)

#Defining model specifications
learning_rate = 0.0002
num_epochs = 1000

In [15]:
input = tf.placeholder(tf.float32, [None, 513])
labels = tf.placeholder(tf.float32, [None, 513])

In [16]:
def getModel(x):
  # Input Layer
  input_layer = tf.reshape(x, [-1, 513, 1])

  # Convolutional Layer #1
  conv1 = tf.layers.conv1d(
      inputs=input_layer,
      filters=16,
      kernel_size=16,
      padding="same",
      activation=tf.nn.relu)

  # Pooling Layer #1
  pool1 = tf.layers.max_pooling1d(inputs=conv1, pool_size=2, strides=2)

  # Convolutional Layer #2 and Pooling Layer #2
  conv2 = tf.layers.conv1d(
      inputs=pool1,
      filters=32,
      kernel_size=8,
      padding="same",
      activation=tf.nn.relu)
  
  pool2 = tf.layers.max_pooling1d(inputs=conv2, pool_size=2, strides=2)

  # Dense Layer
  pool2_flat = tf.layers.flatten(pool2)
  
  logits = tf.layers.dense(inputs=pool2_flat, units=513, activation=tf.nn.relu)
  
  return logits

In [18]:
output = getModel(input)
#Defining the loss function along with its optimizer
loss = tf.reduce_mean(tf.square(output - labels))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss)

sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

count = 0
batch_size = 100
flag = True

while flag:
    size = 0
    #Mini batching with the given batch size
    for i in range(0 , 2459, batch_size):
        size += batch_size
        if size <= 2459:
            batch_x = mag_X[:,i : size]
            batch_y = mag_S[:,i : size]
        else:
            batch_x = mag_X[:,i : 2459]
            batch_y = mag_S[:,i : 2459]
       
        feed_dict = {input: batch_x.T, labels: batch_y.T}
        train_step.run(feed_dict=feed_dict)
   
    if count%10 == 0:
        loss_calc = loss.eval(feed_dict=feed_dict)
        print("Epoch %d, loss %g"%(count, loss_calc))
    
    #Once all the epochs are completed, training is stopped
    if count >= num_epochs:
        flag = False  
        
    count+=1

Epoch 0, loss 0.0124186
Epoch 10, loss 0.00334485
Epoch 20, loss 0.00212749
Epoch 30, loss 0.00177173
Epoch 40, loss 0.00156587
Epoch 50, loss 0.00147931
Epoch 60, loss 0.00136968
Epoch 70, loss 0.0013112
Epoch 80, loss 0.00122777
Epoch 90, loss 0.00118949
Epoch 100, loss 0.00111326
Epoch 110, loss 0.00107464
Epoch 120, loss 0.00103025
Epoch 130, loss 0.00102498
Epoch 140, loss 0.000974766
Epoch 150, loss 0.000938474
Epoch 160, loss 0.000909572
Epoch 170, loss 0.000874571
Epoch 180, loss 0.00087669
Epoch 190, loss 0.00084919
Epoch 200, loss 0.000811207
Epoch 210, loss 0.000790511
Epoch 220, loss 0.000785732
Epoch 230, loss 0.000775171
Epoch 240, loss 0.000767041
Epoch 250, loss 0.00074851
Epoch 260, loss 0.000748232
Epoch 270, loss 0.000748358
Epoch 280, loss 0.000752482
Epoch 290, loss 0.000762173
Epoch 300, loss 0.000728872
Epoch 310, loss 0.000716027
Epoch 320, loss 0.000678239
Epoch 330, loss 0.000674885
Epoch 340, loss 0.000675382
Epoch 350, loss 0.000731009
Epoch 360, loss 0.0007

In [19]:
#Calculating the output from the given input, trained model and layer number
def feedforward(input_data, dnn_output):
    output = dnn_output.eval(feed_dict = {input : input_data})
    
    return output

#Recovering the complex values of the file from the output of the model
def recover_sound(X , mag_X , mag_output):
  temp = X / mag_X
  s_hat = temp * mag_output
  
  return s_hat

In [20]:
#Computing the output from the model for both the test files
s_hat_test1 = feedforward(mag_X_test.T , output)
s_hat_test2 = feedforward(mag_X_test2.T , output)

#Recovering the complex values of both the test files
s_hat1 = recover_sound(X_test , mag_X_test , s_hat_test1.T)
s_hat2 = recover_sound(X_test2 , mag_X_test2 , s_hat_test2.T)

In [21]:
#Reconstructing the test files after removing noise
recon_sound = librosa.istft(s_hat1 , hop_length=512 , win_length=1024)
sf.write('output/test_CNN1_s_01_recons_q1.wav', recon_sound, sr)

recon_sound2 = librosa.istft(s_hat2 , hop_length=512 , win_length=1024)
sf.write('output/test_CNN1_s_02_recons_q1.wav', recon_sound2, sr)

In [22]:
#For testing purpose, feeding the model with train_dirty_male file
#From the output generated, reconstructing the audio file
s_hat_test3 = feedforward(mag_X.T , output)
s_hat3 = recover_sound(X, mag_X , s_hat_test3.T)
recon_sound3 = librosa.istft(s_hat3 , hop_length=512 , win_length=1024)
size_recon_sound3 = np.shape(recon_sound3)[0]

In [23]:
#Once the audio file is generated, calculating the SNR value
s = s[: size_recon_sound3]
num = np.dot(s.T , s)
den = np.dot((s - recon_sound3).T,(s - recon_sound3))
SNR = 10 * np.log10(num/den)
print('Value of SNR : ' + str(SNR))

Value of SNR : 17.482469081878662


# CNN - 2D

In [24]:
import librosa
import numpy as np
import matplotlib.pyplot as plt
import random
import copy

In [25]:
#Loading training and testing files
#Computing STFT on all the files
s, sr = librosa.load('input/train_clean_male.wav', sr=None)
S = librosa.stft(s, n_fft=1024, hop_length=512)

sn, sr = librosa.load('input/train_dirty_male.wav', sr=None)
X = librosa.stft(sn, n_fft=1024, hop_length=512)

x_test, sr = librosa.load('input/test_x_01.wav', sr=None)
X_test = librosa.stft(x_test, n_fft=1024, hop_length=512)

x_test2, sr = librosa.load('input/test_x_02.wav', sr=None)
X_test2 = librosa.stft(x_test2, n_fft=1024, hop_length=512)

In [26]:
#Calculating the magnitude of all the input files
mag_S = np.abs(S)
mag_X = np.abs(X)
mag_X_test = np.abs(X_test)
mag_X_test2 = np.abs(X_test2)

#Defining model specifications
learning_rate = 0.0002
num_epochs = 2000
batch_size = 64
window_size = 20

In [27]:
input = tf.placeholder(tf.float32, [None, 513])
labels = tf.placeholder(tf.float32, [None, 513])

In [28]:
def getModel(x):
  # Input Layer
  input_layer = tf.reshape(x, [-1, 20, 513, 1])

  # Convolutional Layer #1
  conv1 = tf.layers.conv2d(
      inputs=input_layer,
      filters=16,
      kernel_size=[4,4],
      padding="same",
      activation=tf.nn.relu)

  # Pooling Layer #1
  pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2,2], strides=[2,2])

  # Convolutional Layer #2 and Pooling Layer #2
  conv2 = tf.layers.conv2d(
      inputs=pool1,
      filters=32,
      kernel_size=[2,2],
      padding="same",
      activation=tf.nn.relu)
  
  pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2,2], strides=[2,2])

  # Dense Layer
  pool2_flat = tf.layers.flatten(pool2)
  
  logits = tf.layers.dense(inputs=pool2_flat, units=513, activation=tf.nn.relu)
  
  return logits

In [29]:
def transform_data(x , size , window_size):
  temp = x[0 : 0 + window_size,:]
  for i in range(1 , size - window_size + 1):
      temp_mini = x[i : i + window_size,:]
      temp = np.vstack((temp , temp_mini))
      
  return temp

In [30]:
#Transforming the data in such a way that it takes 20 current and previous input frames
transformed_x = transform_data(mag_X.T , np.shape(mag_X.T)[0] , window_size)
#Keeping a copy of transformed x because we will require it later on to calculate the SNR
transformed_x1 = copy.deepcopy(transformed_x)
#Transforming the input data into 2D format
transformed_x = np.reshape(transformed_x , (2440 , 20 , 513))
#Dropping first 19 frames from y(clean wave) signal
transformed_y = (mag_S.T)[window_size - 1 : , :]

In [None]:
output = getModel(input)
#Defining the loss function along with its optimizer
loss = tf.reduce_mean(tf.square(output - labels))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss)

sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

count = 0
flag = True

while flag:
    size = 0
    #Mini batching with the given batch size
    for i in range(0 , 2440, batch_size):
        size += batch_size
        if size <= 2440:
            batch_x = transformed_x[i : size, :]
            batch_y = transformed_y[i : size, :]
        else:
            batch_x = transformed_x[i : 2440, :]
            batch_y = transformed_y[i : 2440, :]
        
        
        batch_x = batch_x.reshape((np.shape(batch_x)[0] * np.shape(batch_x)[1] , np.shape(batch_x)[2]))
        feed_dict = {input: batch_x, labels: batch_y}
        train_step.run(feed_dict=feed_dict)

    if count%200 == 0:
        loss_calc = loss.eval(feed_dict=feed_dict)
        print("Epoch %d, loss %g"%(count, loss_calc))
    
    #Once all the epochs are completed, training is stopped
    if count >= num_epochs:
        flag = False  
        
    count+=1

Epoch 0, loss 0.00608781


In [None]:
#Calculating the output from the given input, trained model and layer number
def feedforward(input_data, dnn_output):
    output = dnn_output.eval(feed_dict = {input : input_data})
    
    return output

#Recovering the complex values of the file from the output of the model
def recover_sound(X , mag_X , mag_output):
  temp = X / mag_X
  s_hat = temp * mag_output
  
  return s_hat

#Recovering the lost frames
def recover_data(x , size , value):
  temp = np.full(size , value)
  output = np.vstack((temp , x))
  
  return output

In [None]:
#Transforming the data in such a way that it be given to the model for testing
transformed_x_test = transform_data(mag_X_test.T , np.shape(mag_X_test.T)[0] , window_size)
transformed_x_test2 = transform_data(mag_X_test2.T , np.shape(mag_X_test2.T)[0] , window_size)

In [None]:
#Computing the output from the model for both the test files
s_hat_test1 = feedforward(transformed_x_test , output)
s_hat_test2 = feedforward(transformed_x_test2 , output)

#Recovering the first 19 frames that were lost
recovered_x_test1 = recover_data(s_hat_test1 , (window_size - 1 , np.shape(s_hat_test1)[1]) , 1e-15)
recovered_x_test2 = recover_data(s_hat_test2 , (window_size - 1 , np.shape(s_hat_test2)[1]) , 1e-15)

#Recovering the complex values of both the test files
s_hat1 = recover_sound(X_test , mag_X_test , recovered_x_test1.T)
s_hat2 = recover_sound(X_test2 , mag_X_test2 , recovered_x_test2.T)

In [None]:
#Reconstructing the test files after removing noise
recon_sound = librosa.istft(s_hat1 , hop_length=512 , win_length=1024)
sf.write('output/test_CNN2_s_01_recons_q2.wav', recon_sound, sr)

recon_sound = librosa.istft(s_hat2 , hop_length=512 , win_length=1024)
sf.write('output/test_CNN2_s_02_recons_q2.wav', recon_sound, sr)

In [None]:
#For testing purpose, feeding the model with train_dirty_male file
#From the output generated, reconstructing the audio file
s_hat_test3 = feedforward(transformed_x1 , output)
recovered_x1 = recover_data(s_hat_test3 , (window_size - 1 , np.shape(s_hat_test3)[1]) , 1e-15)
s_hat3 = recover_sound(X, mag_X , recovered_x1.T)
recon_sound3 = librosa.istft(s_hat3 , hop_length=512 , win_length=1024)
size_recon_sound3 = np.shape(recon_sound3)[0]

In [None]:
#Once the audio file is generated, calculating the SNR value
s = s[: size_recon_sound3]
num = np.dot(s.T , s)
den = np.dot((s - recon_sound3).T,(s - recon_sound3))
SNR = 10 * np.log10(num/den)
print('Value of SNR : ' + str(SNR))