In [None]:
import sys
import os

# Adiciona a pasta principal ao sys.path para podermos importar modulos de myutils
main_path = os.path.abspath(os.path.join(os.getcwd(), '..'))
if main_path not in sys.path:
    sys.path.append(main_path)

# silenciar avisos do TF

os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'
os.environ['TF_CPP_MIN_VLOG_LEVEL'] = '0'
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ["KERAS_BACKEND"] = "tensorflow"
# os.environ['CUDA_VISIBLE_DEVICES'] = ''

import tensorflow as tf
from tensorflow import keras as K
import numpy as np
import pandas as pd
import glob
import cv2
import albumentations as A
import matplotlib.pyplot as plt
import imtools
from sklearn.metrics import roc_auc_score, average_precision_score, roc_curve
# import data_wrangling as dw
import importlib
from absl import logging

# Define o nível de logging para silenciar os avisos do TF
tf.get_logger().setLevel('ERROR')
logging.set_verbosity(logging.ERROR)

device = tf.config.list_physical_devices('GPU')[0]
tf.config.experimental.set_memory_growth(device, True)

# CelebA dataset

a partir dos metadados de identidade, extraimos um dataframe com [path, id], onde cada id pertende a uma pessoa única

In [None]:
anno_dir = "../data/celebA/identity_CelebA.txt"

df = pd.read_csv(anno_dir, sep=' ', header=None, names=['path', 'id']).sort_values('id').reset_index(drop=True)
df

In [None]:
# visualizando todas as fotos de um id aleatório
id = np.random.choice(np.arange(1, 10178), 1)[0]
random_person = df[df['id'] == id]
imgs = []
for i, row in random_person.iterrows():
    path = os.path.join("../data/celebA/images/", row['path'])
    imgs.append(imtools.load_image(path))

print(id)
imtools.plot_grid(imgs, scale=3)

## Split data

In [None]:
from sklearn.model_selection import train_test_split


unique_ids = df['id'].unique()
train_ids, test_ids = train_test_split(unique_ids, test_size=0.001, random_state=42)

train_df = df[df['id'].isin(train_ids)].reset_index(drop=True)
test_df = df[df['id'].isin(test_ids)].reset_index(drop=True)

# Augmentation

In [None]:
transform_train = A.Compose([    
    A.HorizontalFlip(p=0.5),
    A.Affine(scale=(0.75, 1.2), translate_percent=(-0.1, 0.1), rotate=(-15, 15), shear=(-10, 10), border_mode=1, p=0.8),
    A.RandomOrder([        
        A.CLAHE(),
        A.RandomBrightnessContrast(),
        A.GaussianBlur(3),
        A.GaussNoise(std_range=(0.005, 0.05)),
        A.OneOf([
            A.ToGray(),
            A.ToSepia(),            
        ]),
        A.OneOf([
            A.RGBShift((-20, 20), (-20, 20), (-20, 20)),
            A.ColorJitter(brightness=(0.95, 1.05), contrast=(0.95, 1.05), saturation=(0.95, 1.05), hue=(-0.05, 0.05))
        ]),
        A.GridDistortion(
            num_steps=3,
            distort_limit=[-0.1, 0.1],
            interpolation=cv2.INTER_LINEAR,
            normalized=True,
            mask_interpolation=cv2.INTER_NEAREST,
        ),
        A.OpticalDistortion(distort_limit=(-0.2, 0.2)),
        A.Perspective(scale=(0.01, 0.08)),
        A.Posterize(num_bits=(4, 7)),
        A.Defocus(radius=(1, 3)) 
    ], n=4),
    
])

transform_test = A.Compose([
    A.RandomOrder([
        A.HorizontalFlip(p=0.5),
        A.Affine(scale=(0.85, 1.1), translate_percent=(-0.1, 0.1), rotate=(-15, 15), shear=(-5, 5), border_mode=1, p=0.8),
        A.RGBShift((-10, 10), (-10, 10), (-10, 10)),
        A.RandomBrightnessContrast(brightness_limit=(-0.1, 0.1), contrast_limit=(-0.1, 0.1), p=0.75),
        A.OpticalDistortion(distort_limit=(-0.1, 0.1), p=0.75),
        A.Perspective(scale=(0.01, 0.05), p=0.75),        
    ], n=4)
])

