In [0]:
import librosa
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import random
import copy

In [0]:
#Loading training and testing files
#Computing STFT on all the files
s, sr = librosa.load('train_clean_male.wav', sr=None)
S = librosa.stft(s, n_fft=1024, hop_length=512)

sn, sr = librosa.load('train_dirty_male.wav', sr=None)
X = librosa.stft(sn, n_fft=1024, hop_length=512)

x_test, sr = librosa.load('test_x_01.wav', sr=None)
X_test = librosa.stft(x_test, n_fft=1024, hop_length=512)

x_test2, sr = librosa.load('test_x_02.wav', sr=None)
X_test2 = librosa.stft(x_test2, n_fft=1024, hop_length=512)

In [0]:
#Calculating the magnitude of all the input files
mag_S = np.abs(S)
mag_X = np.abs(X)
mag_X_test = np.abs(X_test)
mag_X_test2 = np.abs(X_test2)

#Defining model specifications
learning_rate = 0.0002
num_epochs = 2000
batch_size = 64
window_size = 20

In [0]:
input = tf.placeholder(tf.float32, [None, 513])
labels = tf.placeholder(tf.float32, [None, 513])

In [0]:
def getModel(x):
  # Input Layer
  input_layer = tf.reshape(x, [-1, 20, 513, 1])

  # Convolutional Layer #1
  conv1 = tf.layers.conv2d(
      inputs=input_layer,
      filters=16,
      kernel_size=[4,4],
      padding="same",
      activation=tf.nn.relu)

  # Pooling Layer #1
  pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2,2], strides=[2,2])

  # Convolutional Layer #2 and Pooling Layer #2
  conv2 = tf.layers.conv2d(
      inputs=pool1,
      filters=32,
      kernel_size=[2,2],
      padding="same",
      activation=tf.nn.relu)
  
  pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2,2], strides=[2,2])

  # Dense Layer
  pool2_flat = tf.layers.flatten(pool2)
  
  logits = tf.layers.dense(inputs=pool2_flat, units=513, activation=tf.nn.relu)
  
  return logits

In [0]:
def transform_data(x , size , window_size):
  temp = x[0 : 0 + window_size,:]
  for i in range(1 , size - window_size + 1):
      temp_mini = x[i : i + window_size,:]
      temp = np.vstack((temp , temp_mini))
      
  return temp

In [0]:
#Transforming the data in such a way that it takes 20 current and previous input frames
transformed_x = transform_data(mag_X.T , np.shape(mag_X.T)[0] , window_size)

In [0]:
#Keeping a copy of transformed x because we will require it later on to calculate the SNR
transformed_x1 = copy.deepcopy(transformed_x)

In [0]:
#Transforming the input data into 2D format
transformed_x = np.reshape(transformed_x , (2440 , 20 , 513))

In [0]:
#Dropping first 19 frames from y(clean wave) signal
transformed_y = (mag_S.T)[window_size - 1 : , :]

In [11]:
output = getModel(input)
#Defining the loss function along with its optimizer
loss = tf.reduce_mean(tf.square(output - labels))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss)

sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

count = 0
flag = True

while flag:
    size = 0
    #Mini batching with the given batch size
    for i in range(0 , 2440, batch_size):
        size += batch_size
        if size <= 2440:
            batch_x = transformed_x[i : size, :]
            batch_y = transformed_y[i : size, :]
        else:
            batch_x = transformed_x[i : 2440, :]
            batch_y = transformed_y[i : 2440, :]
        
        
        batch_x = batch_x.reshape((np.shape(batch_x)[0] * np.shape(batch_x)[1] , np.shape(batch_x)[2]))
        feed_dict = {input: batch_x, labels: batch_y}
        train_step.run(feed_dict=feed_dict)

    if count%200 == 0:
        loss_calc = loss.eval(feed_dict=feed_dict)
        print("Epoch %d, loss %g"%(count, loss_calc))
    
    #Once all the epochs are completed, training is stopped
    if count >= num_epochs:
        flag = False  
        
    count+=1

Instructions for updating:
Use keras.layers.conv2d instead.
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Use keras.layers.max_pooling2d instead.
Instructions for updating:
Use keras.layers.flatten instead.
Instructions for updating:
Use keras.layers.dense instead.
Epoch 0, loss 0.00676835
Epoch 200, loss 0.000581396
Epoch 400, loss 0.00049217
Epoch 600, loss 0.000423091
Epoch 800, loss 0.000321786
Epoch 1000, loss 0.000305332
Epoch 1200, loss 0.000297931
Epoch 1400, loss 0.000282398
Epoch 1600, loss 0.000283276
Epoch 1800, loss 0.0002636
Epoch 2000, loss 0.000257399


In [0]:
#Calculating the output from the given input, trained model and layer number
def feedforward(input_data, dnn_output):
    output = dnn_output.eval(feed_dict = {input : input_data})
    
    return output

#Recovering the complex values of the file from the output of the model
def recover_sound(X , mag_X , mag_output):
  temp = X / mag_X
  s_hat = temp * mag_output
  
  return s_hat

#Recovering the lost frames
def recover_data(x , size , value):
  temp = np.full(size , value)
  output = np.vstack((temp , x))
  
  return output

In [0]:
#Transforming the data in such a way that it be given to the model for testing
transformed_x_test = transform_data(mag_X_test.T , np.shape(mag_X_test.T)[0] , window_size)
transformed_x_test2 = transform_data(mag_X_test2.T , np.shape(mag_X_test2.T)[0] , window_size)

In [0]:
#Computing the output from the model for both the test files
s_hat_test1 = feedforward(transformed_x_test , output)
s_hat_test2 = feedforward(transformed_x_test2 , output)

#Recovering the first 19 frames that were lost
recovered_x_test1 = recover_data(s_hat_test1 , (window_size - 1 , np.shape(s_hat_test1)[1]) , 1e-15)
recovered_x_test2 = recover_data(s_hat_test2 , (window_size - 1 , np.shape(s_hat_test2)[1]) , 1e-15)

#Recovering the complex values of both the test files
s_hat1 = recover_sound(X_test , mag_X_test , recovered_x_test1.T)
s_hat2 = recover_sound(X_test2 , mag_X_test2 , recovered_x_test2.T)

In [0]:
#Reconstructing the test files after removing noise
recon_sound = librosa.istft(s_hat1 , hop_length=512 , win_length=1024)
librosa.output.write_wav('test_s_01_recons_q2.wav', recon_sound, sr)

recon_sound = librosa.istft(s_hat2 , hop_length=512 , win_length=1024)
librosa.output.write_wav('test_s_02_recons_q2.wav', recon_sound, sr)

In [0]:
#For testing purpose, feeding the model with train_dirty_male file
#From the output generated, reconstructing the audio file
s_hat_test3 = feedforward(transformed_x1 , output)
recovered_x1 = recover_data(s_hat_test3 , (window_size - 1 , np.shape(s_hat_test3)[1]) , 1e-15)
s_hat3 = recover_sound(X, mag_X , recovered_x1.T)
recon_sound3 = librosa.istft(s_hat3 , hop_length=512 , win_length=1024)
size_recon_sound3 = np.shape(recon_sound3)[0]

In [17]:
#Once the audio file is generated, calculating the SNR value
s = s[: size_recon_sound3]
num = np.dot(s.T , s)
den = np.dot((s - recon_sound3).T,(s - recon_sound3))
SNR = 10 * np.log10(num/den)
print('Value of SNR : ' + str(SNR))

Value of SNR : 14.632517099380493
