# SV Live Demo

In [None]:
%load_ext autoreload
%autoreload 2
%pylab inline

## Speaker Verification

### Model Load

In [None]:
from ResNet34 import ResNet34, ResNet34_v1
from ResNet34_1 import ResNet34_v4
from tdnnModel import tdnn_xvector, tdnn_xvector_v1

config = dict(
    loss="softmax",
    gpu_no=[0], no_cuda=True,
    input_dim=40
)

In [None]:
# model = ResNet34(config, inplanes=16, n_labels=1759)
# model = ResNet34_v4(config, layers=[3,4,6,3], n_labels=1260)
# model = tdnn_xvector_v1(config, n_labels=1759)
model = ResNet34_v1(config, n_labels=1759)

In [None]:
# model.load_extractor("../models/gcommand_equal30_wav/norm/ResNet34_v1_softmax/fbank_30f_100f_v01/model_best.pth.tar")
# model.load_extractor("../models/gcommand_equal30_wav/norm/tdnn_xvector_softmax/fbank_80f_80f_v00/model_best.pth.tar")
# model.load_extractor("./tdnn_model3/model_best.pth.tar")
model.load("./ResNet34_v1_1/model_best.pth.tar")
model.eval()

if not config['no_cuda']:
    model.cuda()

###  Speaker Verification System

In [None]:
import librosa
from pydub  import AudioSegment
from manage_audio import strip_audio
from manage_audio import preprocess_audio
import numpy as np
import torch
import torch.nn.functional as F
import pickle

def seg2wav(seg):
    wav_data = (np.array(seg.get_array_of_samples())
                / 32768.0).astype(np.float32)
    
    return wav_data

