### Imports

In [2]:
import pickle
import tensorflow as tf
import random
from itertools import combinations
import numpy as np
from tensorflow.keras.layers import Input, Flatten, Dense, Dropout, Lambda, Conv2D, MaxPool2D
from tensorflow.keras.models import Sequential
from tensorflow.keras import Model
import tensorflow.keras.backend as K
import librosa
from tensorflow.keras.utils import Sequence
from tensorflow.keras.activations import sigmoid
from tensorflow.keras.metrics import BinaryAccuracy

### Load Training and Testing Data

In [3]:
with open('hw4_trs.pkl', 'rb') as pickle_file:
    train_data = pickle.load(pickle_file)
print(train_data.shape)

(500, 16180)


In [4]:
with open('hw4_tes.pkl', 'rb') as pickle_file:
    test_data = pickle.load(pickle_file)
print(test_data.shape)

(200, 22631)


### Create Positive Pairs
Generates all combinations of pairs and randomly selects L pairs from it.

In [5]:
def create_pos_pairs(speaker, L=45):
    batch = []
    all_pairs = list(combinations(range(10), 2))
    l_pairs = random.sample(all_pairs, L)
    for a,b in l_pairs:
        stft_1 = np.abs(librosa.stft(speaker[a], n_fft=1024, hop_length=512)).T
        stft_2 = np.abs(librosa.stft(speaker[b], n_fft=1024, hop_length=512)).T
        batch.append([stft_1, stft_2])
    return batch

### Create Negative Pairs
The Main Speaker's utternaces lie between start and end. The other 49 speakers' utterances like before start and after end.  
Randomly sample L utterances from the main speaker's utterances with replacement as L is greater than the number of utterances by the main speaker, i.e., L>10.  
Whereas, we randomly sample L utterances from the other 49 speakers' utterances without replacement as L is less than their total utterances L<490.
Using the samples from the above 2 steps we create Negative Pairs

In [6]:
def create_neg_pairs(pos_sp_num, train_data, L=45):
    batch = []
    
    start = pos_sp_num*10
    end = start + 10     
    pos_spk = train_data[start: end]
    neg_spk = train_data[:start] + train_data[end:]
    neg_sample = random.sample(neg_spk, L)
        
    for l in range(L):
        pos = random.choice(pos_spk)
        
        stft_pos = np.abs(librosa.stft(pos, n_fft=1024, hop_length=512)).T
        stft_neg = np.abs(librosa.stft(neg_sample[l], n_fft=1024, hop_length=512)).T
        batch.append([stft_pos, stft_neg])

    return batch

### Create Training Pairs
Creates 2250 pairs of negative and positive examples each, i.e, 4500 total pairs.
Final Dimensions - Pairs (4500) x Number of Inputs (2) x Shape of Spectrogram (45x513) x number of channels (1)

In [7]:
mini_batches = []
train_data = list(train_data)
for i in range(50):
    pos_batch = create_pos_pairs(train_data[i:i+10])
    neg_batch = create_neg_pairs(i, train_data)
    mini_batches += pos_batch + neg_batch
#     mini_batches.append(mini_batch)

In [8]:
mini_np = np.stack(mini_batches)

In [9]:
mini_np.shape

(4500, 2, 32, 513)

In [10]:
mini_np = np.pad(mini_np, ((0,0), (0,0), (0, 13), (0,0)), mode = 'constant', constant_values = 0) 
print(mini_np.shape)

(4500, 2, 45, 513)


In [11]:
mini_np_exp = np.expand_dims(mini_np, axis = 4)

In [12]:
mini_np_exp.shape

(4500, 2, 45, 513, 1)

### Create True Predictions based on the negative/positive pairs

In [13]:
Y = [] 
for i in range(50):
    y = np.zeros(90, dtype = int)
    y[:45] += 1
    Y.append(y)
Y = np.hstack(Y)

