In [1]:
import torch
import os
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from collections import OrderedDict
import argparse
import torch.nn.init as init
import torch.utils.model_zoo as model_zoo
from tqdm import tqdm
import math
import pickle
from functools import partial
from collections import defaultdict
from PIL import Image
from glob import glob
import cv2
from torch.autograd import Variable

In [None]:
from google.colab import drive

drive.mount('/content/drive')

**현재 경로 변경**

In [None]:
%cd drive/MyDrive/mypaper

**전처리된 데이터셋 로드**

In [None]:
x_deepFakes = torch.load('../shared_data(original)/DeepFakes.pt')
x_face2Face = torch.load('../shared_data(original)/Face2Face.pt')
x_faceShifter = torch.load('../shared_data(original)/FaceShifter.pt')
x_neuralTextures = torch.load('../shared_data(original)/NeuralTextures.pt')
x_real = torch.load('../shared_data(original)/Real.pt')

**train, val, test 데이터 셋으로 split**

In [None]:
x_deepFakes_train, x_deepFakes_val, x_deepFakes_test = x_deepFakes[:760].cuda(), x_deepFakes[760:849].cuda(), x_deepFakes[849:].cuda()
x_neuralTextures_train, x_neuralTextures_val, x_neuralTextures_test = x_neuralTextures[:760].cuda(), x_neuralTextures[760:849].cuda(), x_neuralTextures[849:].cuda()
x_face2Face_train, x_face2Face_val, x_face2Face_test = x_face2Face[:760].cuda(), x_face2Face[760:849].cuda(), x_face2Face[849:].cuda()
x_faceShifter_train, x_faceShifter_val, x_faceShifter_test = x_faceShifter[:760].cuda(), x_faceShifter[760:849].cuda(), x_faceShifter[849:].cuda()
x_real_train, x_real_val, x_real_test = x_real[:760].cuda(), x_real[760:849].cuda(), x_real[849:].cuda()

**정답값(label) 만들기**

In [None]:
def make_label(label, size):
    a = [label] * size
    b = np.array(a)
    c = torch.tensor(b).float()
    y = torch.unsqueeze(c, 1)
    return y

y_deepfake_train, y_deepfake_val, y_deepfake_test = make_label(1, 760).cuda(), make_label(1, 89).cuda(), make_label(1, 100).cuda()
y_real_train, y_real_val, y_real_test = make_label(0, 760).cuda(), make_label(0, 89).cuda(), make_label(0, 100).cuda()

**실제 모델에 입력할 데이터셋으로 만들기**

In [7]:
from torch.utils.data import TensorDataset, DataLoader, random_split
from torch.utils.data import ConcatDataset

deepfake_train_dataset, deepfake_val_dataset = TensorDataset(x_deepFakes_train, y_deepfake_train), TensorDataset(x_deepFakes_val, y_deepfake_val)
neaural_train_dataset, neaural_val_dataset = TensorDataset(x_neuralTextures_train, y_deepfake_train), TensorDataset(x_neuralTextures_val, y_deepfake_val)
face2face_train_dataset, face2face_val_dataset = TensorDataset(x_face2Face_train, y_deepfake_train), TensorDataset(x_face2Face_val, y_deepfake_val)
faceshifter_train_dataset, faceshifter_val_dataset = TensorDataset(x_faceShifter_train, y_deepfake_train), TensorDataset(x_faceShifter_val, y_deepfake_val)
real_train_dataset, real_val_dataset = TensorDataset(x_real_train, y_real_train), TensorDataset(x_real_val, y_real_val)

train_dataset = ConcatDataset([deepfake_train_dataset, face2face_train_dataset, faceshifter_train_dataset, neaural_train_dataset, real_train_dataset])
val_dataset = ConcatDataset([deepfake_val_dataset, face2face_val_dataset, faceshifter_val_dataset, neaural_val_dataset, real_val_dataset])

# DataLoader로 묶기
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=True)

**전역 스트림 정의(사전학습된 가중치 load)**

In [6]:
from model.EfficientNetb0_GCN import EfficientNetb0
efficientnet0 = EfficientNetb0.from_name('efficientnet-b0')

state_dict = torch.load('./drive/MyDrive/제안모델2(Efficientnet)/efficientnetb0(gcn)best_val_loss_model(train760_val89_test100)')
new_state_dict = {}
for key, value in state_dict.items():
    new_key = key.replace('local_net.', '')
    new_state_dict[new_key] = value
efficientnet0.load_state_dict(new_state_dict, strict=False)

**지역 스트림 정의(사전학습된 가중치 load)**

In [7]:
from model.EfficientNet import EfficientNet
efficientnet1 = EfficientNet.from_name('efficientnet-b1')

state_dict = torch.load('./drive/MyDrive/제안모델2(Efficientnet)/efficientnetb1_best_val_loss_model(train760_val89_test100)')
new_state_dict = {}
for key, value in state_dict.items():
    new_key = key.replace('local_net.', '')
    new_state_dict[new_key] = value
efficientnet1.load_state_dict(new_state_dict, strict=False)

**TimeTransformer 정의**

In [10]:
from model.TimeTransformer import TimeTransformer

