# Face verification

### Goals
- train a network for face similarity using siamese networks
- train a network for face similarity using triplet loss

the architecture is as follows:

_image_

### Dataset

- We will be using Labeled Faces in the Wild (LFW) dataset available openly at _url_
- For computing purposes, we'll only restrict ourselves to a subpart of the dataset. You're welcome to train on the whole dataset on GPU
- We will also load pretrained weights

### References

- paper1
- paper2

In [None]:
import tensorflow as tf
from keras.backend.tensorflow_backend import set_session
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.5
set_session(tf.Session(config=config))

In [None]:
import keras
import os
import keras.backend as K
from keras.models import Model
from keras.layers import Dense, Input, Concatenate, merge, Lambda, Dot
from keras.layers import Conv2D, MaxPool2D, GlobalAveragePooling2D, Flatten, Dropout
import numpy as np
import random
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

### todos

- ask hedi for his code
- see what can be done in keras
- load and split the dataset

## Processing the dataset

The dataset consists of folders corresponding to each identity. The folder name is the name of the person.
We map each class (identity) to an integer id, and build mappings as dictionaries `name_to_classid` and `classid_to_name`

In [None]:
#PATH = "lfw-a/lfw/"
PATH = "lfw/lfw-deepfunneled/"

In [None]:
dirs = sorted(os.listdir(PATH))
name_to_classid = {d:i for i,d in enumerate(dirs)}
classid_to_name = {v:k for k,v in name_to_classid.items()}
num_classes = len(name_to_classid)
print("number of classes: "+str(num_classes))

In each directory, there is one or more images corresponding to the identity. We map each image path with an integer id, then build a few dictionaries:
- mappings from imagepath and image id: `path_to_id` and `id_to_path`
- mappings from class id to image ids: `classid_to_ids` and `id_to_classid`

In [None]:
# read all directories
img_paths = {c:[directory + "/" + img for img in sorted(os.listdir(PATH+directory))] 
             for directory,c in name_to_classid.items()}

# retrieve all images
all_images_path = []
for img_list in img_paths.values():
    all_images_path += img_list

# map to integers
path_to_id = {v:k for k,v in enumerate(all_images_path)}
id_to_path = {v:k for k,v in path_to_id.items()}

In [None]:
# build mappings between images and class
classid_to_ids = {k:[path_to_id[path] for path in v] for k,v in img_paths.items()}
id_to_classid = {v:c for c,imgs in classid_to_ids.items() for v in imgs}
dict(list(id_to_classid.items())[0:13])

The following histogram shows the number of images per class: there are many classes with only one image. 
These classes are useful as negatives, only as we can't make a positive pair with them.

In [None]:
[(classid_to_name[x], len(classid_to_ids[x])) for x in np.argsort([len(v) for k,v in classid_to_ids.items()])[::-1][:10]]

In [None]:
plt.hist([len(v) for k,v in classid_to_ids.items()], bins=range(1,10))
plt.show()

In [None]:
[(classid_to_name[x], len(classid_to_ids[x])) for x in np.argsort([len(v) for k,v in classid_to_ids.items()])[::-1][:10]]

### siamese nets

A siamese net takes as input two images $x_1$ and $x_2$ and outputs a single value which corresponds to the similarity between $x_1$ and $x_2$.

In order to train such a system, one has to build positive and negative pairs for the training. 

In [None]:
def build_pos_pairs_for_id(classid, max_num=50):
    imgs = classid_to_ids[classid]
    if len(imgs) == 1:
        return []
    pos_pairs = [(imgs[i], imgs[j]) for i in range(len(imgs)) for j in range(i+1,len(imgs))]
    random.shuffle(pos_pairs)
    return pos_pairs[:max_num]

def build_neg_pairs_for_id(classid, classes, max_num=20):
    imgs = classid_to_ids[classid]
    neg_classes_ids = random.sample(classes, max_num+1)
    if classid in neg_classes_ids:
        neg_classes_ids.remove(classid)
    neg_pairs = []
    for id2 in range(max_num):
        img1 = imgs[random.randint(0,len(imgs)-1)]
        imgs2 = classid_to_ids[neg_classes_ids[id2]]
        img2 = imgs2[random.randint(0,len(imgs2)-1)]
        neg_pairs += [(img1, img2)]
    return neg_pairs

Let's build positive and a negative pairs for class 5

In [None]:
build_pos_pairs_for_id(5, 20)

In [None]:
build_neg_pairs_for_id(5, list(range(num_classes)), 6)

Now that we have a way to compute the pairs, let's open all the possible images. It will expand all the images into RAM memory. There are more than 1000 images, so 250Mo of RAM will be used, which will not cause any issue.

_Note: if you plan on opening more images, you should not open them all at once, and rather build a generator_

