# ITI110 Project - Speaker Recognition

Team 9

## Part 3 - Predictions

In [0]:
import sys
import numpy as np
import pandas as pd
import librosa
import pickle
import os
from shutil import copyfile
import matplotlib.pyplot as plt
import imageio
%matplotlib inline

import cv2
import time

import multiprocessing

import tensorflow as tf
import tensorflow.keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate
from tensorflow.keras.models import Model

from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.layers import Lambda, Flatten, Dense
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.utils import CustomObjectScope

from tensorflow.keras.layers import Layer
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
from tensorflow.keras import initializers

import numpy.random as rng

### Folder Paths

In [0]:
base_folder = "/Users/MacBookPro/AIandMLNYP/AIProject/SpeakerRecognition/TIMIT/"
data_folder = base_folder + "/data"
predict_audio_folder = base_folder + "/Audio/Prediction/"
pickle_path = '/Users/MacBookPro/AIandMLNYP/AIProject/SpeakerRecognition/data/'
output_folder = base_folder + "/npydata/"
model_path = base_folder + 'Model'

In [0]:
input_file_path = "/Users/MacBookPro/AIandMLNYP/AIProject/SpeakerRecognition/TIMIT/Audio/Input/"

### Converting to MFCC

In [0]:
# Each of our sample (16khz) lasts exactly from 3 - 5  seconds. We will truncate at 3 secs with 16000 * 3 samples.
#
mfcc_hop_length = 256
mfcc_max_frames = int(16000 * 3 / mfcc_hop_length) + 1

print ("MFCC Frames (for 3 sec audio):     %d" % (mfcc_max_frames))


num_classes = 10
max_samples = 16000 * 3  # 5 seconds
max_mfcc_features = 40

# Scale the values to be between 
def scale(arr):
    #arr = arr - arr.mean()
    safe_max = np.abs(arr).max()
    if safe_max == 0:
        safe_max = 1
    arr = arr / safe_max
    return arr


# Load a file and convert its audio signal into a series of MFCC
# This will return a 2D numpy array.
#
def convert_mfcc(file_name):
    signal, sample_rate = librosa.load(file_name) 
    signal = librosa.util.normalize(signal)
    signal_trimmed, index = librosa.effects.trim(signal, top_db=60)
    signal_trimmed = librosa.util.fix_length(signal_trimmed, max_samples)
    
    feature = (librosa.feature.mfcc(y=signal_trimmed, sr=sample_rate, n_mfcc=max_mfcc_features).T)

    if (feature.shape[0] > mfcc_max_frames):
        feature = feature[0:mfcc_max_frames, :]
    if (feature.shape[0] < mfcc_max_frames):
        feature = np.pad(feature, pad_width=((0, mfcc_max_frames - feature.shape[0]), (0,0)), mode='constant')
    
    # This removes the average component from the MFCC as it may not be meaningful.
    #
    feature[:,0] = 0
        
    feature = scale(feature)

    return feature


MFCC Frames (for 3 sec audio):     188


In [0]:
def convertAudio(path,n = 0):
    '''
    path => Path of train directory or test directory
    '''
    X_class = []
    y_class = []
    # convert and load the mfcc features for all audio files for each speaker
    file_path = path + "Audio/Prediction/"
    for speaker in os.listdir(file_path):
        print("loading speaker: " + speaker)
        speaker_path = os.path.join(file_path,speaker)
        for filename in os.listdir(speaker_path):
            audio_file_path = os.path.join(speaker_path, filename)
            # convert the audio file to mfcc
            mfcc = convert_mfcc(audio_file_path)
                #print(image.shape)
            X_class.append(mfcc)
            y_class.append(speaker)
    y_class = np.vstack(y_class)
    X_class = np.stack(X_class)
    print(y_class)
    return  X_class, y_class

# Tensor Operations

### Loading new speakers' audio files into tensors

In [0]:
X, y =convertAudio(base_folder)
print(X.shape)
#print(c)

