# PART 1 - Speech Denoising Using 1D CNN

In [1]:
import librosa 
import numpy as np

# Reading the training file
s, sr=librosa.load('data/train_clean_male.wav', sr=None) 
S_input=librosa.stft(s, n_fft=1024, hop_length=512)

sn, sr=librosa.load('data/train_dirty_male.wav', sr=None) 
X_input=librosa.stft(sn, n_fft=1024, hop_length=512)

# Reading test 01 file
st_01, sr_01=librosa.load('data/test_x_01.wav', sr=None) 
X_test_01=librosa.stft(st_01, n_fft=1024, hop_length=512)

# Reading test 02 file
st_02, sr_02=librosa.load('data/test_x_02.wav', sr=None) 
X_test_02=librosa.stft(st_02, n_fft=1024, hop_length=512)

In [2]:
# Get the magnitudes of training set, and test sets
S_mag = np.abs(S_input).T
X_mag = np.abs(X_input).T
X_test_01_mag = np.abs(X_test_01).T
X_test_02_mag = np.abs(X_test_02).T

In [3]:
print("Clean training audio shape (S_mag): ",S_mag.shape)
print("Noisy training audio shape (X_mag): ",X_mag.shape)
print("First test audio shape (X_test_01_mag): ",X_test_01_mag.shape)
print("Second test audio shape (X_test_02_mag): ",X_test_02_mag.shape)

Clean training audio shape (S_mag):  (2459, 513)
Noisy training audio shape (X_mag):  (2459, 513)
First test audio shape (X_test_01_mag):  (142, 513)
Second test audio shape (X_test_02_mag):  (380, 513)


In [4]:
from numpy import newaxis

X_mag_ = X_mag[..., newaxis]
X_test_01_mag_ = X_test_01_mag[..., newaxis]
X_test_02_mag_ = X_test_02_mag[..., newaxis]

print("Clean training audio shape (S_mag): ",S_mag.shape)
print("Noisy training audio shape (X_mag): ",X_mag_.shape)
print("First test audio shape (X_test_01_mag): ",X_test_01_mag_.shape)
print("Second test audio shape (X_test_02_mag): ",X_test_02_mag_.shape)

Clean training audio shape (S_mag):  (2459, 513)
Noisy training audio shape (X_mag):  (2459, 513, 1)
First test audio shape (X_test_01_mag):  (142, 513, 1)
Second test audio shape (X_test_02_mag):  (380, 513, 1)


In [5]:
def snr(dirty, clean):
    return round(10 * np.log10(np.sum(np.square(clean))/np.sum(np.square(clean - dirty))),5)

In [6]:
import tensorflow as tf
import numpy as np

# Create placeholders - convention used - [batch_size, width, num_channels]

X = tf.placeholder(tf.float32, [None, 513, 1])
Y = tf.placeholder(tf.float32, [None, 513])

# Create filters - [width, num_channels_in, num_channels_out]
W1 = tf.get_variable("W1", [3,1,32], initializer=tf.keras.initializers.he_normal(seed=0))
W2 = tf.get_variable("W2", [3,32,32], initializer=tf.keras.initializers.he_normal(seed=0))

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [7]:
alpha = 0.00006

# First Convolution Network using stride 1 and padding as VALID (this was done to reduce size as it was taking a long time and gave better result)
Z1 = tf.nn.conv1d(X, W1, stride = 1, padding = 'VALID')
A1 = tf.nn.relu(Z1)
# Maxpooling layer with pool_size 4 and stride 1, padding is valid
P1 = tf.layers.max_pooling1d(A1, pool_size=4, strides=1, padding='VALID')

# Second Convolution Network with stride 1 and padding as VALID
Z2 = tf.nn.conv1d(P1, W2, stride = 1, padding = 'VALID')
A2 = tf.nn.relu(Z2)
# Maxpooling layer with pool_size 2 and stride 2, padding is VALID
P2 = tf.layers.max_pooling1d(A2, pool_size=2, strides=2, padding='VALID')

# Flatten the layers 
P3 = tf.contrib.layers.flatten(P2)

# Fully-connected layers with RELU activation and He initialization
Z3 = tf.contrib.layers.fully_connected(P3, 1024, activation_fn=tf.nn.relu, weights_initializer=tf.keras.initializers.he_normal())
Z4 = tf.contrib.layers.fully_connected(Z3, 513, activation_fn=tf.nn.relu, weights_initializer=tf.keras.initializers.he_normal())

# Loss function - mean squared error
cost = tf.losses.mean_squared_error(predictions = Z4, labels = Y)

