This is a noteboook used to generate the speaker embeddings with the ResNetSE34L model trained with Angular Prototypical loss for multi-speaker training.

Before running this script please DON'T FORGET:
- to set the right paths in the cell below.

Repositories:
- TTS: https://github.com/mozilla/TTS
- ResNetSE34L: https://github.com/clovaai/voxceleb_trainer

In [None]:
import os
import importlib
import random
import librosa
import torch

import numpy as np
from TTS.utils.generic_utils import load_config
from tqdm import tqdm
from TTS.utils.speakers import save_speaker_mapping, load_speaker_mapping

# you may need to change this depending on your system
os.environ['CUDA_VISIBLE_DEVICES']='0'

In [None]:
# Clone voxceleb_trainer 
!git clone https://github.com/clovaai/voxceleb_trainer
os.chdir('voxceleb_trainer')
!git checkout de675ca3e3b27d21fb6f734558b47d6f4c81ac1c

In [None]:
#Install voxceleb_trainer Requeriments
!pip install -r requirements.txt

In [None]:
#Download Voxceleb_trainer Checkpoint
!wget http://www.robots.ox.ac.uk/~vgg/data/voxceleb/models/baseline_lite_ap.model


In [None]:
#!/usr/bin/python
#-*- coding: utf-8 -*-
import torch.nn as nn
import torch.nn.functional as F
import numpy, math, pdb, sys, random
import time, os, itertools, shutil, importlib
from tuneThreshold import tuneThresholdfromScore
from DatasetLoader import loadWAV
from loss.ge2e import GE2ELoss
from loss.angleproto import AngleProtoLoss
from loss.cosface import AMSoftmax
from loss.arcface import AAMSoftmax
from loss.softmax import SoftmaxLoss
from loss.protoloss import ProtoLoss
from loss.pairwise import PairwiseLoss

class SpeakerNet(nn.Module):

    def __init__(self, max_frames, lr = 0.0001, margin = 1, scale = 1, hard_rank = 0, hard_prob = 0, model="alexnet50", nOut = 512, nSpeakers = 1000, optimizer = 'adam', encoder_type = 'SAP', normalize = True, trainfunc='contrastive', **kwargs):
        super(SpeakerNet, self).__init__();

        argsdict = {'nOut': nOut, 'encoder_type':encoder_type}

        SpeakerNetModel = importlib.import_module('models.'+model).__getattribute__(model)
        self.__S__ = SpeakerNetModel(**argsdict).cuda();

        if trainfunc == 'angleproto':
            self.__L__ = AngleProtoLoss().cuda()
            self.__train_normalize__    = True
            self.__test_normalize__     = True
        elif trainfunc == 'ge2e':
            self.__L__ = GE2ELoss().cuda()
            self.__train_normalize__    = True
            self.__test_normalize__     = True
        elif trainfunc == 'amsoftmax':
            self.__L__ = AMSoftmax(in_feats=nOut, n_classes=nSpeakers, m=margin, s=scale).cuda()
            self.__train_normalize__    = False
            self.__test_normalize__     = True
        elif trainfunc == 'aamsoftmax':
            self.__L__ = AAMSoftmax(in_feats=nOut, n_classes=nSpeakers, m=margin, s=scale).cuda()
            self.__train_normalize__    = False
            self.__test_normalize__     = True
        elif trainfunc == 'softmax':
            self.__L__ = SoftmaxLoss(in_feats=nOut, n_classes=nSpeakers).cuda()
            self.__train_normalize__    = False
            self.__test_normalize__     = True
        elif trainfunc == 'proto':
            self.__L__ = ProtoLoss().cuda()
            self.__train_normalize__    = False
            self.__test_normalize__     = False
        elif trainfunc == 'triplet':
            self.__L__ = PairwiseLoss(loss_func='triplet', hard_rank=hard_rank, hard_prob=hard_prob, margin=margin).cuda()
            self.__train_normalize__    = True
            self.__test_normalize__     = True
        elif trainfunc == 'contrastive':
            self.__L__ = PairwiseLoss(loss_func='contrastive', hard_rank=hard_rank, hard_prob=hard_prob, margin=margin).cuda()
            self.__train_normalize__    = True
            self.__test_normalize__     = True
        else:
            raise ValueError('Undefined loss.')

        if optimizer == 'adam':
            self.__optimizer__ = torch.optim.Adam(self.parameters(), lr = lr);
        elif optimizer == 'sgd':
            self.__optimizer__ = torch.optim.SGD(self.parameters(), lr = lr, momentum = 0.9, weight_decay=5e-5);
        else:
            raise ValueError('Undefined optimizer.')
        
        self.__max_frames__ = max_frames

    ## ===== ===== ===== ===== ===== ===== ===== =====
    ## extract Embedding from file
    ## ===== ===== ===== ===== ===== ===== ===== =====
    
    def ExtractEmbedding(self, file_path, num_eval=10):

        inp1 = loadWAV(file_path, self.__max_frames__, evalmode=True, num_eval=num_eval).cuda()
        emb = self.__S__.forward(inp1).detach().cpu()
        # apply L2 norm on Embedding
        if self.__test_normalize__:
            emb = F.normalize(emb, p=2, dim=1)# apply L2 norm

        return emb.cpu().numpy()

    ## ===== ===== ===== ===== ===== ===== ===== =====
    ## Load parameters
    ## ===== ===== ===== ===== ===== ===== ===== =====

    def loadParameters(self, path):

        self_state = self.state_dict();
        loaded_state = torch.load(path);
        for name, param in loaded_state.items():
            origname = name;
            if name not in self_state:
                name = name.replace("module.", "");

                if name not in self_state:
                    print("%s is not in the model."%origname);
                    continue;

            if self_state[name].size() != loaded_state[origname].size():
                print("Wrong parameter length: %s, model: %s, loaded: %s"%(origname, self_state[name].size(), loaded_state[origname].size()));
                continue;

            self_state[name].copy_(param);


