In [1]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

import librosa
import math

%matplotlib inline

### Helper functions

In [2]:
# Calculate SNR 
def calculateSNR(st, st_h):
    st_sum = np.sum(np.abs(st))
    diff_sum = np.sum(np.abs(st-st_h))
    return 10*math.log(st_sum**2/diff_sum**2)

In [3]:
# Read training data
s, sr_s=librosa.load('data/train_clean_male.wav', sr=None)
S=librosa.stft(s, n_fft=1024, hop_length=512)
sn, sr_x=librosa.load('data/train_dirty_male.wav', sr=None)
X=librosa.stft(sn, n_fft=1024, hop_length=512)
print(S.shape)
print(X.shape)
S_abs = np.abs(S)
X_abs = np.abs(X)

(513, 2459)
(513, 2459)


In [25]:
def get_cnn1_model():
    x = tf.placeholder(tf.float32, [None, 513, 1])
    y_ = tf.placeholder(tf.float32, [None, 513])
    LEARNING_RATE = 10e-4
    BATCH_SIZE = 32

    is_training = True

    conv1 = tf.layers.conv1d(
        x,            
        filters=64,
        kernel_size=3,
        activation=tf.nn.relu)

    conv2 = tf.layers.conv1d(
        conv1,            
        filters=32,
        kernel_size=3,
        activation=tf.nn.relu)

    conv3 = tf.layers.conv1d(
        conv2,            
        filters=16,
        kernel_size=3,
        activation=tf.nn.relu)

    flat = tf.reshape(conv3, [-1, 16*507])
    l1 = tf.layers.dense(flat, units=512,  activation=tf.nn.relu)
    l2 = tf.layers.dense(l1, units=512,  activation=tf.nn.relu)
    dropout1 = tf.layers.dropout(l2, rate=0.2, training=is_training)
    l3 = tf.layers.dense(dropout1, units=512,  activation=tf.nn.relu)
    y = tf.layers.dense(l3, units=513, activation=tf.nn.relu)

    loss = tf.losses.mean_squared_error(labels=y_, predictions=y)
    train_step = tf.train.AdamOptimizer(LEARNING_RATE).minimize(loss)

    sess=tf.Session()
    tf.global_variables_initializer().run(session=sess)

    train_data = tf.data.Dataset.from_tensor_slices(tf.constant(X_abs.T.reshape(-1, 513, 1)))
    label_data = tf.data.Dataset.from_tensor_slices(tf.constant(S_abs.T))
    batch_data = tf.data.Dataset.zip((train_data, label_data)).repeat().batch(BATCH_SIZE)

    iterator = batch_data.make_one_shot_iterator()
    next_batch = iterator.get_next()
    return (x, y_, y, sess, next_batch, train_step, loss)



In [42]:
(x1, y_1, y1, sess_1, next_batch_1, train_step_1, loss_1) = get_cnn1_model()

saver = tf.train.Saver()

CHECK_POINT_FILE_NAME = "./hw2_1.ckpt"
TRAIN_STEPS = 6000

# We try to read the model we train last time.
# try:
#     saver.restore(sess_1, CHECK_POINT_FILE_NAME)
# except:
#     pass

for i in range(TRAIN_STEPS+1):    
    batch = sess_1.run(next_batch_1)
    _, loss_value = sess_1.run((train_step_1, loss_1), feed_dict={x1: batch[0], y_1: batch[1]})
    if i% 500 == 0:
        print('Training Step:' + str(i) + '  Loss =  ' + str(loss_value))
        
# save model
save_path = saver.save(sess_1, CHECK_POINT_FILE_NAME)


Training Step:0  Loss =  0.07360355
Training Step:500  Loss =  0.013019245
Training Step:1000  Loss =  0.03624888
Training Step:1500  Loss =  0.0005374244
Training Step:2000  Loss =  0.018250668
Training Step:2500  Loss =  0.016017966
Training Step:3000  Loss =  0.010411807
Training Step:3500  Loss =  0.009119164
Training Step:4000  Loss =  0.0074304985
Training Step:4500  Loss =  0.008981921
Training Step:5000  Loss =  0.0019682865
Training Step:5500  Loss =  0.11883151
Training Step:6000  Loss =  0.013231622


