In [None]:
import os
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from nemo.collections.asr.modules import AudioToMelSpectrogramPreprocessor
from nemo.collections.asr.parts.preprocessing.features import WaveformFeaturizer
import numpy as np
import librosa
import torch
import pickle
from matplotlib.pyplot import imshow
from matplotlib import pyplot as plt
import IPython.display as ipd
%matplotlib inline

In [None]:
# Directory containing real and synthesized samples from various methods for different speakers.
# samples_dir = "/home/pneekhara/synthesized_samples_FINAL/"
samples_dir = "/home/pneekhara/synthesized_samples_universalvoc/"

In [None]:
wav_paths = {}
for fname in os.listdir(samples_dir):
    if fname.endswith(".wav"):
        wav_type, method_info, speaker, val_no = fname.split("_")
        val_no = val_no.split(".")[0]
        key = "{}_{}_{}".format(wav_type, method_info, speaker)
        if key in wav_paths:
            wav_paths[key].append( ( int(val_no), os.path.join(samples_dir, fname) ) )
        else:
            wav_paths[key] = [ (int(val_no), os.path.join(samples_dir, fname)) ]

for key in wav_paths:
    wav_paths[key].sort()
    wav_paths[key] = [ t[1] for t in wav_paths[key] ]

In [None]:
wav_paths.keys()

In [None]:
# speaker = "92"

# for vidx in range(0,5):
#     for wav_type in ["real", "baseline", "synthesizedForceAlignment", "synthesized"]:
#         if wav_type == "real":
#             method_info = "actual"
#         elif wav_type == "baseline":
#             method_info = "originalModel"
#         else:
#             method_info = "1-nomix"

#         key = "{}_{}_{}".format(wav_type, method_info, speaker)
#         wav_path = wav_paths[key][vidx]
#         print (vidx, key)
#         ipd.display(ipd.Audio(wav_path))
    

In [None]:
wav_featurizer = WaveformFeaturizer(sample_rate=44100, int_values=False, augmentor=None)
mel_processor = AudioToMelSpectrogramPreprocessor(
        window_size = None,
        window_stride = None,
        sample_rate=44100,
        n_window_size=2048,
        n_window_stride=512,
        window="hann",
        normalize=None,
        n_fft=None,
        preemph=None,
        features=80,
        lowfreq=0,
        highfreq=None,
        log=True,
        log_zero_guard_type="add",
        log_zero_guard_value=1e-05,
        dither=0.0,
        pad_to=1,
        frame_splicing=1,
        exact_pad=False,
        stft_exact_pad=False,
        stft_conv=False,
        pad_value=0,
        mag_power=1.0
)

speaker_verification_model = EncDecSpeakerLabelModel.from_pretrained("speakerverification_speakernet")
# speaker_verification_model = EncDecSpeakerLabelModel.restore_from("/home/pneekhara/PreTrainedModels/ecapa_tdnn_mask.nemo")
speaker_verification_model.eval().cuda()

In [None]:
pickle_path = "/home/pneekhara/synthesized_audio_meta_dataFinal.pkl"
# pickle_path = "/home/pneekhara/synthesized_audio_meta_data_universalvoc.pkl"
regenerate = False
if os.path.exists(pickle_path) and not regenerate:
    with open(pickle_path, "rb") as f:
        meta_data = pickle.load(f)
        embeddings = meta_data['embeddings']
        spectrograms = meta_data['spectrograms']
        pitch_contours = meta_data['pitch_contours']