In [None]:
parms = {'max_frames':300,'batch_size':200,'max_seg_per_spk':100,
         'nDataLoaderThread':5,'test_interval':10,'max_epoch':500,
         'trainfunc':"angleproto",'optimizer':"adam",'lr':0.001,"lr_decay":0.95,
         "hard_prob":0.5,"hard_rank":10,'margin':1,'scale':15, 
         'nSpeakers':6200,'save_path':"data/test",
         'train_list':"",'test_list':"",'train_path':"voxceleb2",
         'test_path':"voxceleb1",'eval':True,
         'model':"ResNetSE34L",'encoder':"SAP",'nOut':512, 'initial_model':"./baseline_lite_ap.model"
        }

SpNet = SpeakerNet(**parms)
SpNet.loadParameters(parms['initial_model']);
print("Model %s loaded!"%parms['initial_model']);

In [None]:
# Set constants
ROOT_PATH = '../../../'
CONFIG_PATH = os.path.join(ROOT_PATH, 'config-pt-ideal.json')
CONFIG = load_config(CONFIG_PATH)

In [None]:
#Preprocess dataset
meta_data = []
datasets=CONFIG.datasets
for dataset in datasets:
    preprocessor = importlib.import_module('TTS.datasets.preprocess')
    preprocessor = getattr(preprocessor,  dataset['name'].lower())
    meta_data += preprocessor(dataset['path'],dataset['meta_file_train'])
      

#random.shuffle(meta_data)
meta_data= meta_data

meta_data = meta_data
embeddings_dict = {}
len_meta_data= len(meta_data)
for i in tqdm(range(len_meta_data)):
    _, wave_file_path, speaker_id = meta_data[i]
    wav_file_name = os.path.basename(wave_file_path)
    # Extract Embedding
    file_embeddings = SpNet.ExtractEmbedding(wave_file_path)
    embeddings_dict[wav_file_name] = [np.mean(np.array(file_embeddings), axis=0).reshape(-1).tolist(), speaker_id]
    del file_embeddings

In [None]:
# create and export speakers.json  and aplly a L2_norm in embedding
speaker_mapping = {sample: {'name': embeddings_dict[sample][1], 'embedding':embeddings_dict[sample][0]} for i, sample in enumerate(embeddings_dict.keys())}
save_speaker_mapping(ROOT_PATH, speaker_mapping)


In [None]:
#test load integrity
speaker_mapping_load = load_speaker_mapping(ROOT_PATH)
assert speaker_mapping == speaker_mapping_load
print("The file speakers.json has been exported to ",ROOT_PATH, ' with ', len(embeddings_dict.keys()), ' speakers')