# Speaker verification
Implementation of a basic speaker classification system. It uses GMMS to model the speakers voice from MFCCs.

In [3]:
#IMPORTS
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
plt.style.use('ggplot')
rcParams['figure.figsize'] = 16, 8

from sklearn.mixture import GaussianMixture as GMM
from sklearn.decomposition import PCA
from sklearn import metrics

import sys
import os
import glob
import random

import librosa
import librosa.display

In [28]:
#DEFINICIO VARIABLES 
base_path = '/home/jc/speech_processing_notebooks'    #Carpeta practica 4
speecon_path = os.path.join(base_path,'audios','speecon')    #Carpeta Speecon
temp_path = os.path.join(base_path,'exports')    #Carpeta exports

### Generic function

In [16]:
def mfcc(files, n_coefs=16):
    ''' Función genérica MFCC
        Función genérica para calcular los coeficientes MFCC dada
        una lista con los paths a los audios.
        Utiliza la libreria librosa para leer el
        audio y calcular los coeficientes.
        Parametros:
        - files: lista con los ficheros a computar
        - n_coefs: int numero de coeficientes para el MFCC
        Devuelve:
        - base: np.array de tamaño Nxn_coefs con los coeficientes para cada trama'''
    
    #Inicializamos un array a ceros
    base = np.zeros((1,n_coefs))
    
    for file_audio in files:
        #Lectura del audio, remuestreamos a 8000Hz
        audio, fs = librosa.core.load(file_audio, sr=8000)
        #Calculo de los coefs
        mfcc_raw = librosa.feature.mfcc(audio, sr=fs, n_mfcc=n_coefs).T
        #Stack de la base de datos con los mfcc calculados
        base = np.vstack((base,mfcc_raw))
    
    return base[1:] #El primero no lo devolvemos porqué son los ceros de inializacion

In [17]:
def read_mfcc(path):
    ''' Reads mfcc file from person
        Parametros:
        - path: path donde leer el mfcc
    '''
    mfcc = np.loadtxt(path, delimiter=',')
    
    return mfcc

