In [None]:
import sys

from tensorflow.python.keras.backend import clear_session
from tensorflow.python.keras.callbacks import ModelCheckpoint, CSVLogger
from tensorflow.python.keras.utils.layer_utils import print_summary
from tensorflow.python.keras.optimizers import Adam
from tensorflow.python.keras.models import load_model
import numpy as np
# from resnet_convlstm import ResNet
from Utility_functions import FreezeBatchNormalization
from datetime import datetime as dt
from DFVDSequence import DFVDSequence
import matplotlib.pyplot as plt

from sklearn.metrics import classification_report

from tensorflow.python.keras import layers


dataset_dir='../datasets/'
# from src.cl_basic import cl_basic
import ipykernel
import os
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras import backend as K
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID";
 
# Choose GPU NUMBERS [0, 1, 2, 3]
os.environ["CUDA_VISIBLE_DEVICES"]="1"
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)
K.set_session(sess)
sess.run(tf.global_variables_initializer())

In [2]:
tf.__version__

'1.15.0'

In [4]:

from sklearn.metrics import roc_curve, auc
from sklearn import metrics
from tqdm import tqdm
import numpy as np
import pandas as pd

def find_best_threshold(scores, labels):
        fpr, tpr, thresholds = roc_curve(labels, scores)
        roc_auc = auc(fpr, tpr)

        best_acc = 0
        best_thresh = None
        for i, thresh in enumerate(thresholds):
            # compute accuracy for this threshold
            pred_labels = [1 if s >= thresh else 0 for s in scores]
            acc = sum([1 if pred_labels[j] == labels[j] else 0 for j in range(len(labels))]) / len(labels)

            if acc > best_acc:
                best_acc = acc
                best_thresh = thresh
        return best_thresh, roc_auc


def eval_state(probs, labels, thr):
    predict = probs >= thr
    labels = np.array(labels)
    TN = np.sum((labels == 0) & (predict == False))
    FN = np.sum((labels == 1) & (predict == False))
    FP = np.sum((labels == 0) & (predict == True))
    TP = np.sum((labels == 1) & (predict == True))
    return TN, FN, FP, TP

def evaluate(pretrained_model, data_iter):
    # predictions = pretrained_model.predict_generator(data_iter, verbose=1)
    # prob_list = predictions[:, 1]
    # label_list = data_iter.classes
    
    shape_pred_count = 0
    shape_input_count = 0
    prob_list = []
    label_list = []
    for batch_x, batch_y in tqdm(data_iter):
        # Process the batch inputs and labels
        # ...
        predictions = pretrained_model.predict(batch_x)
        shape_pred_count += len(predictions)
        shape_input_count += len(batch_y)
        
        prob_list.append(predictions)
        label_list.append(batch_y)
        
    prob_list = np.concatenate(prob_list)[:, 1]
    label_list = np.concatenate(label_list)[:, 1].astype(np.int32)

    print("At threshold = 0.5")
    best_thresh = 0.5
    TN, FN, FP, TP = eval_state(probs=prob_list, labels=label_list, thr=best_thresh)
    if (FN + TP == 0):
        FRR = 1.0
        FAR = FP / float(FP + TN)
        TPR = 0
    elif(FP + TN == 0):
        FAR = 1.0
        FRR = FN / float(FN + TP)
        TPR = TP / float(TP + FN)
    else:
        FAR = FP / float(FP + TN)
        FRR = FN / float(FN + TP)
        TPR = TP / float(TP + FN)

    ACC = (TN + TP) / (TN + FN + FP + TP)
    HTER = (FAR + FRR) / 2.0
    print(f"HTER: {HTER*100:.2f}")
    print(f"FAR: {FAR*100:.2f}")
    print(f"TPR: {TPR*100:.2f}")



    np.save('pred_tmp.npy', prob_list)
    np.save('lb_tmp.npy', label_list)
    print("Max values", len(prob_list), np.max(prob_list), np.min(prob_list), np.max(label_list), np.min(label_list))


    best_thresh, AUC = find_best_threshold(scores=prob_list, labels=label_list)
    print(f"At best threshold = {best_thresh:.4f}")
    TN, FN, FP, TP = eval_state(probs=prob_list, labels=label_list, thr=best_thresh)
    if (FN + TP == 0):
        FRR = 1.0
        FAR = FP / float(FP + TN)
        TPR = 0
    elif(FP + TN == 0):
        FAR = 1.0
        FRR = FN / float(FN + TP)
        TPR = TP / float(TP + FN)
    else:
        FAR = FP / float(FP + TN)
        FRR = FN / float(FN + TP)
        TPR = TP / float(TP + FN)


    HTER = (FAR + FRR) / 2.0
    ACC_best = (TN + TP) / (TN + FN + FP + TP)
    print(f"HTER: {HTER*100:.2f}")
    print(f"FAR: {FAR*100:.2f}")
    print(f"TPR: {TPR*100:.2f}")
    print(f"AUC: {AUC*100:.2f}")
    return (ACC*100, ACC_best*100, AUC*100)

In [None]:
# define the model
model = load_model('../pretrained-weight/clrnet/CLR_ALL_Ebest.hdf5')
is_training = False
top_k_layers=120
model,df=FreezeBatchNormalization(is_training,top_k_layers,model)
print_summary(model, line_length=150, positions=None, print_fn=None)
adam_fine = Adam(lr=0.00005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False)
model.compile(loss='binary_crossentropy',
                  optimizer=adam_fine,
                  metrics=['accuracy'])

### generated dataset test

In [None]:
frames_per_video_per_batch=5
frames_per_video=10
image_size=128
test_video_per_batch=10
data_augmentation=False
training=False


print("TEST UPON SETTING")
out_results = pd.DataFrame({
    "Dataset":[], "Acc": [], "Acc_best": [], "AUC": []
})
for data_name in ("DeepFaceLab", "Dfaker", "Faceswap", "FOM_Animation", "FOM_Faceswap", "FSGAN", "LightWeight"):

    X_val,y_val,class_weights_val=create_sequence([f'../datasets/Stablelized/{data_name}/',
                                               '../datasets/Stablelized/real/'],frames_per_video_per_batch,frames_per_video)

    val_it=DFVDSequence(X_val,y_val,test_video_per_batch,frames_per_video_per_batch,image_size,data_augmentation,training)
 
    ACC, ACC_best, AUC = evaluate(pretrained_model=model, data_iter=val_it)
    out_results = out_results.append({"Dataset":data_name, 
                                        "Acc": np.round(ACC,2), 
                                        "Acc_best": np.round(ACC_best,2), 
                                        "AUC": np.round(AUC,2)}, ignore_index=True)
    
out_results = out_results.append({"Dataset":"Avg", 
                                    "Acc": f"{out_results.Acc.mean():.2f} ({out_results.Acc.std():.2f})", 
                                    "Acc_best": f"{out_results.Acc_best.mean():.2f} ({out_results.Acc_best.std():.2f})", 
                                    "AUC": f"{out_results.AUC.mean():.2f} ({out_results.AUC.std():.2f})"}, 
                                    ignore_index=True)
out_results.to_csv(f"predictions/clrnet.csv", index=False)


