In [1]:
import random
import matplotlib.pyplot as plt
import numpy as np
import umap.umap_ as umap
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Model
import tensorflow.keras.layers as K_layers
import tensorflow.keras.models as K_models
import tensorflow as tf
from PIL import Image
from skimage import io
import os

data_path = "data_numpy_trim_4_sil_01_1percent_v1.2"

In [2]:
OUT_DIM = 100
BATCH_SIZE = 8

In [3]:
def get_random_triple(path):
    data_array = []

    person = random.sample(os.listdir(path), 2)

    pathanchor = path+"/"+person[0]
    filesanchor = random.sample(os.listdir(pathanchor), 2)
    
    for i in range(2):
        #print("{}/{}".format(pathanchor, filesanchor[i]))
        data_array.append(np.load("{}/{}".format(pathanchor, filesanchor[i])))

    pathnegative = path+"/"+person[1]
    filesnegative = random.sample(os.listdir(pathnegative), 1)

    for i in range(1):
        #print("{}/{}".format(pathnegative, filesnegative[i]))
        data_array.append(np.load("{}/{}".format(pathnegative, filesnegative[i])))    
    
    return data_array

In [4]:
def data_generator(batch_size=32):
    while True:
        a = []
        p = []
        n = []
        
        for _ in range(batch_size):
            x = get_random_triple(data_path)

            a.append(x[0])
            p.append(x[1]) 
            n.append(x[2]) 

    yield ([np.array(a), np.array(p), np.array(n)], np.zeros((batch_size, 1)).astype("float32")) 
    # second argument is expected output, which we don't expect so it can be array of zeros

In [5]:
def calc_cos_tensor(tensor1, tensor2):
    return tf.tensordot(tensor1, tensor2, axes=1) / (tf.norm(tensor1) * tf.norm(tensor2))

In [6]:
def triplet_loss(y_true, y_pred, margin=0.5):
    anchor_out = y_pred[:, 0:100]
    positive_out = y_pred[:, 100:200]
    negative_out = y_pred[:, 200:300]
    
    if tf.math.count_nonzero(anchor_out) == 6400:
        for a in anchor_out:
            tf.print(a)
    
    nr_iters = [4, 2, 1]
    tf_list = []
    tf_list_tmp = []
    
    for j in nr_iters:
        for i in range(j):
            if j == nr_iters[0]:
                cos1_1 = calc_cos_tensor(anchor_out[2*i], positive_out[2*i])
                cos1_2 = calc_cos_tensor(anchor_out[2*i], negative_out[2*i])
                loss1 = (1-cos1_1) - (1-cos1_2) + margin
                
                cos2_1 = calc_cos_tensor(anchor_out[2*i+1], positive_out[2*i+1])
                cos2_2 = calc_cos_tensor(anchor_out[2*i+1], negative_out[2*i+1])
                loss2 = (1-cos2_1) - (1-cos2_2) + margin
                
                tf_list_tmp.append(tf.stack([loss1, loss2], 0))
            else:
                tf_list_tmp.append(tf.concat([tf_list[2*i], tf_list[2*i+1]], 0))

        tf_list = tf_list_tmp
        tf_list_tmp = []
        
    tf.print("\n")
    tf.print("Loss: ", tf.keras.backend.mean(tf_list[0]))
                                              
    return tf.keras.backend.mean(tf_list[0])

In [7]:
input_size = (90, 173, 1)

In [8]:
input_layer = K_layers.Input(input_size)
x = K_layers.Conv2D(32, 3, activation="relu")(input_layer)
x = K_layers.Conv2D(32, 3, activation="relu")(x)
x = K_layers.MaxPool2D(2)(x)
x = K_layers.Conv2D(64, 3, activation="relu")(x)
x = K_layers.Conv2D(64, 3, activation="relu")(x)
x = K_layers.MaxPool2D(2)(x)
x = K_layers.Conv2D(128, 3, activation="relu")(x)
x = K_layers.Flatten()(x)
x = K_layers.Dense(OUT_DIM, activation="relu")(x)

model = Model(input_layer, x)
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 90, 173, 1)]      0         
                                                                 
 conv2d (Conv2D)             (None, 88, 171, 32)       320       
                                                                 
 conv2d_1 (Conv2D)           (None, 86, 169, 32)       9248      
                                                                 
 max_pooling2d (MaxPooling2D  (None, 43, 84, 32)       0         
 )                                                               
                                                                 
 conv2d_2 (Conv2D)           (None, 41, 82, 64)        18496     
                                                                 
 conv2d_3 (Conv2D)           (None, 39, 80, 64)        36928     
                                                             

In [9]:
triplet_model_a = K_layers.Input(input_size)
triplet_model_p = K_layers.Input(input_size)
triplet_model_n = K_layers.Input(input_size)
triplet_model_out = K_layers.Concatenate()([model(triplet_model_a), model(triplet_model_p), model(triplet_model_n)])
triplet_model = Model([triplet_model_a, triplet_model_p, triplet_model_n], triplet_model_out)
triplet_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 90, 173, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 input_3 (InputLayer)           [(None, 90, 173, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 input_4 (InputLayer)           [(None, 90, 173, 1)  0           []                               
                                ]                                                           

In [10]:
triplet_model.compile(loss=triplet_loss, optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5))

In [11]:
triplet_model.fit(data_generator(batch_size=BATCH_SIZE), steps_per_epoch=150, epochs=3)

KeyboardInterrupt: 

In [None]:
triplet_model.trainable_variables

In [None]:
triplet_model.compile(loss=None, optimizer="adam")

In [None]:
triplet_model.save("triplet_voice.h5")