In [1]:
#!pip install bcolz
import os
import cv2
import tqdm
import bcolz
import numpy as np
import tensorflow as tf 
from tensorflow.keras import Model
from sklearn.model_selection import KFold
from tensorflow.keras.applications import (MobileNetV2,ResNet50)
from tensorflow.keras.layers import (Dense,Dropout,Flatten,Input)
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard

In [12]:
img_path = ''#"/home/oluwaseun/Documents/Seamfix_projects/project/face_dataset/random_faces/facedataset/badboy1.jpg"#"/path/to/an/to/generate/embeddings"
test_dataset = "/home/oluwaseun/Documents/Seamfix_projects/project/face_dataset/arcface_dataset"

In [13]:
#parameters
batch_size = 128
input_size  = 112
embd_shape = 512
sub_name = 'arc_mobv2' #arc_rec50 or arc_mobv2
backbone_type = 'MobileNetV2' # 'ResNet50', 'MobileNetV2'
head_type = "ArcHead" # 'ArcHead', 'NormHead'
is_ccrop = False # central-cropping or not
num_classes = 85743 #33432
num_samples = 5822653 #2399999
w_decay = float(5e-4)

In [14]:
def Backbone(backbone_type='ResNet50', use_pretrain
             =True):
    """Backbone Model"""
    weights = None
    if use_pretrain:
        weights = 'imagenet'

    def backbone(x_in):
        if backbone_type == 'ResNet50':
            return ResNet50(input_shape=x_in.shape[1:], include_top=False,
                            weights=weights)(x_in)
        elif backbone_type == 'MobileNetV2':
            return MobileNetV2(input_shape=x_in.shape[1:], include_top=False,
                               weights=weights)(x_in)
        else:
            raise TypeError('backbone_type error!')
    return backbone


def OutputLayer(embd_shape, w_decay=5e-4, name='OutputLayer'):
    """Output Later"""
    def output_layer(x_in):
        x = inputs = Input(x_in.shape[1:])
        x = BatchNormalization()(x)
        x = Dropout(rate=0.5)(x)
        x = Flatten()(x)
        x = Dense(embd_shape, kernel_regularizer=tf.keras.regularizers.l2(w_decay) )(x)
        x = BatchNormalization()(x)
        return Model(inputs, x, name=name)(x_in)
    return output_layer


def ArcHead(num_classes, margin=0.5, logist_scale=64, name='ArcHead'):
    """Arc Head"""
    def arc_head(x_in, y_in):
        x = inputs1 = Input(x_in.shape[1:])
        y = Input(y_in.shape[1:])
        x = ArcMarginPenaltyLogists(num_classes=num_classes,
                                    margin=margin,
                                    logist_scale=logist_scale)(x, y)
        return Model((inputs1, y), x, name=name)((x_in, y_in))
    return arc_head

def ArcFaceModel(size, channels, num_classes, name, margin, logist_scale, embd_shape,head_type, backbone_type, w_decay, use_pretrain, training):
    """Arc Face Model"""
    x = inputs = Input([size, size, channels], name='input_image')

    x = Backbone(backbone_type=backbone_type, use_pretrain=use_pretrain)(x)

    embds = OutputLayer(embd_shape, w_decay=w_decay)(x)
    if training:
        assert num_classes is not None
        labels = Input([], name='label')
        if head_type == 'ArcHead':
            logist = ArcHead(num_classes=num_classes, margin=margin,
                             logist_scale=logist_scale)(embds, labels)
        else:
            logist = NormHead(num_classes=num_classes, w_decay=w_decay)(embds)
        return Model((inputs, labels), logist, name=name)
    else:
        return Model(inputs, embds, name=name)
    
    #return Model(inputs, embds, name=name)
 

In [15]:
class BatchNormalization(tf.keras.layers.BatchNormalization):
    """Make trainable=False freeze BN for real (the og version is sad).
       ref: https://github.com/zzh8829/yolov3-tf2
    """
    def call(self, x, training=False):
        if training is None:
            training = tf.constant(False)
        training = tf.logical_and(training, self.trainable)
        return super().call(x, training)


