In [None]:
import os
import json
from itertools import chain
from hashlib import md5

import pandas as pd
import numpy as np
from sklearn.semi_supervised import LabelSpreading
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import loguniform

## Dataset preparation

In [None]:
# load LFW dataset face embeddings

if os.path.exists("../data/lfw_facenet_embeddings.parquet"):
    # try local path
    embedding_df = pd.read_parquet("../data/lfw_facenet_embeddings.parquet")
else:
    # download from Hugging Face
    embedding_df = pd.read_parquet("hf://datasets/lajota13/lfw_facenet_embeddings/lfw_facenet_embeddings.parquet")
    embedding_df.to_parquet("../data/lfw_facenet_embeddings.parquet")

embedding_df.head()

In [None]:
# parse celebs names

embedding_df["name"] = embedding_df["label"].str.replace("_", " ")
embedding_df.drop(columns=["label"], inplace=True)

embedding_df.head()

In [None]:
# load seasonal color analysis annotations

with open("../data/celebrities.json") as fid:
  annotations = json.load(fid)
annotations_df = pd.DataFrame(list(chain(*[[{"name": c, "season":  season} for c in celebs] for season, celebs in annotations.items()])))
annotations_df["macroseason"] = annotations_df["season"].str.extract("[a-z]+-([a-z]+)-[a-z]+")

annotations_df.head()

In [None]:
# associate annotations to embeddings

embedding_annotated_df = embedding_df.merge(annotations_df, on="name", how="left")
macrolabel_map_df = pd.DataFrame(
    {
        "macrolabel": [0, 1, 2, 3],
        "macroseason": ["winter", "summer", "spring", "autumn"]
    }
)
embedding_annotated_df = embedding_annotated_df.merge(macrolabel_map_df, on="macroseason", how="left")
embedding_annotated_df["macrolabel"] = embedding_annotated_df["macrolabel"].fillna(-1)
embedding_annotated_df["macrolabel"] = embedding_annotated_df["macrolabel"].astype(int)

embedding_annotated_df.head()

In [None]:
embedding_annotated_df["macrolabel"].value_counts()

In [None]:
# splitting dataset into labeled and unlabeled records (in order to sample the test set 
# from the labeled portion of the dataset)

labeled_df = embedding_annotated_df.loc[embedding_annotated_df["macrolabel"] != -1, :].copy()
unlabeled_df = embedding_annotated_df.loc[embedding_annotated_df["macrolabel"] == -1, :].copy()

In [None]:
# in order to easily sample the test set in a reproducible manner, the names md5 hashes can be exploited

labeled_df["name"].apply(
    lambda s: int(md5(s.encode("utf-8")).hexdigest()[-2:], 16)
).hist()

# display splits percentange
labeled_df["split"] = labeled_df["name"].apply(
    lambda s: "train" if int(md5(s.encode("utf-8")).hexdigest()[-2:], 16) < 195 else "test"
)
_ = labeled_df["split"].value_counts() / len(labeled_df)
_.name = "percentage"
print(_)


In [None]:
# build one view per each split, merging the unlabeled portion in the training set

train_df = pd.concat(
    [
        labeled_df.loc[labeled_df["split"] == "train", :].drop(columns="split"),
        unlabeled_df
    ],
    axis=0
)
test_df = labeled_df.loc[labeled_df["split"] == "test", :].drop(columns="split")

In [None]:
# training set label distribution
print(train_df["macrolabel"].value_counts())

In [None]:
# test set label distribution
print(test_df["macrolabel"].value_counts())

In [None]:
# build numpy arrays representing the dataset splits

X_train = np.vstack(train_df["embedding"].tolist())
y_train = train_df["macrolabel"].values.astype(np.int8)
X_test = np.vstack(test_df["embedding"].tolist())
y_test = test_df["macrolabel"].values.astype(np.int8)

## Label propagation

In [None]:
N = 20
rs = RandomizedSearchCV(
    estimator=LabelSpreading(kernel="knn"), 
    param_distributions={
        "n_neighbors": range(5, 20),
        "alpha": loguniform(0.2, 0.9)
    },
    n_iter=N,
    scoring=lambda ls, X, y: ls.score(X[y > -1], y[y > -1]), 
    random_state=42
)

rs.fit(X_train, y_train)


In [None]:
print(f"Best score: {rs.best_score_}")
ls =  rs.best_estimator_
y_pred = ls.predict(X_test)
ConfusionMatrixDisplay.from_predictions(y_pred=y_pred, y_true=y_test)
print(classification_report(y_pred=y_pred, y_true=y_test))

In [None]:
# dump propagated labels

unlabeled_df["macrolabel"] = (ls.predict(np.vstack(unlabeled_df["embedding"].tolist()))).astype(np.int8)
unlabeled_df = unlabeled_df.drop(columns="macroseason").merge(macrolabel_map_df, on="macrolabel", how="left")
label_propagated_df = pd.concat([labeled_df, unlabeled_df])
label_propagated_df.to_parquet("../data/lfw_facenet_embeddings_label_propagated.parquet")

In [None]:
# save embedding and labels for visualization in the TF embedding projector (https://projector.tensorflow.org/)
LOGDIR = "logs"
if not os.path.isdir(LOGDIR):
    os.mkdir(LOGDIR)

np.savetxt(
    os.path.join(LOGDIR, "embeddings.tsv"), 
    np.vstack(label_propagated_df["embedding"].tolist()), 
    delimiter="\t"
)

label_propagated_df[["macroseason", "name"]].to_csv(
    os.path.join(LOGDIR, "metadata.tsv"), 
    sep="\t"
)