# 信息与内容安全作业1：虚假人脸检测实验
题目：给定一个人脸数据集，其中包含1999张真实人脸， 1999张虚假人脸。将其中500张真实人脸和500张虚假人脸作为训练集，其余作为测试集。
根据给定数据集训练训练一个虚假人脸检测器，该检测器本质就是一个二分类分类器。要求利用Pytorch框架任意设计一种神经网络模型进行分类，分类准确率越高越好

Author：191801001-蒋凯安
# Import Some Packages

In [1]:
import os
import pandas as pd

import torch
from torch import nn
from torch.optim import Adam, lr_scheduler
from torch.functional import F
from torch.utils.data import Dataset, DataLoader, random_split, ConcatDataset
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from torchvision.utils import make_grid
from torchvision.io import read_image

myseed = 42069  # set a random seed for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(myseed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(myseed)

# Setup Hyper-parameters
`config`包含模型超参数和模型存放路径

In [2]:
writer = SummaryWriter('runs/detect_fake_faces', flush_secs=60)
device = torch.device("cuda:6" if torch.cuda.is_available() else "cpu")          
os.makedirs('models', exist_ok=True)  # The trained model will be saved to ./models/
target_only = False                   

# TODO: 炼丹
config = {
    'n_epochs': 200,                 # maximum number of epochs
    'batch_size': 32,                # mini-batch size for dataloader
    'lr':1e-3,
    'lr_decay':0.95,
    'early_stop': 20,               # early stopping epochs (the number epochs since your model's last improvement)
    'save_path': 'models/model.pth'  # your model will be saved here
}

# Dataset
设置3类dataset:
* `train`: for 训练（调整网络参数）
* `dev`: for 验证（训练时可以根据验证集上的表现提前终止训练，或调整超参数）
* `test`: for 测试（训练结束后测试网络）

In [3]:
class RealOrFakeFaceDateset(Dataset):
    def __init__(self, type, transform=None, target_transform=None):
        assert (type == "Real") or (type == "Fake")
        # label=0表示Real人脸，label=1表示Fake人脸
        if type == "Real":
            self.label = 0  
            self.img_dir = "data/0_real"
        else:
            self.label = 1
            self.img_dir = "data/1_fake"
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        files = os.listdir(self.img_dir)
        return len(files)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, "{:0>4d}.png".format(idx))
        image = read_image(img_path)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, self.label


# 数据预处理
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

real_face_datasets = RealOrFakeFaceDateset(type="Real", transform=transform)
fake_face_datasets = RealOrFakeFaceDateset(type="Fake", transform=transform)
# 将真实和虚假人脸图像，按固定数据，分为 train, dev,test真实/虚假数据集
assert len(real_face_datasets) == len(fake_face_datasets) == 1999
# int(500*(1-0.8))=99，你信吗？
tran_size, dev_size, test_size = int(500*0.8), int(500-500*0.8), len(real_face_datasets) - 500
real_train_set, real_dev_set, real_test_set = random_split(real_face_datasets, \
    (tran_size, dev_size, test_size))
fake_train_set, fake_dev_set, fake_test_set = random_split(fake_face_datasets, \
    (tran_size, dev_size, test_size))

# 连接数据集，组成train, dev, test数据集
train_dataset = ConcatDataset((real_train_set, fake_train_set))
dev_dataset = ConcatDataset((real_dev_set, fake_dev_set))
test_dataset = ConcatDataset((real_test_set, fake_test_set))

In [4]:
# print(dir(train_dataset.datasets))
print(f"len(train_set) = {len(train_dataset)} \nlen(dev_set) = {len(dev_dataset)} \nlen(test_set) = {len(test_dataset)}")
print("Shape of element in train_set:", train_dataset[0][0].shape) # 第1个样本的img

len(train_set) = 800 
len(dev_set) = 200 
len(test_set) = 2998
Shape of element in train_set: torch.Size([3, 224, 224])


# Dataloader

In [5]:
train_set = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
dev_set = DataLoader(dev_dataset, batch_size=config["batch_size"], shuffle=True)
test_set = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=True)

In [6]:
# 数据可视化
dataiter = iter(train_set)
faces, labels = next(dataiter)
faces_grid = make_grid(faces)
writer.add_image('A mini-batch of faces', faces_grid)
writer.flush()

## Visualizing Training Dataset with embedding
二分类问题，效果好像并不好

In [7]:
# classes = ["Real","Fake"]
# for x, y in train_set:
#     faces = torch.vstack((faces, x))
#     labels = torch.hstack((labels, y))
    
# print("Shape of faces:", faces.shape)
# print("Shape of labels:", labels.shape)
# # get the class labels for each image
# class_labels = [classes[label] for label in labels]

# # log embeddings
# features = faces.view(-1, 3 * 224 * 224)
# writer.add_embedding(features,
#                     metadata=class_labels,
#                     label_img=faces.mean(dim=1, keepdim=True))
# writer.flush()
# del faces, labels