loading speaker: FDAW0
loading speaker: MTPF0
loading speaker: MGRL0
loading speaker: MTJS0
loading speaker: MMRP0
loading speaker: FSJK1
loading speaker: MDPK0
loading speaker: Sekhar
loading speaker: MKLW0
loading speaker: MRCG0
loading speaker: MJEB1
loading speaker: Joy
loading speaker: Sindhu
[['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['FDAW0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MTPF0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MGRL0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MTJS0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['MMRP0']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['FSJK1']
 ['MDPK0']
 ['MDPK0']
 ['MDPK0']
 ['MDPK0

### Saving the baseline tensors on disk

In [0]:
with open(os.path.join(pickle_path,"employeeaudio.pickle"), "wb") as f:
    pickle.dump((X,y),f)

### Loading the validation audio  into tensors

In [0]:
# generate pairs and targets
with open(os.path.join(pickle_path, "employeeaudio.pickle"), "rb") as f:
    (X, y) = pickle.load(f)

print(X.shape)
print(y.shape)

(131, 188, 40)
(131, 1)


# Rebuild Best Model

### Pull-in best model, along with Initializers and Optimizers

In [0]:
def initialize_weights(shape, dtype=None):
    return np.random.normal(loc = 0.0, scale = 1e-2, size = shape)

In [0]:
def initialize_bias(shape, dtype=None):
    return np.random.normal(loc = 0.5, scale = 1e-2, size = shape)

In [0]:
def get_base_conv_encoder(input_shape):
    # Convolutional Neural Network
    model = Sequential()
    model.add(Conv2D(64, (3,3), activation='relu', input_shape=input_shape,
                   kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None), kernel_regularizer=l2(2e-4)))  
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(128, (3,3), activation='relu',
                     kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                     bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(BatchNormalization())
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(516, activation='sigmoid',
                   kernel_regularizer=l2(1e-3),
                   kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),bias_initializer=initialize_bias))
    return model
    

In [0]:
def build_final_model(input_shape,  distance_metric='uniform_euclidean'):
    
    assert distance_metric in ('uniform_euclidean', 
                                'weighted_l1',
                                'cosine_distance')
    left_input = Input(input_shape)
    right_input = Input(input_shape)
    model = get_base_conv_encoder(input_shape)
    encoded_l = model(left_input)
    encoded_r = model(right_input)
    
    if distance_metric == 'weighted_l1':
        print("using Weighted_l1")
        L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
        L1_distance = L1_layer([encoded_l, encoded_r])
        prediction = Dense(1,activation='sigmoid',bias_initializer=initialize_bias)(L1_distance)
      
    if distance_metric == 'uniform_euclidean':
        print("inside euclidian")
        L1_layer = Lambda(lambda tensors:K.sqrt(K.sum(K.square(K.abs(tensors[0] - tensors[1])),axis=-1, keepdims=True)))
        L1_distance = L1_layer([encoded_l, encoded_r])
        prediction = Dense(1,activation='sigmoid',bias_initializer=initialize_bias)(L1_distance)

   
    if distance_metric == 'cosine_distance':
        print("using cosine similarity")
        L1_layer = Lambda(cosine_similarity, output_shape=cos_dist_output_shape)
        L1_distance = L1_layer([encoded_l, encoded_r])
        prediction = Dense(1,activation='sigmoid',bias_initializer=initialize_bias)(L1_distance)
    
    
    # Connect the inputs with the outputs
    siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)
    # return the model
    return siamese_net  

In [0]:
model = build_final_model((188, 40, 1),'weighted_l1')
model.summary()

using Weighted_l1
Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 188, 40, 1)] 0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 188, 40, 1)] 0                                            
__________________________________________________________________________________________________
sequential (Sequential)         (None, 516)          23852804    input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
lambda (Lambda)                 (None, 516)          0           sequential[

### Load Best Weights and Compile

In [0]:
model.load_weights(os.path.join(model_path, "seg_weights.best.hdf5"))

In [0]:
optimizer = Adam(lr = 0.00006)
model.compile(loss="binary_crossentropy",metrics=['accuracy'], optimizer=optimizer)

# Predictions

In [0]:
def predict_speaker(inputaudiofile):
    
    ### extract audio mfccs for the inputaudiofile.
    audio_file_path = os.path.join(input_file_path, inputaudiofile)
    mfcc = convert_mfcc(audio_file_path)
        
    input_l = []
    input_r = []
    s,w,h = X.shape
    input_l.append(mfcc.reshape(w,h,1))
    y_pred = np.zeros(shape=(s))
    
    ###loop through the audio mfccs of the baseline audio files.
     
    for  i in range(X.shape[0]):
        w,h = mfcc.shape
        pairs=[np.zeros((1 , w, h,1)) for i in range(2)]
        pairs[0][0,:,:,:] = input_l[0]
        pairs[1][0,:,:,:] = X[i].reshape(w,h,1)
        y_pred[i] = (model.predict(pairs).ravel())[0]
        
    return y_pred
    

In [0]:
y_pred = predict_speaker('MJEB1_SI837.WAV.wav')

In [0]:
prediction = y[np.argmax(y_pred)]
print(prediction)

['MJEB1']


In [0]:
y_pred = predict_speaker('MRC0_SX438.WAV.wav')
prediction = y[np.argmax(y_pred)]
print(prediction)

['MRCG0']


In [0]:
y_pred = predict_speaker('sx52-Sindhu.wav')
prediction = y[np.argmax(y_pred)]
print(prediction)

['Sindhu']


In [0]:
y_pred = predict_speaker('sx30.wav')
prediction = y[np.argmax(y_pred)]
print(prediction)

['Joy']


In [0]:
y_pred = predict_speaker('Sek10.wav')
#print(y_pred)
prediction = y[np.argmax(y_pred)]
print(prediction)

['Sekhar']
