# A Comparison of Metric Learning Loss Functions for True End-to-End Speaker Verification

This notebook contains the code to reproduce the Equal Error Rate (EER) of the Additive Angular Margin (AAM) loss model from the paper.

Before you begin, make sure you have installed both `pyannote.db.voxceleb` plugin [here](https://github.com/pyannote/pyannote-db-voxceleb).

If you use this model, please cite our paper:

```BibTeX Entry```

## Preparation

Before beginning, we make sure all the needed libraries are available:
- pytorch
- numpy
- xarray
- feerci
- pyannote.audio
- pyannote.database
- pyannote.metrics
- tqdm

In [None]:
import torch
import numpy as np
from xarray import DataArray
from feerci import feerci
from pyannote.core.utils.distance import cdist
from pyannote.audio.features import Pretrained
from pyannote.audio.features.utils import get_audio_duration
from pyannote.audio.applications.speaker_embedding import SpeakerEmbedding
from pyannote.database import get_protocol, FileFinder
from pyannote.metrics.binary_classification import det_curve
from tqdm import tqdm

We also initialize the database with the preprocessors needed, and we define some useful functions.

In [None]:
# Preprocessors make sure to look for wav files and to calculate the audio duration
preprocessors = {'audio': FileFinder(), 'duration': get_audio_duration}

# We use the VoxCeleb1_X protocol, with a train and dev set resulting from splitting the original dev
protocol = get_protocol('VoxCeleb.SpeakerVerification.VoxCeleb1_X', preprocessors=preprocessors)


# A function to get embeddings from a file
def get_embedding(file, pretrained, mean=False):
    emb = []
    for f in file.files():
        if 'try_with' in f:
            segments = f['try_with']
        else:
            segments = f['annotation'].get_timeline()
        for segment in segments:
            emb.append(pretrained(f).crop(segment, mode='center'))
    emb = np.vstack(emb)
    if mean:
        emb = np.mean(emb, axis=0, keepdims=True)
    return emb


# A function to calculate the EER on a subset of VoxCeleb1_X
def run_experiment(distance, subset):
    y_pred, y_true = [], []
    for trial in tqdm(getattr(protocol, f'{subset}_trial')()):
        file1 = trial['file1']
        hash1 = get_hash(file1)
        file2 = trial['file2']
        hash2 = get_hash(file2)
        y_pred.append(distance.data[index1[hash1], index2[hash2]])
        y_true.append(trial['reference'])
    y_pred = np.array(y_pred)
    y_true = np.array(y_true)
    eer, ci_lower, ci_upper, _ = feerci(-y_pred[y_true == 0], -y_pred[y_true == 1], is_sorted=False)
    return {
        'eer': eer,
        'ci_lower': ci_lower,
        'ci_upper': ci_upper,
        'y_true': y_true,
        'y_pred': y_pred}

## Loading the Pre-trained Model

The first thing to do is to load the model. You can do so either by loading the model hosted in this repository or a variant trained on segments of variable duration, which is available as pyannote's default speaker embedding model.

If you choose pyannote's model, bear in mind that performance might differ from what's in the paper.

In [None]:
# Load the pretrained model to use with 3s segments extracted with a 100ms sliding window
# Make sure to change the device to `cpu` if your machine doesn't have a GPU
model = Pretrained(validate_dir='models/AAM/train/VoxCeleb.SpeakerVerification.VoxCeleb2.train'
                   '/validate_equal_error_rate/VoxCeleb.SpeakerVerification.VoxCeleb1_X.development',
                   duration=3., step=.0333, device='cuda')

# Alternatively, you can use a version of this model that we have trained for variable duration segments,
# which is available as pyannote's default speaker embedding model
# model = torch.hub.load('pyannote/pyannote-audio', 'emb', duration=3., step=.0333, device='cuda')

print(f'Embeddings have dimension {model.dimension:d}')

In the next sections we will evaluate this model on `VoxCeleb1.test` and compare results using raw cosine distances as well as after score normalization using adaptive s-norm.

## Evaluating with Raw Distances

Once the model is ready, we can use it for inference. In this section we evaluate it using the raw cosine distance between embeddings.

### Calculate embeddings

This might take a while depending on your machine, as embeddings are being calculated from raw audio using the pretrained neural network (~10min with an Nvidia GTX1080).

If you wish to precalculate embeddings for `VoxCeleb1_X` (ex. to run this faster next time), you should use `pyannote-audio emb apply`. An example can be found [here](https://github.com/pyannote/pyannote-audio/tree/develop/tutorials/models/speaker_embedding#application).

If you have precalculated embeddings, make sure to use the `Precomputed` class from `pyannote.audio.features` instead of `Pretrained`.

In [None]:
get_hash = lambda file: SpeakerEmbedding.get_hash(file)

# hash to embedding mapping
cache1 = dict()
cache2 = dict()

# hash to index mapping
index1 = dict()
index2 = dict()

n_file1 = 0
n_file2 = 0

# Get embeddings for every trial in Test
for trial in tqdm(protocol.test_trial(), total=37720):
    
    file1 = trial['file1']
    hash1 = get_hash(file1)
    if hash1 not in cache1:
        cache1[hash1] = get_embedding(file1, model, mean=True)
        index1[hash1] = n_file1
        n_file1 += 1
    
    file2 = trial['file2']
    hash2 = get_hash(file2)
    if hash2 not in cache2:
        cache2[hash2] = get_embedding(file2, model, mean=True)
        index2[hash2] = n_file2
        n_file2 += 1

hashes1 = list(cache1.keys())
hashes2 = list(cache2.keys())
emb1 = np.vstack(list(cache1.values()))
emb2 = np.vstack(list(cache2.values()))

### Calculate cosine distances

In [None]:
distance = DataArray(
    cdist(emb1, emb2, metric='cosine'),
    dims=('file1', 'file2'),
    coords=(hashes1, hashes2))

### Calculate the EER on VoxCeleb1 Test

In [None]:
raw_results = run_experiment(distance, 'test')
print(f"EER with raw distances: {100 * raw_results['eer']:.2f} in "
      f"[{100 * raw_results['ci_lower']:.2f}, {100 * raw_results['ci_upper']:.2f}]")

## Evaluating with Adaptive S-Norm

Now we improve the above EER with the score normalization method called Adaptive S-Norm, which consists in:

1) Determining a cohort set of embeddings (different from the model's training set, in our case `VoxCeleb2.dev`)

In [None]:
# Get cohort embeddings from VoxCeleb1_X.train
cohort_embedding = dict()
for cohort_file in tqdm(protocol.train(), total=143506):
    speaker = cohort_file['annotation'].argmax()
    embedding = get_embedding(cohort_file, model, mean=False)
    cohort_embedding.setdefault(speaker, []).append(embedding)

# The cohort consists of the mean embedding for each speaker
cohort_speakers = list(cohort_embedding.keys())
cohort = np.vstack([np.mean(np.vstack(cohort_embedding[speaker]), axis=0, keepdims=True) 
                    for speaker in cohort_speakers])

2) Calculating the raw score of the trials

In [None]:
# Calculate the distances between each trial embedding (file1 and file2) and the cohort
distance1 = DataArray(
    cdist(emb1, cohort, metric='cosine'),
    dims=('file1', 'cohort'),
    coords=(hashes1, cohort_speakers))

distance2 = DataArray(
    cdist(emb2, cohort, metric='cosine'),
    dims=('file2', 'cohort'),
    coords=(hashes2, cohort_speakers))

3) Calculating the mean and std of N most similar scores to each embedding in the trials (N=500 in our case)