else:
    embeddings = {}
    spectrograms = {}
    pitch_contours = {}
    kidx = 0
    for key in wav_paths:
        print ("Getting embeddings for:", kidx, key, len(wav_paths[key]), len(wav_paths))
        embeddings[key] = []
        spectrograms[key] = []
        pitch_contours[key] = []
        for path in wav_paths[key]:
            embedding = speaker_verification_model.get_embedding(path)
            embeddings[key].append(embedding.cpu().numpy().flatten())

            wav = wav_featurizer.process(path)
            mel, _ = mel_processor.get_features(wav[None], torch.tensor([[wav.shape[0]]]).long() )
            mel = mel[0][0].cpu().numpy()

            pitch, _, _ = librosa.pyin(
                wav.numpy(),
                fmin=80,
                fmax=640,
                frame_length=2048,
                sr=44100,
                fill_na=0.0,
            )

            spectrograms[key].append(mel)
            pitch_contours[key].append(pitch)
        
        kidx += 1

    with open(pickle_path, "wb") as f:
        meta_data = {
            'spectrograms' : spectrograms,
            'pitch_contours' : pitch_contours,
            'embeddings' : embeddings
        }
        pickle.dump(meta_data, f, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
from sklearn import metrics
import random
def caclulate_speaker_verification_metrics(embeddings, duration_mins, mixing, eval_mode="synthetic"):

    def get_sim_score(v1, v2):
        sim = np.dot(v1, v2)
        sim /= (np.sqrt(np.dot(v1, v1)) * np.sqrt(np.dot(v2, v2)))
        return sim
        
        
    real_embeddings = {}
    synthetic_embeddings = {}
    
    mix_str = "mix" if mixing else "nomix"
    for key in embeddings:
        speaker = key.split("_")[-1]
        if "real_actual" in key:
            real_embeddings[speaker] = embeddings[key]
        if "synthesized_{}-{}".format(duration_mins, mix_str) in key:
            synthetic_embeddings[speaker] = embeddings[key]
    
    trial_pairs = []
    if eval_mode == "real":
        synthetic_embeddings = real_embeddings
        
    for speaker in sorted(synthetic_embeddings.keys()):
        for emb_idx, emb in enumerate(synthetic_embeddings[speaker]):
            pos_pairs = []
            for pos_emb_idx in range(0, len(real_embeddings[speaker])):
                if emb_idx != pos_emb_idx:
                    sim_score = get_sim_score(emb, real_embeddings[speaker][pos_emb_idx])
                    pos_pairs.append( (sim_score, 1))
            neg_pairs = []
            for neg_speaker in sorted(real_embeddings.keys()):
                if neg_speaker != speaker:
                    for neg_emb in real_embeddings[neg_speaker]:
                        sim_score = get_sim_score(emb, neg_emb)
                        neg_pairs.append( (sim_score, 0))
            random.seed(42)
            random.shuffle(neg_pairs)
            # neg_pairs = neg_pairs[:len(pos_pairs)]
            
            trial_pairs += pos_pairs
            trial_pairs += neg_pairs
    
    print ("Num Trials", len(trial_pairs))      
    y_true = [pair[1] for pair in trial_pairs]
    y_score = [pair[0] for pair in trial_pairs]
    
    fpr, tpr, threshold = metrics.roc_curve(y_true, y_score, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = threshold[np.nanargmin(np.absolute((fnr - fpr)))]
    EER = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
    EER_check = fnr[np.nanargmin(np.absolute((fnr - fpr)))]
    
    assert abs(EER - EER_check) < 0.1
    
    return EER
    
    
            
        

In [None]:
for mixing in [False, True]:
    for duration_mins in [1, 5, 30, 60, 'All']:
        if duration_mins == "All" and mixing:
            continue
        eer = caclulate_speaker_verification_metrics(embeddings, duration_mins, mixing, eval_mode="synthetic")
        print (eer)
#         print ( "finetuning mins:{} mixing:{} eer:{}".format(duration_mins, mixing, eer) )

In [None]:
caclulate_speaker_verification_metrics(embeddings, 60, False, eval_mode="real")

In [None]:
def calculate_pitch_contour_error_metrics(ref_pitch_contour_np, gen_pitch_contour_np):
    NFOE_error = 0.0 
    NUV_ref = 0.0
    NVU_gen = 0.0
    NVV = 0.0
    N_total = 0.0
    non_zero_count = 0.0
    for i in range(min(len(ref_pitch_contour_np), len(gen_pitch_contour_np))):
        N_total += 1.
        if (ref_pitch_contour_np[i] != 0 and gen_pitch_contour_np[i] != 0):        
            non_zero_count +=1
        if (ref_pitch_contour_np[i] == 0 and gen_pitch_contour_np[i] != 0):
            NUV_ref += 1
        if (gen_pitch_contour_np[i] == 0 and ref_pitch_contour_np[i] != 0):
            NVU_gen += 1

        if (ref_pitch_contour_np[i] > 0 and  gen_pitch_contour_np[i] > 0):
            NVV += 1 
            ratio = (gen_pitch_contour_np[i]/ref_pitch_contour_np[i])
            if (ratio > 1.2 or ratio < 0.8):
                NFOE_error +=1

    FFE = ((NFOE_error + NUV_ref + NVU_gen)/N_total)

    if NVV == 0:
        GPE = 1.0
    else:
        GPE =  (NFOE_error/NVV)


    VDE = ((NUV_ref + NVU_gen)/N_total)


    metric_dict = {'FFE': FFE, "GPE" : GPE, "VDE" : VDE}

    return metric_dict

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.patches as mpatches
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import pylab as plt
import matplotlib.patches as mpatches
import json

In [None]:
for speaker in [92, 6097]:
    for mixing in [False, True]:
        for finetuning_duration in [1, 5, 30, 60, 'All']:
        
            if mixing and finetuning_duration == 'All':
                continue
            mix_str = "nomix"
            if mixing:
                mix_str = "mix"
            synthesized_key = "synthesizedForceAlignment_{}-{}_{}".format(finetuning_duration, mix_str, speaker)
            actual_key = "real_actual_{}".format(speaker)
            
            mean_metrics = {
                'FFE' : [],
                'GPE' : [],
                'VDE' : []
            }
            for idx in range(len(pitch_contours[actual_key])):
                ref_pitch = pitch_contours[actual_key][idx]
                synth_pitch = pitch_contours[synthesized_key][idx]
                
                pitch_metrics = calculate_pitch_contour_error_metrics(ref_pitch, synth_pitch)
                for error_key in pitch_metrics:
                    mean_metrics[error_key].append(pitch_metrics[error_key])
            for error_key in mean_metrics:
                mean_metrics[error_key] = round(float(np.mean(mean_metrics[error_key])),3)
            
            print (speaker, finetuning_duration, "mixing:", mixing, mean_metrics)

In [None]:
with open("/home/pneekhara/synthesized_samplesv3/speaking_rate.json") as f:
    speaking_rate_stats = json.loads(f.read())
    for key in speaking_rate_stats:
        for metric_key in speaking_rate_stats[key]:
            print (key,metric_key, round(speaking_rate_stats[key][metric_key], 2) )

In [None]:
for speaker in [92, 6097]:
    for finetuning_duration in [5, 30, 60, 'All']:
        for mixing in [True]:
            if mixing and finetuning_duration == 'All':
                continue
            mix_str = "nomix"
            if mixing:
                mix_str = "mix"
            synthesized_key = "synthesizedForceAlignment_{}-{}_{}".format(finetuning_duration, mix_str, speaker)
            actual_key = "real_actual_{}".format(speaker)
            
            mean_metrics = {
                'FFE' : [],
                'GPE' : [],
                'VDE' : []
            }
            for idx in range(len(pitch_contours[actual_key])):
                ref_pitch = pitch_contours[actual_key][idx]
                synth_pitch = pitch_contours[synthesized_key][idx]
                
                pitch_metrics = calculate_pitch_contour_error_metrics(ref_pitch, synth_pitch)
                for error_key in pitch_metrics:
                    mean_metrics[error_key].append(pitch_metrics[error_key])
            for error_key in mean_metrics:
                mean_metrics[error_key] = round(float(np.mean(mean_metrics[error_key])),3)
            
            print (speaker, finetuning_duration,  mean_metrics)

In [None]:
def mscatter(x,y, ax=None, m=None, **kw):
    import matplotlib.markers as mmarkers
    #ax = ax or plt.gca()
    sc = plt.scatter(x,y,**kw)
    if (m is not None) and (len(m)==len(x)):
        paths = []
        for marker in m:
            if isinstance(marker, mmarkers.MarkerStyle):
                marker_obj = marker
            else:
                marker_obj = mmarkers.MarkerStyle(marker)
            path = marker_obj.get_path().transformed(
                        marker_obj.get_transform())
            paths.append(path)
        sc.set_paths(paths)
    return sc

def visualize_embeddings(embedding_dict_np, title = "TSNE"):
    # extract num_speakers 10. With new speaker key will have num_speakers+1 speakers in the TSNE plot
    # keys = []
    label_list =[]
    color =[]
    marker_shape = []
    color_idx = 0
    universal_embed_list=[]
    handle_list=[]  
    _unique_speakers = {}
    
    marker_list = ['<', '*', 'h', 'X', 's', 'H', 'D', 'd', 'P', 'v', '^', '>', '8', 'p']
    kidx = 0
    
    speaker_keys = {}
    for key in embedding_dict_np:
        wav_type, method_info, speaker = key.split("_")
        if speaker in speaker_keys:
            speaker_keys[speaker].append(key)
        else:
            speaker_keys[speaker] = [key]
    
    _all_keys = []
    for spk in speaker_keys:
        _all_keys += speaker_keys[spk]
        
    for key in _all_keys:
        wav_type, method_info, speaker = key.split("_")
        if method_info == "vocoded":
            continue
        if speaker not in _unique_speakers:
            _unique_speakers[speaker] = len(_unique_speakers)
        
        _num_samples = len(embedding_dict_np[key])
        if speaker in ["92", "6097"]:
            _num_samples = 10 # keeping just 10 to keep things balanced with other speakers
#         print (key, _num_samples)
        universal_embed_list += embedding_dict_np[key][:_num_samples]
        
        id_color = plt.cm.tab20(_unique_speakers[speaker])
        #id_color = plt.cm.tab20(kidx)
        color_element = [id_color] * _num_samples
        color += color_element
        _marker_shape = [ marker_list[kidx % len(marker_list)] ] * _num_samples
        marker_shape += _marker_shape
        _label = "{} : {}".format(key, marker_list[kidx % len(marker_list)])
        handle_list.append(mpatches.Patch(color = id_color, label=_label))
        
        kidx += 1
        
   
    
    speaker_embeddings = TSNE(n_components=2, random_state=0).fit_transform(universal_embed_list)        
    
    #ax = plt.axes()
    #ax.set_facecolor("gainsboro")
    #plt.scatter(speaker_embeddings[:, 0], speaker_embeddings[:, 1], marker = marker_shape,  c=color, s=30)
    mscatter(speaker_embeddings[:, 0], speaker_embeddings[:, 1], m = marker_shape,  c=color, s=60)
   
    plt.legend(handles=handle_list,title="Speaker_ID")
    plt.title(title)
    plt.show()

def plot_pitch(pitch_actual, pitch_synthetic):
    plt.plot(range(len(pitch_actual)), pitch_actual, "r", label = "Real" )
    plt.plot(range(len(pitch_synthetic)), pitch_synthetic, "b", label = "Synthetic" )
    plt.legend()
    plt.title("F0 Pitch contour")
    plt.show()

def visualize_synthesized_audio(speaker, duration, mixing, n=2):
    actual_key = "real_actual_{}".format(speaker)
    mix_str = "mix" if mixing else "nomix"
    synthesized_key = "synthesized_{}-{}_{}".format(duration, mix_str, speaker)
    
    if duration == "All" and mixing:
        return
    
    if duration == "All":
        synth_msg = "trained from scratch on all data audio of speaker {}".format(speaker)
    else:
        synth_msg = "finetuned on {} mins audio of speaker {}".format(duration, speaker)
        if mixing:
            synth_msg += " mixed with 5000 samples of original speaker."
    
    for idx in range(n):
        
#         actual_spectrogram = spectrograms[actual_key][idx]
#         synthetic_spectrogram = spectrograms[synthesized_key][idx]
        
#         actual_pitch = pitch_contours[actual_key][idx]
#         synthetic_pitch = pitch_contours[synthesized_key][idx]
        
        print ("---"*30)
        
        
        print(idx+1, "Actual audio")
#         imshow(actual_spectrogram, origin="lower", aspect = "auto")
#         plt.show()
        
        ipd.display(ipd.Audio(wav_paths[actual_key][idx]))
        
        
        print(idx+1, "Synthetic audio by model", synth_msg)
#         imshow(synthetic_spectrogram, origin="lower", aspect = "auto")
#         plt.show()
        ipd.display(ipd.Audio(wav_paths[synthesized_key][idx]))
        
#         plot_pitch(actual_pitch, synthetic_pitch)
        

In [None]:
plt.rcParams["figure.figsize"] = (9, 3)

for speaker in [6097]:
    for finetuning_mins in [60, 5, "All"]:
        for mixing in [False, True]:
            visualize_synthesized_audio(speaker, finetuning_mins, mixing, n=1)

In [None]:
plt.rcParams["figure.figsize"] = (12, 12)
visualize_embeddings(embeddings)

In [None]:
len(embeddings['synthesized_5-nomix_6097'])

In [None]:
from nemo.collections.tts.models import Tacotron2Model

In [None]:
Tacotron2Model.list_available_models()