In [None]:
import cv2
!pip install mtcnn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mtcnn
  Downloading mtcnn-0.1.1-py3-none-any.whl (2.3 MB)
[K     |████████████████████████████████| 2.3 MB 5.6 MB/s 
Installing collected packages: mtcnn
Successfully installed mtcnn-0.1.1


In [None]:
#FACE EXTRACTION
import os
import numpy as np
import torch

from time import time
from tqdm import tqdm
from PIL import Image
from mtcnn.mtcnn import MTCNN, extract_face
#from pkg.mtcnn import MTCNN, extract_face

######################################################################

ALBUM_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'album')
FACES_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'faces')

######################################################################

def extract_album_faces():
    """ Extract all the faces from the album that have a detection probability > 99%
    and save them in a `data/faces` folder.
    """
    np.random.seed(0)
    os.makedirs(os.path.abspath(FACES_PATH), exist_ok=True)
    mtcnn = MTCNN(select_largest=False, keep_all=True, device='cpu').eval()
    for root, dirs, files in os.walk(ALBUM_PATH):
        for fname in tqdm(files, ascii=True):
            fpath = os.path.join(ALBUM_PATH, fname)
            img = Image.open(fpath)
            with torch.no_grad():
                boxes, probs = mtcnn.detect(img, landmarks=False)
            if boxes is not None:
                for box, prob in zip(boxes, probs):
                    if prob > 0.99:
                        isfile = True
                        while isfile: 
                            rand_key = np.random.randint(10**5, 10**6)
                            save_path = os.path.join(FACES_PATH, '{}.png'.format(rand_key))
                            isfile = os.path.isfile(save_path)
                        _ = extract_face(img, box, save_path=save_path)


######################################################################

if __name__ == "__main__":
    print("Extracting faces from photo album")
    t0 = time()
    extract_album_faces()
    print("Done ({:.2f}s)".format(time() - t0))

In [None]:
# VIDEO PROCESSING

# video_facenet/video.py
class VideoProcessor:
    def __init__(self, video_path, **kwargs):
        self.video_path = video_path
        self.cap = cv2.VideoCapture(video_path)
        self.id = 0
        self.data = kwargs           

    @property
    def duration(self):
        cap = self.cap
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        return frame_count/fps

    @property
    def pos(self):
        return int(self.cap.get(cv2.CAP_PROP_POS_FRAMES))

    @pos.setter
    def pos(self, pos):
        self.id = pos
        self.cap.set(cv2.CAP_PROP_POS_FRAMES, pos) 
  

    def images(self, start=0, end=None):
        self.pos = start
        success, image = self.cap.read()
        while success:
            if end is not None and self.id == end:
                yield image
                return

            yield image
            self.id += 1            
            success, image = self.cap.read()                

    def iterate(self, process, start=0, end=None):
        last = self.frame_count - 1 if end is None else end 
        for image in self.images(start=start, end=end):
            if process(image=image, pos=self.id, video=self, last=last, **self.data):
                break

In [None]:
#HDBSCAN CLUSTERING
from datetime import datetime
import umap
import joblib


import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)    
    import hdbscan

from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator

from .files import file_name


def cluster_faces_pipeline(reduce):
    models = []
    if reduce:
        models.append(
            ("umap", umap.UMAP(
                n_neighbors=30,
                min_dist=0.0,
                n_components=30,
                random_state=42,
            ))
        )
    models.append(("hdbscan", hdbscan.HDBSCAN (min_cluster_size=15)))
    return Pipeline(models)


class FaceCluster(BaseEstimator):
    def __init__(self, suffix, umap):
        super()
        self.suffix = suffix       
        self.clusterer = cluster_faces_pipeline(reduce=umap)

    @property
    def _file_name(self):
        return file_name("cluster{}.npy", self.suffix)

    def save(self):
        joblib.dump(self.clusterer, self._file_name)

    def load(self):
        fn = self._file_name
        if not os.path.isfile(fn):
            return False
        self.clusterer = joblib.load(fn)
        return True        

    @property
    def hdbscan(self)->umap.UMAP:
        return self.clusterer["hdbscan"]

    @property
    def umap(self):
        return self.clusterer["umap"]        

    @property
    def labels_(self):
        return self.hdbscan.labels_        

    def fit(self, X, y=None):
        return self.clusterer.fit(X, y)

    def fit_predict(self, X, y=None):
        start_time = datetime.now()        
        rt = self.clusterer.fit_predict(X, y)
        end_time = datetime.now()
        print("Clustering took: {}".format(end_time - start_time))        
        return rt

    def approximate_predict(self, X):
        return hdbscan.approximate_predict(self.hdbscan, X)

    def membership_vector(self, X):
        return hdbscan.membership_vector(self.hdbscan, X)

    @property
    def probabilities_(self):
        return self.hdbscan.probabilities_        

    def all_points_membership_vectors(self):
        return hdbscan.all_points_membership_vectors(self.hdbscan)

    def generate_prediction_data(self):
        self.hdbscan.generate_prediction_data()

    def set_params(self, **kwargs):
        return self.clusterer.set_params(**kwargs)

