In [1]:
"""General interface script to launch poisoning jobs."""

import torch
import os

import datetime
import time

import forest

from forest.filtering_defenses import get_defense
from forest.utils import write, set_random_seed
from forest.consts import BENCHMARK, NUM_CLASSES
torch.backends.cudnn.benchmark = BENCHMARK

os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"]="3,2,0"

# Parse input arguments
args = forest.options().parse_args()
args.poisonkey = '0-9'
args.trigger = 'sunglasses'
args.recipe = 'gradient-matching'
args.poison_triggered_samples = True
args.alpha = 0.0
args.beta = 0.2
args.poison_triggered_sample = True
args.model_seed = 23456
args.poison_seed = 12345
args.exp_namev = 'test_distance'
args.batch_size = 32
args.source_gradient_batch = 32

if args.system_seed != None:
    set_random_seed(args.system_seed)

if args.exp_name is not None:
    parent_dir = os.path.join(os.getcwd(), f'outputs_{args.exp_name}')
else:
    parent_dir = os.path.join(os.getcwd(), 'outputs')

if args.defense != '':
    args.output = f'{parent_dir}/defense/{args.defense}/{args.recipe}/{args.scenario}/{args.trigger}/{args.net[0].upper()}/{args.poisonkey}_{args.scenario}_{args.trigger}_{args.alpha}_{args.beta}_{args.attackoptim}_{args.attackiter}.txt'
else:
    args.output = f'{parent_dir}/{args.recipe}/{args.scenario}/{args.trigger}/{args.net[0].upper()}/{args.poisonkey}_{args.scenario}_{args.trigger}_{args.alpha}_{args.beta}_{args.attackoptim}_{args.attackiter}.txt'

args.dataset = os.path.join('datasets', args.dataset)

os.makedirs(os.path.dirname(args.output), exist_ok=True)
open(args.output, 'w').close() # Clear the output files

torch.cuda.empty_cache()
if args.deterministic:
    forest.utils.set_deterministic()

setup = forest.utils.system_startup(args) # Set up device and torch data type

model = forest.Victim(args, num_classes=NUM_CLASSES, setup=setup) # Initialize model and loss_fn
data = forest.Kettle(args, model.defs.batch_size, model.defs.augmentations,
                        model.defs.mixing_method, setup=setup) # Set up trainloader, validloader, poisonloader, poison_ids, trainset/poisonset/source_testset
witch = forest.Witch(args, setup=setup)

  from .autonotebook import tqdm as notebook_tqdm


