In [1]:
import os
from pystoi import stoi
from loss import *
from scipy.io import wavfile
import whisper
from whisper.audio import log_mel_spectrogram
import json

In [2]:
# Filepaths, constants
data_dir = '/home/thegoods/'
text_path = '/home/thegoods/clean/labels.csv'
WHISPER_PGD = 'whisper_pgd_200'
WHISPER_GAN = 'whisper_gan'
DF_AUDIO = 'DF_audio'
DF_TEXT = 'DF_text'
def getPath(mode,attack,id,split='test'):
    if mode=="clean":
        return os.path.join(data_dir,mode,split,f'{id}.wav')
    elif mode=='gen':
        return os.path.join(data_dir,mode,attack,split,f'{id}.npy')
    else:
        return os.path.join(data_dir,mode,attack,split,f'{id}.wav')
inputs = [WHISPER_PGD]
N_SAMPLES = 100
# inputs = [WHISPER_PGD, WHISPER_GAN, DF_AUDIO, DF_TEXT]

In [3]:
texts = {}
for l in open(text_path,"r").readlines():
    parts = l.split(',')
    texts[parts[0]] = parts[-1]

# DEEPFAKE GENERATION

In [4]:
torch.cuda.is_available()

True

In [5]:
import sys
from pathlib import Path

import torch
from torch import nn
import numpy as np

sys.path.append('./encoder/')
from encoder import inference as encoder
from vocoder import inference as vocoder
from synthesizer.inference import Synthesizer

In [6]:
encoder.load_model(Path("saved_models/default/encoder.pt"))
synthesizer = Synthesizer(Path('saved_models/default/synthesizer.pt'))
vocoder.load_model(Path('saved_models/default/vocoder.pt'))

Loaded encoder "encoder.pt" trained to step 1564501
Synthesizer using device: cuda
Building Wave-RNN
Trainable Parameters: 4.481M
Loading model weights at saved_models/default/vocoder.pt


In [7]:
def AudioSynthesisRun(preprocessed_wav_torch,text):       
    embed = encoder.embed_utterance(preprocessed_wav_torch,using_partials=False)
    texts = [text]
    embeds = [embed]

    specs = synthesizer.synthesize_spectrograms(texts, embeds)
    spec = specs[0]
    return spec

In [8]:
def genDFs(id):
    c_text = texts[id]
    for inp in inputs:
        noise_fname = getPath('noise',inp,id)
        sample_rate, noise_samples = wavfile.read(noise_fname)
        noise_orig = np.float32(noise_samples)
        gen_fname = getPath('gen',inp,id)
        print(gen_fname)
        np.save(gen_fname, AudioSynthesisRun(torch.from_numpy(noise_orig), c_text).detach().numpy())

In [9]:
with torch.no_grad():
    i = 0
    for fname in os.listdir(os.path.join(data_dir,'clean','test')):
        id = fname.strip('.wav')
        genDFs(id)
        i += 1
        if i > N_SAMPLES:
            break

/home/thegoods/gen/whisper_pgd_200/test/61-70968-0032.npy
nmels 40
Before transpose shape torch.Size([40, 429])


Trainable Parameters: 30.870M
Loaded synthesizer "synthesizer.pt" trained to step 295000

| Generating 1/1


Done.

/home/thegoods/gen/whisper_pgd_200/test/4446-2271-0023.npy
nmels 40
Before transpose shape torch.Size([40, 227])

| Generating 1/1


Done.

/home/thegoods/gen/whisper_pgd_200/test/237-134500-0037.npy
nmels 40
Before transpose shape torch.Size([40, 711])

| Generating 1/1


Done.

/home/thegoods/gen/whisper_pgd_200/test/1284-1180-0023.npy
nmels 40
Before transpose shape torch.Size([40, 562])

| Generating 1/1


Done.

/home/thegoods/gen/whisper_pgd_200/test/2300-131720-0038.npy
nmels 40
Before transpose shape torch.Size([40, 562])

| Generating 1/1


Done.

/home/thegoods/gen/whisper_pgd_200/test/4507-16021-0007.npy
nmels 40
Before transpose shape torch.Size([40, 264])

| Generating 1/1


Done.

/home/thegoods/gen/whisper_pgd_200/test/5142-33396-0009.npy
nmels 40
Before transpose shape torch.Size([40, 338])

| Generating 1/1


Done.

/home/thegoods/gen/whisper_pgd_200/test

In [10]:
del vocoder
del encoder
del synthesizer

# Metrics

## STOI

In [11]:
def STOI(orig,noised,sample_rate):
    return stoi(orig,noised,sample_rate)

## Text Prob Comparison

In [12]:
model = whisper.load_model("base")

In [13]:
model.device

device(type='cuda', index=0)

In [14]:
def getProb(mel_spec, real_text, model=model):
    return get_loss_single_segment(model, spec=mel_spec, label=real_text).to('cpu')