# Adam optimizer
optimizer = tf.train.AdamOptimizer(learning_rate=alpha).minimize(cost)

init = tf.global_variables_initializer()

Instructions for updating:
Use keras.layers.MaxPooling1D instead.
The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
Use keras.layers.flatten instead.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


In [8]:
import tensorflow.contrib.slim as slim

print('\033[1m' + "Model Summary:\n" + '\033[0m')
model_vars = tf.trainable_variables()
slim.model_analyzer.analyze_vars(model_vars, print_info=True)

[1mModel Summary:
[0m
---------
Variables: name (type shape) [size]
---------
W1:0 (float32_ref 3x1x32) [96, bytes: 384]
W2:0 (float32_ref 3x32x32) [3072, bytes: 12288]
fully_connected/weights:0 (float32_ref 8096x1024) [8290304, bytes: 33161216]
fully_connected/biases:0 (float32_ref 1024) [1024, bytes: 4096]
fully_connected_1/weights:0 (float32_ref 1024x513) [525312, bytes: 2101248]
fully_connected_1/biases:0 (float32_ref 513) [513, bytes: 2052]
Total size of variables: 8820321
Total bytes of variables: 35281284


(8820321, 35281284)

In [9]:
batch_size = 100
epochs = 500

with tf.Session() as sess:
        
    # Run the initialization
    sess.run(init)
        
    m = len(X_mag_)
    num_batches = int(m/batch_size)
    
    print("Number of training samples : ", m)
    print("Number of Epochs : ", epochs)
    print("Number of batches : ", num_batches)
    
    epoch_loss_ = []
    
    snr_list = []
    snr_list2 = []
    
    # iterate through epochs
    for epoch in range(epochs):
        cost_ = 0.0
        
        # iterate through 
        i = 0
        j = batch_size + 1
        for batch in range(num_batches):
            X_batch = X_mag_[i:j]
            Y_batch = S_mag[i:j]
            
            i = j
            j = j + batch_size
            
            _,curr_cost = sess.run([optimizer, cost], feed_dict={X: X_batch, Y: Y_batch})
            
            cost_ = cost_ + curr_cost/num_batches
            
            y_hat_train = sess.run(Z4, feed_dict={X:X_mag_})
            y_hat_test_02 = sess.run(Z4, feed_dict={X:X_test_02_mag_})
            y_hat_test_01 = sess.run(Z4, feed_dict={X:X_test_01_mag_})
        
        snr_list.append(snr(X_mag,y_hat_train))       
        snr_list2.append(snr(X_test_02_mag,y_hat_test_02))
        

        if epoch % 50 == 0:
            print("Epoch : ", epoch, "\tCost : ", cost_)
    print("Epoch : ", epoch, "\tCost : ", cost_)

Number of training samples :  2459
Number of Epochs :  500
Number of batches :  24
Epoch :  0 	Cost :  0.15525713500877222
Epoch :  50 	Cost :  0.011535281897522509
Epoch :  100 	Cost :  0.008847963753699636
Epoch :  150 	Cost :  0.0064324035386865335
Epoch :  200 	Cost :  0.005451753987775494
Epoch :  250 	Cost :  0.004389559631817974
Epoch :  300 	Cost :  0.0031120403630969426
Epoch :  350 	Cost :  0.0027580466072928784
Epoch :  400 	Cost :  0.001949995409328646
Epoch :  450 	Cost :  0.0018564730368476983
Epoch :  499 	Cost :  0.0017147543937123069


In [10]:
print('\033[1m' + "Signal to Noise Ratios:\n" + '\033[0m')
print("SNR for the original cleaned audio sample (training) and the output of DNN for the training sample. ")
print("SNR of train_clean_male and y_hat :", snr(y_hat_train,S_mag))

[1mSignal to Noise Ratios:
[0m
SNR for the original cleaned audio sample (training) and the output of DNN for the training sample. 
SNR of train_clean_male and y_hat : 16.91668


In [13]:
# Write the outputs to files 

# Recover the (complex-valued) speech spectrogram of the test signal 
s_01 = np.multiply(np.divide(X_test_01,X_test_01_mag.T),y_hat_test_01.T)
s_02 = np.multiply(np.divide(X_test_02,X_test_02_mag.T),y_hat_test_02.T)

# Take inverse-STFT 
out_01 = librosa.istft(s_01, hop_length=512)
out_02 = librosa.istft(s_02, hop_length=512)

# Write the output
librosa.output.write_wav('output1/test_s_01_recons.wav', out_01, sr_01)
librosa.output.write_wav('output1/test_s_02_recons.wav', out_02, sr_02)