# PART 2 - Speech Denoising Using 2D CNN 

In [1]:
import librosa 
import numpy as np

# Reading the training file
s, sr=librosa.load('data/train_clean_male.wav', sr=None) 
S_input=librosa.stft(s, n_fft=1024, hop_length=512)

sn, sr=librosa.load('data/train_dirty_male.wav', sr=None) 
X_input=librosa.stft(sn, n_fft=1024, hop_length=512)

# Reading test 01 file
st_01, sr_01=librosa.load('data/test_x_01.wav', sr=None) 
X_test_01=librosa.stft(st_01, n_fft=1024, hop_length=512)

# Reading test 02 file
st_02, sr_02=librosa.load('data/test_x_02.wav', sr=None) 
X_test_02=librosa.stft(st_02, n_fft=1024, hop_length=512)

In [2]:
# Get the magnitudes of training set, and test sets
S_mag = np.abs(S_input).T
X_mag = np.abs(X_input).T
X_test_01_mag = np.abs(X_test_01).T
X_test_02_mag = np.abs(X_test_02).T

In [3]:
print("Clean training audio shape (S_mag): ",S_mag.shape)
print("Noisy training audio shape (X_mag): ",X_mag.shape)
print("First test audio shape (X_test_01_mag): ",X_test_01_mag.shape)
print("Second test audio shape (X_test_02_mag): ",X_test_02_mag.shape)

Clean training audio shape (S_mag):  (2459, 513)
Noisy training audio shape (X_mag):  (2459, 513)
First test audio shape (X_test_01_mag):  (142, 513)
Second test audio shape (X_test_02_mag):  (380, 513)


In [4]:
def snr(dirty, clean):
    return round(10 * np.log10(np.sum(np.square(clean))/np.sum(np.square(clean - dirty))),5)

In [5]:
from numpy import newaxis

# Create data for 2d CNN by taking frames of 20x513 from the data 
X_mag_train = []
for i in range(len(X_mag)-19):
    X_mag_train.append(X_mag[i:i+20,:])
    
X_mag_train = np.array(X_mag_train)[..., newaxis]

X_test1 = []
for i in range(len(X_test_01_mag)-19):
    X_test1.append(X_test_01_mag[i:i+20,:])
    
X_test1 = np.array(X_test1)[..., newaxis]

X_test2 = []
for i in range(len(X_test_02_mag)-19):
    X_test2.append(X_test_02_mag[i:i+20,:])
    
X_test2 = np.array(X_test2)[..., newaxis]

# For cleaned up signal and test signals, cliping the sound at the beggining - removing first 19 values
S_mag_train = S_mag[19:]

print("Noisy training audio shape for 2d CNN (X_mag_train): ",X_mag_train.shape)
print("First test audio shape for 2d CNN(X_test1): ",X_test1.shape)
print("Second test audio shape for 2d CNN(X_test2): ",X_test2.shape)

Noisy training audio shape for 2d CNN (X_mag_train):  (2440, 20, 513, 1)
First test audio shape for 2d CNN(X_test1):  (123, 20, 513, 1)
Second test audio shape for 2d CNN(X_test2):  (361, 20, 513, 1)


In [6]:
import tensorflow as tf
import numpy as np

# Create placeholders - convention used - [batch_size, height, width, num_channels]

X = tf.placeholder(tf.float32, [None, 20, 513, 1])
Y = tf.placeholder(tf.float32, [None, 513])

# Create filters - [width, height, num_channels_in, num_channels_out]
W1 = tf.get_variable("W1", [2, 2, 1, 1], initializer=tf.keras.initializers.he_normal(seed=0))
W2 = tf.get_variable("W2", [4, 4, 1, 1], initializer=tf.keras.initializers.he_normal(seed=0))

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [20]:
# Learning Rate
alpha = 0.0005

# First Convolution Network using stride of 1 and padding - same
Z1 = tf.nn.conv2d(X, W1, strides=[1, 1, 1, 1], padding = 'SAME')
Z1 = tf.layers.batch_normalization(Z1)
A1 = tf.nn.relu(Z1)
# Maxpooling layer with kernel_size as 2x2 and stride of 1,1
P1 = tf.nn.max_pool(A1, ksize = [1, 2, 2, 1], strides = [1, 1, 1, 1], padding='SAME')

# Second Convolution Network using stride of 1 and padding - same
Z2 = tf.nn.conv2d(P1, W2, strides=[1, 1, 1, 1], padding = 'SAME')
Z2 = tf.layers.batch_normalization(Z2)
A2 = tf.nn.relu(Z2)
# Maxpooling layer with kernel_size as 2x2 and stride of 1,1
P2 = tf.nn.max_pool(A2, ksize = [1, 2, 2, 1], strides = [1, 1, 1, 1], padding='SAME')

# Flatten the layers 
P3 = tf.contrib.layers.flatten(P2)

# Fully-connected layers
Z3 = tf.contrib.layers.fully_connected(P3, 1024, activation_fn=tf.nn.relu, weights_initializer=tf.keras.initializers.he_normal())
#Z3 = tf.nn.dropout(Z3, rate=0.3)
Z4 = tf.contrib.layers.fully_connected(Z3, 513, activation_fn=tf.nn.relu, weights_initializer=tf.keras.initializers.he_normal())

