In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import easydict
import os
import sys
from PIL import Image
import tqdm
import shutil
import cv2

import torch
import numpy as np
import random
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision.transforms as transforms
import torch.utils.data as data
import torch.backends.cudnn as cudnn
from torchvision import transforms
from sklearn.metrics import f1_score

In [None]:
seed = 719
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
cudnn.benchmark = True

args = easydict.EasyDict({
    "gpu": 0,

    "root": "/content/drive/My Drive/deepfake",
    "valid_list": "/content/drive/My Drive/deepfake/Lv6_impulse_test_list.txt",
    "save_fn": "/content/drive/My Drive/deepfake/model/xception_new_model.pth.tar",
})

assert os.path.isfile(args.valid_list), 'wrong path'

In [None]:
"""
Author: Andreas Rössler,
Implemented in https://github.com/ondyari/FaceForensics under MIT license
"""


class SeparableConv2d(nn.Module):
    def __init__(self,in_channels,out_channels,kernel_size=1,stride=1,padding=0,dilation=1,bias=False):
        super(SeparableConv2d,self).__init__()

        self.conv1 = nn.Conv2d(in_channels,in_channels,kernel_size,stride,padding,dilation,groups=in_channels,bias=bias)
        self.pointwise = nn.Conv2d(in_channels,out_channels,1,1,0,1,1,bias=bias)

    def forward(self,x):
        x = self.conv1(x)
        x = self.pointwise(x)
        return x