In [None]:
#PIPELINE
from typing import List, cast
from functools import partial

from .facenet_types import Face
from .video import VideoProcessor
from .files import (
    bounding_box_file_name,
    landmarks_file_name,
    embeddings_file_name    
)

def generate_embeddings(encoder, faces:List[Face], **kwargs):
    images = [face.image for face in faces]
    embeddings = encoder.generate_embeddings(images)
    for face, embedding in zip(faces, embeddings):
        face.embedding = embedding
        face.image = None


def find_faces(pos, detector, image, faces: List[Face], **kwargs):
    found = cast(List[Face], detector.find_faces(image, detect_multiple_faces=True))
    for i, face in enumerate(found):
        face.pos = pos
        face.id = i
        faces.append(face)
    
class CsvWriter(object):
    def __init__(self, file_name, append=False):
        mode = "a" if append else "w"
        self.f = open(file_name, mode)
        self.new_line = "\n"

    def writeArray(self, *arr):
        line = ",".join([str(x) for x in arr])
        self.f.write(line)
        self.f.write(self.new_line)
    
    def flush(self):
        self.f.flush()

    def close(self):
        self.f.close()

class Saver(object):
    def __init__(self, save, writers):
        self.__save = save
        self.__writers = writers

    def __call__(self, **kwargs):
        self.__save(**kwargs)

    def flush(self):
        for writer in self.__writers:
            writer.flush()

    def close(self):
        for writer in self.__writers:
            writer.close() 


def write_bounding_boxes(writer, pos, id, face):
    writer.writeArray(pos, id, *face.bounding_box)

def write_landmarks(writer, pos, id, landmarks):
    for landmark in landmarks.items():
        writer.writeArray(pos, id, landmark[0], landmark[1][0], landmark[1][1])

def write_embeddings(writer, pos, id, embedding):
    writer.writeArray(pos, id, *[writer.embedding_format % x for x in embedding])

def save_faces(bounding_box_writer, landmarks_writer, embeddings_writer, faces:List[Face], **kwargs):              
    for face in faces:
        pos = face.pos
        id = face.id
        write_bounding_boxes(writer=bounding_box_writer, pos=pos, id=id, face=face)
        write_landmarks(writer=landmarks_writer, pos=pos, id=id, landmarks=face.landmarks)
        write_embeddings(writer=embeddings_writer, pos=pos, id=id, embedding=face.embedding)

def create_faces_saver(suffix="", embedding_format="%.6f", append=False):
    bounding_box_writer = CsvWriter(file_name=bounding_box_file_name(suffix), append=append)
    landmarks_writer = CsvWriter(file_name=landmarks_file_name(suffix), append=append)
    embeddings_writer = CsvWriter(file_name=embeddings_file_name(suffix), append=append)

    writers = [bounding_box_writer, landmarks_writer, embeddings_writer]

    embeddings_writer.embedding_format = embedding_format
    save = partial(save_faces, bounding_box_writer=bounding_box_writer, landmarks_writer=landmarks_writer, embeddings_writer=embeddings_writer)

    return Saver(save=save, writers=writers)

def process_video(video_path, model_path, start=0, suffix="", batch_size=64, end=None, **kwargs):
    from video_facenet.facenet import Detector, Facenet

    detector = Detector()
    encoder = Facenet(
        model_path=model_path,
        batch_size=batch_size
    )
    faces = []

    save = create_faces_saver(suffix=suffix)

    def process(save, pos, faces, batch_size, last, **kwargs):
        find_faces(faces=faces, pos=pos, **kwargs)
        if len(faces) >= batch_size or pos==last:
            generate_embeddings(faces=faces, **kwargs)
            save(faces=faces, **kwargs)
            save.flush()
            faces.clear()
            print("frame #", pos)

    video = VideoProcessor(video_path=video_path, detector=detector, encoder=encoder, save=save, faces=faces, batch_size=batch_size)
    video.iterate(process, start=start, end=end)
    save.close()
    detector.close()
    encoder.close()