In [1]:
import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
import cv2
import re
import random
import itertools
from tqdm import tqdm

In [2]:
os.chdir('D://face recog')

In [3]:
TRAIN_TEST_SPLIT = 0.8

In [4]:
class image_class():
    def __init__(self, name, image_paths):
        self.name = name
        self.image_paths = image_paths
        self.embeddings = []
    def get_len(self):
        return len(self.image_paths)

In [5]:
def get_dataset(path):
    dataset = []
    path_exp = os.path.expanduser(path)
    names = [name for name in os.listdir(path_exp)]
    names.sort()

    for i in range(0,len(names)):
        name = names[i]
        image_paths = []
        facedir = os.path.join(path_exp, name)
        if os.path.isdir(facedir):
            image_paths = [os.path.join(facedir, img) for img in os.listdir(facedir)]
        dataset.append(image_class(name, image_paths))
    return dataset

In [6]:
def sample_dataset(dataset, people_per_batch, images_per_person):
    num_of_images = people_per_batch*images_per_person
    num_of_classes = len(dataset)
    class_indices = np.arange(num_of_classes)
    np.random.shuffle(class_indices)

    image_paths = []
    num_per_class = []
    sampled_class_indices = []
    i = 0
    while len(image_paths) < num_of_images:
        class_index = class_indices[i]
        num_images_in_class = len(dataset[class_index].image_paths)
        image_indices = np.arange(num_images_in_class)
        np.random.shuffle(image_indices)

        num_images_from_class = min(images_per_person, num_images_in_class, num_of_images-len(image_paths))
        idx = image_indices[0:num_images_from_class]
        image_paths_from_class = [dataset[class_index].image_paths[j] for j in idx]

        sampled_class_indices += [class_index]*num_images_from_class
        image_paths += image_paths_from_class
        num_per_class.append(num_images_from_class)
        i += 1
    
    return image_paths, num_per_class

In [7]:
def select_triplets(embeddings, nrof_images_per_class, image_paths, people_per_batch, alpha):
    """ Select the triplets for training
    """
    trip_idx = 0
    emb_start_idx = 0
    num_trips = 0
    triplets = []
    
    
    # VGG Face: Choosing good triplets is crucial and should strike a balance between
    #  selecting informative (i.e. challenging) examples and swamping training with examples that
    #  are too hard. This is achieve by extending each pair (a, p) to a triplet (a, p, n) by sampling
    #  the image n at random, but only between the ones that violate the triplet loss margin. The
    #  latter is a form of hard-negative mining, but it is not as aggressive (and much cheaper) than
    #  choosing the maximally violating example, as often done in structured output learning.

    for i in range(people_per_batch):
        nrof_images = int(nrof_images_per_class[i])
        for j in range(1,nrof_images):
            a_idx = emb_start_idx + j - 1
            neg_dists_sqr = np.sum(np.square(embeddings[a_idx] - embeddings), 1)
            for pair in range(j, nrof_images): # For every possible positive pair.
                p_idx = emb_start_idx + pair
                pos_dist_sqr = np.sum(np.square(embeddings[a_idx]-embeddings[p_idx]))
                neg_dists_sqr[emb_start_idx:emb_start_idx+nrof_images] = np.NaN
                #all_neg = np.where(np.logical_and(neg_dists_sqr-pos_dist_sqr<alpha, pos_dist_sqr<neg_dists_sqr))[0]  # FaceNet selection
                all_neg = np.where(neg_dists_sqr-pos_dist_sqr<alpha)[0] # VGG Face selecction
                nrof_random_negs = all_neg.shape[0]
                if nrof_random_negs>0:
                    rnd_idx = np.random.randint(nrof_random_negs)
                    n_idx = all_neg[rnd_idx]
                    triplets.append([a_idx, p_idx, n_idx])
                    #print('Triplet %d: (%d, %d, %d), pos_dist=%2.6f, neg_dist=%2.6f (%d, %d, %d, %d, %d)' % 
                    #    (trip_idx, a_idx, p_idx, n_idx, pos_dist_sqr, neg_dists_sqr[n_idx], nrof_random_negs, rnd_idx, i, j, emb_start_idx))
                    trip_idx += 1

                num_trips += 1

        emb_start_idx += nrof_images

    np.random.shuffle(triplets)
    return triplets, len(triplets)