def zero_padding(data, in_len):
    padding_len = max(0, in_len - len(data))
    data = np.pad(data, (padding_len//2, padding_len - padding_len//2), "constant")
    
    return data

class sv_system():
    def __init__(self, model, spk_models=None, lda_model=None,
                 n_dims=40, feat_format='fbank'):
        self.speaker_models = pickle.load(open(spk_models, "rb")) if spk_models \
                                else dict()
        self.lda_model = pickle.load(open(lda_model, "rb")) if lda_model \
                                else None
        self.dct_filters = librosa.filters.dct(n_filters=n_dims, n_input=n_dims)
        self.model = model
        self.n_dims = n_dims
        self.feat_format = feat_format
        self.enrolled_feats = dict()
    
    def enrol(self, wav, spk_name):
        feat = self._wav2feat(wav)
#         print("feat shape: {}".format(feat.shape))
        dvector = self._extract_dvector(feat).squeeze()
        if spk_name not in self.speaker_models:
            self.enrolled_feats[spk_name] = [feat]
            self.speaker_models[spk_name] = [dvector]
        else:
            self.enrolled_feats[spk_name] += [feat]
            self.speaker_models[spk_name] += [dvector]
            
        
    def enrols(self, wav, spk_name):
        feats = self._wav2feats(wav)
        for feat in feats:
            dvector = self._extract_dvector(feat).squeeze()

            if spk_name not in self.speaker_models:
                self.speaker_models[spk_name] = [dvector]
            else:
                self.speaker_models[spk_name] += [dvector]
                   
    def _extract_dvector(self, feat):
        """
            dvector: ndarray
        """
        if feat.dim() == 2:
            feat = feat.unsqueeze(0).unsqueeze(0)
        elif feat.dim() == 3:
            if isinstance(self.model, ResNet34_v1):
                feat = feat.unsqueeze(1)
            else:
                feat = feat.unsqueeze(0)
                
        
        dvector = self.model.embed(feat).detach().cpu().numpy()
        if self.lda_model:
            dvector = self.lda_model.transform(dvector).astype(np.float32)
                
        return dvector
    
    def _wav2feats(self, wav):
        wav_seg = AudioSegment.from_wav(wav)
#         wav_seg = wav_seg.normalize()
        voice_segs = pydub.silence.split_on_silence(wav_seg, min_silence_len=100, 
                keep_silence=500, silence_thresh=-32)
        self.voice_segs = voice_segs
        print([len(seg) for seg in voice_segs])
#         max_len = int(len(max(voice_segs, key=len)) * 16000/1000)
#         wav_data = [zero_padding(strip_audio(seg2wav(seg), rms_ths=0.10), 16000)
#                     for seg in voice_segs]
        wav_data = [seg2wav(seg) for seg in voice_segs]
        feats = []
        for wav_d in wav_data:
            feat = preprocess_audio(wav_d, n_mels=self.n_dims, 
                        dct_filters=self.dct_filters, in_feature=self.feat_format)
            feats.append(feat)
        
        return feats
        
    def _wav2feat(self, wav):
        """
            extracting input feature from wav (mfcc, fbank)
        """
#         wav_data = librosa.core.load(wav, sr=16000)[0]
        wav_seg = AudioSegment.from_wav(wav)
#         wav_seg = wav_seg.normalize()
#         print(len(wav_seg))
#         wav_seg = wav_seg.strip_silence(silence_len=100, padding=100, silence_thresh=-32)
#         assert len(wav_seg) > 0, "no voice"
        wav_data = seg2wav(wav_seg)
#         wav_data = strip_audio(wav_data, rms_ths=0.10)
#         wav_data = zero_padding(wav_data, 16000)
        
        feat = preprocess_audio(wav_data, n_mels=self.n_dims, 
                    dct_filters=self.dct_filters, in_feature=self.feat_format)
        
        return feat
        
    def init_speaker_model(self):
        self.speaker_model = dict()
    
    def compute_spk_model(self):
#         self.concat_feats = np.concatenate([self._extract_dvector(torch.cat(v, dim=0)) 
#                                         for v in self.enrolled_feats.values()], 
#                                       axis=0)
        self.concat_feats = avg_speaker_models = np.stack([np.mean(v, axis=0).squeeze() for v in self.speaker_models.values()], 
                                      axis=0)
    def verify(self, wav):
        """
            verify a input wav and output a verification result
            and rank-1 identification
        """
        feat = self._wav2feat(wav)
        test_dvector = self._extract_dvector(feat)
        # order keep?
        # averaging all dvectors for each speaker
#         avg_speaker_models = np.stack([np.mean(v, axis=0).squeeze() for v in self.speaker_models.values()], 
#                                       axis=0)

#         score = F.cosine_similarity(torch.from_numpy(self.concat_feats).float(), 
#                                         torch.from_numpy(test_dvector).float(), dim=1).numpy()

        score = []
        for k, v in self.speaker_models.items():
            v = np.array(v)
            score_ = F.cosine_similarity(torch.from_numpy(v).float(), 
                                        torch.from_numpy(test_dvector).float(), dim=1).numpy()
            score.append(np.median(score_))
            
        pred_speaker = list(self.speaker_models.keys())[np.argmax(score)]
            
        return pred_speaker, score

### parse dataset 

In [None]:
import pandas as pd
import os

In [None]:
kor_dataset = os.listdir("kor_voices/wav")

In [None]:
records = []
for wav_file in kor_dataset:
    filen = wav_file.rstrip('.wav')
    spk, sent = filen.split('_')[0], filen.split('_')[1]
    records.append((spk, sent, "kor_voices/wav/"+wav_file))

In [None]:
kor_dataset_df = pd.DataFrame.from_records(records, columns=['spk', 'sent', 'wav'])

### speaker model similarity

In [None]:
spk_model = test_sv_system.speaker_models
spk_model_idx = list(spk_model.keys())
avg_speaker_models = torch.from_numpy(np.stack([np.mean(v, axis=0).squeeze() for v in spk_model.values()], 
                                       axis=0))
spk_model_scores = F.cosine_similarity(avg_speaker_models.unsqueeze(1), avg_speaker_models.unsqueeze(0), dim=2)

In [None]:
import itertools
trials = []
for a ,b in itertools.product(range(len(spk_model_idx)),range(len(spk_model_idx))):
    trials.append((spk_model_idx[a],spk_model_idx[b],a,b))
spk_model_trial_df = pd.DataFrame.from_records(trials, columns=['spk_a', 'spk_b', 'spk_a_id', 'spk_b_id'])    

In [None]:
spk_model_trial_df['score'] = spk_model_scores[spk_model_trial_df.spk_a_id, spk_model_trial_df.spk_b_id]

In [None]:
spk_model_trial_df[spk_model_trial_df.score > 0.8].spk_a.value_counts()

In [None]:
blacklist = spk_model_trial_df[spk_model_trial_df.score > 0.8].spk_a.value_counts()[:7].index.tolist()

In [None]:
blacklist = ['be', 'ms', 'cy', 'dw', 'dg']

In [None]:
clean_kor_dataset_df = kor_dataset_df[~kor_dataset_df.spk.isin(blacklist)]

### sv_system

In [None]:
# test_sv_system = sv_system(model, spk_models=None, lda_model=None)
# test_sv_system = sv_system(model, spk_models=None, lda_model="tdnn_model3/lda_model.pkl")
test_sv_system = sv_system(model, spk_models=None, lda_model=None, n_dims=64)

### Enrollment

In [None]:
blacklist = ['hm', 'hs', 'be', 'ms', 'ds', 'cy', 'dw', 'dg', 'ej']

In [None]:
enroll_spks = ['je', 'ip', 'hm', 'sv']

In [None]:
# white_df = kor_dataset_df[~kor_dataset_df.spk.isin(blacklist)]
white_df = kor_dataset_df

In [None]:
enrollment_df = white_df[white_df.spk.isin(enroll_spks)].groupby(['spk','sent']).apply(lambda x: x.sample(n=1))
# enrollment_df = kor_dataset_df[kor_dataset_df.spk.isin(enroll_spks)].groupby(['spk']).apply(lambda x: x.sample(n=1))

In [None]:
test_df = white_df[(~white_df.wav.isin(enrollment_df.wav))]
# test_df = kor_dataset_df[kor_dataset_df.spk.isin(test_spks)]

In [None]:
print("enroll: {}, test: {}".format(len(enrollment_df), len(test_df)))

In [None]:
test_sv_system.init_speaker_model()
for idx, row in enrollment_df.iterrows():
    test_sv_system.enrol(row.wav, row.spk)

In [None]:
test_sv_system.compute_spk_model()

-------------------------

In [None]:
tp_counts = 0 # true positive
fp_counts = 0 # false positive
threshold = 0.85

scores = []
labels = []
preds = []
for idx, row in test_df.iterrows():
    pred_speaker, score = test_sv_system.verify(row.wav)
    preds.append(pred_speaker)
    if row.spk in enroll_spks and max(score) >= threshold:
        tp_counts += 1
        scores.append(score); labels.append(1)
    elif row.spk in enroll_spks and max(score) < threshold:
        scores.append(score); labels.append(1)
    elif row.spk not in enroll_spks and max(score) >= threshold:
        fp_counts += 1
        scores.append(score); labels.append(0)
    else:
        scores.append(score); labels.append(0)
scores = np.array(scores)
labels = np.array(labels)

In [None]:
tp_rate = tp_counts / len(test_df)
fp_rate = fp_counts / len(test_df)

print("tp: {:.2f}, fn: {:.2f}, fp: {:.2f}, tn: {:.2f}".format(tp_rate, 1-tp_rate, fp_rate, 1-fp_rate))

In [None]:
from sklearn.metrics import roc_curve, auc
fpr, tpr, thres = roc_curve(
        labels, np.max(scores, axis=1), pos_label=1)
eer = fpr[np.nanargmin(np.abs(fpr - (1 - tpr)))]
print("[TI] eer: {:.3f}%".format(eer*100))

In [None]:
thres[np.nanargmin(np.abs(fpr - (1 - tpr)))]

In [None]:
test_df['score'] = np.max(scores, axis=1)

In [None]:
test_df[test_df.spk.isin(enroll_spks)].hist()
plt.xlim([0.5, 1])
plt.show()

In [None]:
test_df[~test_df.spk.isin(enroll_spks)].hist()
plt.xlim([0.5, 1])
plt.show()

In [None]:
b = test_df[~test_df.spk.isin(enroll_spks)]

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
enroll_spks

In [None]:
from sklearn.metrics import confusion_matrix

cnf_matrix = confusion_matrix(test_df.spk.tolist(), preds, labels=test_df.spk.unique().tolist())
np.set_printoptions(precision=2)

# Plot non-normalized confusion matrix
plt.figure(figsize=(10,10))
plot_confusion_matrix(cnf_matrix, classes=test_df.spk.unique().tolist(),
                      title='Confusion matrix, without normalization')

In [None]:
def cos_score(a, b):
    print(np.dot(a, b)/np.linalg.norm(a)/np.linalg.norm(b))

In [None]:
result_df = pd.concat([test_df, pd.DataFrame.from_records(scores.round(4).tolist(), 
                                                          index=test_df.index, 
                                                          columns=test_sv_system.speaker_models.keys())], 
                      axis=1)

In [None]:
result_df['pred'] = preds

In [None]:
result_df.drop(columns='score').to_csv("./ResNet34_v1_1/result.csv", float_format='%.3f', index=False)

In [None]:
enrollment_df.to_csv("./ResNet34_v1_1/enrollment.csv", index=False)

-------------------------

In [None]:
# test_sv_system.enrols("test_43AT1YC.wav", "heesu")
test_sv_system.enrols("mini_dataset/heesu_en1.wav", "heesu")
# test_sv_system.enrols("mini_dataset/heesu_en2.wav", "heesu")

In [None]:
test_sv_system.enrol("./mini_dataset/inpyo.wav", 'inpyo')
test_sv_system.enrol("./mini_dataset/inpyo_6Pu01w1.wav", 'inpyo')

In [None]:
test_sv_system.enrol("./mini_dataset/jiwoong.wav", 'jiwoong')
test_sv_system.enrol("./mini_dataset/jiwoong_3bJIHKe.wav", 'jiwoong')

In [None]:
test_sv_system.enrol("./mini_dataset/younghyun.wav", 'younghyun')
test_sv_system.enrol("./mini_dataset/younghyun_y4SCszr.wav", 'younghyun')

### Verification &  Identification

In [None]:
import os

for wav in sorted(os.listdir("mini_dataset/")):
    if 'wav' not in wav: continue
    if wav in test_sv_system.enrolled_wavs: continue
#     if 'heesu' not in wav: continue
    print(wav)
    test_sv_system.verify("mini_dataset/"+wav)
    print("--------------")

In [None]:
## import IPython.display as ipd
ipd.Audio("mini_dataset/inpyo.wav")

### 연속문장

In [None]:
import IPython.display as ipd
ipd.Audio("test_43AT1YC.wav")

In [None]:
wav_seg = AudioSegment.from_wav("mini_dataset/heesu_1.wav")
# wav_seg = wav_seg.normalize()
# wav_seg = wav_seg.strip_silence(silence_len=20, silence_thresh=-16,
#         padding=20)    

In [None]:
voice_segs = pydub.silence.split_on_silence(wav_seg, min_silence_len=100, keep_silence=500, silence_thresh=-32)

In [None]:
voice_segs[0]

In [None]:
import soundfile as sd
sd.write("mini_dataset/heesu_en1_right.wav", seg2wav(voice_segs[5]), 16000)

In [None]:
voice_segs[4]

In [None]:
AudioSegment.from_wav("mini_dataset/inpyo_rtXndSE.wav")

In [None]:
AudioSegment.from_wav("mini_dataset/jiwoong.wav")

In [None]:
df = pd.read_pickle("kor_voices/kor_dataset.pkl")

In [None]:
spks = df.spk.unique().tolist()

In [None]:
df['label'] = df.spk.apply(lambda x: spks.index(x))

In [None]:
df.to_pickle("kor_voices/kor_dataset.pkl")