In [None]:
from skimage.io import imread
from skimage.transform import resize

def resize100(img):
    return resize(img, (100, 100), preserve_range=True, mode='reflect')[20:80,20:80,:]

def open_all_images(id_to_path):
    all_imgs = []
    for path in id_to_path.values():
        all_imgs += [np.expand_dims(resize100(imread(PATH+path)),0)]
    return np.vstack(all_imgs)


In [None]:
all_imgs = open_all_images(id_to_path)
all_imgs.shape, str(all_imgs.nbytes / 1e6) + "Mo"

In [None]:
def build_train_test_data(split=0.8):
    listX1 = []
    listX2 = []
    listY = []
    split = int(num_classes * split)
    
    # train
    for id in range(split):
        pos = build_pos_pairs_for_id(id)
        neg = build_neg_pairs_for_id(id, list(range(split)))
        for pair in pos:
            listX1 += [pair[0]]
            listX2 += [pair[1]]
            listY += [1]
        for pair in neg:
            if sum(listY) > len(listY) / 2:
                listX1 += [pair[0]]
                listX2 += [pair[1]]
                listY += [0]
    perm = np.random.permutation(listX1)
    X1_ids_train, X2_ids_train, Y_ids_train = np.array(listX1)[perm], np.array(listX2)[perm], np.array(listY)[perm]
    
    listX1 = []
    listX2 = []
    listY = []
    #test
    for id in range(split,num_classes):
        pos = build_pos_pairs_for_id(id)
        neg = build_neg_pairs_for_id(id, list(range(split,num_classes)))
        for pair in pos:
            listX1 += [pair[0]]
            listX2 += [pair[1]]
            listY += [1]
        for pair in neg:
            if sum(listY) > len(listY) / 2:
                listX1 += [pair[0]]
                listX2 += [pair[1]]
                listY += [0]
    X1_ids_test, X2_ids_test, Y_ids_test = np.array(listX1), np.array(listX2), np.array(listY)
    return X1_ids_train, X2_ids_train, Y_ids_train, X1_ids_test, X2_ids_test, Y_ids_test

In [None]:
from imgaug import augmenters as iaa

seq = iaa.Sequential([
    #iaa.Crop(px=(0, 16)), # crop images from each side by 0 to 16px (randomly chosen)
    iaa.Fliplr(0.5), # horizontally flip 50% of the images
    iaa.GaussianBlur(sigma=(0, 0.5)), # blur images with a sigma of 0 to 3.0
    iaa.Multiply((0.5, 1.5))
])


In [None]:
X1_ids_train, X2_ids_train, train_Y, X1_ids_test, X2_ids_test, test_Y = build_train_test_data()

In [None]:
class Generator():
    def __init__(self, X1, X2, Y, batch_size, all_imgs):
        self.cur_train_index=0
        self.batch_size = batch_size
        self.X1 = X1
        self.X2 = X2
        self.Y = Y
        self.imgs = all_imgs
        self.num_samples = Y.shape[0]
        
    def next_train(self):
        while 1:
            self.cur_train_index += self.batch_size
            if self.cur_train_index >= self.num_samples:
                self.cur_train_index=0
            
            imgs1 = self.X1[self.cur_train_index:self.cur_train_index+self.batch_size]
            imgs2 = self.X2[self.cur_train_index:self.cur_train_index+self.batch_size]
    
       # deactivate augmentation
       #     yield ([self.imgs[imgs1], self.imgs[imgs2]],
       #             self.Y[self.cur_train_index:self.cur_train_index+self.batch_size])
        
            yield ([seq.augment_images(self.imgs[imgs1]), 
                    seq.augment_images(self.imgs[imgs2])
                    ],
                    self.Y[self.cur_train_index:self.cur_train_index+self.batch_size]
                )

In [None]:
gen = Generator(X1_ids_train, X2_ids_train, train_Y, 32, all_imgs)

In [None]:
[x1, x2], y = next(gen.next_train())

In [None]:
z = np.vstack([x1[0:6],x2[0:6]])
z.shape

In [None]:
plt.figure(figsize=(16, 6))
for i, X in enumerate(z):
    plt.subplot(2, 6, i + 1)
    plt.imshow(X / 255)
    if i>5 and y[i-6]==1.0:
        plt.title("similar")
    elif i>5 and y[i-6]==0.0:
        plt.title("different")
    plt.axis('off')
plt.show()

In [None]:
train_X1 = all_imgs[X1_ids_train]
train_X2 = all_imgs[X2_ids_train]

In [None]:

test_X1 = all_imgs[X1_ids_test]
test_X2 = all_imgs[X2_ids_test]

In [None]:
#train_X1.shape, train_X2.shape, train_Y.shape, 