augment_train = lambda img: transform_train(image=img)['image']
augment_test = lambda img: transform_test(image=img)['image']


######################
# Avaliando a qualidade da transformada
id = np.random.choice(np.arange(1, 10178), 1)[0]
random_person = df[df['id'] == id]
imgs = []
imgs_auged = []
for i, row in random_person.iterrows():
    path = os.path.join("../data/celebA/images/", row['path'])    
    img = imtools.load_image(path)
    imgs.append(img)
    imgs_auged.append(augment_test(img))

print(id)
n = len(imgs)
if n > 5:
    n = 5
imtools.plot_images(imgs[:n-1], scale=3)
imtools.plot_images(imgs_auged[:n-1], scale=3)

In [None]:
# triplet dataset
def get_dataset(anno_df, batch_size, kshots=1, img_size=224, transform=None, drop=False, shuffle=False):    
    def gen():        
        while True:
            positive_idx, negative_idx = np.random.choice(anno_df['id'], 2, replace=False)
            
            # path | id df
            positive_obs = anno_df[anno_df['id'] == positive_idx]
            negative_obs = anno_df[anno_df['id'] == negative_idx]

            # sortear K-shot dos suportes triplet
            positive_obs = positive_obs.sample(kshots+1, replace=True) # query é a amostra +1
            negative_obs = negative_obs.sample(kshots, replace=True)

            # separar query de positives
            query_obs = positive_obs.iloc[-1]
            positive_obs = positive_obs.iloc[:-1]
                        
            # carregar e guardar imagens em listas
            positive_imgs = []
            negative_imgs = []
            query_imgs = []
            for i, row in positive_obs.iterrows():
                path = os.path.join("../data/celebA/images/", row['path'])
                positive_imgs.append(imtools.load_image(path, size=(img_size, img_size)))
            for i, row in negative_obs.iterrows():
                path = os.path.join("../data/celebA/images/", row['path'])
                negative_imgs.append(imtools.load_image(path, size=(img_size, img_size)))

            # só há um query que será multiplicado com augmentation por Kshots
            path = os.path.join("../data/celebA/images/", query_obs['path'])
            query_img = imtools.load_image(path, size=(img_size, img_size))

            # augmentation
            if transform is not None:
                positive_imgs = [transform(img) for img in positive_imgs]
                negative_imgs = [transform(img) for img in negative_imgs]
                query_imgs = [transform(query_img) for _ in range(kshots)] # multiplicando query com augmentation
            else: # repetir o query para preservar compatibilidade do output_signature
                query_imgs = [query_img for _ in range(kshots)]

            # to tensor
            p_inputs = np.array(positive_imgs).astype('float32')
            n_inputs = np.array(negative_imgs).astype('float32')
            q_inputs = np.array(query_imgs).astype('float32')
            
            yield p_inputs, n_inputs, q_inputs
    
    output_signature = (
        tf.TensorSpec(shape=(kshots, img_size, img_size, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(kshots, img_size, img_size, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(kshots, img_size, img_size, 3), dtype=tf.float32)
    )

    ds = tf.data.Dataset.from_generator(gen, output_signature=output_signature)
    ds = ds.shuffle(batch_size*8) if shuffle else ds
    ds = ds.batch(batch_size, drop_remainder=drop)
    ds = ds.prefetch(20)
    return ds

In [None]:
BATCH_SIZE = 16
KSHOTS = 5
train_ds = get_dataset(anno_df=train_df,
                       batch_size=BATCH_SIZE,
                       kshots=KSHOTS,
                       img_size=224,
                       transform=augment_train,
                       drop=True,
                       shuffle=False)

test_ds = get_dataset(anno_df=test_df,
                       batch_size=BATCH_SIZE,
                       kshots=KSHOTS,
                       img_size=224,
                       transform=augment_test,
                       drop=True,
                       shuffle=False)

In [None]:
for P, N, Q in test_ds:
    break

print(P.shape, N.shape, Q.shape)

imtools.plot_images(P[0,].numpy().astype('uint8'), scale=3)
imtools.plot_images(N[0,].numpy().astype('uint8'), scale=3)
imtools.plot_images(Q[0,].numpy().astype('uint8'), scale=3)

## FaceModel

In [None]:
@K.utils.register_keras_serializable(package='Custom')
def l2norm(x):
    return tf.nn.l2_normalize(x, axis=1)


@K.utils.register_keras_serializable(package='Custom')
def build_feature_extractor(input_shape, backbone, dim=512):
    def self_attention(layer,):
        x = K.layers.Dropout(0.5)(layer)
        x = K.layers.Dense(dim // 2, activation='gelu')(x)
        att_mask = K.layers.Dense(dim, activation='sigmoid')(x)

        x = K.layers.Dropout(0.5)(layer)
        x = K.layers.Dense(dim)(x)

        return x * att_mask
    
    backbone.trainable = False
    
    inputs = K.layers.Input(shape=input_shape)

    # feature extraction
    embeddings = backbone(inputs, training=False)
    
    # vectorize
    x = K.layers.GlobalAveragePooling2D()(embeddings)
    x = self_attention(x)
    outputs = K.layers.Lambda(l2norm)(x)

    feature_extractor = K.Model(inputs, outputs)
    return feature_extractor


@K.utils.register_keras_serializable(package='Custom')
class FaceModel(K.Model):
    def __init__(self, feature_extractor, dim=512, margin=0.1, **kwargs):
        super().__init__(**kwargs)
        self.feature_extractor=feature_extractor
        self.margin=margin
        self.dim=dim
        self.triplet_loss_tracker=K.metrics.Mean(name='loss')

    @property
    def metrics(self):
        return [
            self.triplet_loss_tracker,
        ]
    
    def compute_loss(self, p, n, q):
        "soft margin semi-hard negatives"

        p_dist = tf.reduce_sum(tf.square(p - q), axis=1) # (batch, dim) -> (batch,)
        n_dist = tf.reduce_sum(tf.square(n - q), axis=1)

        # dp < dn < dp + margin
        # mask = tf.logical_and(p_dist < n_dist, n_dist < (p_dist + self.margin)) # (batch, 1) bool

        diff = p_dist - n_dist # (batch,)
        soft_loss = tf.math.log1p(tf.exp(diff)) # (batch,)
        loss = soft_loss

        # manter apenas as distâncias moderadas
        # loss = tf.boolean_mask(soft_loss, mask) # (batch,)
        return tf.reduce_mean(loss) # em caso de mask ser all False TODO: VERIFICAR MÉDIA COM MASK 

    def train_step(self, data):
        with K.backend.name_scope('train'):                    
            p_episodes, n_episodes, q_episodes = data                                         
            num_episodes = p_episodes.shape[0] # equivalente ao batch_size            
                        
            with tf.GradientTape() as tape:
                # (batch, dim) vai armazenar todas as inferências de cada classe 
                # Criando TensorArrays dinâmicos
                p_batch_pred = tf.TensorArray(tf.float32, size=num_episodes)
                n_batch_pred = tf.TensorArray(tf.float32, size=num_episodes)
                q_batch_pred = tf.TensorArray(tf.float32, size=num_episodes)
                for episode in range(num_episodes): # (# batch iterações)
                    p_inputs, n_inputs, q_inputs = p_episodes[episode], n_episodes[episode], q_episodes[episode]

                    # transforma todas as entradas em um único batch de (3*kshots, H, W, 3)
                    inputs = tf.concat([p_inputs, n_inputs, q_inputs], axis=0) # (P + N + Q)
                    kshots = inputs.shape[0] // 3

                    # extração de todos os vetores
                    embeddings = self.feature_extractor(inputs)
                    
                    # split de cada classe
                    p_embeddings = embeddings[0:kshots]
                    n_embeddings = embeddings[kshots:2*kshots]
                    q_embeddings = embeddings[-kshots:]

                    # médias do K-shot
                    p_vector = tf.reduce_mean(p_embeddings, axis=0)
                    n_vector = tf.reduce_mean(n_embeddings, axis=0)
                    q_vector = tf.reduce_mean(q_embeddings, axis=0)

                    # preencher os tensores dinâmicos
                    p_batch_pred = p_batch_pred.write(episode, p_vector)
                    n_batch_pred = n_batch_pred.write(episode, n_vector)
                    q_batch_pred = q_batch_pred.write(episode, q_vector)
                    
                # após todas as inferências, recuperar o valor final (Batch, dim)
                p_batch_pred = p_batch_pred.stack()
                n_batch_pred = n_batch_pred.stack()
                q_batch_pred = q_batch_pred.stack()
                
                # soft margin semi-hard triplet loss
                loss = self.compute_loss(p_batch_pred, n_batch_pred, q_batch_pred)

            grads = tape.gradient(loss, self.feature_extractor.trainable_weights)
            self.optimizer.apply_gradients(zip(grads, self.feature_extractor.trainable_weights))

            # atualizar trackers
            self.triplet_loss_tracker.update_state(loss)

        return {m.name: m.result() for m in self.metrics}
    
                
    def get_config(self):
        config = super().get_config()
        config.update({
            "feature_extractor": K.utils.serialize_keras_object(self.feature_extractor),
            "margin": self.margin,
            "dim": self.dim
        })
        return config

    @classmethod
    def from_config(cls, config):
        fe_ser = config.pop("feature_extractor")        
        feature_extractor = K.utils.deserialize_keras_object(fe_ser)
        
        margin = config.pop("margin", 0.2)
        dim = config.pop("dim", 512)

        return cls(feature_extractor=feature_extractor, margin=margin, dim=dim, **config)


# ----------------------------
# Build model
# ----------------------------
backbone = K.applications.EfficientNetB0(
    include_top=False,    
    weights='imagenet',
    input_shape=(224, 224, 3)
)
feature_extractor = build_feature_extractor((224, 224, 3), backbone, dim=512)
model = FaceModel(feature_extractor, dim=512, margin=0.2)
model.compile(optimizer=K.optimizers.Adam(1e-3))

## callbacks

In [None]:
def compute_metrics(p, n, q):
    p_dist = tf.reduce_sum(tf.square(p - q), axis=1) # (batch, dim) -> (batch,)
    n_dist = tf.reduce_sum(tf.square(n - q), axis=1)

    # distâncias negativas para melhor interpretabilidade das métricas
    # "quanto maior -d, mais parecido com o positivo"
    scores = -np.concatenate([p_dist, n_dist])
    labels = np.concatenate([np.ones_like(p_dist), np.zeros_like(n_dist)])
    
    auroc = roc_auc_score(labels, scores)           # ROC-AUC
    aupr  = average_precision_score(labels, scores) # PR-AUC

    # Youlden's J threshold
    fpr, tpr, thresholds = roc_curve(labels, scores)
    J = tpr - fpr
    idx = np.argmax(J)
    J_threshold = thresholds[idx]

    return auroc, aupr, fpr, tpr, J_threshold

class VerificationMetrics(K.callbacks.Callback):
    def __init__(self, val_ds, compute_fn, steps=10, margin=0.1):
        super().__init__()
        self.val_ds = val_ds        
        self.compute_fn = compute_fn
        self.steps = steps
        self.margin = margin
        self.loss_tracker = K.metrics.Mean(name='loss')
        self.auroc_tracker = K.metrics.Mean(name='auroc')
        self.aupr_tracker = K.metrics.Mean(name='aupr')
        self.threshold_tracker = K.metrics.Mean(name='threshold')

    def compute_loss(self, p, n, q):
        "soft margin semi-hard negatives"

        p_dist = tf.reduce_sum(tf.square(p - q), axis=1) # (batch, dim) -> (batch,)
        n_dist = tf.reduce_sum(tf.square(n - q), axis=1)

        # dp < dn < dp + margin
        # mask = tf.logical_and(p_dist < n_dist, n_dist < (p_dist + self.margin)) # (batch, 1) bool

        diff = p_dist - n_dist # (batch,)
        soft_loss = tf.math.log1p(tf.exp(diff)) # (batch,)
        loss = soft_loss

        # manter apenas as distâncias moderadas
        # loss = tf.boolean_mask(soft_loss, mask) # (batch,)
        return tf.reduce_mean(loss) # em caso de mask ser all False TODO: VERIFICAR MÉDIA COM MASK

    def on_epoch_end(self, epoch, logs=None):
        counter = 0
        for p_episodes, n_episodes, q_episodes in self.val_ds:
            num_episodes = p_episodes.shape[0]
            p_batch_pred = tf.TensorArray(tf.float32, size=num_episodes)
            n_batch_pred = tf.TensorArray(tf.float32, size=num_episodes)
            q_batch_pred = tf.TensorArray(tf.float32, size=num_episodes)
            for episode in range(num_episodes): # (# batch iterações)
                p_inputs, n_inputs, q_inputs = p_episodes[episode], n_episodes[episode], q_episodes[episode]

                # transforma todas as entradas em um único batch de (3*kshots, H, W, 3)
                inputs = tf.concat([p_inputs, n_inputs, q_inputs], axis=0) # (P + N + Q)
                kshots = inputs.shape[0] // 3

                # extração de todos os vetores
                embeddings = self.model.feature_extractor(inputs)
                
                # split de cada classe
                p_embeddings = embeddings[0:kshots]
                n_embeddings = embeddings[kshots:2*kshots]
                q_embeddings = embeddings[-kshots:]

                # médias do K-shot
                p_vector = tf.reduce_mean(p_embeddings, axis=0)
                n_vector = tf.reduce_mean(n_embeddings, axis=0)
                q_vector = tf.reduce_mean(q_embeddings, axis=0)

                # preencher os tensores dinâmicos
                p_batch_pred = p_batch_pred.write(episode, p_vector)
                n_batch_pred = n_batch_pred.write(episode, n_vector)
                q_batch_pred = q_batch_pred.write(episode, q_vector)
                
            # após todas as inferências, recuperar o valor final (Batch, dim)
            p_batch_pred = p_batch_pred.stack()
            n_batch_pred = n_batch_pred.stack()
            q_batch_pred = q_batch_pred.stack()
            
            # soft margin semi-hard triplet loss
            loss = self.compute_loss(p_batch_pred, n_batch_pred, q_batch_pred)

            auroc, aupr, fpr, tpr, J_threshold = self.compute_fn(p_batch_pred, n_batch_pred, q_batch_pred)

            self.loss_tracker.update_state(loss)
            self.auroc_tracker.update_state(auroc)
            self.aupr_tracker.update_state(aupr)
            self.threshold_tracker.update_state(J_threshold)

            counter += 1

            if counter == self.steps:
                break

        loss = self.loss_tracker.result()
        auroc = self.auroc_tracker.result()
        aupr = self.aupr_tracker.result()
        J_threshold = self.threshold_tracker.result()

        # print(f" | Epoch {epoch+1} | AUROC: {auroc:.4f} | AUPR: {aupr:.4f} | J_th: {J_threshold:.4f}")
        logs = logs or {}
        logs['val_loss'] = loss.numpy()
        logs['val_auroc'] = auroc.numpy()
        logs['val_aupr'] = aupr.numpy()
        logs['val_J_threshold'] = J_threshold.numpy()

callbacks = [
    VerificationMetrics(test_ds, compute_metrics, steps=2, margin=0.1)
]    

In [None]:
model.save('models/model_v1.keras')
model = K.models.load_model('models/model_v1.keras')

In [None]:
history = model.fit(
    train_ds,    
    epochs=3,
    steps_per_epoch=100,
    callbacks=callbacks
)
history = pd.DataFrame(history.history)

In [None]:
a = tf.TensorArray(size=0, dtype=tf.float32, dynamic_size=True)

a.write(0, tf.zeros((4)))
a.write(1, tf.ones((4)))

a.stack()

a = a.read()
a - a

In [None]:
for P, N, Q in train_ds:
    P, N, Q = P[0], N[0], Q[0]

    p_vector = model.feature_extractor.predict(P)
    n_vector = model.feature_extractor.predict(N)
    q_vector = model.feature_extractor.predict(Q)

    p_vector = tf.reduce_mean(p_vector, axis=0)
    n_vector = tf.reduce_mean(n_vector, axis=0)

    p_dist = tf.reduce_sum(tf.square(p_vector - q_vector), axis=1) # (batch, 512) -> (batch, 1)
    n_dist = tf.reduce_sum(tf.square(n_vector - q_vector), axis=1)

    print("-"*20)
    print(f"P dist: {p_dist} | N dist: {n_dist}")
    imtools.plot_images(P.numpy().astype('uint8'), scale=3)
    imtools.plot_images(N.numpy().astype('uint8'), scale=3)
    imtools.plot_images(Q.numpy().astype('uint8'), scale=3)
    print("-"*20)

    break

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score, average_precision_score, roc_curve

def make_scores_and_labels(p_dists, n_dists, use_exp=True):
    """
    p_dists: array-like of positive distances (Q vs same-ID P)
    n_dists: array-like of negative distances (Q vs different-ID N)
    use_exp: if True use exp(-dist) as score, else use -dist
    Returns: scores (1D), labels (1D)
    """
    p_dists = np.asarray(p_dists).ravel()
    n_dists = np.asarray(n_dists).ravel()
    dists = np.concatenate([p_dists, n_dists])
    if use_exp:
        scores = np.exp(-dists)
    else:
        scores = -dists
    labels = np.concatenate([np.ones_like(p_dists), np.zeros_like(n_dists)])
    return scores, labels

def compute_roc_pr_auc(p_dists, n_dists, use_exp=True):
    scores, labels = make_scores_and_labels(p_dists, n_dists, use_exp)
    roc_auc = roc_auc_score(labels, scores)
    pr_auc  = average_precision_score(labels, scores)  # PR AUC (average precision)
    return {'roc_auc': roc_auc, 'pr_auc': pr_auc}

def compute_eer_and_threshold(p_dists, n_dists, use_exp=True):
    scores, labels = make_scores_and_labels(p_dists, n_dists, use_exp)
    fpr, tpr, thresh = roc_curve(labels, scores)
    fnr = 1 - tpr
    # EER: point where FPR ~= FNR (pick the threshold minimizing absolute difference)
    idx = np.nanargmin(np.abs(fpr - fnr))
    eer = (fpr[idx] + fnr[idx]) / 2.0
    eer_threshold = thresh[idx]
    return {'eer': eer, 'eer_threshold': eer_threshold, 'fpr': fpr, 'tpr': tpr, 'thresholds': thresh}

def tpr_at_target_fpr(p_dists, n_dists, target_fpr=1e-3, use_exp=True):
    scores, labels = make_scores_and_labels(p_dists, n_dists, use_exp)
    fpr, tpr, thresh = roc_curve(labels, scores)
    valid = np.where(fpr <= target_fpr)[0]
    if valid.size == 0:
        return {'tpr': 0.0, 'threshold': None}
    idx = valid[-1]  # max TPR with FPR <= target
    return {'tpr': tpr[idx], 'threshold': thresh[idx], 'fpr': fpr[idx]}

###############################################
def compute_metrics(p_dists, n_dists):
    # distâncias negativas para melhor interpretabilidade das métricas
    # "quanto maior -d, mais parecido com o positivo"
    scores = -np.concatenate([p_dists, n_dists])
    labels = np.concatenate([np.ones_like(p_dists), np.zeros_like(n_dists)])
    
    auroc = roc_auc_score(labels, scores)           # ROC-AUC
    aupr  = average_precision_score(labels, scores) # PR-AUC

    # Youlden's J threshold
    fpr, tpr, thresholds = roc_curve(labels, scores)
    J = tpr - fpr
    idx = np.argmax(J)
    J_threshold = thresholds[idx]

    return auroc, aupr, fpr, tpr, J_threshold



In [None]:
# simulated dists (positives generally smaller than negatives)
p_dists = np.random.normal(0.6, 0.12, size=2000)
n_dists = np.random.normal(1.2, 0.25, size=2000)

print(compute_roc_pr_auc(p_dists, n_dists, use_exp=False))
print(compute_eer_and_threshold(p_dists, n_dists, use_exp=False)['eer'])
print(tpr_at_target_fpr(p_dists, n_dists, target_fpr=1e-3, use_exp=False))