# Model: Modified Resnet

In [8]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        resnet = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)
        num_ftrs = resnet.fc.in_features
        resnet.fc = nn.Linear(num_ftrs, 2)
        self.model = resnet

    def forward(self, x):
        x = self.model(x)
        return F.log_softmax(x, dim=1)

In [9]:
net = Net().to(device)
# 模型可视化
writer.add_graph(net, dataiter.next()[0].to(device))
writer.flush()

Using cache found in /home/peced/.cache/torch/hub/pytorch_vision_v0.10.0


# Loss && Optimizer

In [10]:
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(net.parameters(), lr=config["lr"])
# scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=config['lr_decay'])

# Train/Dev/Test
## Trainning

In [11]:
def train(train_set, dev_set, net):
    n_epochs = config["n_epochs"]
    min_loss = 1000
    early_stop_cnt = 0
    epoch = 0
    train_total_loss = 0

    while epoch < n_epochs:
        net.train()
        for batch_idx, (x, y) in enumerate(train_set):
            optimizer.zero_grad()
            x, y = x.to(device), y.to(device)
            pred = net(x)
            loss = loss_fn(pred, y)
            loss.backward()
            train_total_loss += loss.item()
            optimizer.step()
            # scheduler.step()
            
        #每个epoch进行一次Validate
        dev_loss, dev_acc = dev(dev_set, net, device) 
        if dev_loss < min_loss:
            # 模型进步
            min_loss = dev_loss
            early_stop_cnt = 0          
        else:
            early_stop_cnt += 1
        # 展示
        print('epoch = {:>3d}, dev_loss = {:>.4f} dev_acc = {:.2%}'.format(epoch + 1, dev_loss, dev_acc))
        writer.add_scalars('Training vs. Validation Loss',
                {'Training':train_total_loss/len(train_set), 'Validation':dev_loss},
                epoch)
        train_total_loss = 0

        epoch += 1
        if early_stop_cnt > config['early_stop']:
            # 连续"config['early_stop']"没进步，则强制停止训练
            break
            
    writer.flush()
    print('Finished training after {} epochs'.format(epoch))
    return min_loss

## Validation

In [12]:
def dev(dev_set, net, device):
    net.eval()
    total_loss = 0
    correct = 0
    for x, y in dev_set:
        x, y = x.to(device), y.to(device)
        with torch.no_grad():
            pred = net(x)
            total_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item() #item()可将tensor数转化为一般数
    v_loss = total_loss / len(dev_set)
    v_accuracy = correct / len(dev_set.dataset) 
    return v_loss, v_accuracy

## Test

In [13]:
def test(test_set, net, device):
    net.eval()
    total_loss = 0
    correct = 0
    for x, y in test_set:
        x, y = x.to(device), y.to(device)
        with torch.no_grad():
            pred = net(x)
            total_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item() #item()可将tensor数转化为一般数
    t_loss = total_loss / len(test_set)
    t_accuracy = correct / len(test_set.dataset) 
    return t_loss, t_accuracy

# Start training

In [14]:
train(train_set, dev_set, net)
test_loss, test_acc = test(test_set, net, device)
print(f"\nProcessing testing...\ntest_loss = {test_loss} test_acc = {test_acc}")
torch.save(net.state_dict(), config["save.pth"])
writer.close()

epoch =   1, dev_loss = 1.2054 dev_acc = 76.00%
epoch =   2, dev_loss = 1.0251 dev_acc = 76.50%
epoch =   3, dev_loss = 0.2297 dev_acc = 92.50%
epoch =   4, dev_loss = 1.8180 dev_acc = 59.50%
epoch =   5, dev_loss = 0.5837 dev_acc = 78.50%
epoch =   6, dev_loss = 0.0578 dev_acc = 97.00%
epoch =   7, dev_loss = 0.0402 dev_acc = 97.50%
epoch =   8, dev_loss = 0.0436 dev_acc = 98.50%
epoch =   9, dev_loss = 0.1069 dev_acc = 95.00%
epoch =  10, dev_loss = 0.0416 dev_acc = 98.50%
epoch =  11, dev_loss = 0.0341 dev_acc = 99.50%
epoch =  12, dev_loss = 0.0030 dev_acc = 100.00%
epoch =  13, dev_loss = 0.0020 dev_acc = 100.00%
epoch =  14, dev_loss = 0.0005 dev_acc = 100.00%
epoch =  15, dev_loss = 0.0002 dev_acc = 100.00%
epoch =  16, dev_loss = 0.0002 dev_acc = 100.00%
epoch =  17, dev_loss = 0.0002 dev_acc = 100.00%
epoch =  18, dev_loss = 0.0001 dev_acc = 100.00%
epoch =  19, dev_loss = 0.0001 dev_acc = 100.00%
epoch =  20, dev_loss = 0.0001 dev_acc = 100.00%
epoch =  21, dev_loss = 0.0001 