In [None]:
test_X1.shape, test_X2.shape, test_Y.shape

## Simple convolutional model

In [None]:
def accuracy_sim(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on similarity.
    '''
    return K.mean(K.equal(y_true, K.cast(y_pred > 0.5, y_true.dtype)))

In [None]:
inp = Input((60,60,3), dtype='float32')
x = Conv2D(16, 3, activation="relu", padding="same")(inp)
x = MaxPool2D((2,2))(x) # 30,30
x = Conv2D(32, 3, activation="relu", padding="same")(x)
x = MaxPool2D((2,2))(x) # 15,15
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = MaxPool2D((2,2))(x) # 8,8
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = MaxPool2D((2,2))(x) # 4,4
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = Flatten()(x)
x = Dropout(0.2)(x)
x = Dense(32)(x)
x = Dropout(0.2)(x)
x = Dense(32)(x)
shared_conv = Model(inputs=inp, outputs = x)

In [None]:
shared_conv.summary()

In [None]:
i1 = Input((60,60,3), dtype='float32')
i2 = Input((60,60,3), dtype='float32')

x1 = shared_conv(i1)
x2 = shared_conv(i2)

out = Dot(axes=-1, normalize=True)([x1,x2])

model = Model(inputs=[i1, i2], outputs=out)
predict_model = Model(inputs=i1, outputs=x1)
model.compile(loss="mse", optimizer="rmsprop", metrics=[accuracy_sim])

In [None]:
model.fit_generator(generator=gen.next_train(), 
                    steps_per_epoch=train_Y.shape[0] // 32, 
                    epochs=5,
                    validation_data=([test_X1, test_X2], test_Y))

In [None]:
model.save_weights("weights.h5")

In [None]:
emb = predict_model.predict(all_imgs)

In [None]:
norm_emb = emb / np.linalg.norm(emb, axis=-1, keepdims=True)

In [None]:
def most_sim(x, emb, topn=5):
    sims = np.dot(emb,x)
    ids = np.argsort(sims)[::-1]
    return [(id,sims[id]) for id in ids[:topn]]

In [None]:
def test_id(image, emb, topn=5):
    sims = np.dot(emb,x)
    ids = np.argsort(sims)[::-1]
    return [(id,sims[id]) for id in ids[:topn]]

In [None]:
def display(img):
    img = img.astype('uint8')
    plt.imshow(img)
    plt.show()

In [None]:
interesting_classes = list(filter(lambda x: len(x[1])>4, classid_to_ids.items()))
class_idx = random.choice(interesting_classes)[0]
print(class_idx)
img_idx = random.choice(classid_to_ids[class_idx])
for id, sim in most_sim(norm_emb[img_idx], norm_emb):
    display(all_imgs[id])
    print((classid_to_name[id_to_classid[id]], id, sim))

# Triplet loss

In the triplet loss model, we'll define 3 inputs $(a,+,-)$ for anchor, positive and negative.

#### usage and differences with siamese nets

In [None]:
def build_triplet_for_image(imageid, classes, max_num=20):
    classid = id_to_classid[imageid]
    imgs = set(classid_to_ids[classid])
    other_imgs = imgs - set([imageid])
    negatives = random.sample(classes, len(other_imgs))
    triplets = [(imageid, img, neg) for (img,neg) in zip(other_imgs, negatives)]
    random.shuffle(triplets)
    return triplets[:max_num]

In [None]:
build_triplet_for_image(21, list(range(200)), 20)

In [None]:
max_positive_interactions = sum([len(x) * (len(x) - 1) for x in classid_to_ids.values()])
max_positive_interactions

In [None]:
class TripletGenerator():
    def __init__(self, split, batch_size, all_imgs, max_positives=20):
        self.cur_img_index=0
        self.cur_img_pos_index=0
        self.batch_size = batch_size
        
        self.imgs = all_imgs
        # build anchor and positive paris, we'll add negatives during batch generation
        listX1 = []
        listX2 = []

        # build positive pairs
        for id in range(split):
            pos = build_pos_pairs_for_id(id, max_positives)
            
            for pair in pos:
                listX1 += [pair[0]]
                listX2 += [pair[1]]
        print(listX1)
        #shuffle positives
        perm = np.random.permutation(listX1)
        self.X1_ids_train = np.array(listX1)[perm]
        self.X2_ids_train = np.array(listX2)[perm]
        self.num_samples = len(listX1)
        
    def next_train(self):
        while 1:
            self.cur_train_index += self.batch_size
            if self.cur_train_index >= self.num_samples:
                self.cur_train_index=0
            
            # fill one batch
            imgs1 = self.X1[self.cur_train_index:self.cur_train_index+self.batch_size]
            imgs2 = self.X2[self.cur_train_index:self.cur_train_index+self.batch_size]
            imgs3 = random.sample(split,imgs1.shape[0])
    
       # deactivate augmentation
       #     yield ([self.imgs[imgs1], self.imgs[imgs2]],
       #             self.Y[self.cur_train_index:self.cur_train_index+self.batch_size])
        
            yield ([seq.augment_images(self.imgs[imgs1]), 
                    seq.augment_images(self.imgs[imgs2])
                    ],
                    self.Y[self.cur_train_index:self.cur_train_index+self.batch_size]
                )


    


In [None]:
split = int(num_classes * 0.9)
gen = TripletGenerator(split, 32, all_imgs)
next(gen)

In [None]:

def identity_loss(y_true, y_pred):
    return K.mean(y_pred - 0 * y_true)


def triplet_loss(X):
    _alpha = 0.2
    a, p, n = X

    positive_distances = K.mean(K.square(a - p),axis=-1)
    negative_distances = K.mean(K.square(a - n),axis=-1)
    
    # batch loss
    losses = K.maximum(0.0, positive_distances - negative_distances + _alpha)
    
    return K.mean(losses)

In [None]:
NormalizeLayer = Lambda(lambda x: K.l2_normalize(
                        K.sqrt(K.relu(x) + K.epsilon()) - K.sqrt(K.relu(-x)+ K.epsilon()),
                        axis=-1))

In [None]:
inp = Input((100,100,3), dtype='float32')
x = Conv2D(16, 3, activation="relu", padding="same")(inp)
x = MaxPool2D((2,2))(x) # 50,50
x = Conv2D(32, 3, activation="relu", padding="same")(x)
x = MaxPool2D((2,2))(x) # 25,25
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = MaxPool2D((2,2))(x) # 12,12
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = MaxPool2D((2,2))(x) # 6,6
x = Conv2D(64, 3, activation="relu", padding="same")(x)
x = Flatten()(x)
x = Dense(64)(x)
shared_conv2 = Model(inputs=inp, outputs = x)

### tech details 

In [None]:
loss = merge(
    [a, p, n],
    mode=triplet_loss,
    name='loss',
    output_shape=(1, ))

In [None]:
anchor = Input((100, 100, 3), name='anchor')
positive = Input((100, 100, 3), name='positive')
negative = Input((100, 100, 3), name='negative')

a = shared_conv2(anchor)
p = shared_conv2(positive)
n = shared_conv2(negative)

loss = Lambda(triplet_loss,
                      output_shape=(1,))(
                      [a,p,n])

model_triplet = Model(
    inputs=[anchor, positive, negative],
    outputs=loss)

predict_model_triplet = Model(inputs=anchor, outputs=a)
model_triplet.compile(loss=identity_loss, optimizer="adam")

In [None]:
x = np.random.uniform(size=(90,100,100,3))
print(x.shape)
triplet = [x[0:30,:], x[30:60,:], x[60:90,:]]
model_triplet.fit(x=triplet, 
          y=np.zeros(30),
          epochs=100, batch_size=10,)


In [None]:
out = predict_model_triplet.predict(x[0:90])
tsne_out = TSNE(perplexity=30).fit_transform(out)

In [None]:
colors = ["blue"]*30 + ["green"]*30 + ["red"]*30
plt.figure(figsize=(10, 10))
plt.scatter(tsne_out[0:90, 0], tsne_out[0:90, 1], c=colors);
plt.xticks(()); plt.yticks(());
plt.show()

In [None]:
np.linalg.norm(out[0]-out, axis=-1)

In [None]:
idx = random.randint(0,100)

for id, sim in most_sim(norm_emb[idx], norm_emb):
    display(all_imgs[id])
    print((id,sim))

Constrastive loss and euclidean model

In [None]:
def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    return K.mean( y_true * K.square(y_pred) +
                  (1 - y_true) * K.square(K.maximum(margin - y_pred, 0)))


In [None]:
def accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))

In [None]:
def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.maximum(K.sum(K.square(x - y), axis=1, keepdims=True), K.epsilon()))

In [None]:
i1 = Input((100,100,3), dtype='float32')
i2 = Input((100,100,3), dtype='float32')

x1 = shared_conv(i1)
x2 = shared_conv(i2)

#out = Dot(axes=-1, normalize=True)([x1,x2])
out = Lambda(euclidean_distance, output_shape=(1,))([x1, x2])


model = Model(inputs=[i1, i2], outputs=out)
predict_model = Model(inputs=i1, outputs=x1)
model.compile(loss=contrastive_loss, optimizer="rmsprop", metrics=[accuracy])

In [None]:
model.fit([train_X1, train_X2], train_Y,
          batch_size=32,
          epochs=10,
          validation_data=([test_X1, test_X2], test_Y))