In [15]:
def WhisperComparison(g_spec, clean_text, clean_norm):
    return getProb(g_spec, clean_text) - clean_norm
# TODO - subtract or divide?

## Spec Comparison

In [16]:
from tslearn.metrics import dtw, dtw_path
l1loss = nn.L1Loss()


Install h5py to use hdf5 features: http://docs.h5py.org/
  warn(h5py_msg)


In [17]:
# Use scipy sparse maybe to save space
def getAlignmentMatrices(path):
    M = len(path)
    l1m = max(path, key=lambda x: x[0])[0]
    l2m = max(path, key=lambda x: x[1])[1]

    A1 = np.zeros((M,l1m+1),dtype=np.float32)
    A2 = np.zeros((M,l2m+1),dtype=np.float32)


    for i in range(len(path)):
        p1,p2 = path[i]
        A1[i,p1] = 1.0
        A2[i,p2] = 1.0

    return torch.from_numpy(A1).detach(),torch.from_numpy(A2).detach()

In [18]:
def l1Entropy(orig_spec, noised_spec):
    return l1loss(noised_spec,orig_spec)

In [19]:
def compute_objective_entropy(synth_speech_spec, target_speech_spec):
    np1 = (synth_speech_spec - synth_speech_spec.mean())/synth_speech_spec.std()
    np2 = (target_speech_spec - target_speech_spec.mean())/target_speech_spec.std()

    pathCalc1 = np1.clone().detach().numpy()
    pathCalc2 = np2.clone().detach().numpy()

    optimal_path, dtw_score = dtw_path(pathCalc1, pathCalc2, global_constraint="sakoe_chiba", sakoe_chiba_radius=5)
    A1,A2 = getAlignmentMatrices(optimal_path)

    stretched1 = torch.matmul(A1,np1)
    stretched2 = torch.matmul(A2,np2)

    return l1Entropy(stretched1,stretched2)

In [20]:
def shrinkSpec(spec):
    return spec
    wav_init_spec = torch.log(torch.clamp(spec, min=1e-10, max=1e5))

    np1_scale = wav_init_spec.clone().detach().numpy()
    np1_scale = (np1_scale - np1_scale.min())/np1_scale.std()
    mask = np.linalg.norm(np1_scale, axis=1) <= 17.5

    wav_init_spec = torch.tensor(wav_init_spec[~mask, :],requires_grad=True)
    return wav_init_spec
    # implement shrinking code - TODO?

In [21]:
def compareSpecs(clean_spec, gen_spec):
    # print('clean_spec.shape',clean_spec.shape)
    # print('gen_spec.shape',gen_spec.shape)
    return compute_objective_entropy(gen_spec, clean_spec)

## Put it all together

In [22]:
def getMetrics(id):
    output = {inp : {} for inp in inputs}
    # get clean spectrogram
    clean_fname = getPath('clean','',id)
    sample_rate, clean_samples = wavfile.read(clean_fname)
    clean_orig = np.float32(clean_samples)
    clean_spec = log_mel_spectrogram(clean_orig) # what sampling rate is used?
    clean_spec_small = shrinkSpec(clean_spec.T).T
    # print('clean_spec_small.shape',clean_spec_small.shape)

    try:
        clean_text = texts[id]
    except KeyError:
        return None
    whisper_baseline = getProb(clean_spec.to(model.device), clean_text)
    del clean_spec

    for inp in inputs:
        # get noised waveform, and get stoi results
        noise_path = getPath('noise',inp,id)
        inp_sr, inp_samples = wavfile.read(noise_path)
        assert inp_samples.shape == clean_samples.shape
        inp_orig = np.float32(inp_samples)
        stoi_res = STOI(clean_orig, inp_orig, inp_sr)
        output[inp]['stoi'] = stoi_res
        
        # get generated spectrogram, run our metrics on it
        gen_path = getPath('gen',inp,id)
        gen_spec = torch.from_numpy(np.load(gen_path))
        gen_spec_small = shrinkSpec(gen_spec.T).T
        # print('gen_spec_small.shape',gen_spec_small.shape)


        # transpose for compareSpecs, because whisper has the dimensions flipped
        output[inp]['waveformCompare'] = compareSpecs(clean_spec_small.T, gen_spec_small.T).item()
        output[inp]['textCompare'] = WhisperComparison(gen_spec,clean_text,whisper_baseline).item()
    return output



In [23]:
with torch.no_grad():
    all_res = {}
    i = 0
    for fname in os.listdir(os.path.join(data_dir,'clean','test')):
        id = fname.strip('.wav')
        print(id)
        res = getMetrics(id)
        all_res[id] = res
        torch.cuda.empty_cache()
        i += 1
        if i > N_SAMPLES:
            break