In [43]:
def denoise_sound_conv1(sess, x, y, input_file_name, output_file_name):
    sn, sr=librosa.load(input_file_name, sr=None)
    testX=librosa.stft(sn, n_fft=1024, hop_length=512)
    testX_abs = np.abs(testX)
    is_training = False
    S_test_abs = sess.run(y, feed_dict={x: testX_abs.T.reshape(-1, 513, 1) }).T
    is_training = True
    print(S_test_abs.shape)
    ratio = (testX / testX_abs)
    Sh = np.multiply(ratio, S_test_abs)
    librosa.output.write_wav(output_file_name, librosa.istft(Sh, hop_length=512), sr)
    

In [44]:
denoise_sound_conv1(sess_1, x1, y1, 'data/test_x_01.wav', 'recover_01_d1x.wav')
denoise_sound_conv1(sess_1, x1, y1, 'data/test_x_02.wav', 'recover_02_d1x.wav')

(513, 142)
(513, 380)


In [45]:
# Calculate SNR by training data
S_test_abs = sess_1.run(y1, feed_dict={x1: X_abs.T.reshape(-1, 513, 1) }).T
ratio = (X / X_abs)
s_t = np.multiply(ratio, S_test_abs)
print(calculateSNR(S, s_t))

11.263725845982792


### CNN 2D


In [46]:
# Transform a spectra to image like data form with dim 20x513
def imageize_spec(X):
    raw = []
    for t in range(len(X)-19):
        raw.append(X[t: t+20])
    X_abs_2d = np.array(raw).reshape(-1, 20, 513, 1)
    return X_abs_2d    

In [47]:
# Padding 
def padding_frames(X):
    testX_pad = X
    # padding
    for i in range(19):
        testX_pad = np.hstack((testX_pad, np.zeros((513, 1))))
    return testX_pad


In [48]:
# "imageize" data for 2D CNN, before that we will pad 19 empty frame at the end
S_abs_T = S_abs.T
X_abs_T = np.abs(padding_frames(X)).T
S_abs_2d = S_abs_T
X_abs_2d = imageize_spec(X_abs_T)

print(X_abs_2d.shape)
print(S_abs_T.shape)

(2459, 20, 513, 1)
(2459, 513)


In [49]:
def get_cnn2_model():
    x = tf.placeholder(tf.float32, [None, 20, 513, 1])
    y_ = tf.placeholder(tf.float32, [None, 513])
    LEARNING_RATE = 10e-4
    BATCH_SIZE = 32

    is_training = True

    #  18 * 511 * 16 
    conv1 = tf.layers.conv2d(
        x,            
        filters=16,
        kernel_size=[3, 3],
        activation=tf.nn.relu)

    # 9 * 255 * 16
    pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)

    # 7 * 253 * 32
    conv2 = tf.layers.conv2d(
        pool1,            
        filters=32,
        kernel_size=[3, 3],
        activation=tf.nn.relu)

    # 3 * 126 * 32
    pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)

    # 1 * 124 * 64
    conv3 = tf.layers.conv2d(
        pool2,            
        filters=64,
        kernel_size=[3, 3],
        activation=tf.nn.relu)

    flat = tf.reshape(conv3, [-1, 124*64])

    l1 = tf.layers.dense(flat, units=512,  activation=tf.nn.relu)
    l2 = tf.layers.dense(l1, units=512,  activation=tf.nn.relu)
    dropout1 = tf.layers.dropout(l2, rate=0.2, training=is_training)
    y = tf.layers.dense(dropout1, units=513, activation=tf.nn.relu)

    loss = tf.losses.mean_squared_error(labels=y_, predictions=y)
    train_step = tf.train.AdamOptimizer(LEARNING_RATE).minimize(loss)

    sess=tf.Session()
    tf.global_variables_initializer().run(session=sess)
    
    # Modify data for training
    train_data = tf.data.Dataset.from_tensor_slices(tf.constant(X_abs_2d))
    label_data = tf.data.Dataset.from_tensor_slices(tf.constant(S_abs.T))
    batch_data = tf.data.Dataset.zip((train_data, label_data)).repeat().batch(BATCH_SIZE)

    iterator = batch_data.make_one_shot_iterator()
    next_batch = iterator.get_next()

    return (x, y_, y, sess, next_batch, train_step, loss)