In [8]:
def read_image(path, img_size = (128,128)):
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, img_size)
    image = image.reshape(1, img_size[0], img_size[1], 3)
    image = image/255
    return image

In [9]:
def get_all_embeddings(model, dataset, img_size=(128,128)):
    for i in range(len(dataset)):
        paths = dataset[i].image_paths
        embeddings = []
        for a in range(0,len(paths)):
            image = read_image(paths[a], img_size=img_size)
            embedding = model(image)
            embeddings.append(embedding)
        dataset[i].embeddings = embeddings
    print('all embeddings calculated')

In [10]:
"""this is the really bad O(m * n^2) version, working on better solution"""

def evaluate(model, dataset, alpha = 0.01, img_size = (128,128)):
    FP = 0
    FN = 0
    TP = 0
    TN = 0
    
    
    
    for i in range(len(dataset)):
        name = dataset[i].name
        paths = dataset[i].image_paths
        embeddings = dataset[i].embeddings
        print("evaluating "+name+" class...")
        class_TP = 0
        class_TN = 0
        class_FN = 0
        class_FP = 0
        class_same_comparisons = 0
        class_diff_comparisons = 0

        """finding true accept and validation rate per class"""
        for a in range(len(embeddings)):
            anchor_embedding = embeddings[a]
            for b in range(a, len(embeddings)):
                pos_comparison_embedding = embeddings[b]
                pos_embedding_dist = np.sum(np.square(pos_comparison_embedding-anchor_embedding))
                if pos_embedding_dist < alpha:
                    class_TP += 1
                class_same_comparisons += 1

            

            """finding false acceptance rate per class""" 
            for j in range(i, len(dataset)):
                negative_paths = dataset[j].image_paths
                negative_embeddings = dataset[j].embeddings
                for b in range(len(negative_embeddings)):
                    neg_comparison_embedding = negative_embeddings[b]
                    neg_embedding_dist = np.sum(np.square(neg_comparison_embedding-anchor_embedding))
                    if neg_embedding_dist < alpha:
                        class_FP += 1
                    class_diff_comparisons += 1

        if len(embeddings)>1:
                print('true accept = '+str(class_TP), ' validation rate = ' +str(class_TP/class_same_comparisons))    
        print('false accept = '+str(class_FP), ' false accept rate = ' +str(class_FP/class_diff_comparisons))
        
        FP += class_FP/class_diff_comparisons
        TP += class_TP/class_same_comparisons
    
    return FP/len(dataset), TP/len(dataset)


In [11]:
def augment_data(image, img_size):
    """augments the data by cropping and flipping randomly"""
    flip = np.random.choice([True, False])
    crop = np.random.choice([True, False])
    rotate = np.random.choice([True, False])

    image_dim = img_size[0]
    
    if crop:
        x1, x2 = np.random.randint(0,image_dim),np.random.randint(0,image_dim)
        left = np.min([x1,x2])
        right = np.max([x1,x2])
        if right - left >= img_size[0]:
            image = image[left:right, left:right]
    
    if flip:
        axis_to_flip = random.choice([0,1])
        image = np.flip(image, axis_to_flip)
    
    if rotate:
        k = random.randint(1,3)
        image = np.rot90(image, k)

    image = cv2.resize(image, img_size)
    
    return image

In [12]:
def choose_random_file():
    x = random.randint(0,len(dataset))
    name = dataset[x].name
    path = random.choice(dataset[x].image_paths)
    return name, path