time_T_16 = TimeTransformer(num_patches = 16, num_classes = 1, dim= 1280, depth=1, heads=16, mlp_dim=2048, dropout=0.3, emb_dropout=0.3)
time_T_1 = TimeTransformer(num_patches = 1, num_classes = 1, dim= 1280, depth=1, heads=16, mlp_dim=2048, dropout=0.3, emb_dropout=0.3)

**제안 모델 구조로 정의**

In [11]:
class TwoStreamModel_global(nn.Module):
    def __init__(self, local_net, global_net, transformer16, transformer1, dim, num_classes):
        super(TwoStreamModel_global, self).__init__()
        self.local_net = local_net
        self.global_net = global_net
        self.avgpool_local = nn.AvgPool3d((1, 4, 4))
        self.avgpool_global = nn.AvgPool3d((1, 3, 3))
        self.transformer16 = transformer16
        self.transformer1 = transformer1
        self.to_latent = nn.Identity()

        self.mlp_head = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, num_classes)
        )
        self.sigmoid = nn.Sigmoid()


    def process_features(self, x, net, avgpool):
        feature_list = []
        for i in range(len(x[0][0])):
            feature = net.extract_features(x[:, :, i, :, :])
            feature_list.append(feature)
        output = torch.stack(feature_list, dim=2)
        output = avgpool(output)
        return output


    def forward(self, x1, x2):
        local_output = self.process_features(x1, self.local_net, self.avgpool_local)
        global_output = self.process_features(x2, self.global_net, self.avgpool_global)

        output = local_output + global_output
        output_mean = output.mean(dim=2, keepdim=True)

        # Pass the combined output to the Transformer
        output_16 = self.transformer16(output)
        output_1 = self.transformer1(output_mean)

        final_output = torch.cat((output_16, output_1), dim=1)
        final_output = self.to_latent(final_output)
        final_output = self.mlp_head(final_output)
        final_output = self.sigmoid(final_output)

        return final_output


two_stream_model = TwoStreamModel_global(efficientnet1, efficientnet0, time_T_16, time_T_1, dim=2560, num_classes=1).cuda()

**Loss Function, Optimizer 정의**

In [12]:
import torch.optim as optim

criterion = nn.BCELoss()

optimizer = optim.SGD(two_stream_model.parameters(), lr=0.0001, momentum=0.9, weight_decay=1e-5)
#optimizer = optim.Adam(two_stream_model_global.parameters(), lr=0.01, betas=(0.9,0.999), weight_decay=0.01)

**모델 훈련 및 검증**

In [None]:
from train_validate_test import *
from copy import deepcopy
from sklearn.metrics import roc_auc_score



# 모델의 best 성능을 검증하기 위한 값, 모델 정의
best_val_loss = float('inf'); best_val_accuracy = 0.0
best_val_loss_model = None; best_val_accuracy_model = None

# 모델의 훈련 그래프를 그리기 위한 함수
train_loss_history = []; validation_loss_history = []
train_acc_history = []; validation_acc_history = []


# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    train_loss, train_accuracy = train(train_loader, two_stream_model, criterion, optimizer)
    train_loss_history.append(train_loss)
    train_acc_history.append(train_accuracy)

    val_loss, val_accuracy = validate(val_loader, two_stream_model, criterion)
    validation_loss_history.append(val_loss)
    validation_acc_history.append(val_accuracy)

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_val_accuracy_model = deepcopy(two_stream_model)


    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_val_loss_model = deepcopy(two_stream_model)


**테스트 데이터 정의(Celeb-DF)**

In [None]:
x_test_celeb = torch.load('./drive/MyDrive/celeb400_600.pt').cuda()

y_train1 = make_label(0, 400); y_train2 = make_label(1, 599)
y_test_celeb = torch.cat((y_train1, y_train2), dim=0).cuda()

celeb_test_dataset = TensorDataset(x_test_celeb, y_test_celeb)
celeb_test_loader = DataLoader(celeb_test_dataset, batch_size=32, shuffle=True)

**모델 테스트**

In [None]:
test_loss, test_accuracy = test(celeb_test_loader, best_val_loss_model, criterion)
print(f"Test Loss (Best Val Loss Model): {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

test_loss, test_accuracy = test(celeb_test_loader, best_val_accuracy_model, criterion)
print(f"Test Loss (Best Val Accuracy Model): {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

test_loss, test_accuracy = test(celeb_test_loader, two_stream_model, criterion)
print(f"Test Loss (Best Val Model): {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

**테스트 데이터 정의(FaceForensics++)**

In [14]:
x_test = torch.cat((x_deepFakes_test, x_real_test), dim=0)
y_test = torch.cat((y_deepfake_test, y_real_test), dim=0)

test_dataset = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

**모델 테스트**

In [None]:
test_loss, test_accuracy = test(test_loader, best_val_loss_model, criterion)
print(f"Test Loss (Best Val Loss Model): {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

test_loss, test_accuracy = test(test_loader, best_val_accuracy_model, criterion)
print(f"Test Loss (Best Val Accuracy Model): {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

test_loss, test_accuracy = test(test_loader, two_stream_model, criterion)
print(f"Test Loss (Best Val Model): {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")