<a href="https://colab.research.google.com/github/saurabhIU/Deep-Learning/blob/master/Speech_Denoising_Using_1D_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Data Preparation

In [1]:

import librosa
import librosa.display as disp
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import time;
print(tf.__version__)

1.13.1


## Load Data (Training Clean and Training Noisy data)

In [0]:
s_clean, sr_clean=librosa.load('train_clean_male.wav',sr=None)
S_clean=librosa.stft(s_clean, n_fft=1024, hop_length=512)
S_clean_abs = np.abs(S_clean).T
Y_train = S_clean_abs


s_dirty, sr_noisy=librosa.load('train_dirty_male.wav',sr=None)
S_dirty=librosa.stft(s_dirty, n_fft=1024, hop_length=512)

S_dirty_abs = np.abs(S_dirty).T

X_train = S_dirty_abs.reshape(-1,1,513,1)


In [3]:
print(f'Shape of clean signal is {Y_train.shape} and shape of noisy signal is {X_train.shape}')

Shape of clean signal is (2459, 513) and shape of noisy signal is (2459, 1, 513, 1)


## Helper Functions

In [0]:
def get_input_batch(batch_size, data, labels):
    '''
    Return a total of `batch_size` random samples and labels. 
    '''
    index = np.random.choice(data.shape[0], batch_size)
    x = [data[i] for i in (index)]
    y = [labels[i] for i in (index)]
    return np.asarray(x), np.asarray(y)
  
def flatten_layer(layer):
  
    # Get the shape of the input layer.
    layer_shape = layer.get_shape()

    
    feature_num = layer_shape[1:4].num_elements()
    
    # Flatten
    layer_flat = tf.reshape(layer, [-1, feature_num])

    # Return flattened layer and the number of features.
    return layer_flat, feature_num

## Neural Network Configuration

In [5]:
EPOCHS = 1000

kernel_size1 = 8
kernel_size2 = 16
kernel_size3 = 32

Batch_Size = 200

kernel_num1 = 16
kernel_num2 = 32
kernel_num3 = 64

fc1_size = 2000
fc2_size = 513


X = tf.placeholder("float", [None,1,513,1])
Y = tf.placeholder("float", [None,513])


filters = {
              'wl1': tf.get_variable('W1', shape=(1,kernel_size1,1,kernel_num1), initializer=tf.initializers.he_normal()),
              'wl2': tf.get_variable('W2', shape=(1,kernel_size2,kernel_num1,kernel_num2), initializer=tf.initializers.he_normal()),
              'wl3': tf.get_variable('W3', shape=(1,kernel_size3,kernel_num2,kernel_num3), initializer=tf.initializers.he_normal()),
              'wfc1': tf.get_variable('W4', shape=(4160,fc1_size), initializer=tf.initializers.he_normal()),
              'wfc2': tf.get_variable('W5', shape=(fc1_size,fc2_size), initializer=tf.initializers.he_normal()),
    
              
          }

biases = {
              'bl1': tf.get_variable('B1', shape=(kernel_num1), initializer=tf.initializers.he_normal()),
              'bl2': tf.get_variable('B2', shape=(kernel_num2), initializer=tf.initializers.he_normal()),
              'bl3': tf.get_variable('B3', shape=(kernel_num3), initializer=tf.initializers.he_normal()),
              'bl4': tf.get_variable('B4', shape=(fc1_size), initializer=tf.initializers.he_normal()),
              'bl5': tf.get_variable('B5', shape=(fc2_size), initializer=tf.initializers.he_normal()),
    
          }


Instructions for updating:
Colocations handled automatically by placer.


## Function to build one convolutional layer with max pool

In [0]:
#Convolution Layer 


def build_convolutional_layer(input,filter_num,bias_num,kernel_num):
    
    
    conv_layer = tf.nn.conv2d(input,filters[filter_num],strides=[1, 1, 1, 1],padding='SAME')
    
    conv_layer += biases[bias_num]
    
    conv_layer = tf.nn.max_pool (value=conv_layer,
                                  ksize=[1, 1, 2, 1],
                                  strides=[1, 1, 2, 1],
                                  padding='SAME')
    
    conv_layer = tf.nn.relu(conv_layer)
    
    return conv_layer
      

## Build CNN with three convolutional layer with maxpool and two fully connected layer

In [0]:
def conv_nn(input):
  
  
  layer1 = build_convolutional_layer(input,'wl1','bl1',kernel_num1)
  print(layer1)
  
  layer2 = build_convolutional_layer(layer1,'wl2','bl2',kernel_num2)
  print(layer2)
  
  layer3 = build_convolutional_layer(layer2,'wl3','bl3',kernel_num2)
  print(layer3)
  
  
  # Fully connected layer
  layer_flat, fc_feature_num  = flatten_layer(layer3)
  
  print(fc_feature_num)
  fc1 = tf.matmul(layer_flat,filters['wfc1']) + biases['bl4']
  
  fc1 = tf.nn.relu(fc1)
  
  fc2 = tf.matmul(fc1,filters['wfc2']) + biases['bl5']
  
  return fc2
  

