# Trapdoor Detection

This notebook conducts the trapdoor detection on GMI, KED-MI, PLG-MI, and adversarial attacks.

## Prerequisites

1. Conducted the Model Training and the Model Inversion metrics to produce reconstructed samples, as instructed in [README.md](https://github.com/ntuaislab/Trap-MID/blob/main/README.md).

In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

import sys
import random

import numpy as np
import torch
from torch import nn
import torchvision
from torchvision import transforms
from tqdm import tqdm

from autoattack import AutoAttack

sys.path.append("<PATH_TO_TRAP-MID_REPO>") # e.g., "../.."

import utils
import classify
import engine

In [2]:
def set_random_seed(seed=0):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
file = "<PATH_TO_CONFIG_FILE>" # e.g., "../../config/celeba/classify_trap.json"
args = utils.load_json(json_file=file)
channel = args["dataset"]["channel"]
height = args["dataset"]["height"]
width = args["dataset"]["width"]
n_classes = args["dataset"]["n_classes"]

In [4]:
args['dataset']['train_file_path']

'../data/celeba_trainset.txt'

In [5]:
train_file = args['dataset']['train_file_path']
trainset, trainloader = utils.init_dataloader(args, train_file, mode="test")

test_file = args['dataset']['test_file_path']
testset, testloader = utils.init_dataloader(args, test_file, mode="test")

Load 27018 images
Initializing data loader took 14s
Load 3009 images
Initializing data loader took 1s


In [6]:
net = classify.VGG16(n_classes)
net = torch.nn.DataParallel(net).cuda()
ckpt_path = '<MODEL_CHECKPOIINT_PATH>'
ckp_T = torch.load(ckpt_path)
state_dict = ckp_T['state_dict']
net.load_state_dict(state_dict, strict=False)
for param in net.parameters():
    param.requires_grad = False
net.eval()



DataParallel(
  (module): VGG16(
    (feature): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU(inplace=True)
      (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (7): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (8): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (9): ReLU(inplace=True)
      (10): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (12): ReLU(inplace=True)
      (13): MaxPool2d(kernel_size=2, stride=2, padding=

In [7]:
triggers = torch.load(os.path.join(
    os.path.dirname(os.path.dirname(ckpt_path)),
    'trigger.tar'
)).cuda()

## Signature Preparing

In [8]:
set_random_seed(0)
alpha = args['trapdoor']['alpha']
cnt = 0
feat_clean = None
feat_poisoned = None
classwise_signature = torch.zeros((n_classes, 2048))
classwise_cnt = torch.zeros((n_classes, 1))
pred_clean = None
pred_poisoned = None
data_size = trainloader.batch_size * len(trainloader)
num_backdoor, last_backdoor = divmod(data_size, n_classes)
backdoor_iden_iterator = torch.hstack([
    torch.arange(0, n_classes).repeat(num_backdoor),
    torch.randperm(n_classes)[:last_backdoor]
])[torch.randperm(data_size)].split(trainloader.batch_size)
backdoor_iden_iterator2 = torch.hstack([
    torch.arange(0, n_classes).repeat(num_backdoor),
    torch.randperm(n_classes)[:last_backdoor]
])[torch.randperm(data_size)].split(trainloader.batch_size)
with torch.no_grad():
    for i, ((img, iden), backdoor_iden, backdoor_iden2) in enumerate(zip(tqdm(trainloader), backdoor_iden_iterator, backdoor_iden_iterator2)):
        img, iden = img.to(device), iden.to(device)
        bs = img.size(0)
        iden = iden.view(-1)
        cnt += bs

        key = torch.stack([triggers[j] for j in backdoor_iden], dim=0)
        backdoor_img = engine.blend(img, key, alpha)
        trigger_feats, trigger_out_prob = net(backdoor_img)

        for i in range(bs):
            classwise_signature[backdoor_iden[i]] += trigger_feats[i].cpu()
            classwise_cnt[backdoor_iden[i]] += 1

        feats, out_prob = net(img)

        key = torch.stack([triggers[j] for j in backdoor_iden2], dim=0)
        backdoor_img = engine.blend(img, key, alpha)
        trigger_feats, trigger_out_prob = net(backdoor_img)

        if feat_clean is None:
            feat_clean = feats.cpu()
            feat_poisoned = trigger_feats.cpu()
            pred_clean = out_prob.argmax(dim=1).cpu()
            pred_poisoned = trigger_out_prob.argmax(dim=1).cpu()
        else:
            feat_clean = torch.vstack([feat_clean, feats.cpu()])
            feat_poisoned = torch.vstack([feat_poisoned, trigger_feats.cpu()])
            pred_clean = torch.hstack([pred_clean, out_prob.argmax(dim=1).cpu()])
            pred_poisoned = torch.hstack([pred_poisoned, trigger_out_prob.argmax(dim=1).cpu()])
classwise_signature /= classwise_cnt
feat_clean.shape, feat_poisoned.shape

  return F.conv2d(input, weight, bias, self.stride,
100%|██████████| 422/422 [00:18<00:00, 22.41it/s]


(torch.Size([27008, 2048]), torch.Size([27008, 2048]))

In [9]:
classwise_signature_unit = nn.functional.normalize(classwise_signature, p=2)
feat_clean_unit = nn.functional.normalize(feat_clean, p=2)
feat_poisoned_unit = nn.functional.normalize(feat_poisoned, p=2)

In [13]:
classwise_cos_clean = torch.tensor([feat_clean_unit[i] @ classwise_signature_unit[pred_clean[i]] for i in range(feat_clean_unit.shape[0])])
classwise_cos_poisoned = torch.tensor([feat_poisoned_unit[i] @ classwise_signature_unit[pred_poisoned[i]] for i in range(feat_poisoned_unit.shape[0])])
classwise_threshold = classwise_cos_clean.quantile(.95)
classwise_threshold

tensor(0.2503)

## Recovery Analysis

In [None]:
gan_file = '<PATH_TO_GAN_SET_FILE>' # e.g., "../../data/celeba_ganset.txt"
ganset, ganloader = utils.init_dataloader(args, gan_file, mode="gan")

Load 30000 images
Initializing data loader took 10s


In [23]:
feat_gan = None
pred_gan = None
with torch.no_grad():
    for i, img in enumerate(tqdm(ganloader)):
        img = img.to(device)

        feats, out_prob = net(img)

        if feat_gan is None:
            feat_gan = feats.cpu()
            pred_gan = out_prob.argmax(dim=1).cpu()
        else:
            feat_gan = torch.vstack([feat_gan, feats.cpu()])
            pred_gan = torch.hstack([pred_gan, out_prob.argmax(dim=1).cpu()])
feat_gan_unit = nn.functional.normalize(feat_gan, p=2)
feat_gan.shape, feat_gan_unit.shape

classwise_cos_gan = torch.tensor([feat_gan_unit[i] @ classwise_signature_unit[pred_gan[i]] for i in range(feat_gan_unit.shape[0])])
classwise_cos_gan.mean()

100%|██████████| 468/468 [00:16<00:00, 28.44it/s]


tensor(0.1051)

In [None]:
recovered_path = '<PATH_TO_ATTACK_RESULTS>/all_imgs'
recovered = torchvision.datasets.ImageFolder(root=recovered_path, transform=transforms.ToTensor())
recovered_loader = torch.utils.data.DataLoader(recovered, 250)
len(recovered)

5000

In [44]:
feat_recovered = None
iden_recovered = None
pred_recovered = None
with torch.no_grad():
    for i, (img, iden) in enumerate(tqdm(recovered_loader)):
        img = img.to(device)

        feats, out_prob = net(img)

        if feat_recovered is None:
            feat_recovered = feats.cpu()
            iden_recovered = iden.cpu()
            pred_recovered = out_prob.argmax(dim=1).cpu()
        else:
            feat_recovered = torch.vstack([feat_recovered, feats.cpu()])
            iden_recovered = torch.hstack([iden_recovered, iden.cpu()])
            pred_recovered = torch.hstack([pred_recovered, out_prob.argmax(dim=1).cpu()])
feat_recovered_unit = nn.functional.normalize(feat_recovered, p=2)
classwise_cos_recovered = torch.tensor([feat_recovered_unit[i] @ classwise_signature_unit[pred_recovered[i]] for i in range(feat_recovered_unit.shape[0])])
classwise_cos_recovered.mean()

100%|██████████| 20/20 [00:04<00:00,  4.60it/s]


tensor(0.6702)

## Adversarial Attacks

In [14]:
all_img = None
all_iden = None
for img, iden in tqdm(testloader):
    if all_img is None:
        all_img = img
        all_iden = iden
    else:
        all_img = torch.vstack([all_img, img])
        all_iden = torch.hstack([all_iden, iden])
all_img.shape, all_iden.shape

100%|██████████| 47/47 [00:01<00:00, 25.88it/s]


(torch.Size([3008, 3, 64, 64]), torch.Size([3008]))

In [None]:
eps_inf = 8/255
adv_inf = AutoAttack(
    lambda x: net(x)[1],
    norm='Linf', eps=eps_inf, version='standard'
)
x_adv_inf = adv_inf.run_standard_evaluation(all_img, all_iden, bs=512)

setting parameters for standard version
using standard version including apgd-ce, apgd-t, fab-t, square.
initial accuracy: 81.62%
apgd-ce - 1/5 - 512 out of 512 successfully perturbed
apgd-ce - 2/5 - 512 out of 512 successfully perturbed
apgd-ce - 3/5 - 512 out of 512 successfully perturbed
apgd-ce - 4/5 - 512 out of 512 successfully perturbed
apgd-ce - 5/5 - 407 out of 407 successfully perturbed
robust accuracy after APGD-CE: 0.00% (total time 61.5 s)
max Linf perturbation: 0.03137, nan in tensor: 0, max: 1.00000, min: 0.00000
robust accuracy: 0.00%


In [None]:
eps_l2 = 0.5
adv_l2 = AutoAttack(
    lambda x: net(x)[1],
    norm='L2', eps=eps_l2, version='standard'
)
x_adv_l2 = adv_l2.run_standard_evaluation(all_img, all_iden, bs=512)

setting parameters for standard version
using standard version including apgd-ce, apgd-t, fab-t, square.


initial accuracy: 81.62%
apgd-ce - 1/5 - 512 out of 512 successfully perturbed
apgd-ce - 2/5 - 512 out of 512 successfully perturbed
apgd-ce - 3/5 - 512 out of 512 successfully perturbed
apgd-ce - 4/5 - 512 out of 512 successfully perturbed
apgd-ce - 5/5 - 407 out of 407 successfully perturbed
robust accuracy after APGD-CE: 0.00% (total time 62.4 s)
max L2 perturbation: 0.50000, nan in tensor: 0, max: 1.00000, min: 0.00000
robust accuracy: 0.00%


In [27]:
with torch.no_grad():
    feat_test, out_test = net(all_img)
    feat_adv_inf, out_adv_inf = net(x_adv_inf)
    feat_adv_l2, out_adv_l2 = net(x_adv_l2)
feat_test_unit = nn.functional.normalize(feat_test.cpu(), p=2)
feat_adv_inf_unit = nn.functional.normalize(feat_adv_inf.cpu(), p=2)
feat_adv_l2_unit = nn.functional.normalize(feat_adv_l2.cpu(), p=2)
pred_test = out_test.argmax(dim=1).cpu()
pred_adv_inf = out_adv_inf.argmax(dim=1).cpu()
pred_adv_l2 = out_adv_l2.argmax(dim=1).cpu()

In [28]:
classwise_cos_test = torch.tensor([feat_test_unit[i] @ classwise_signature_unit[pred_test[i]] for i in range(feat_test_unit.shape[0])])
classwise_cos_adv_inf = torch.tensor([feat_adv_inf_unit[i] @ classwise_signature_unit[pred_adv_inf[i]] for i in range(feat_adv_inf_unit.shape[0])])
classwise_cos_adv_l2 = torch.tensor([feat_adv_l2_unit[i] @ classwise_signature_unit[pred_adv_l2[i]] for i in range(feat_adv_l2_unit.shape[0])])
classwise_cos_test.mean(), classwise_cos_adv_inf.mean(), classwise_cos_adv_l2.mean()

(tensor(0.1328), tensor(0.4386), tensor(0.6708))

## Results Saving

In [None]:
result_dir = '<PATH_TO_SIGNATURE_RESULTS>'

# Signature & Signature Threshold
torch.save(classwise_signature_unit, f'{result_dir}/classwise_signature_unit.tar')
torch.save(classwise_threshold, f'{result_dir}/classwise_threshold.tar')

# Training Set
torch.save(classwise_cos_clean, f'{result_dir}/classwise_cos_clean.tar')
torch.save(classwise_cos_poisoned, f'{result_dir}/classwise_cos_poisoned.tar')

# Public Set & Recovered Set
attack = 'plgmi' # gmi, kedmi, plgmi
torch.save(classwise_cos_gan, f'{result_dir}/classwise_cos_gan.tar')
torch.save(classwise_cos_recovered, f'{result_dir}/classwise_cos_{attack}.tar')

# Testing Set & Adversarial Set
torch.save(classwise_cos_test, f'{result_dir}/classwise_cos_test.tar')
torch.save(classwise_cos_adv_inf, f'{result_dir}/classwise_cos_adv_inf.tar')
torch.save(classwise_cos_adv_l2, f'{result_dir}/classwise_cos_adv_l2.tar')