In [17]:
from face_wrapper import FaceWrapper, FaceProcessor
from glob import glob
import torch
import numpy as np
from tqdm import tqdm
from queue import Queue
import random
from pydantic import BaseModel
import datetime

FACES_THRESHOLD = 0.70

class SearchResult(BaseModel):
    id: str
    format: str
    image_path: str
    timestamp: float
    score: float
    date: datetime.date
    location: str

class FaceClassifier(object):
    def __init__(self, images, filter_height: int = 60, filter_width: int = 60):
        self.face_processor = FaceProcessor(images)
        self.face_processor.filter(filter_height, filter_width)
        self.face_processor.calculate_embeddings()

        self.embeddings = self.face_processor.embeddings
        self.clusters = self.cluster_faces()
        self.clusters = sorted(self.clusters, key=lambda l: (len(l), l), reverse=True)
    
    def cosine(self, embedding1: np.ndarray, embedding2: np.ndarray):
        return torch.cosine_similarity(
            torch.tensor(embedding1).unsqueeze(0),
            torch.tensor(embedding2).unsqueeze(0)
        ).item()

    def cluster_faces(self):
        """
        THis function is written by OJU
        """
        list_of_clusters=[]
        dic_of_elements_covered={}
        graph={}

        for emb in tqdm(range(len(self.embeddings)), desc='generating face graph'):
            graph[emb] = []

            for y in range(emb+1, len(self.embeddings)):
                if self.cosine(self.embeddings[emb], self.embeddings[y]) >= FACES_THRESHOLD:
                    graph[emb].append(y)

        for emb in tqdm(range(len(self.embeddings)), desc='creating clusters'):
            lst=[]
            if emb in dic_of_elements_covered: continue
            
            q=Queue()
            q.put(emb)

            while (not q.empty()):
                curr=q.get()
                lst.append(curr)
                dic_of_elements_covered[curr]=None
                for x in graph[curr]:
                    if x in dic_of_elements_covered: continue
                    q.put(x)

            list_of_clusters.append(lst)
        
        return list_of_clusters

    def fetch_top_faces(self, num_faces: int = 10):
        clusters = self.clusters[:num_faces]

        for cluster in clusters:
            index = random.choice(cluster)
            path = self.face_processor.filepaths[index]

            yield path

face_classifier = FaceClassifier(glob("images/FACE*.jpeg"))

100%|██████████| 4518/4518 [00:00<00:00, 5427.94it/s]
4518it [00:00, 500934.88it/s]
100%|██████████| 1113/1113 [00:01<00:00, 654.41it/s]
100%|██████████| 1113/1113 [00:00<00:00, 1517.39it/s]




generating face graph: 100%|██████████| 1113/1113 [00:41<00:00, 27.03it/s] 
creating clusters: 100%|██████████| 1113/1113 [00:00<00:00, 63816.77it/s]


In [19]:
list(face_classifier.fetch_top_faces())

['images\\FACE-IcrbM1l_BoI-21b44be0-8df5-4c65-b7b8-22e424fc6ad9-0.jpeg',
 'images\\FACE-IcrbM1l_BoI-f55a26f8-0094-425f-9ac6-5a25633549d9-0.jpeg',
 'images\\FACE-a7GITgqwDVg-eaac8895-8e88-4c86-af83-889bf9bfe50b-0.jpeg',
 'images\\FACE-_GgIt2EFHV8-becd620f-e470-4e97-8b45-e92be4eb1e0a-0.jpeg',
 'images\\FACE-_GgIt2EFHV8-2c5b4bfb-1833-4e0d-bad0-d0a2f8575686-0.jpeg',
 'images\\FACE-e-ORhEE9VVg-263bf4ba-9c88-4c42-8103-c30d64b9bcbc-0.jpeg',
 'images\\FACE-e-ORhEE9VVg-d1eefd45-7f14-4009-b0c2-7687947e444c-1.jpeg',
 'images\\FACE-e-ORhEE9VVg-e52db5cf-b528-4405-8132-a05b7d9fdd4f-0.jpeg',
 'images\\FACE-000000092091-5674ab4a-af19-4e50-94da-4341b49808e4-0.jpeg',
 'images\\FACE-a7GITgqwDVg-656485eb-cc2f-4117-9560-e0be2e313e5f-0.jpeg']