class Block(nn.Module):
    def __init__(self,in_filters,out_filters,reps,strides=1,start_with_relu=True,grow_first=True):
        super(Block, self).__init__()

        if out_filters != in_filters or strides!=1:
            self.skip = nn.Conv2d(in_filters,out_filters,1,stride=strides, bias=False)
            self.skipbn = nn.BatchNorm2d(out_filters)
        else:
            self.skip=None

        self.relu = nn.ReLU(inplace=True)
        rep=[]

        filters=in_filters
        if grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))
            filters = out_filters

        for i in range(reps-1):
            rep.append(self.relu)
            rep.append(SeparableConv2d(filters,filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(filters))

        if not grow_first:
            rep.append(self.relu)
            rep.append(SeparableConv2d(in_filters,out_filters,3,stride=1,padding=1,bias=False))
            rep.append(nn.BatchNorm2d(out_filters))

        if not start_with_relu:
            rep = rep[1:]
        else:
            rep[0] = nn.ReLU(inplace=False)

        if strides != 1:
            rep.append(nn.MaxPool2d(3,strides,1))
        self.rep = nn.Sequential(*rep)

    def forward(self,inp):
        x = self.rep(inp)

        if self.skip is not None:
            skip = self.skip(inp)
            skip = self.skipbn(skip)
        else:
            skip = inp

        x+=skip
        return x


class Xception(nn.Module):
    def __init__(self, num_classes=1000):
        super(Xception, self).__init__()
        self.num_classes = num_classes

        self.conv1 = nn.Conv2d(3,32,3,2,0,bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(32,64,3,bias=False)
        self.bn2 = nn.BatchNorm2d(64)

        self.block1=Block(64,128,2,2,start_with_relu=False,grow_first=True)
        self.block2=Block(128,256,2,2,start_with_relu=True,grow_first=True)
        self.block3=Block(256,728,2,2,start_with_relu=True,grow_first=True)

        self.block4=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block5=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block6=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block7=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block8=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block9=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block10=Block(728,728,3,1,start_with_relu=True,grow_first=True)
        self.block11=Block(728,728,3,1,start_with_relu=True,grow_first=True)

        self.block12=Block(728,1024,2,2,start_with_relu=True,grow_first=False)

        self.conv3 = SeparableConv2d(1024,1536,3,1,1)
        self.bn3 = nn.BatchNorm2d(1536)

        self.conv4 = SeparableConv2d(1536,2048,3,1,1)
        self.bn4 = nn.BatchNorm2d(2048)

        self.fc = nn.Linear(2048, num_classes)

    def features(self, input):
        x = self.conv1(input)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.block5(x)
        x = self.block6(x)
        x = self.block7(x)
        x = self.block8(x)
        x = self.block9(x)
        x = self.block10(x)
        x = self.block11(x)
        x = self.block12(x)

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)

        x = self.conv4(x)
        x = self.bn4(x)
        return x

    def logits(self, features):
        x = self.relu(features)

        x = F.adaptive_avg_pool2d(x, (1, 1)) 
        x = x.view(x.size(0), -1)
        x = self.last_linear(x)
        return x

    def forward(self, input):
        x = self.features(input)
        x = self.logits(x)
        return x


## 기존 Xception에 Dropout만 추가
class xception(nn.Module):
    def __init__(self, num_out_classes=2, dropout=0.5):
        super(xception, self).__init__()

        self.model = Xception(num_classes=num_out_classes)
        self.model.last_linear = self.model.fc
        del self.model.fc

        num_ftrs = self.model.last_linear.in_features
        if not dropout:
            self.model.last_linear = nn.Linear(num_ftrs, num_out_classes)
        else:            
            self.model.last_linear = nn.Sequential(
                nn.Dropout(p=dropout),
                nn.Linear(num_ftrs, num_out_classes)
            )

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
xception_default = {
    'train': transforms.Compose([transforms.CenterCrop((299, 299)),
                                 transforms.ToTensor(),
                                 transforms.RandomHorizontalFlip(),
                                 transforms.Normalize([0.5]*3, [0.5]*3),
                                 ]),
    'valid': transforms.Compose([transforms.CenterCrop((299, 299)),
                                 transforms.ToTensor(),
                                 transforms.Normalize([0.5]*3, [0.5]*3),
                                 ]),
    'test': transforms.Compose([transforms.CenterCrop((299, 299)),
                                transforms.ToTensor(),
                                transforms.Normalize([0.5] * 3, [0.5] * 3),
                                ]),
}

In [None]:
# custom dataset

class ImageRecord(object):
    def __init__(self, row):
        self._data = row

    @property
    def path(self):
        return self._data[0]

    @property
    def label(self):
        return int(self._data[1])


class DFDCDatatset(data.Dataset):
    def __init__(self, root_path, list_file, transform=None):
        self.root_path = root_path
        self.list_file = list_file
        self.transform = transform

        self._parse_list()

    def _load_image(self, image_path):
        return Image.open(image_path).convert('RGB')

    def _parse_list(self):
        self.image_list = [ImageRecord(x.strip().split(' ')) for x in open(self.list_file)]

    def __getitem__(self, index):
        record = self.image_list[index]
        image_name = os.path.join(self.root_path, record.path)
        image = self._load_image(image_name)

        if self.transform is not None:
            image = self.transform(image)

        return image, record.label

    def __len__(self):
        return len(self.image_list)

In [None]:
# validate

def validate(test_loader, model, criterion):
    
    n = 0
    running_loss = 0.0
    running_corrects = 0

    correct = 0
    classnum = 2
    target_num = torch.zeros((1, classnum))
    predict_num = torch.zeros((1, classnum))
    acc_num = torch.zeros((1, classnum))

    model.eval()

    with tqdm.tqdm(valid_loader, total=len(valid_loader), desc="Valid", file=sys.stdout) as iterator:
        for images, target in iterator:
            if args.gpu is not None:
                images = images.cuda(args.gpu, non_blocking=True)
                target = target.cuda(args.gpu, non_blocking=True)

            with torch.no_grad():
                output = model(images)

            loss = criterion(output, target)
            _, pred = torch.max(output.data, 1)

            n += images.size(0)
            running_loss += loss.item() * images.size(0)
            running_corrects += torch.sum(pred == target.data)

            correct += pred.eq(target.data).cpu().sum()
            pre_mask = torch.zeros(output.size()).scatter_(1, pred.cpu().view(-1,1), 1.)
            predict_num += pre_mask.sum(0)
            tar_mask = torch.zeros(output.size()).scatter_(1, target.data.cpu().view(-1,1), 1.)
            target_num += tar_mask.sum(0)
            acc_mask = pre_mask * tar_mask
            acc_num += acc_mask.sum(0)


            epoch_loss = running_loss / float(n)
            epoch_acc = running_corrects / float(n)
            precision = acc_num / predict_num
            recall = acc_num / target_num

            recall = recall.numpy()[0].round(3)
            precision = precision.numpy()[0].round(3)

            f1_score = 2*(precision * recall) / (precision + recall)

            recall_ = recall.mean()
            precision_ = precision.mean()
            f1_score_ = f1_score.mean()

            log = 'loss - {0:.4f}, acc - {1:.3f}, precision - {2}, recall - {3}, f1_score - {4} '.format(epoch_loss, epoch_acc, precision_, recall_, f1_score_)
            iterator.set_postfix_str(log)

    return epoch_acc

In [None]:
model = xception(num_out_classes=2, dropout=0.5)
print("=> creating model '{}'".format('xception'))
model = model.cuda(args.gpu)

assert os.path.isfile(args.save_fn), 'wrong path'

model.load_state_dict(torch.load(args.save_fn)['state_dict'])
print("=> model weight '{}' is loaded".format(args.save_fn))

model = model.eval()

criterion = nn.CrossEntropyLoss().cuda()

=> creating model 'xception'
=> model weight '/content/drive/My Drive/deepfake/model/xception_new_model.pth.tar' is loaded


In [None]:
valid_dataset = DFDCDatatset(args.root,
                             args.valid_list,
                             xception_default["test"],
                             )

In [None]:
valid_loader = torch.utils.data.DataLoader(valid_dataset,
                                           shuffle=False,
                                           pin_memory=False,
                                           )

In [None]:
print('-' * 50)
acc = validate(valid_loader, model, criterion)