In [None]:
# This is our N
COHORT_SIZE = 400

# Calculate mean and std of N most similar cohort embeddings for all file1
data1 = np.partition(distance1.data, COHORT_SIZE)[:, :COHORT_SIZE]
mz = np.mean(data1, axis=1) 
sz = np.std(data1, axis=1)
mz = DataArray(mz, dims=('file1',), coords=(hashes1,))
sz = DataArray(sz, dims=('file1',), coords=(hashes1,))

# Calculate mean and std of N most similar cohort embeddings for all file2
data2 = np.partition(distance2.data, COHORT_SIZE)[:, :COHORT_SIZE]
mt = np.mean(data2, axis=1) 
st = np.std(data2, axis=1)
mt = DataArray(mt, dims=('file2',), coords=(hashes2,))
st = DataArray(st, dims=('file2',), coords=(hashes2,))

# Normalize
distance_z = (distance - mz) / sz
distance_t = (distance - mt) / st
distance_s = 0.5 * (distance_z + distance_t)

4) Normalizing the trial's score

In [None]:
# Calculate the DET curve on test and print the EER value
ada_snorm_results = run_experiment(distance_s, 'test')
print(f"EER with Adaptive S-Norm: {100 * ada_snorm_results['eer']:.2f} in"
      f"[{100 * ada_snorm_results['ci_lower']:.2f}, {100 * ada_snorm_results['ci_upper']:.2f}]")

That's all! If you have any questions or suggestions, please feel free to open an issue in [pyannote-audio](https://github.com/pyannote/pyannote-audio)