In [None]:
import pandas as pd
import torch

df = pd.read_csv("../data/voxceleb2_train_ps.csv")
df["Video"] = [file.split("/")[-2] for file in df["File"]]
df

In [None]:
# Compute avg number of videos per speaker
unique_videos_per_speaker = df.groupby('Speaker')['Video'].nunique().reset_index()
unique_videos_per_speaker["Video"].mean()

In [None]:
centroids = torch.load("../our_centroids.pt", map_location='cpu')[0]
sim = centroids @ centroids.T

sim.size()

In [None]:
tmp = df.groupby('Video')['Speaker'].nunique().reset_index()
tmp[tmp["Speaker"] > 2]
# unique_videos_per_speaker["Video"].mean()

In [None]:
cluster_to_spk = {}

for cluster in df["Speaker_ps"].unique():
    cluster_to_spk[cluster] = df[df["Speaker_ps"] == cluster]["Speaker"].value_counts().idxmax()

len(cluster_to_spk)

In [None]:
pos_scores = []
neg_scores = []
clusters = df["Speaker_ps"].unique().tolist()

sim_np = sim.cpu().numpy()

for cluster in clusters[:100]:
    pos_scores += [
        sim_np[cluster, c]
        for c in clusters
        if cluster_to_spk[c] == cluster_to_spk[cluster]
    ]
    neg_scores += [
        sim_np[cluster, c]
        for c in clusters
        if cluster_to_spk[c] != cluster_to_spk[cluster]
    ]


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde

pos_scores = 1 - np.array(pos_scores)
neg_scores = 1 - np.array(neg_scores)

x = np.linspace(min(min(pos_scores), min(neg_scores)), max(max(pos_scores), max(neg_scores)), 1000)
y_pos = gaussian_kde(pos_scores)(x)
y_neg = gaussian_kde(neg_scores)(x)


In [None]:
plt.figure(figsize=(12, 6))

plt.plot(x, y_pos, label='Positives', color='green')
plt.plot(x, y_neg, label='Negatives', color='red')

plt.title('Distributions of distances')
plt.xlabel('Distance')
plt.ylabel('Density')
# plt.yscale('log')
# plt.xscale('log')
plt.legend()
plt.grid(True)
plt.show()

## Simulation

In [None]:
import torch.nn.functional as F

embeddings = torch.load("../embeddings_100_full_vox2.pt", map_location='cpu')
embeddings = F.normalize(embeddings)
embeddings.size()

In [None]:
from tqdm import tqdm

def simulation(
    video_threshold = 0.5,
    prob_decay = 0.2,
    speaker_threshold = 0.9,
    count = 5000,
    verbose = False,
    seed = 0
):
    np.random.seed(seed)
    torch.manual_seed(seed)

    res = []

    count_total = 0
    speaker_acc_total = 0
    video_acc_total = 0

    # for i in tqdm(range(len(df))):
    for i in tqdm(np.random.randint(0, len(df), size=(count,))):
        speaker = df.iloc[i]["Speaker"]
        video = df.iloc[i]["Video"]
        cluster = df.iloc[i]["Speaker_ps"]
        if verbose:
            print(f"Current sample: {i}, Speaker: {speaker}, Video: {video}")

        # Determine nearby clusters
        dists = sim[cluster]
        nearby_clusters = torch.nonzero(dists > video_threshold).view(-1)
        nearby_clusters = nearby_clusters[torch.sort(sim[cluster, nearby_clusters], descending=True).indices]
        nearby_clusters = nearby_clusters[1:]
        if len(nearby_clusters) == 0:
            res.append(None)
            continue
        if verbose:
            print(f"Nearby clusters sim:", sim[cluster, nearby_clusters])
            print(f"Nearby clusters idx:", nearby_clusters)

        # Sample one random cluster
        probabilities = prob_decay ** torch.arange(len(nearby_clusters)).float()
        probabilities = probabilities / probabilities.sum()
        random_cluster = nearby_clusters[torch.multinomial(probabilities, 1)]
        if verbose:
            print(f"Selected cluster sim: {sim[cluster, random_cluster]}")
            print(f"Selected cluster idx: {random_cluster}")

        # Get all samples from cluster
        samples = df[df["Speaker_ps"] == random_cluster.item()]

        samples_dist = (embeddings[samples.index] @ centroids[random_cluster].T).view(-1)
        
        nearby_samples = torch.nonzero(samples_dist > speaker_threshold).view(-1)
        nearby_samples = nearby_samples[torch.sort(samples_dist[nearby_samples], descending=True).indices]
        if len(nearby_samples) == 0:
            res.append(None)
            continue

        probabilities = prob_decay ** torch.arange(len(nearby_samples)).float()
        probabilities = probabilities / probabilities.sum()
        random_sample = nearby_samples[torch.multinomial(probabilities, 1)]
        sample = samples.iloc[random_sample.item()]

        res.append(sample["File"])

        speaker_acc = int(sample["Speaker"] == speaker)
        video_acc = int(sample["Video"] == video)

        speaker_acc_total += speaker_acc
        video_acc_total += video_acc
        if verbose:
            print(f"Speaker accuracy: {speaker_acc}")
            print(f"Video accuracy: {video_acc}")

        count_total += 1

    if count_total != 0:
        speaker_acc_total /= count_total
        video_acc_total /= count_total

    # df["File2"] = res

    return speaker_acc_total, video_acc_total, count_total / count

In [None]:
simulation(
    video_threshold=0.835,
    prob_decay=0.5,
    speaker_threshold=0.94
)

(0.9093381686310064, 0.2470534904805077, 0.4412)

In [None]:
# df
df.to_csv("../data/voxceleb2_train_ps.csv")

In [None]:
import optuna

def objective(trial):
    speaker_acc, video_acc, coverage = simulation(
        video_threshold=0.835,#trial.suggest_float('video_threshold', 0.8, 0.85),
        prob_decay=0.5, #trial.suggest_float('prob_decay', 0.0, 1.0),
        speaker_threshold=trial.suggest_float('speaker_threshold', 0.85, 0.99)
    )
    
    score = speaker_acc# + (1 - video_acc)# + coverage

    return score

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=25)

study.best_params

In [None]:
'''
{'video_threshold': 0.6101654597940125,
 'prob_decay': 0.03608832601140681,
 'speaker_threshold': 0.8391936107587641}

{'video_threshold': 0.8251956372897714,
 'prob_decay': 0.7986839078081102,
 'speaker_threshold': 0.9086315021218292}

 {'video_threshold': 0.8365454665594554}
 '''

simulation(
    0.835, #study.best_params["video_threshold"],
    0.5, #study.best_params["prob_decay"],
    0.94, #study.best_params["speaker_threshold"],
)

In [None]:
import matplotlib.pyplot as plt
import torch

n = 10

plt.figure(figsize=(10, 6))

for decay_factor in [0.5, 1]:
    probabilities = decay_factor ** torch.arange(n).float()
    probabilities = (probabilities / probabilities.sum()).numpy()
    plt.plot(probabilities, marker='o', label=f'{decay_factor}')
    print(probabilities)

# probabilities = torch.ones(n) / n

plt.xlabel('Index')
plt.ylabel('Probability')
plt.title('Exponentially Decreasing Probability Distribution')
plt.grid(True, which='both', linestyle='--', linewidth=0.5)
plt.legend()
plt.show()