## Define Cost function and optimizer

In [8]:
logits = conv_nn(X)
cost = tf.losses.mean_squared_error(Y,logits)
optimizer = tf.train.AdamOptimizer().minimize(cost)

Tensor("Relu:0", shape=(?, 1, 257, 16), dtype=float32)
Tensor("Relu_1:0", shape=(?, 1, 129, 32), dtype=float32)
Tensor("Relu_2:0", shape=(?, 1, 65, 64), dtype=float32)
4160
Instructions for updating:
Use tf.cast instead.


## Train Convolutional Neural Network

In [9]:
sess =  tf.Session() 
   
sess.run(tf.global_variables_initializer())
  
tic = time.time()
for i in range(EPOCHS):
    x_batch, y_batch = get_input_batch(Batch_Size,X_train,Y_train)
    c,_ = sess.run([cost, optimizer],feed_dict={X:x_batch, Y: y_batch})
    if i % 100 == 0:
      print(f'Epoch: {i},training loss:{c}')
toc = time.time()
print(f'Time taken for training is {toc-tic}')

Epoch: 0,training loss:1.0670050382614136
Epoch: 100,training loss:0.021850746124982834
Epoch: 200,training loss:0.007043549790978432
Epoch: 300,training loss:0.0040027000941336155
Epoch: 400,training loss:0.0032290476374328136
Epoch: 500,training loss:0.00238299323245883
Epoch: 600,training loss:0.0019764250610023737
Epoch: 700,training loss:0.001569435466080904
Epoch: 800,training loss:0.0015364603605121374
Epoch: 900,training loss:0.0015846877358853817
Time taken for training is 25.523308753967285


## Denoise train noisy signal by feeding it through trained network

In [10]:
prediction = sess.run(logits,feed_dict={X: X_train, Y: Y_train})
print(prediction.shape)

(2459, 513)


## Recover speech spectrogram

In [0]:

prediction_complex = np.multiply(np.divide(S_dirty,S_dirty_abs.T),prediction.T)

## Recover Time domain signal

In [0]:
prediction_timedomain = librosa.istft(prediction_complex,hop_length=512, win_length=1024)

In [13]:
print(f'Size of predicted signal is {prediction_timedomain.size} and ground truth signal is {s_clean.size}')


Size of predicted signal is 1258496 and ground truth signal is 1258899


## Trim down ground truth to match the size of denoised signal to calculate SNR

In [14]:
s_clean = s_clean[0:prediction_timedomain.size]
s_clean.shape

(1258496,)

## Calculate SNR

In [15]:
SNR = 10*np.log10(np.dot(s_clean.T,s_clean)/np.dot((s_clean - prediction_timedomain).T,(s_clean - prediction_timedomain)))
SNR

17.903960943222046

## Load test noisy signal test_01_x.wav and feed its magnitude spectra to trained network

In [0]:
test1, sr_test=librosa.load('test_x_01.wav',sr=None)
Test1=librosa.stft(test1, n_fft=1024, hop_length=512,window="hann")
Test1_absolute = np.abs(Test1).T

X_Test = Test1_absolute.reshape(-1,1,513,1)

In [17]:
test_prediction = sess.run(logits,feed_dict={X: X_Test})
test_prediction.shape

(142, 513)

## Recover complex valued speech spectrogram of cleaned test signal

In [0]:
test1_complex = np.multiply(np.divide(Test1,Test1_absolute.T),test_prediction.T)

## Recover time domain speech signal by applying inverse-STFT

In [0]:
test1_timedomain = librosa.istft(test1_complex,hop_length=512, win_length=1024,window="hann")

## Write out the cleaned speech signal

In [0]:
librosa.output.write_wav('cleaned_test1_conv1D.wav', test1_timedomain, sr_test)

## Load test noisy signal test_02_x.wav and feed its magnitude spectra to trained network


In [0]:
test2, sr_test=librosa.load('test_x_02.wav',sr=None)
Test2=librosa.stft(test2, n_fft=1024, hop_length=512,window="hann")
Test2_absolute = np.abs(Test2).T

X_Test_2 = Test2_absolute.reshape(-1,1,513,1)

In [0]:
test2_prediction = sess.run(logits,feed_dict={X: X_Test_2})

## Recover complex valued speech spectrogram of cleaned test signal

In [0]:
test2_complex = np.multiply(np.divide(Test2,Test2_absolute.T),test2_prediction.T)

## Recover time domain speech signal by applying inverse-STFT

In [0]:
test2_timedomain = librosa.istft(test2_complex,hop_length=512, win_length=1024,window="hann")

## Write out the cleaned speech signal

In [0]:
librosa.output.write_wav('cleaned_test2_conv1D.wav', test2_timedomain, sr_test)