In [None]:
import asyncio
import logging
import os
from pathlib import Path
import sys

import imageio.v3 as iio
from sklearn.cluster import HDBSCAN
from matplotlib import colormaps, colors
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
import umap

from mime_db import MimeDb


In [None]:
VIDEO_FILE = "" # Just the name of the video file, no path

# DeepFace and ArcFace resize/crop face images to 152x152 and 112x112 pixels,
# respectively, so an image upsized from smaller than 100xH px is not useful.
WIDTH_THRESHOLD = 100  # pixels of width

FACE_FEATURES = 4096 # 4096 for DeepFace, 512 for ArcFace

video_path = Path("videos", VIDEO_FILE)

# Connect to the database
db = await MimeDb.create()

# Get video metadata
video_name = video_path.name
video_id = await db.get_video_id(video_name)
video_id = video_id[0]["id"]

#video_poses = await db.get_pose_data_from_video(video_id)
video_poses = await db.get_poses_with_faces(video_id)

poses_df = pd.DataFrame.from_records(video_poses, columns=video_poses[0].keys())

# This averages the feature vectors of every frame of a given face/track.
# The resulting vector is of limited use, because a pose's face
# orientation can change a lot across a movement track, and pretty much
# all face feature extractors are sensitive to face orientation (pose).
# Also assumes that every track always follows the same person throughout
# its duration, which is correct in theory (but in practice ...)
def average_face_embeddings(embeddings):
    # Not sure why all these conversions are necessary...
    embeddings = np.array([embed for embed in embeddings])
    # Need at least 5 frames/poses to consider this a reliable face
    if embeddings.shape[0] < 5:
        avg_embed = None
    else:
        avg_embed = np.mean(embeddings, axis=0)
    return [avg_embed] * embeddings.shape[0]

poses_df["face_width"] = poses_df.apply(lambda p: 0 if p["face_bbox"] is None else p["face_bbox"][2], axis=1)

# similarity_threshold = dst.findThreshold("DeepFace", "cosine")
# logging.info(
#     f"Suggested similarity threshold for DeepFace+cosine is {similarity_threshold}"
# )

print("Total poses:", len(poses_df))

poses_df = poses_df[
    (
        (~np.isnan(poses_df["face_confidence"]))
        & (poses_df["face_confidence"] > 0)
        & (poses_df["face_width"] > WIDTH_THRESHOLD)
    )
].reset_index()

print("Poses with usable faces:", len(poses_df))

# We can use the largest face image as a thumbnail/rep (if desired)
poses_df["face_area"] = poses_df.apply(lambda p: 0 if p["face_bbox"] is None else p["face_bbox"][2] * p["face_bbox"][3], axis=1)

poses_df["face_avg"] = poses_df.groupby(["track_id"])["face_embedding"].transform(
    average_face_embeddings
)

poses_df["face_embedding"] = poses_df["face_embedding"].apply(lambda p: p[:FACE_FEATURES])

rep_poses_df = poses_df.iloc[
    (
        # poses_df.groupby(["track_id"])["face_area"].idxmax()
        # This is always pretty close to 1...
        poses_df.groupby(["track_id"])["face_confidence"].idxmax()
    )
]

print("Faces representing a track:", len(rep_poses_df))

print(
    rep_poses_df[["track_id", "frame", "face_bbox", "face_area", "face_confidence"]].head(5)
)

X = rep_poses_df["face_embedding"].to_list()
im_labels = rep_poses_df[["track_id", "frame"]].applymap(str).agg('_'.join, axis=1).to_list()

In [None]:
standard_embedding = umap.UMAP(
    random_state=42,
).fit_transform(X)

plt.scatter(standard_embedding[:, 0], standard_embedding[:, 1], s=4)

In [None]:
clusterable_embedding = umap.UMAP(
    n_neighbors=10,
    min_dist=0.0,
    n_components=2,
    random_state=42,
).fit_transform(X)

plt.scatter(clusterable_embedding[:, 0], clusterable_embedding[:, 1], s=4)

In [None]:
print("fitting clustering model")

hdb = HDBSCAN(min_cluster_size=3, min_samples=4) # , max_cluster_size=15
hdb.fit(X)
labels = hdb.labels_.tolist()

assigned_faces = 0

for cluster_id in range(-1, max(labels) + 1):
    print("Faces in cluster", cluster_id, labels.count(cluster_id))
    if cluster_id != -1:
        assigned_faces += labels.count(cluster_id)

print("assigned", assigned_faces, "track faces out of", len(labels), round(assigned_faces/len(labels),4))
    
plt.scatter(clusterable_embedding[:, 0], clusterable_embedding[:, 1], c=labels, cmap='Spectral', s=4)

In [None]:
print("fitting UMAP preclustered model")
save_images = True
plot_images = True

hdb = HDBSCAN(min_cluster_size=3, min_samples=4) # , max_cluster_size=15
hdb.fit(clusterable_embedding)
labels = hdb.labels_.tolist()

assigned_faces = 0

for cluster_id in range(-1, max(labels) + 1):
    if save_images and not os.path.isdir(str(cluster_id)):
        os.mkdir(str(cluster_id))
    print("Faces in cluster", cluster_id, labels.count(cluster_id))
    if cluster_id != -1:
        assigned_faces += labels.count(cluster_id)

print("assigned", assigned_faces, "track faces out of", len(labels), round(assigned_faces/len(labels),4))

if save_images:
    for i, cluster_id in enumerate(labels):
        try:
            cluster_pose = rep_poses_df.iloc[i]
        except Exception as e:
            print("Error referencing face", i)
            print(e)
            continue
        x, y, w, h = [round(coord) for coord in cluster_pose["face_bbox"]]
        video_handle = f"/videos/{video_name}"
        img = iio.imread(video_handle, index=cluster_pose["frame"] - 1, plugin="pyav")
        img_region = img[y : y + h, x : x + w]
        iio.imwrite(f"{cluster_id}/{i}.jpg", img_region, extension=".jpeg")

fig = plt.figure(figsize=(12,8))
ax = fig.gca()

if plot_images:
    cm = colormaps["Spectral"]
    norm = colors.Normalize(vmin=-1, vmax=max(labels))
    
    ax.scatter(clusterable_embedding[:, 0], clusterable_embedding[:, 1])
    for i, cluster_id in enumerate(labels):
        try:
            cluster_pose = rep_poses_df.iloc[i]
        except Exception as e:
            print("Error referencing face", i)
            print(e)
            continue
        x, y, w, h = [round(coord) for coord in cluster_pose["face_bbox"]]
        video_handle = f"/videos/{video_name}"
        img = iio.imread(video_handle, index=cluster_pose["frame"] - 1, plugin="pyav")        
        img_region = img[y : y + h, x : x + w]
        img = Image.fromarray(img_region)
        img.thumbnail((20, 20), resample=Image.Resampling.LANCZOS)
        ab = AnnotationBbox(OffsetImage(np.asarray(img)), (clusterable_embedding[i, 0], clusterable_embedding[i, 1]))
        ab.patch.set_linewidth(1)
        ab.patch.set(color=cm(norm(cluster_id)))

        ax.add_artist(ab)
else:
    ax.scatter(clusterable_embedding[:, 0], clusterable_embedding[:, 1], c=labels, cmap='Spectral', s=4)