# Loss function - mean squared error
cost = tf.losses.mean_squared_error(predictions = Z4, labels = Y)

# Adam optimizer
optimizer = tf.train.AdamOptimizer(learning_rate=alpha).minimize(cost)

init = tf.global_variables_initializer()



In [29]:
import tensorflow.contrib.slim as slim

print('\033[1m' + "Model Summary:\n" + '\033[0m')
model_vars = tf.trainable_variables()
slim.model_analyzer.analyze_vars(model_vars, print_info=True)

[1mModel Summary:
[0m
---------
Variables: name (type shape) [size]
---------
W1:0 (float32_ref 2x2x1x1) [4, bytes: 16]
W2:0 (float32_ref 4x4x1x1) [16, bytes: 64]
batch_normalization/gamma:0 (float32_ref 1) [1, bytes: 4]
batch_normalization/beta:0 (float32_ref 1) [1, bytes: 4]
batch_normalization_1/gamma:0 (float32_ref 1) [1, bytes: 4]
batch_normalization_1/beta:0 (float32_ref 1) [1, bytes: 4]
fully_connected/weights:0 (float32_ref 10260x1024) [10506240, bytes: 42024960]
fully_connected/biases:0 (float32_ref 1024) [1024, bytes: 4096]
fully_connected_1/weights:0 (float32_ref 1024x513) [525312, bytes: 2101248]
fully_connected_1/biases:0 (float32_ref 513) [513, bytes: 2052]
batch_normalization_2/gamma:0 (float32_ref 1) [1, bytes: 4]
batch_normalization_2/beta:0 (float32_ref 1) [1, bytes: 4]
batch_normalization_3/gamma:0 (float32_ref 1) [1, bytes: 4]
batch_normalization_3/beta:0 (float32_ref 1) [1, bytes: 4]
fully_connected_2/weights:0 (float32_ref 10260x1024) [10506240, bytes: 42024960]

(33099299, 132397196)

In [25]:
batch_size = 488
epochs = 500

with tf.Session() as sess:
        
    # Run the initialization
    sess.run(init)
        
    m = len(X_mag_train)
    num_batches = int(m/batch_size)
    
    print("Number of training samples : ", m)
    print("Number of Epochs : ", epochs)
    print("Number of batches : ", num_batches)
    
    epoch_loss_ = []
    
    snr_list = []
    snr_list2 = []
    
    # iterate through epochs
    for epoch in range(epochs):
        cost_ = 0.0
        
        # iterate through 
        i = 0
        j = batch_size + 1
        for batch in range(num_batches):
            X_batch = X_mag_train[i:j]
            Y_batch = S_mag_train[i:j]
            
            i = j
            j = j + batch_size
            
            _,curr_cost = sess.run([optimizer, cost], feed_dict={X: X_batch, Y: Y_batch})
            
            cost_ = cost_ + curr_cost/num_batches
            
            y_hat_train = sess.run(Z4, feed_dict={X:X_mag_train})
            y_hat_test_01 = sess.run(Z4, feed_dict={X:X_test1})
            y_hat_test_02 = sess.run(Z4, feed_dict={X:X_test2})
        
        snr_list.append(snr(X_mag[19:,:],y_hat_train))       
        snr_list2.append(snr(X_test_02_mag[19:,:],y_hat_test_02))
        

        if epoch % 100 == 0:
            print("Epoch : ", epoch, "\tCost : ", cost_)
    print("Epoch : ", epoch, "\tCost : ", cost_)

Number of training samples :  2440
Number of Epochs :  500
Number of batches :  5
Epoch :  0 	Cost :  0.08732897266745568
Epoch :  100 	Cost :  0.005619087023660541
Epoch :  200 	Cost :  0.004199303640052676
Epoch :  300 	Cost :  0.0033429136499762532
Epoch :  400 	Cost :  0.002752184332348406
Epoch :  499 	Cost :  0.0024205617606639866


In [26]:
print('\033[1m' + "Signal to Noise Ratios:\n" + '\033[0m')
print("SNR for the original cleaned audio sample (training) and the output of DNN for the training sample. ")
print("SNR of train_clean_male and y_hat :", snr(y_hat_train,S_mag_train))

[1mSignal to Noise Ratios:
[0m
SNR for the original cleaned audio sample (training) and the output of DNN for the training sample. 
SNR of train_clean_male and y_hat : 16.23022


In [27]:
# Write the outputs to files 

# Recover the (complex-valued) speech spectrogram of the test signal 
s_01 = np.multiply(np.divide(X_test_01[:,19:],X_test_01_mag[19:,:].T),y_hat_test_01.T)
s_02 = np.multiply(np.divide(X_test_02[:,19:],X_test_02_mag[19:,:].T),y_hat_test_02.T)

# Take inverse-STFT 
out_01 = librosa.istft(s_01, hop_length=512)
out_02 = librosa.istft(s_02, hop_length=512)

# Write the output
librosa.output.write_wav('output2/test_s_01_recons.wav', out_01, sr_01)
librosa.output.write_wav('output2/test_s_02_recons.wav', out_02, sr_02)