In [13]:
class Distance_Layer(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
        
    def call(self, anchor, pos, neg):
        pos_distance = tf.reduce_sum(tf.square(anchor - pos), -1)
        neg_distance = tf.reduce_sum(tf.square(anchor - neg), -1)
        return (pos_distance, neg_distance)

In [14]:
"""creating the actual network itself, we use imagenet pre trained weights and set most layers as untrainable"""
class FaceNet_Network(tf.keras.Model):
    def __init__(self, input_shape, latent_dims = 128, alpha = 0.2):
        super().__init__()
        self.alpha = alpha
        self.resnet_backbone = tf.keras.applications.resnet50.ResNet50(
            include_top=False,
            weights=None,
            input_tensor=None,
            input_shape=input_shape,
            pooling=None,
            classes=1000,
        )
        #self.resnet_backbone = tf.keras.applications.MobileNet(
        #    input_shape=input_shape,
        #    include_top=False,
        #    weights=None,
        #    input_tensor=None,
        #    pooling=None,
        #)
        
        #self.resnet_backbone = tf.keras.applications.InceptionResNetV2(
        #    include_top=False,
        #    weights=None,
        #    input_tensor=None,
        #    input_shape=input_shape,
        #    pooling=None,
        #    classes=1000
        #)
        
        #trainable = False
        #for layer in self.resnet_backbone.layers:
        #    if layer.name == "conv5_block1_out":
        #        trainable = True
        #        print('trainable')
        #    layer.trainable = trainable

        x_input = tf.keras.Input(input_shape)
        x = self.resnet_backbone(x_input)
        x = tf.keras.layers.Flatten()(x)
        x = tf.keras.layers.Dense(256, activation="relu")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Dense(128, activation="relu")(x)
        x = tf.keras.layers.BatchNormalization()(x)
        output = tf.keras.layers.Dense(latent_dims)(x)
        
        
        self.embedding_model = tf.keras.Model(inputs = x_input, outputs = output)
        
        anchor = tf.keras.layers.Input(shape = input_shape)
        pos = tf.keras.layers.Input(shape = input_shape)
        neg = tf.keras.layers.Input(shape = input_shape)
        
        anchor_embedding = self.embedding_model(anchor)
        pos_embedding = self.embedding_model(pos)
        neg_embedding = self.embedding_model(neg)
        
        anchor_embedding = tf.nn.l2_normalize(anchor_embedding, 1, 1e-10, name='anchor')
        anchor_embedding = tf.nn.l2_normalize(anchor_embedding, 1, 1e-10, name='anchor')
        anchor_embedding = tf.nn.l2_normalize(anchor_embedding, 1, 1e-10, name='anchor')
        
        
        distances = Distance_Layer()(
            anchor_embedding,
            pos_embedding,
            neg_embedding,
        )
        
        self.siamese_model = tf.keras.Model(inputs=[anchor, pos, neg], outputs = distances)
    
    def compute_loss(self, data):
        anchor, pos, neg = data
        anchor = tf.expand_dims(anchor, 0)
        pos = tf.expand_dims(pos, 0)
        neg = tf.expand_dims(neg, 0)
        pos_distance, neg_distance = self.siamese_model([anchor, pos, neg])
        loss = pos_distance - neg_distance
        loss = tf.maximum(loss + self.alpha, 0.0)
        return loss

In [15]:
dataset = get_dataset('lfw/')

In [16]:
test_net = FaceNet_Network((128, 128,3))
train_dataset = dataset[:int(len(dataset)*TRAIN_TEST_SPLIT)]
test_dataset = dataset[int(len(dataset)*TRAIN_TEST_SPLIT):]
optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001)

In [17]:
test_net.resnet_backbone.summary()

