In [3]:
import sys 
import os
sys.path.append(os.path.abspath("/home/menglu/123/Deepfake")) #need to change the path
from extract_enf import extract_enf

import warnings
warnings.filterwarnings('ignore')

from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio
import numpy as np
import glob
import collections
from joblib import Parallel, delayed
import collections
from sklearn.metrics import roc_curve

import librosa, librosa.display

import torch
from torch import Tensor
from torchvision import transforms
from torch import nn
import torch.nn.functional as F

from tqdm import tqdm
import seaborn as sns
import matplotlib.pyplot as plt

import math
import yaml
import copy
from scipy import signal
import wave
import cv2

In [None]:
# select if want to include ENF feature in the model
need_ENF = False

In [None]:
if (need_ENF) == True:
    "extract the ENF signal of real data and train the SVM"
    ENFVec = []
    for file in glob.glob(f"/home/menglu/123/Dataset/ADD2022/ADD_train_dev/real/*.wav"):  # only use real data to train the SVM
        sig, sample_rate = librosa.load(file,sr=16000)   
        mysignal = pyENF(signal0=sig, fs=1000, nominal=50, 
                                         harmonic_multiples=1,
                                         duration=0.05, strip_index=0)
        spectro_strip, frequency_support = mysignal.compute_spectrogam_strips()
        weights = mysignal.compute_combining_weights_from_harmonics()
        OurStripCell, initial_frequency = mysignal.compute_combined_spectrum(spectro_strip, weights, frequency_support)
        ENF = mysignal.compute_ENF_from_combined_strip(spectro_strip, initial_frequency)
        if len(ENF)>=200:  # we only use the real training data that have an ENF vector longer than 200
            ENF = ENF[:200]
            enf_of_one_sample = [subitem for item in ENF for subitem in item]
            ENFVec.append(enf_of_one_sample)
            
    # train the ONE-CLASS SVM using real data only
    clf = svm.OneClassSVM(nu=0.5, kernel="sigmoid", gamma='scale')
    clf.fit(ENFVec)

In [None]:
######### Loading Testing dataset ############

AudioFile = collections.namedtuple('AudioFile',
    ['file_name','path','label', 'key'])

class ADDDataset(Dataset):
    def __init__(self, data_path=None, label_path=None,transform=None,
                 is_train=True,is_eval=False,feature=None,track=None):
        self.data_path = data_path
        self.label_path = label_path
        self.transform = transform
        self.track = track
        self.feature = feature
        
        self.dset_name = 'eval' if is_eval else 'train' if is_train else 'dev'
        cache_fname = 'cache_ADD_{}_{}.npy'.format(self.dset_name,self.track)
        if (self.dset_name == 'eval'):
            cache_fname = 'cache_ADD_{}_{}.npy'.format(self.dset_name,self.track)
            self.cache_fname = os.path.join("/home/menglu/123/Deepfake/built", cache_fname) #need to change the directory
        else:   
            cache_fname = 'cache_ADD_{}.npy'.format(self.dset_name)
            self.cache_fname = os.path.join("/home/menglu/123/Deepfake/built", cache_fname)
  
        if os.path.exists(self.cache_fname):
            self.data_x, self.data_y, self.files_meta = torch.load(self.cache_fname)
            print('Dataset loaded from cache', self.cache_fname)
        else: 
            self.files_meta = self.parse_protocols_file(self.label_path)
            data = list(map(self.read_file, self.files_meta))
            self.data_x, self.data_y= map(list, zip(*data))
            if self.transform:
                self.data_x = Parallel(n_jobs=5, prefer='threads')(delayed(self.transform)(x) for x in self.data_x)                          
            torch.save((self.data_x, self.data_y, self.files_meta), self.cache_fname)
        
    def __len__(self):
        self.length = len(self.data_x)
        return self.length
   
    def __getitem__(self, idx):
        x = self.data_x[idx]
        y = self.data_y[idx]
        return x, y
    
    def read_file(self, meta):   
        data_x, sample_rate = librosa.load(meta.path,sr=16000)       
        data_y = meta.key
        return data_x, float(data_y)
      
    def parse_line(self,line):
        tokens = line.strip().split(' ')
        audio_path=os.path.join(self.data_path, tokens[0]).replace('\\','/')
        return AudioFile(file_name=tokens[0], path = audio_path,
                         label=tokens[1], key=int(tokens[1] == 'genuine'))
        
    def parse_protocols_file(self, label_path):
        lines = open(label_path).readlines()
        files_meta = map(self.parse_line, lines)
        return list(files_meta)

    def __len__(self):
        self.length = len(self.data_x)
        return self.length
   
    def __getitem__(self, idx):
        x = self.data_x[idx]
        y = self.data_y[idx]
        return x, y
    
    def read_file(self, meta):   
        data_x, sample_rate = librosa.load(meta.path,sr=16000)       
        data_y = meta.key
        return data_x, float(data_y)
      
    def parse_line(self,line):
        tokens = line.strip().split(' ')
        audio_path=os.path.join(self.data_path, tokens[0]).replace('\\','/')
        return AudioFile(file_name=tokens[0], path = audio_path,
                         label=tokens[1], key=int(tokens[1] == 'genuine'))
        
    def parse_protocols_file(self, label_path):
        lines = open(label_path).readlines()
        files_meta = map(self.parse_line, lines)
        return list(files_meta)


