In [1]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training
import torch
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from torch import optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
import math

import numpy as np
import os

from sklearn.neighbors import NearestNeighbors

In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Running on device: {}'.format(device))

Running on device: cuda:0


In [3]:
def collate_fn(x):
    return x[0]
workers = 0 if os.name == 'nt' else 4

toy_dataset = datasets.ImageFolder('../data/test_images')
toy_dataset.idx_to_class = {i:c for c, i in toy_dataset.class_to_idx.items()}
toy_loader = DataLoader(toy_dataset, collate_fn=collate_fn, num_workers=workers)

dataset = datasets.ImageFolder('../data/VGGFace2/train_cropped_split')
dataset.idx_to_class = {i:c for c, i in dataset.class_to_idx.items()}
labels = np.array([j for i,j in dataset.imgs])
dataset.class_to_instances = {class_idx : np.where(labels == class_idx)[0] for class_idx in dataset.idx_to_class.keys()}
loader = DataLoader(dataset, collate_fn=collate_fn, num_workers=workers)

In [4]:
initial_database = {}
labels = np.array([j for i,j in dataset.imgs])
for _class in dataset.classes[:2000]:
    initial_database[dataset.class_to_idx[_class]] = np.where(labels==dataset.class_to_idx[_class])[0][0]
    
all_classes = np.array(list(dataset.idx_to_class.keys()))
imp_classes = np.array(list(initial_database.keys()))
fraud_classes = np.array(all_classes[len(imp_classes):])

In [5]:
resnet = InceptionResnetV1(pretrained='vggface2').eval().to(device)

In [17]:
class BiometricSystem():
    def __init__(self, database, vgg_dataset, model=None, mtcnn=None, threshold=0.5):
        ''' Database format:
            Dictionary from class_idx to the sample index in vgg_dataset
        '''
        self.database = database
        self.classes = database.keys()
        self.vgg_dataset = vgg_dataset
#         if vgg_dataset==None:
#             self.vgg_dataset = datasets.ImageFolder('../data/VGGFace2/train_cropped')
#         else:
#             self.vgg_dataset = vgg_dataset
#         if mtcnn:
#             self.mtcnn = mtcnn
#         else:
#             self.mtcnn = MTCNN(
#                     image_size=160, margin=0, min_face_size=20,
#                     thresholds=[0.6, 0.7, 0.7], factor=0.709, post_process=True,
#                     device=device
        if model:
            self.model = model
        else:
            self.model = InceptionResnetV1(pretrained='vggface2').eval().to(device)
        self.trans = transforms.Compose([
            np.float32,
            transforms.ToTensor(),
            fixed_image_standardization
        ])
        self.threshold = threshold
        
    def checkfaces(self, query_refs, thresh=0.8):
        ''' List of queries for one day
        Get a query with the vgg_sample_idx query_ids'''
        query_embeddings, support_embeddings = self.get_embeddings(query_refs)
        neigh = NearestNeighbors(1, 1)
        neigh.fit(support_embeddings)
        dists, neighs = neigh.kneighbors(query_embeddings, 1)
        neighs[dists>thresh] = -1
        return neighs.flatten()
                
    def get_embeddings(self, query_refs):
        ''' List of queries for one day
        Get a query with the vgg_sample_idx query_ids'''
        aligned = []
        classes = []
        
        n = len(query_refs)
        for query_ref in query_refs:
            img = self.trans(self.vgg_dataset.__getitem__(query_ref)[0])
            aligned.append(img)
            
        for class_id, img_ref in self.database.items():
            img = self.trans(self.vgg_dataset.__getitem__(img_ref)[0])
            aligned.append(img)
            classes.append(class_id)

        aligned = torch.stack(aligned).to(device)
        embeddings = np.zeros((len(aligned), 512))
        for i in range(0, math.ceil(len(aligned)/32)):
            start = 32*i
            end = min(32*(i+1), len(aligned))
            embeddings[start:end] = resnet(aligned[start:end]).detach().cpu()
            
        embeddings = embeddings / np.linalg.norm(embeddings, axis=-1)[:, np.newaxis]
        query_embeddings = embeddings[:n]
        support_embeddings = embeddings[n:]
        return query_embeddings, support_embeddings

In [18]:
biometricSystem = BiometricSystem(database=initial_database, model=resnet, vgg_dataset=dataset)

In [19]:
# biometricSystem.checkfaces([0,1,2], [0,1,2])

In [20]:
query_ids = [0, 1, 2]
trans = transforms.Compose([
            np.float32,
            transforms.ToTensor(),
            fixed_image_standardization
        ])
aligned = []
n = len(query_ids)
for query_id in query_ids:
    img = trans(dataset.__getitem__(query_id)[0])
    aligned.append(img)
aligned = torch.stack(aligned).to(device)
resnet(aligned).detach().cpu().shape

torch.Size([3, 512])

In [49]:
num_query = 2000
fraud_ratio = 0.1

fraud = np.random.rand(num_query) < fraud_ratio
labels = np.random.choice(imp_classes, num_query)
labels[fraud] = np.random.choice(fraud_classes, len(labels[fraud]))
# for i in np.where(fraud)[0]:
#     label = labels[i]
#     newlabel = np.random.choice(all_classes)
#     while(newlabel == label):
#         newlabel = np.random.choice(all_classes)
#     labels[i] = np.random.choice(fraud_classes)