In [18]:
def verification_list(speecon):
    ''' Crea listas para calcular los parámetros posteriormente
        Estructura listas:
        - Hablantes entrenamiento (37.5%) : Son los audios que usamos para 
        entrenar el modelo GMM
        - Hablantes testing (12.5%) : Son unos audios de las personas de entrenamiento
        reservados para testing
        - Impostores (50%): Son las personas que no pertenecen al sistema
        Funcionamiento:
        - Partimos el dataset en dos partes, personas correctas y personas incorrectas(impostores)
        - Las personas correctas las partimos en 3/4 para entrenamiento y 1/4 parte testing
        - Guardamos el path hacia los ficheros en listas
    '''
    #Diccionario para contener los datos
    training = list()
    testing_good = list()
    impostor = list()
    #count = 0
    
    # Iteramos la base de datos de speecon primero bloque a bloque 
    # y luego persona a persona
    for block in os.listdir(speecon):
        block_path = os.path.join(speecon,block)
        
        # Listamos todos los ficheros en el Block
        ses_block = os.listdir(block_path)
        # Los desordenamos
        random.shuffle(ses_block)
        
        s = len(ses_block)
        
        # Y escojemos la mitad como usuarios buenos y la mitad como impostores
        # (Usamos floor division para obtener numeros pares)
        legit_users = ses_block[:s//2]
        impostors = ses_block[s//2:(s//2)*2]
        
        
        for user in legit_users:
            ses_path = os.path.join(block_path,user)
            
            # Obtenemos los ficheros de la persona
            all_files = glob.glob(f"{ses_path}/*.wav")
            
            # Para los users escojemos los 15 primeros ficheros (3/4) para training
            # y los ultimos 5 (1/4) para testing.
            train_files = all_files[:15]
            test_files = all_files[-5:]
            
            # Añadimos a la base de datos
            training.append(train_files)   #En los training es imortante tener una lista para cada hablante
            testing_good.extend(test_files)
        
        for imp in impostors:
            
            # Path
            ses_path = os.path.join(block_path,imp)
            
            # Para los impostores cojemos todos los ficheros para
            # testing
            all_files = glob.glob(f"{ses_path}/*.wav")
            
            # Extendemos la lista
            impostor.extend(all_files)
            
    # Devolvemos una lista con las tres listas         
    return [training, testing_good, impostor]

In [19]:
def compute_mfccs(lists, n_coefs, train_path, test_good_path, test_bad_path):
    ''' MFCCs Calculation
        Calcula los MFCCs dadas unas listas con los paths a los ficheros de cada tipo
        Parametros:
        - lists: Lista con las tres listas de ficheros
        - n_coefs: numero de coeficientes para calcular los mfccs
        - train_path: Path donde guardar los ficheros de entrenamiento
        - test_good_path: Path donde guardar los ficheros de testeo buenos
        - test_bad_path: Path donde guardar los ficheros de testeo malos (impostores)
    '''  
    
    # Comprovamos que las carpetas existan y si no las creamos
    os.makedirs(train_path, exist_ok=True)
    os.makedirs(test_bad_path, exist_ok=True)
    os.makedirs(test_bad_path, exist_ok=True)
    
    training = lists[0]
    testing_good = lists[1]
    testing = lists[2]
    
    for path_list in training:
        # Para calcular el modelo de la persona usamos todos los
        # audios de train
        train_mfcc = mfcc(path_list,n_coefs)
        
        # Extraemos el nombre de la persona a partir de la path
        person = path_list[0][-10:-7]
        
        # Exportamos a csv
        save_path = os.path.join(train_path, person+'.mfcc')
        np.savetxt(save_path, train_mfcc, delimiter=",")
    
    print("Training MFCC calculated")
    
    i = 0
    for path in testing_good:
        # Extraemos el nombre de la persona a partir de la path
        person = path[-10:-7]
        
        # Creamos una carpeta para cada persona con sus audios,
        # lo hacemos así para despues facilitar su validacion
        os.makedirs(os.path.join(test_good_path, person), exist_ok=True)
        save_path = os.path.join(test_good_path, person, f'audio{i}.mfcc')
        
        # Counter
        i = i+1

        # Calculo de los mfcc
        test_mfcc = mfcc([path],n_coefs)
        
        # Exportamos a csv
        np.savetxt(save_path, test_mfcc, delimiter=",")
        
    print("Good testing MFCC calculated")
       
    i = 0
    for path in testing:
        # Extraemos el nombre de la persona a partir de la path
        person = path[-10:-7]
        
        # Creamos una carpeta para cada persona con sus audios
        os.makedirs(os.path.join(test_bad_path, person), exist_ok=True)
        save_path = os.path.join(test_bad_path, person, f'audio{i}.mfcc')

        # Counter
        i = i+1
        
        # Calculo de los mfcc
        test_mfcc = mfcc([path],n_coefs)
        
        #Exportamos a csv
        np.savetxt(save_path, test_mfcc, delimiter=",")
        
    print("Impostor MFCC calculated")
        

In [20]:
def train_gmm(n_gmms,train_path):
    ''' Entrenamiento de las GMM
        Parametros:
        - n_gmms: Numero de coeficientes a usar para calcular el modelo GMM
        - train_path: Path para leer los ficheros .csv con las MFCCs
        Devuelve:
        - trained_gmm: diccionario con los nombres y las GMMs
    '''
    # Lectura de los ficheros en la train_path
    files_train = glob.glob(f"{train_path}/*.mfcc")
    
    trained_gmm = dict()
    
    for mfcc_path in files_train:
        # Leemos los MFCCs para esa persona
        mfcc = read_mfcc(mfcc_path)
        
        # Calculo de la GMM
        gmm=GMM(n_gmms, n_init=2).fit(mfcc) 
        
        # Extracción del nombre a partir del path
        person = mfcc_path[-8:-5]
        
        # Guardamos gmm en el diccionario
        trained_gmm[person] = gmm
        
    return trained_gmm

In [21]:
def train_world_gmm(files_train, n_mfccs, n_gmms):
    base = np.zeros((1,n_mfccs))

    for mfcc_path in files_train:
        mfcc = read_mfcc(mfcc_path)
        #Stack de la base de datos con los mfcc calculados
        base = np.vstack((base,mfcc))

    gmm=GMM(n_gmms, n_init=2).fit(base)
    
    return gmm

In [22]:
def verification(trained_gmm, test_bad_path, test_good_path, world_gmm):
    ''' Verification del locutor
        Para cada hablante (una gmm por hablante) calcula la probabilidad de que 
        un conjunto de audios pertenezca a la persona. Para ese conjunto de audios
        se escojen 5 audios que pertenecen a la persona y 5 que no.
        Parametros:
        - trained_gmm: diccionario con la gmm de cada persona
        - test_bad_path: path donde encontrar los ficheros de testing malos (impostores)
        - test_good_path: path donde encontrar los ficheros de testing buenos
        Devuelve:
        - assigned: diccionario con una lista de las scores de cada audio
        las 5 primeras scores corresponden a audios de la gmm y los 5 siguientes no
    '''
    
    # Contenedor de datos
    assigned = dict()
    
    # Todos los ficheros impostores
    all_test =  glob.glob(f"{test_bad_path}/*/*.mfcc")
    # Shuffle para desordenar los ficheros y que salgan variados
    random.shuffle(all_test)
        
    # Iteramos el diccionario. key = 'persona' gmm = gmm de la persona
    for key, gmm in trained_gmm.items():
        # Obtenemos los ficheros que corresponden
        good_files = glob.glob(f"{test_good_path}/{key}/*.mfcc")
        # Cojemos 5 ficheros de testing malos (impostores),
        # como estan desordenados nos saldran ficheros de
        # personas distintas
        test_files = [all_test.pop() for i in range(0,5)]
        
        # Sanity check para comprobar que la carpeta no esta vacía
        if len(good_files)==0: continue
        
        # Lista para guardar los datos
        current_gmm = list()
        
        for path in good_files:
            # Leemos mfcc
            read = read_mfcc(path)
            # Extraemos nombre de la persona a partir del path
            person = path[42:45]
            # Calculamos la score (logscore)
            score = gmm.score(read)- world_gmm.score(read)
            # Guardamos en el contenedor
            current_gmm.append(score)
        
        for path in test_files:
            # Leemos mfcc
            read = read_mfcc(path)
            # Extraemos nombre de la persona a partir del path
            person = path[41:44]
            # Calculamos la score (logscore)
            score = gmm.score(read) - world_gmm.score(read)
            # Guardamos en el contenedor
            current_gmm.append(score)
                
        # Guardamos en el contenedor de datos
        assigned[key] = current_gmm

    return assigned
    

In [23]:
def compute_threshold(threshold, verification_dict):
    ''' Calculo de las métricas para un determinado threshold
        Parametros:
        - threshold: valor del threshold
        - verificatoin_dict: contenedor de resultados de la funcion
        verification
    '''
    # Variables
    missed = 0
    false_positive = 0
    count = 0
    
    # Iteramos todas las personas
    for key,value in verification_dict.items():
        i = 0
        # Y por todos las scores
        for l in value:
            # Como los 5 primeros son buenos, miramos si es inferior
            # al threshold. En caso afirmativo se comptabiliza como missed
            if l<threshold and i<5: missed += 1
                
            # En los 5 segundos son impostores, miramos si superan el 
            # threshold. En caso afirmativo se comptabiliza como falso positivo
            if l>threshold and i>5: false_positive += 1   
            
            # Counter de operaciones    
            i = i + 1
        
        # Cuenta total de operaciones
        count = count + i
    
    # Calculo del coste
    cost = missed + false_positive*99
    
    # Presentación de resultados
    print(f'TH: {threshold:.2f} | Missed: {missed:3}/{count} | False Positive: {false_positive:3}/{count} | Cost: {cost}')
    
    return cost

In [24]:
def find_threshold(verification_dict, starting_thr=-80, ending_thr=-50, step_thr=0.2):
    ''' Busqueda de threshold óptimo
        Calcula el threshold con el que conseguimos mejor coste.
        Definimos coste como = missed + 99*false_positive
        Parametros:
        - verification_dict: diccionario devuelto por la funcion verification()
        - starting_thr: Valor inicial de los thresholds evaluados
        - ending_thr: Valor final de los threshold evaluados
        - step_thr: Valor del step de los thresholds
        Devuelve:
        - thr: el mejor threshold para el modelo
    '''
    
    # Constantes de análisis
    cost_min = None
    thr = None
    
    # Abanico de thresholds que vamos a probar
    thr_array = np.arange(starting_thr, ending_thr, step_thr)
    
    # Iteramos todos los thresholds de thr_array
    for threshold in thr_array:
        # Usamos compute_threshold para calcular el coste del threshold
        cost = compute_threshold(threshold, verification_dict)
        
        # Miramos si el coste baja con este thr, en caso afirmativo
        # guardamos los datos
        if cost_min == None or cost_min>cost:
            cost_min = cost
            thr = threshold
        
    # Resultados finales
    print('\nFinal results:')
    compute_threshold(thr, v)
            
    return thr

In [29]:
# Definimos paths donde exportar los ficheros de las MFCCs
train_path = os.path.join(temp_path,'ver','train')
test_good_path = os.path.join(temp_path,'ver','test_good')
test_bad_path = os.path.join(temp_path,'ver','test_bad')

In [30]:
lists = verification_list(speecon_path)

In [31]:
compute_mfccs(lists, 17, train_path, test_good_path, test_bad_path)

Training MFCC calculated
Good testing MFCC calculated
Impostor MFCC calculated


In [32]:
world_gmm = train_world_gmm(files_train, n_mfccs=17, n_gmms=12)

NameError: name 'files_train' is not defined

In [None]:
trained_gmm = train_gmm(8,train_path)

In [None]:
# Si lanza error se tienen que recalcular los MFCCs
v = verification(trained_gmm, test_bad_path, test_good_path, world_gmm)

In [None]:
thr = find_threshold(v,-50,20,0.2)