class ArcMarginPenaltyLogists(tf.keras.layers.Layer):
    """ArcMarginPenaltyLogists"""
    def __init__(self, num_classes, margin=0.5, logist_scale=64, **kwargs):
        super(ArcMarginPenaltyLogists, self).__init__(**kwargs)
        self.num_classes = num_classes
        self.margin = margin
        self.logist_scale = logist_scale
        
    def get_config(self):

        config = super().get_config().copy()
        config.update({
            
            'num_classes': self.num_classes,
            'margin': self.margin,
            'logist_scale': self.logist_scale
        })
        return config
    
    def build(self, input_shape):
        self.w = self.add_variable(
            "weights", shape=[int(input_shape[-1]), self.num_classes])
        self.cos_m = tf.identity(math.cos(self.margin), name='cos_m')
        self.sin_m = tf.identity(math.sin(self.margin), name='sin_m')
        self.th = tf.identity(math.cos(math.pi - self.margin), name='th')
        self.mm = tf.multiply(self.sin_m, self.margin, name='mm')

    def call(self, embds, labels):
        normed_embds = tf.nn.l2_normalize(embds, axis=1, name='normed_embd')
        normed_w = tf.nn.l2_normalize(self.w, axis=0, name='normed_weights')

        cos_t = tf.matmul(normed_embds, normed_w, name='cos_t')
        sin_t = tf.sqrt(1. - cos_t ** 2, name='sin_t')

        cos_mt = tf.subtract(
            cos_t * self.cos_m, sin_t * self.sin_m, name='cos_mt')

        cos_mt = tf.where(cos_t > self.th, cos_mt, cos_t - self.mm)

        mask = tf.one_hot(tf.cast(labels, tf.int32), depth=self.num_classes,
                          name='one_hot_mask')

        logists = tf.where(mask == 1., cos_mt, cos_t)
        logists = tf.multiply(logists, self.logist_scale, 'arcface_logist')

        return logists

In [16]:
def get_val_pair(path, name):
    carray = bcolz.carray(rootdir=os.path.join(path, name), mode='r')
    issame = np.load('{}/{}_list.npy'.format(path, name))

    return carray, issame

In [17]:
def get_val_data(data_path):
    """get validation data"""
    lfw, lfw_issame = get_val_pair(data_path, 'lfw_align_112/lfw')
    agedb_30, agedb_30_issame = get_val_pair(data_path,'agedb_align_112/agedb_30')

    return lfw, lfw_issame, agedb_30, agedb_30_issame

In [18]:
def calculate_accuracy(threshold, dist, actual_issame):
    predict_issame = np.less(dist, threshold)
    tp = np.sum(np.logical_and(predict_issame, actual_issame))
    fp = np.sum(np.logical_and(predict_issame, np.logical_not(actual_issame)))
    tn = np.sum(np.logical_and(np.logical_not(predict_issame),
                               np.logical_not(actual_issame)))
    fn = np.sum(np.logical_and(np.logical_not(predict_issame), actual_issame))

    tpr = 0 if (tp + fn == 0) else float(tp) / float(tp + fn)
    fpr = 0 if (fp + tn == 0) else float(fp) / float(fp + tn)
    acc = float(tp + tn) / dist.size
    return tpr, fpr, acc

In [19]:
def calculate_roc(thresholds, embeddings1, embeddings2, actual_issame,nrof_folds=10):
    assert (embeddings1.shape[0] == embeddings2.shape[0])
    assert (embeddings1.shape[1] == embeddings2.shape[1])
    nrof_pairs = min(len(actual_issame), embeddings1.shape[0])
    nrof_thresholds = len(thresholds)
    k_fold = KFold(n_splits=nrof_folds, shuffle=False)

    tprs = np.zeros((nrof_folds, nrof_thresholds))
    fprs = np.zeros((nrof_folds, nrof_thresholds))
    accuracy = np.zeros((nrof_folds))
    best_thresholds = np.zeros((nrof_folds))
    indices = np.arange(nrof_pairs)

    diff = np.subtract(embeddings1, embeddings2)
    dist = np.sum(np.square(diff), 1)

    for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)):
        # Find the best threshold for the fold
        acc_train = np.zeros((nrof_thresholds))
        for threshold_idx, threshold in enumerate(thresholds):
            _, _, acc_train[threshold_idx] = calculate_accuracy( threshold, dist[train_set], actual_issame[train_set])
        best_threshold_index = np.argmax(acc_train)

        best_thresholds[fold_idx] = thresholds[best_threshold_index]
        for threshold_idx, threshold in enumerate(thresholds):
            tprs[fold_idx, threshold_idx], fprs[fold_idx, threshold_idx], _ =  calculate_accuracy(threshold, dist[test_set], actual_issame[test_set])
        _, _, accuracy[fold_idx] = calculate_accuracy( thresholds[best_threshold_index], dist[test_set], actual_issame[test_set])

    tpr = np.mean(tprs, 0)
    fpr = np.mean(fprs, 0)
    return tpr, fpr, accuracy, best_thresholds