In [50]:
query_ids = [np.random.choice(dataset.class_to_instances[label]) for label in labels]
query_ids = np.array(query_ids)


In [51]:
pred = biometricSystem.checkfaces(query_ids)

In [52]:
_tp = np.logical_and(pred == labels, pred >= 0)
_fp = np.logical_and(pred != labels, pred >= 0)
_tn = np.logical_and(labels > len(imp_classes)-1, pred < 0)
_fn = np.logical_and(labels <= len(imp_classes)-1, pred < 0)
tp = np.count_nonzero(_tp)
fp = np.count_nonzero(_fp)
tn = np.count_nonzero(_tn)
fn = np.count_nonzero(_fn)

assert(tp+fp+tn+fn == num_query)

print(f"tp = {100*tp/num_query}%")
print(f"fp = {100*fp/num_query}%")
print(f"tn = {100*tn/num_query}%")
print(f"fn = {100*fn/num_query}%")


tp = 59.8%
fp = 0.85%
tn = 9.3%
fn = 30.05%


In [40]:
#ASSUMPTION: All important classes are first k classes of all_classes

num_query = 2000
fraud_ratio = 0.1
num_day = 2

num_query_total = num_query*num_day
tp = fp = tn = fn = 0

biometricSystem = BiometricSystem(database=initial_database, model=resnet, vgg_dataset=dataset)

for i in range(num_day):
    fraud = np.random.rand(num_query) < fraud_ratio
    labels = np.random.choice(imp_classes, num_query)
    labels[fraud] = np.random.choice(fraud_classes, len(labels[fraud]))
        
    query_ids = [np.random.choice(dataset.class_to_instances[label]) for label in labels]
    query_ids = np.array(query_ids)
    pred = biometricSystem.checkfaces(query_ids)
    
    _tp = np.logical_and(pred == labels, pred >= 0)
    _fp = np.logical_and(pred != labels, pred >= 0)
    _tn = np.logical_and(labels > len(imp_classes)-1, pred < 0)
    _fn = np.logical_and(labels <= len(imp_classes)-1, pred < 0)
    tp += np.count_nonzero(_tp)
    fp += np.count_nonzero(_fp)
    tn += np.count_nonzero(_tn)
    fn += np.count_nonzero(_fn)
    
assert(tp+fp+tn+fn == num_query*num_day)    
print(f"tp = {100*tp/num_query_total}%")
print(f"fp = {100*fp/num_query_total}%")
print(f"tn = {100*tn/num_query_total}%")
print(f"fn = {100*fn/num_query_total}%")

tp = 60.225%
fp = 0.75%
tn = 9.325%
fn = 29.7%


array([    0,     0,     0, ...,     0, -1553,     0])

In [None]:
class BiometricDataset(Dataset):
#     """
#     A customized data loader for the "Security Sytem trainer"
#     """  
    def __init__(self, database, vgg_dataset=None):
        if vgg_dataset==None:
            self.vgg_dataset = datasets.ImageFolder('../data/VGGFace2/train_cropped')
        else:
            self.vgg_dataset = vgg_dataset
        self.samples = np.array(list(database.values()))

    def __getitem__(self, index):
        """ Get a sample from the dataset
        """
        return self.vgg_dataset[self.samples[index]]
    
    def add(self, item):
        self.samples.append(item.value())
    
    def __len__(self):
        return len(self.samples)
    


# class BiometricDataset(Dataset):
#     """
#     A customized data loader for the "Security Sytem trainer"
#     """
#     def __init__(self, database, vgg_dataset=None):
#         if vgg_dataset==None:
#             self.vgg_dataset = datasets.ImageFolder('../data/VGGFace2/train_cropped')
#         else:
#             self.vgg_dataset = vgg_dataset
#         self.class_to_idx = self.vgg_dataset.class_to_idx
#         self.idx_to_class = {i:c for c, i in self.class_to_idx.items()}
        
#         self.database = {label: database[label] for label in database.keys()}
#         self.samples = [(val, label) for label, val in initial_database.items()]
        
#     def __getitem__(self, index):
#         """ Get a sample from the dataset
#         """
#         return self.vgg_dataset.__getitem__(self.samples[index][0])
    
#     def add(self, item):
#         """ Add item of the form (vgg_idx, class_idx)
#         """
#         self.samples.append(item)
    
#     def __len__(self):
#         """
#         Total number of samples in the dataset
#         """
#         return len(self.idx_to_label)


In [None]:
# dataset.samples
for i in range(len(biometricDataset)):
    print(biometricDataset[i])

In [None]:
biometricDataset = BiometricDataset(initial_database, dataset)

In [None]:
biometricDataset[0]

In [None]:
k = 3
print(dataset.idx_to_class[biometricDataset.__getitem__(k)[1]])
biometricDataset.__getitem__(k)[0]

In [None]:
biometricDataset.add((1,0))

In [None]:
k = 300
print(dataset.idx_to_class[biometricDataset.__getitem__(k)[1]])
biometricDataset.__getitem__(k)[0]

In [None]:
initial_database

In [53]:
np.array([1,2,3]).mean()

2.0