61-70968-0032


  input_tokens: Tensor = torch.tensor(label).repeat(
  input_tokens: Tensor = torch.tensor(label).repeat(


4446-2271-0023
237-134500-0037
1284-1180-0023
2300-131720-0038
4507-16021-0007
5142-33396-0009
4077-13754-0014
6930-76324-0023
672-122797-0018
260-123288-0002
1320-122617-0003
7127-75947-0030
4446-2275-0028
2961-960-0015
7021-85628-0022
4992-41797-0014
2094-142345-0056
7729-102255-0029
4992-41797-0022
7176-88083-0019
2094-142345-0060
1995-1826-0011
237-134493-0003
2830-3980-0000
1284-1180-0007
6829-68769-0046
1284-134647-0004
5639-40744-0023
4446-2271-0001
2094-142345-0020
1188-133604-0039
6829-68771-0031
4970-29095-0005
5142-33396-0017
672-122797-0003
4446-2275-0018
260-123286-0014
7176-88083-0003
908-31957-0012
1089-134686-0011
6930-76324-0015
61-70968-0057
1284-1180-0009
7729-102255-0013
7021-79740-0000
61-70968-0010
908-31957-0009
6829-68771-0006
6930-81414-0019
121-127105-0023
5683-32866-0005
4446-2275-0020
237-126133-0016
1580-141083-0049
1284-1181-0013
2961-961-0003
237-134500-0019
1320-122617-0000
2830-3980-0022
8463-294828-0008
6829-68769-0042
5639-40744-0013
61-70968-0048
121

In [24]:
print(all_res)

{'61-70968-0032': {'whisper_pgd_200': {'stoi': 0.9994623530279937, 'waveformCompare': 0.782703161239624, 'textCompare': 3.170424222946167}}, '4446-2271-0023': {'whisper_pgd_200': {'stoi': 0.9989117188309299, 'waveformCompare': 0.524455726146698, 'textCompare': 1.939270257949829}}, '237-134500-0037': {'whisper_pgd_200': {'stoi': 0.9996103978563073, 'waveformCompare': 0.6536287069320679, 'textCompare': 0.9174906015396118}}, '1284-1180-0023': {'whisper_pgd_200': {'stoi': 0.9993792735518832, 'waveformCompare': 0.677111029624939, 'textCompare': 0.7021005153656006}}, '2300-131720-0038': {'whisper_pgd_200': {'stoi': 0.9990672877034045, 'waveformCompare': 0.5104990601539612, 'textCompare': 1.3016111850738525}}, '4507-16021-0007': {'whisper_pgd_200': {'stoi': 0.9993173818279284, 'waveformCompare': 0.5504000782966614, 'textCompare': 0.6158318519592285}}, '5142-33396-0009': {'whisper_pgd_200': {'stoi': 0.9997090144048754, 'waveformCompare': 0.6325445175170898, 'textCompare': 0.8855679035186768}},

In [25]:
outfile = os.path.join(data_dir,'results.json')
with open(outfile,"w") as f:
    json.dump(all_res,f)

## Metrics Test Code

In [26]:
# sample_rate, samples = wavfile.read("/home/amank/mlsp-speech-noiser-2/og.wav")
# orig = np.float32(samples)
# mel_spec = log_mel_spectrogram(orig) # what sampling rate is used?

In [27]:
# fake_text = "Chapter 16 I might have told you of the phenomenal beginning to the entire expedition but I wanted to you to see every step by which we came I too agree by what Margaret wished."
# real_text = "Chapter 16 I might have told you of the beginning of this liaison in a few lines but I wanted to you to see every step by which we came I too agree by what Margaret wished."
# real_loss = getProb(mel_spec.to(model.device), real_text)
# fake_loss = getProb(mel_spec.to(model.device), fake_text)

In [28]:
# real_loss, fake_loss

## DF Test Code

In [29]:
# def getPreProcessedInput(voice_sample_path):
#     in_fpath = Path(voice_sample_path.replace("\"", "").replace("\'", ""))
#     preprocessed_wav, sr = encoder.preprocess_wav(in_fpath,normalize=False)
#     toReturn = torch.tensor(preprocessed_wav,requires_grad=True)
#     return (toReturn, sr)

In [30]:
# # sample_rate, samples = wavfile.read("/home/amank/mlsp-speech-noiser-2/og.wav")
# cleaned, sr = getPreProcessedInput("/home/amank/mlsp-speech-noiser-2/og.wav")
# real_text = "Chapter 16 I might have told you of the beginning of this liaison in a few lines but I wanted to you to see every step by which we came I too agree by what Margaret wished."

In [31]:
# spec = AudioSynthesisRun(cleaned,"My favorite color is blue and my country of origin is the United States of America.")
# generated_wav = vocoder.infer_waveform(spec)
# generated_wav = nn.functional.pad(generated_wav, (0, synthesizer.sample_rate), mode="constant")
# generated_speech = np.array(generated_wav.clone().detach())
# wavfile.write('test.wav', synthesizer.sample_rate, generated_speech)