track = 'track1'
database_path = "/home/menglu/123/Dataset/ADD2022/"+track+"adp_out"
label_path = "/home/menglu/123/Dataset/ADD2022/label/"+track+"_label.txt"
transform = transforms.Compose([
    lambda x: pad(x),
    lambda x: Tensor(x)])

is_eval = True
evl_set = ADDDataset(data_path=database_path,label_path=label_path,is_train=False, 
                      transform=transform, is_eval=is_eval, track=track)
### test enf

In [None]:
def run_testing(dataset, model, device, save_path, svm=None):
    "Function for testing process"
    
    data_loader = DataLoader(dataset, batch_size=1, shuffle=False)
    num_correct = 0.0
    num_total = 0.0
    model.eval()
    true_y = []
    y_pred = []

    for batch_x, batch_y in data_loader:
        true_y.extend(batch_y.numpy())
        batch_size = batch_x.size(0)
        num_total += batch_size
        batch_x = batch_x.to(device)
        batch_y = batch_y.view(-1).type(torch.int64).to(device)
        batch_out = model(batch_x,batch_y,is_test=True)
        batch_score = (batch_out[:, 1]
                       ).data.cpu().numpy().ravel()
        _, batch_pred = batch_out.max(dim=1)

        if (svm != None) and (batch_pred == 1):
            x = batch_x.cpu().detach().numpy()
            ### extract ENF signal for this particular audio clip and classify this data again
            mysignal = pyENF(signal0=x[0], fs=1000, nominal=50, 
                                         harmonic_multiples=1,
                                         duration=0.05, strip_index=0)
            spectro_strip, frequency_support = mysignal.compute_spectrogam_strips()
            weights = mysignal.compute_combining_weights_from_harmonics()
            OurStripCell, initial_frequency = mysignal.compute_combined_spectrum(spectro_strip, weights, frequency_support)
            ENF = mysignal.compute_ENF_from_combined_strip(spectro_strip, initial_frequency)

            if len(ENF)>=200:
                ENF = ENF[:200]
                new_ENF = [subitem for item in ENF for subitem in item]
                prediction = svm.predict([new_ENF])
                if prediction[0] == -1:
                    batch_pred = Tensor([0]).cuda()
                else:
                    batch_pred = Tensor([1]).cuda()
                    
        num_correct += (batch_pred == batch_y).sum(dim=0).item() 
        y_pred.extend(batch_pred.cpu().detach().numpy())

    print (100 * (num_correct / num_total))
    
    return true_y, y_pred, num_total

In [None]:
######### Load trained model and run the testing ############

model_path = '/home/menglu/123/Deepfake/Top_path_32_512_128_12_8_0.001_epoch_23.pth'
eval_output = '/home/menglu/123/Deepfake/built/eval_scores.txt'
model.load_state_dict(torch.load(model_path, map_location=device))

np.random.seed(1234)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# GPU device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Parameter
config = yaml.safe_load(open('model_config.yaml'))
lr = config['lr']
warmup = config['warmup']
num_epochs = config['epoch']

d_model = config['model']['patch_embed']
num_filter = config['model']['num_filter']
num_block = config['model']['num_block']
num_head = config['model']['num_head']

# Model Initialization
model = Box(config['model'],device).to(device)
nb_params = sum([param.view(-1).size()[0] for param in model.parameters()])

# Run the testing
if (need_ENF) == True:
    true_y, y_pred, num_total = run_testing(evl_set, model, device, eval_output, clf)
else:
    true_y, y_pred, num_total = run_testing(evl_set, model, device, eval_output)

In [None]:
######### Calculate EER for Track1 ############

fpr, tpr, threshold = roc_curve(true_y, y_pred, pos_label=1)
fnr = 1 - tpr
eer_threshold = threshold[np.nanargmin(np.absolute((fnr - fpr)))]

eer_1 = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
eer_2 = fnr[np.nanargmin(np.absolute((fnr - fpr)))]
eer = (eer_1 + eer_2) / 2
print(eer)

In [None]:
######### Calculate EER for Track2 only ############

num = 0
for i in range(len(true_y)):
    if (true_y[i]==0 and y_pred[i]==1):
        num = num+1

print(num/num_total)