In [20]:
def evaluate(embeddings, actual_issame, nrof_folds=10):
    # Calculate evaluation metrics
    thresholds = np.arange(0, 4, 0.01)
    embeddings1 = embeddings[0::2]
    embeddings2 = embeddings[1::2]
    tpr, fpr, accuracy, best_thresholds = calculate_roc(thresholds, embeddings1, embeddings2, np.asarray(actual_issame),nrof_folds=nrof_folds)

    return tpr, fpr, accuracy, best_thresholds


In [21]:
def l2_norm(x, axis=1):
    """l2 norm"""
    norm = np.linalg.norm(x, axis=axis, keepdims=True)
    output = x / norm

    return output

In [22]:
def hflip_batch(imgs):
    assert len(imgs.shape) == 4
    return imgs[:, :, ::-1, :]

In [23]:
def perform_val(embedding_size, batch_size, model, carray, issame, nrof_folds=10, is_ccrop=False, is_flip=True):
    """perform val"""
    embeddings = np.zeros([len(carray), embedding_size])

    for idx in tqdm.tqdm(range(0, len(carray), batch_size)):
        batch = carray[idx:idx + batch_size]
        batch = np.transpose(batch, [0, 2, 3, 1]) * 0.5 + 0.5
        if is_ccrop:
            batch = ccrop_batch(batch)
        if is_flip:
            fliped = hflip_batch(batch)
            emb_batch = model(batch) + model(fliped)
            embeddings[idx:idx + batch_size] = l2_norm(emb_batch)
        else:
            batch = ccrop_batch(batch)
            emb_batch = model(batch)
            embeddings[idx:idx + batch_size] = l2_norm(emb_batch)

    tpr, fpr, accuracy, best_thresholds = evaluate(embeddings, issame, nrof_folds)

    return accuracy.mean(), best_thresholds.mean()

In [None]:
model = ArcFaceModel(size=input_size,channels=3, num_classes=num_classes, name='arcface_model', margin=0.5, logist_scale=64, embd_shape=embd_shape, head_type=head_type, backbone_type = backbone_type,
                 w_decay=w_decay, use_pretrain = True, training=False)

model.summary(line_length=80)

In [None]:
ckpt_path = tf.train.latest_checkpoint('./ckpt/ckpt2')
print("[*] load ckpt from {}".format(ckpt_path))
model.load_weights(ckpt_path)


In [29]:
ckpt_path = tf.train.latest_checkpoint('./ckpt/ckpt2')
print("[*] load ckpt from {}".format(ckpt_path))
model.load_weights(ckpt_path)

[*] load ckpt from ./ckpt/ckpt2/e_5_b_18536.ckpt


<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f3b400d8a90>

In [31]:
if img_path:
    print("[*] Encode {} to ./output_embeds.npy".format(img_path))
    img = cv2.imread(img_path)
    img = cv2.resize(img, (input_size, input_size))
    img = img.astype(np.float32) / 255.
    if len(img.shape) == 3:
        img = np.expand_dims(img, 0)
        x = model(img)
    np.save('./output_embeds.npy', embeds)

else:
    print("[*] Loading LFW, AgeDB30 and CFP-FP...")
    lfw, lfw_issame, agedb_30, agedb_30_issame= get_val_data(test_dataset)
    

    print("[*] Perform Evaluation on LFW...")
    acc_lfw, best_th = perform_val(embd_shape, batch_size, model, lfw, lfw_issame,is_ccrop=False)
    print("    acc {:.4f}, th: {:.2f}".format(acc_lfw, best_th))
    
    print("[*] Perform Evaluation on AgeDB30...")
    acc_agedb30, best_th = perform_val(embd_shape, batch_size, model, agedb_30, agedb_30_issame, is_ccrop=False)
    print("    acc {:.4f}, th: {:.2f}".format(acc_agedb30, best_th))


[*] Encode ./data/badboy1.jpg to ./output_embeds.npy