Model: "resnet50"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 134, 134, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 64, 64, 64)   9472        ['conv1_pad[0][0]']              
                                                                                                  
 conv1_bn (BatchNormalization)  (None, 64, 64, 64)   256         ['conv1_conv[0][0]']      

 conv2_block3_2_conv (Conv2D)   (None, 32, 32, 64)   36928       ['conv2_block3_1_relu[0][0]']    
                                                                                                  
 conv2_block3_2_bn (BatchNormal  (None, 32, 32, 64)  256         ['conv2_block3_2_conv[0][0]']    
 ization)                                                                                         
                                                                                                  
 conv2_block3_2_relu (Activatio  (None, 32, 32, 64)  0           ['conv2_block3_2_bn[0][0]']      
 n)                                                                                               
                                                                                                  
 conv2_block3_3_conv (Conv2D)   (None, 32, 32, 256)  16640       ['conv2_block3_2_relu[0][0]']    
                                                                                                  
 conv2_blo

 conv3_block3_2_conv (Conv2D)   (None, 16, 16, 128)  147584      ['conv3_block3_1_relu[0][0]']    
                                                                                                  
 conv3_block3_2_bn (BatchNormal  (None, 16, 16, 128)  512        ['conv3_block3_2_conv[0][0]']    
 ization)                                                                                         
                                                                                                  
 conv3_block3_2_relu (Activatio  (None, 16, 16, 128)  0          ['conv3_block3_2_bn[0][0]']      
 n)                                                                                               
                                                                                                  
 conv3_block3_3_conv (Conv2D)   (None, 16, 16, 512)  66048       ['conv3_block3_2_relu[0][0]']    
                                                                                                  
 conv3_blo

 conv4_block2_2_conv (Conv2D)   (None, 8, 8, 256)    590080      ['conv4_block2_1_relu[0][0]']    
                                                                                                  
 conv4_block2_2_bn (BatchNormal  (None, 8, 8, 256)   1024        ['conv4_block2_2_conv[0][0]']    
 ization)                                                                                         
                                                                                                  
 conv4_block2_2_relu (Activatio  (None, 8, 8, 256)   0           ['conv4_block2_2_bn[0][0]']      
 n)                                                                                               
                                                                                                  
 conv4_block2_3_conv (Conv2D)   (None, 8, 8, 1024)   263168      ['conv4_block2_2_relu[0][0]']    
                                                                                                  
 conv4_blo

 conv4_block5_2_relu (Activatio  (None, 8, 8, 256)   0           ['conv4_block5_2_bn[0][0]']      
 n)                                                                                               
                                                                                                  
 conv4_block5_3_conv (Conv2D)   (None, 8, 8, 1024)   263168      ['conv4_block5_2_relu[0][0]']    
                                                                                                  
 conv4_block5_3_bn (BatchNormal  (None, 8, 8, 1024)  4096        ['conv4_block5_3_conv[0][0]']    
 ization)                                                                                         
                                                                                                  
 conv4_block5_add (Add)         (None, 8, 8, 1024)   0           ['conv4_block4_out[0][0]',       
                                                                  'conv4_block5_3_bn[0][0]']      
          

 conv5_block2_2_relu (Activatio  (None, 4, 4, 512)   0           ['conv5_block2_2_bn[0][0]']      
 n)                                                                                               
                                                                                                  
 conv5_block2_3_conv (Conv2D)   (None, 4, 4, 2048)   1050624     ['conv5_block2_2_relu[0][0]']    
                                                                                                  
 conv5_block2_3_bn (BatchNormal  (None, 4, 4, 2048)  8192        ['conv5_block2_3_conv[0][0]']    
 ization)                                                                                         
                                                                                                  
 conv5_block2_add (Add)         (None, 4, 4, 2048)   0           ['conv5_block1_out[0][0]',       
                                                                  'conv5_block2_3_bn[0][0]']      
          

In [18]:
test_net.embedding_model.load_weights('facenet_models/facenet_model_3_171.h5')

In [19]:
train_dataset = dataset[:int(len(dataset)*TRAIN_TEST_SPLIT)]
test_dataset = dataset[int(len(dataset)*TRAIN_TEST_SPLIT):]

In [20]:
def non_training_forward_pass(model, sample_paths, img_size, num_per_class, people_per_batch):
    images = []
    first_embeddings = []
    """running one forward pass to find embeddings, to find the good triplets. This portion is not for training"""
    for path in sample_paths:
        augment = np.random.choice([True, False])
        current_image = cv2.imread(path)
        current_image = cv2.cvtColor(current_image, cv2.COLOR_BGR2RGB)
        current_image = cv2.resize(current_image, img_size)
        current_image = current_image/255
        if augment:
            current_image = augment_data(current_image, img_size)
        images.append(current_image)
        first_embeddings.append(model.embedding_model(current_image.reshape(1,img_size[0],img_size[1],3)))
    triplets, num_of_triplets = select_triplets(first_embeddings, num_per_class, sample_paths, people_per_batch, alpha = 0.2)
    
    anchor_images = []
    pos_images = []
    neg_images = []
    
    img_triplets = []
    
    for triplet in triplets:
        #anchor_images.append(images[triplet[0]])
        #pos_images.append(images[triplet[1]])
        #neg_images.append(images[triplet[2]])
        #img_triplets.append(img_triplet)
        img_triplets.append([images[triplet[0]], images[triplet[1]], images[triplet[2]]])
    
    #anchor_images = np.expand_dims(np.array(anchor_images), 0)
    #pos_images = np.expand_dims(np.array(pos_images), 0)
    #neg_images = np.expand_dims(np.array(neg_images),0)
    
    #final_images = np.concatenate((anchor_images, pos_images, neg_images),0)
    
    return np.array(img_triplets)
    