In [55]:
(x2, y_2, y2, sess_2, next_batch_2, train_step_2, loss_2) = get_cnn2_model()

CHECK_POINT_FILE_NAME = "./hw2_2.ckpt"
TRAIN_STEPS = 6000

saver = tf.train.Saver()
# # We try to read the model we train last time.
# try:
#     saver.restore(sess_2, CHECK_POINT_FILE_NAME)
# except:
#     pass

for i in range(TRAIN_STEPS+1):    
    batch = sess_2.run(next_batch_2)
    _, loss_value = sess_2.run((train_step_2, loss_2), feed_dict={x2: batch[0], y_2: batch[1]})
    if i% 500 == 0:
        print('Training Step:' + str(i) + '  Loss =  ' + str(loss_value))
        
# save model
save_path = saver.save(sess_2, CHECK_POINT_FILE_NAME)


Training Step:0  Loss =  0.07327714
Training Step:500  Loss =  0.01871779
Training Step:1000  Loss =  0.053603042
Training Step:1500  Loss =  0.00070385553
Training Step:2000  Loss =  0.016006254
Training Step:2500  Loss =  0.024314485
Training Step:3000  Loss =  0.010634547
Training Step:3500  Loss =  0.011704107
Training Step:4000  Loss =  0.015239471
Training Step:4500  Loss =  0.009782101
Training Step:5000  Loss =  0.0027571036
Training Step:5500  Loss =  0.03306281
Training Step:6000  Loss =  0.007868359


In [56]:
def denoise_sound_conv2(sess, x, y, input_file_name, output_file_name):
    sn, sr=librosa.load(input_file_name, sr=None)
    testX=librosa.stft(sn, n_fft=1024, hop_length=512)
    testX_abs = np.abs(testX)
    testX_abs_pad = np.abs(padding_frames(testX))
    X_abs_2d = imageize_spec(testX_abs_pad.T)
    is_training = False
    S_test_abs = sess.run(y, feed_dict={x: X_abs_2d }).T
    is_training = True
    print(S_test_abs.shape)
    ratio = (testX / testX_abs)
    Sh = np.multiply(ratio, S_test_abs)
    librosa.output.write_wav(output_file_name, librosa.istft(Sh, hop_length=512), sr)
    

In [57]:
denoise_sound_conv2(sess_2, x2, y2, 'data/test_x_01.wav', 'recover_01_d2x.wav')
denoise_sound_conv2(sess_2, x2, y2, 'data/test_x_02.wav', 'recover_02_d2x.wav')

(513, 142)
(513, 380)


In [58]:
# Calculate SNR by training data

X_abs_T = np.abs(padding_frames(X)).T
S_abs_2d = S_abs_T
X_abs_2d = imageize_spec(X_abs_T)

is_training = False
S_test_abs = sess_2.run(y2, feed_dict={x2: X_abs_2d }).T
is_training = True

ratio = (X / X_abs)
s_t = np.multiply(ratio, S_test_abs)
print(calculateSNR(S, s_t))


12.582371208574601


## Result

I found the training time of 1D CNN is faster than 2D CNN, since I can fit all the data into one epoch to train the model. 

In addition, even the SNR of 2D CNN can be as good as the 1D CNN, I still feel the quality of 1D CNN is much better than 2D CNN based on my subjective listening test.