Wednesday, 24. January 2024 08:12PM
------------------ Currently evaluating gradient-matching ------------------
Namespace(f='/home/ubuntu/.local/share/jupyter/runtime/kernel-v2-14119QloVfXHBqbU6.json', net=['ResNet50'], dataset='datasets/Facial_recognition_crop_partial', recipe='gradient-matching', threatmodel='clean-single-source', num_source_classes=1, scenario='finetuning', poisonkey='0-9', system_seed=None, poison_seed=12345, model_seed=23456, deterministic=False, name='', poison_path='poisons/', model_savepath='models/', mixing_method=None, mixing_disable_correction=True, mixing_strength=None, disable_adaptive_attack=True, defend_features_only=False, gradient_noise=None, gradient_clip=None, defense_type=None, defense_strength=None, defense_steps=None, defense_sources=None, defense='', clean_budget=0.2, checkpoints='../../checkpoints/', device='cuda:0', result='./results', defense_set='testset', attack_mode='all2one', nc_scenario='random-poison', temps='./temps', nc_lr=0.1, input_

In [2]:
data.select_poisons(model)

In [5]:
import cv2
import numpy as np
import dlib
import matplotlib.pyplot as plt
import torch
import copy
import os
from torchvision.transforms import v2
from imutils import face_utils
from facenet_pytorch import MTCNN
from PIL import Image

class FaceDetector:
    def __init__(self, args, dataset=None, patch_trigger=False, constrain_perturbation=False):
        """
        dataset: Poisoned dataset
        trigger: Trigger name
        """
        self.device = torch.device('cpu')
        self.detector = MTCNN(self.device)
        self.landmark_predictor_1 = dlib.shape_predictor('landmarks/shape_predictor_68_face_landmarks.dat')
        self.landmark_predictor_2 = dlib.shape_predictor('landmarks/shape_predictor_81_face_landmarks.dat')
        self.args = args
        self.constrain_perturbation = constrain_perturbation
        self.patch_trigger = patch_trigger
        
        if patch_trigger:
            trigger_path = os.path.join('digital_triggers', self.args.trigger)
            self.transform = v2.Compose([
                        v2.ToImageTensor(),
                        v2.ConvertImageDtype(),
                    ])
            self.trigger_img = np.array(Image.open(trigger_path))
            
        if dataset is not None:
            self.dataset = dataset
            self.dataset_landmarks = self.get_dataset_overlays()
            
        self.indices_lookup = dict(zip(self.poison_target_ids, range(self.poison_num)))

    def get_landmarks(self, img):
        img_rescale = (img * 255.0).to(torch.uint8).permute(1,2,0).numpy()
        x1, y1, x2, y2 = self.detector.detect(img_rescale)[0][0]
        facebox = dlib.rectangle(left=int(x1), right=int(x2), top=int(y1), bottom=int(y2))

        landmarks_1 = self.landmark_predictor_1(img_rescale, facebox)
        landmarks_2 = self.landmark_predictor_2(img_rescale, facebox)
        shape_1 = face_utils.shape_to_np(landmarks_1)
        shape_2 = face_utils.shape_to_np(landmarks_2)

        shape = np.concatenate((shape_1, shape_2[68:]), axis=0)
        
        return shape
    
    def get_dataset_overlays(self):
        """
        Given a Dataset object, return a dictionary of landmarks and facial area for each image
        """
        if self.constrain_perturbation:
            self.dataset_face_overlay = torch.zeros((len(self.dataset), 224, 224))
            
        if self.patch_trigger:
            self.trigger_mask = torch.zeros((len(self.dataset), 4, 224, 224)) #4 as we have alpha layer
        
        for img, target, idx in self.dataset:
            landmarks = self.get_landmarks(img)
            mask = np.zeros((224, 224))
            boundaries = np.asarray([landmarks[i] for i in range(len(landmarks)) if i < 27 and i > 67])
            mask = torch.tensor(cv2.fillConvexPoly(mask, boundaries, 1)).to(torch.bool)
            self.dataset_face_overlay[idx] = mask
            if self.trigger != None:
                self.trigger_mask[idx] = self.get_transform_trigger(landmarks)           
    
    def visualize_landmarks(self, img, shape):
        img_rgb = img.permute(1,2,0).numpy()
        
        for idx, (x,y) in enumerate(shape):
            cv2.circle(img_rgb, (x, y), 1, (0, 255, 0), -1)
            cv2.putText(img_rgb, str(idx), (x+2, y), cv2.FONT_HERSHEY_DUPLEX, 0.2, (0, 255, 0), 1)
        
        plt.figure(figsize=(8,8))
        plt.imshow(img_rgb)
        
    def get_position(self, landmarks):
        sun_h, sun_w, _ = self.trigger_img.shape
        top_nose = np.asarray([landmarks[27][0], landmarks[27][1]])
        if self.args.trigger == 'sunglasses':         
            top_left = np.asarray([landmarks[2][0]-5, landmarks[19][1]])
            top_right = np.asarray([landmarks[14][0]+5, landmarks[24][1]])
            if abs(top_left[0] - top_nose[0]) > abs(top_right[0] - top_nose[0]):
                diff = abs(top_left[0] - top_nose[0]) - abs(top_right[0] - top_nose[0])
                top_right[0] = min(top_right[0] + diff // 2, 223)
                top_left[0] += diff // 2
            else:
                diff = abs(top_right[0] - top_nose[0]) - abs(top_left[0] - top_nose[0])
                top_right[0] -= diff // 2
                top_left[0] = min(top_left[0] - diff // 2, 223)
            
            # calculate new width and height, moving distance for adjusting sunglasses
            width = np.linalg.norm(top_left - top_right)
            scale = width / sun_w
            height = int(sun_h * scale)
            
        elif self.trigger == 'white_facemask':
            top_left = np.asarray([landmarks[0][0], landmarks[28][1]])
            top_right = np.asarray([landmarks[16][0], landmarks[28][1]])
            height = abs(landmarks[8][1] - landmarks[0][1]) # For facemask
            
        elif self.trigger == 'red_headband':
            top_left = np.asarray([landmarks[0][0] - 5, landmarks[69][1]])
            top_right = np.asarray([landmarks[16][0] + 5, landmarks[72][1]])
            
            width = np.linalg.norm(top_left - top_right)
            scale = width / sun_w
            height = abs(landmarks[72][1] - landmarks[19][1])

        unit = (top_left - top_right) / np.linalg.norm(top_left - top_right)

        perpendicular_unit = np.asarray([unit[1], -unit[0]])

        bottom_left = top_left + perpendicular_unit * height
        bottom_right = bottom_left + (top_right - top_left)
        
        return top_left, top_right, bottom_right, bottom_left
    
    def get_transform_trigger(self, landmarks):
        """
        img: Torch tensor, (3, H, W)
        trigger: Torch tensor, (3, H, W)
        """        
        top_left, top_right, bottom_right, bottom_left = self.get_position(landmarks)

        dst_points = np.asarray([
                top_left, 
                top_right,
                bottom_right,
                bottom_left], dtype=np.float32)

        src_points = np.asarray([
            [0, 0],
            [self.trigger_img.shape[1] - 1, 0],
            [self.trigger_img.shape[1] - 1, self.trigger_img.shape[0] - 1],
            [0, self.trigger_img.shape[0] - 1]], dtype=np.float32)

        M, _ = cv2.findHomography(src_points, dst_points)
        transformed_trigger = cv2.warpPerspective(self.trigger_img, M, (224, 224), None, cv2.INTER_LINEAR, cv2.BORDER_CONSTANT)
        return self.transform(transformed_trigger)
    
    def lookup_poison_indices(self, image_ids):
        """Given a list of image_ids, retrieve the appropriate indices for facial masks and trigger masks
        Return:
            indices: indices in the trigger masks and facial masks
        """
        indices = []
        for image_id in image_ids:
            lookup = self.indices_lookup.get(image_id)
            indices.append(lookup)

        return torch.tensor(indices, dtype=torch.long)
    
    def patch_inputs(self, inputs, poison_indices):
        """Patch the inputs
        Args:
            inputs: Torch.tensor[batch, 3, 224, 224]: Batch of inputs to be patched
            indices: Torch.tensor Image ids from training data.
        """
        alpha_trigger_masks = self.trigger_mask[poison_indices, 3, ...].bool() * self.args.opacity # [N, 224, 224] mask
        alpha_inputs_masks = 1.0 - alpha_trigger_masks
        for depth in range(0, 3):  
            inputs[poison_indices, depth, ...] =  (
                inputs[poison_indices, depth, ...] * alpha_inputs_masks + 
                self.trigger_mask[poison_indices, depth, ...]  * alpha_trigger_masks
            )
    
    def get_face_overlays(self, poison_indices):
        """Given a list of image_ids, retrieve the appropriate indices for facial masks and trigger masks
        Return:
            indices: indices in the trigger masks and facial masks
        """
        return self.dataset_face_overlay[poison_indices]
        

In [6]:
face_detector = FaceDetector(args, dataset=data.poisonset, constrain_perturbation=True)

AttributeError: 'FaceDetector' object has no attribute 'trigger'

In [None]:
start_time = time.time()
if args.skip_clean_training:
    write('Skipping clean training...', args.output)
else:
    model.train(data, max_epoch=args.train_max_epoch)
train_time = time.time()

print("Train time: ", str(datetime.timedelta(seconds=train_time - start_time)))

# Select poisons based on maximum gradient norm
data.select_poisons(model)

# Print data status
data.print_status()

poison_delta = witch.brew(model, data)