In [21]:
def train(model = test_net, batch_size = 16, batches_per_epoch = 100, epochs = 500, people_per_batch = 45, images_per_person = 40, img_size = (128,128)):
    for i in range(epochs+1):
        running_loss = 0
        pbar = tqdm(range(0, batches_per_epoch))
        for j in pbar:
            images = []
            while len(images) < batch_size:
                sample_paths, num_per_class = sample_dataset(train_dataset, people_per_batch, images_per_person)
                new_triplets = non_training_forward_pass(model, sample_paths, img_size, num_per_class, people_per_batch)
                for new_triplet in new_triplets:
                    images.append(new_triplet)
            
            images = images[:batch_size]
            
            images = tf.convert_to_tensor(images, dtype=tf.float32)
            
            loss = []
            with tf.GradientTape() as tape:
                for triplet in images:
                    loss.append(model.compute_loss(triplet))
                loss = tf.convert_to_tensor(loss)
                loss = tf.reduce_mean(loss)

                
            #with tf.GradientTape() as tape:
            #    loss = model.compute_loss(images)
            
            #for i in range(0,len(images)):
            #    plt.imshow(images[i][0])
            #    plt.figure()
            
            #loss = tf.reduce_sum(loss)
            #print('loss')
            running_loss = running_loss * 0.99 + loss * 0.01
            gradients = tape.gradient(loss, model.embedding_model.trainable_weights)
            #print(gradients)
            optimizer.apply_gradients(zip(gradients, model.embedding_model.trainable_weights))
            


            pbar.set_description(
                        f'Epoch={i}, Train_Loss={running_loss}')

            
            
        
        model.embedding_model.save_weights(
            f'facenet_models/facenet_model_3_{i+171}.h5')



                    

In [None]:
train()

Epoch=0, Train_Loss=0.028184840455651283:   4%|█▌                                    | 4/100 [06:07<2:26:42, 91.69s/it]

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


In [None]:
get_all_embeddings(test_net.embedding_model, test_dataset)

In [None]:
TP, FP = evaluate(test_net.embedding_model, test_dataset, alpha = 0.2, img_size=(128,128))

In [None]:
cosine_similarity = tf.keras.metrics.CosineSimilarity()

In [None]:
print(TP, FP)

In [None]:
def measure_similarity():
    counter = random.randint(0,len(test_dataset))
    while len(test_dataset[counter].embeddings) < 2:
        counter += 1
    anchor_embedding = test_dataset[counter].embeddings[0]
    #pos_embedding = test_dataset[counter].embeddings[random.randint(1,len(test_dataset[counter].embeddings))]

    pos_embedding = random.choice(test_dataset[counter].embeddings[1:])
    #pos_embedding = test_dataset[counter].embeddings[1]


    neg_counter = random.randint(0,len(test_dataset))
    while neg_counter == counter:
        neg_counter += 1
    neg_embedding = random.choice(test_dataset[neg_counter].embeddings)
    
    return cosine_similarity(anchor_embedding, pos_embedding), cosine_similarity(anchor_embedding, neg_embedding)

In [None]:
def rough_evaluation(num_of_tests):
    pos_similarity = 0
    neg_similarity = 0
    
    for i in range(0,num_of_tests):
        pos_similarity += measure_similarity()[0]
        neg_similarity += measure_similarity()[1]
    
    return pos_similarity/num_of_tests, neg_similarity/num_of_tests

In [None]:
print(rough_evaluation(1000))

In [None]:
test_net.model.summary()

In [None]:
print(images)