### Create Testing Pairs
Creates 900 pairs of negative and positive examples each, i.e, 1800 total pairs.  
Final Dimensions - Pairs (1800) x Number of Inputs (2) x Shape of Spectrogram (45x513) x number of channels (1)

In [14]:
mini_batches_test = []
test_data = list(test_data)
for i in range(20):
    pos_batch = create_pos_pairs(test_data[i:i+10])
    neg_batch = create_neg_pairs(i, test_data)
    mini_batches_test += pos_batch + neg_batch

In [15]:
mini_np_test = np.stack(mini_batches_test)

In [16]:
mini_np_test.shape

(1800, 2, 45, 513)

In [17]:
mini_np_exp_test = np.expand_dims(mini_np_test, axis = 4)
mini_np_exp_test.shape

(1800, 2, 45, 513, 1)

### Loss Function
Binary Cross Entropy aka Sigmoid Cross Entropy is used

In [19]:
def loss(y_true, y_pred):
    return tf.reduce_sum(tf.losses.sigmoid_cross_entropy(y_true, y_pred))

In [20]:
# Function copied from [1]
def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

In [21]:
def siamese_model(input_shape):
    left_input = Input(input_shape)
    right_input = Input(input_shape)
    
    # Base Network
    base_model = Sequential()
    base_model.add(Conv2D(32, kernel_size = (5, 5), input_shape = input_shape, activation = 'relu'))
    base_model.add(MaxPool2D())
    base_model.add(Conv2D(64, kernel_size = (3, 3), activation = 'relu'))
    base_model.add(MaxPool2D())
    base_model.add(Flatten())
    base_model.add(Dense(1000, activation = 'tanh'))
    
    left_output = base_model(left_input)
    right_output = base_model(right_input)
    
    print(K.shape(left_output))
    print(K.shape(right_output))
    
    prediction = tf.reduce_sum(tf.multiply(left_output, right_output), axis = 1)
    prediction = tf.reshape(prediction, [-1, 1])
    
#     prediction = sigmoid(K.dot(left_output, K.transpose(right_output)))
    print(K.shape(prediction))
    
    siamese_model = Model(inputs = [left_input, right_input], outputs = prediction)
    
    return siamese_model

In [25]:
model = siamese_model([45, 513, 1])
model.compile(loss=loss, optimizer = tf.keras.optimizers.Adam(), metrics=[BinaryAccuracy()])

Tensor("Shape_3:0", shape=(2,), dtype=int32)
Tensor("Shape_4:0", shape=(2,), dtype=int32)
Tensor("Shape_5:0", shape=(2,), dtype=int32)


In [26]:
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            [(None, 45, 513, 1)] 0                                            
__________________________________________________________________________________________________
input_4 (InputLayer)            [(None, 45, 513, 1)] 0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 1000)         72596328    input_3[0][0]                    
                                                                 input_4[0][0]                    
__________________________________________________________________________________________________
tf_op_layer_Mul_1 (TensorFlowOp [(None, 1000)]       0           sequential_1[1][0]         

In [None]:
model.fit([mini_np_exp[:, 0], mini_np_exp[:, 1]], Y, batch_size=32, epochs=50, validation_data=([mini_np_exp_test[:, 0], mini_np_exp_test[:, 1]] ,Y[:1800]))

Train on 4500 samples, validate on 1800 samples
Epoch 1/50
Epoch 2/50

In [None]:
from keras.models import load_model

## References
1. https://keras.io/examples/mnist_siamese/
2. https://medium.com/predict/face-recognition-from-scratch-using-siamese-networks-and-tensorflow-df03e32f8cd0
3. https://becominghuman.ai/siamese-networks-algorithm-applications-and-pytorch-implementation-4ffa3304c18
4. https://towardsdatascience.com/one-shot-learning-with-siamese-networks-using-keras-17f34e75bb3d

In [None]:
model.save_weights('siamese.cpkt')