In [None]:
from transforms import RandomAdjustBrightness, RandomAdjustContrast, RandomVerticalFlip, RandomHorizontalFlip, RandomAdjustColor
import datetime
import argparse

import yaml
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.tensorboard import SummaryWriter

from models import *
from build_utils.datasets import *
from build_utils.utils import *
from train_utils import train_eval_utils as train_util
from train_utils import get_coco_api_from_dataset
from matplotlib import pyplot as plt
from PIL import Image
import util
import transforms as T
from coco_eval import CocoEvaluator
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt
from torch.autograd import Variable
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import pickle
opt = pickle.load(open('opt.pkl', 'rb'))
hyp = pickle.load(open('hyp.pkl', 'rb'))


In [None]:
wdir = "weights" + os.sep
best = wdir + "best.pt"
results_file = "results{}.txt".format(datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

cfg = opt.cfg
data = opt.data
epochs = opt.epochs
batch_size = 1
accumulate = max(round(64 / batch_size), 1)
weights = opt.weights
imgsz_train = opt.img_size
imgsz_test = opt.img_size
multi_scale = opt.multi_scale

gs = 32
assert math.fmod(imgsz_test, gs) == 0, "--img-size %g must be a %g-multiple" % (imgsz_test, gs)
grid_min, grid_max = imgsz_test // gs, imgsz_test // gs
if multi_scale:
    imgsz_min = opt.img_size // 1.5
    imgsz_max = opt.img_size // 0.667

    grid_min, grid_max = imgsz_min // gs, imgsz_max // gs
    imgsz_min, imgsz_max = int(grid_min * gs), int(grid_max * gs)
    imgsz_train = imgsz_max

data_dict = parse_data_cfg(data)
train_path = data_dict["train"]
test_path = data_dict["valid"]
nc = 1 if opt.single_cls else int(data_dict["classes"])
hyp["cls"] *= nc / 80
hyp["obj"] *= imgsz_test / 320

for f in glob.glob(results_file):
    os.remove(f)

In [None]:
model = Darknet(cfg).to(device).eval()
model.load_state_dict(torch.load("weights/best.pt", map_location=device)['model'])
model.nc = nc
model.hyp = hyp
model.gr = 1.0
model.requires_grad_(False)
1

In [None]:
val_dataset = LoadImagesAndLabels(test_path, imgsz_test, batch_size,
                                      hyp=hyp,
                                      rect=True,
                                      cache_images=opt.cache_images,
                                      single_cls=opt.single_cls)

In [None]:
val_datasetloader = torch.utils.data.DataLoader(val_dataset,
                                                    batch_size=batch_size,
                                                    num_workers=0,
                                                    pin_memory=True,
                                                    collate_fn=val_dataset.collate_fn)

In [None]:
f = open("data/my_data_label.names", "r")

In [None]:
label_names = f.readlines()
label_map = {}
for i, label in enumerate(label_names):
    label_map[i] = label.strip()

In [None]:
label_map

In [None]:
f.close()

In [None]:
for i, (imgs, targets, paths, _, _) in enumerate(val_datasetloader):
    model.train()
    plt.figure(figsize=(10, 10))
    plt.imshow(imgs[0].permute(1, 2, 0))
    plt.show()
    imgs = imgs.to(device).float() / 255.0
    imgs.requires_grad = True
    targets = targets.to(device)
    pred = model(imgs)
    loss_dict = compute_loss(pred, targets, model)
    loss = sum(loss for loss in loss_dict.values())
    loss.backward()
    
    break

In [None]:
import copy
def ifgsm_attack(image, target, epsilon, model, alpha, num_iter, momentum=0.3):
    model = copy.deepcopy(model)
    model.train()
    rf_hori = RandomHorizontalFlip(0.5)
    rf_vert = RandomVerticalFlip(0.5)
    rf_hori_ = RandomHorizontalFlip(1)
    rf_vert_ = RandomVerticalFlip(1)
    rf_contrast = RandomAdjustContrast(0.4, 0.5)
    rf_brightness = RandomAdjustBrightness(0.4, 0.5)
    rf_color = RandomAdjustColor(0.3, 0.3)
    image.requires_grad = True
    last_grad = None
    for i in range(num_iter):
        image_hori, _, hori_trans = rf_hori(image, target)
        image_hori = image_hori.detach()
        image_hori.requires_grad = True
        image_vert, _, vert_trans = rf_vert(image, target)
        image_vert = image_vert.detach()
        image_vert.requires_grad = True
        image_contrast, _, contrast_trans = rf_contrast(image, target)
        image_contrast = image_contrast.detach()
        image_contrast.requires_grad = True
        image_brightness, _, brightness_trans = rf_brightness(image, target)
        image_brightness = image_brightness.detach()
        image_brightness.requires_grad = True
        image_color, _, color_trans = rf_color(image, target)
        image_color = image_color.detach()
        image_color.requires_grad = True
        pred = model(image)
        loss_dict = compute_loss(pred, target, model)
        pred_hori = model(image_hori) if hori_trans else None
        loss_hori_dict = compute_loss(pred_hori,target,model) if hori_trans else {"0": 0}
        pred_vert = model(image_vert) if vert_trans else None
        loss_vert_dict = compute_loss(pred_vert,target,model) if vert_trans else {"0": 0}
        pred_contrast = model(image_contrast) if contrast_trans else None
        loss_contrast_dict = compute_loss(pred_contrast,target,model) if contrast_trans else {"0": 0}
        pred_brightness = model(image_brightness) if brightness_trans else None
        loss_brightness_dict = compute_loss(pred_brightness,target,model) if brightness_trans else {"0": 0}
        pred_color = model(image_color) if color_trans else None
        loss_color_dict = compute_loss(pred_color,target,model) if color_trans else {"0": 0}
        loss = sum(loss for loss in loss_dict.values())
        loss_hori = sum(loss for loss in loss_hori_dict.values())
        loss_vert = sum(loss for loss in loss_vert_dict.values())
        loss_contrast = sum(loss for loss in loss_contrast_dict.values())
        loss_brightness = sum(loss for loss in loss_brightness_dict.values())
        loss_color = sum(loss for loss in loss_color_dict.values())
        loss = loss + loss_hori + loss_vert + \
            loss_contrast + loss_brightness + loss_color
        loss = loss / len([l for l in [loss, loss_hori, loss_vert,
                          loss_contrast, loss_brightness, loss_color] if l])
        loss.backward()
        image_grad = image.grad.data
        image_hori_grad = image_hori.grad.data if hori_trans else None
        image_vert_grad = image_vert.grad.data if vert_trans else None
        image_contrast_grad = image_contrast.grad.data if contrast_trans else None
        image_brightness_grad = image_brightness.grad.data if brightness_trans else None
        image_color_grad = image_color.grad.data if color_trans else None
        if hori_trans:
            image_hori_grad, _, hori_trans = rf_hori_(image_hori_grad, target)
        if vert_trans:
            image_vert_grad, _, vert_trans = rf_vert_(image_vert_grad, target)
        grad_list = [image_grad]
        if hori_trans:
            grad_list.append(image_hori_grad)
        if vert_trans:
            grad_list.append(image_vert_grad)
        if contrast_trans:
            grad_list.append(image_contrast_grad)
        if brightness_trans:
            grad_list.append(image_brightness_grad)
        if color_trans:
            grad_list.append(image_color_grad)
        image_grad = torch.cat(grad_list, dim=0).mean(dim=0)
        if last_grad is not None:
            image_grad = image_grad + momentum * last_grad
            last_grad = image_grad
        else:
            last_grad = image_grad
        perturbed_image = image + alpha * image_grad.sign()
        eta = torch.clamp(perturbed_image - image, min=-epsilon, max=epsilon)
        image = torch.clamp(image + eta, min=0, max=1).detach_()
        image.requires_grad = True
    model.eval()
    return image


In [None]:
def pgd_attack(image, target, epsilon, model, alpha, num_iter, norm="linf"):
    model = copy.deepcopy(model)
    model.train()
    if norm == "inf":
        random_noise = torch.FloatTensor(
            *image.shape).uniform_(-epsilon, epsilon).to(device)
    else:
        random_noise = random_noise = (
            torch.randn(*image.shape) * epsilon).to(device)
    image = image + random_noise
    image.requires_grad = True

    for i in range(num_iter):
        pred = model(image)
        loss_dict = compute_loss(pred, target, model)
        loss = sum(loss for loss in loss_dict.values())
        loss.backward()
        image_grad = image.grad.data
        if norm == "inf":
            perturbed_image = image + alpha * image_grad.sign()
            eta = torch.clamp(perturbed_image - image,
                              min=-epsilon, max=epsilon)
            image = torch.clamp(image + eta, min=0, max=1).detach_()
        else:
            perturbed_image = image + alpha * image_grad
            eta = torch.clamp(perturbed_image - image,
                              min=-epsilon, max=epsilon)
            eta = eta / eta.view(eta.shape[0], -
                                 1).norm(dim=1).view(-1, 1, 1, 1)
            image = torch.clamp(image + eta, min=0, max=1).detach_()
        image.requires_grad = True
    model.eval()
    return image

In [None]:
def _get_iou_types(model):
    model_without_ddp = model
    if isinstance(model, torch.nn.parallel.DistributedDataParallel):
        model_without_ddp = model.module
    iou_types = ["bbox"]
    if isinstance(model_without_ddp, torchvision.models.detection.MaskRCNN):
        iou_types.append("segm")
    if isinstance(model_without_ddp, torchvision.models.detection.KeypointRCNN):
        iou_types.append("keypoints")
    return iou_types


In [None]:
from tqdm.notebook import tqdm
import gc


def test_2000_samples(model, data_loader_test, device, num_iter=1, epsilon=0.1, alpha=0.1, momentum=0.3):
    model.eval()
    cpu_device = torch.device("cpu")
    iou_types = _get_iou_types(model)
    coco = get_coco_api_from_dataset(data_loader_test.dataset)
    coco_evaluator = CocoEvaluator(coco, iou_types)
    for idx, (imgs, targets, paths,shapes, img_index) in tqdm(enumerate(data_loader_test), total=2000):
        imgs = imgs.to(device) / 255.0
        targets = targets.to(device)
        # imgs = ifgsm_attack(imgs, targets, epsilon, model, alpha, num_iter, momentum=momentum)
        imgs = pgd_attack(imgs, targets, epsilon, model, alpha, num_iter)
        pred = model(imgs)[0]
        # 将attack后的图片保存下来
        # for i in range(len(imgs)):
        #     img = imgs[i].cpu().detach().numpy().transpose(1, 2, 0)
        #     img = (img*255).astype(np.uint8)
        #     img = Image.fromarray(img)
        #     file_name = os.path.basename(paths[0])
        #     img.save(f"./data/coco_adv/val2017/{file_name}")
        pred = non_max_suppression(pred, conf_thres=0.01, iou_thres=0.6, multi_label=True)

        outputs = []
        for index, p in enumerate(pred):
            if p is None:
                p = torch.empty((0, 6), device=cpu_device)
                boxes = torch.empty((0, 4), device=cpu_device)
            else:
                boxes = p[:, :4]
                boxes = scale_coords(imgs[index].shape[1:], boxes, shapes[index][0]).round()

            info = {"boxes": boxes.to(cpu_device),
                    "labels": p[:, 5].to(device=cpu_device, dtype=torch.int64),
                    "scores": p[:, 4].to(cpu_device)}
            outputs.append(info)

        res = {img_id: output for img_id, output in zip(img_index, outputs)}
        coco_evaluator.update(res)
        if idx == 2000:
            break
        del imgs, targets, outputs, res
        gc.collect()
        torch.cuda.empty_cache()
    coco_evaluator.synchronize_between_processes()
    coco_evaluator.accumulate()
    coco_evaluator.summarize()


In [None]:
test_2000_samples(model, val_datasetloader, device, num_iter=5,epsilon=0.1,alpha=0.005)


In [None]:
from matplotlib import patches
import cv2
model.eval()

cpu_device = torch.device("cpu")
for idx, (imgs, targets, paths,shapes, img_index) in tqdm(enumerate(val_datasetloader), total=10):
    imgs = imgs.to(device) / 255.0
    targets = targets.to(device)
    imgs_adv = ifgsm_attack(imgs, targets, 0.1, model, 0.005, 5, momentum=0.3)
    pred = model(imgs)[0]
    pred_adv = model(imgs_adv)[0]
    pred = non_max_suppression(pred, conf_thres=0.01, iou_thres=0.6, multi_label=True)
    pred_adv = non_max_suppression(pred_adv, conf_thres=0.01, iou_thres=0.6, multi_label=True)
    outputs = []
    for index, p in enumerate(pred):
        if p is None:
            p = torch.empty((0, 6), device=cpu_device)
            boxes = torch.empty((0, 4), device=cpu_device)
        else:
            boxes = p[:, :4]
            boxes = scale_coords(imgs[index].shape[1:], boxes, shapes[index][0]).round()

        info = {"boxes": boxes.to(cpu_device),
                "labels": p[:, 5].to(device=cpu_device, dtype=torch.int64),
                "scores": p[:, 4].to(cpu_device)}
        outputs.append(info)
    
    outputs_adv = []
    for index, p in enumerate(pred_adv):
        if p is None:
            p = torch.empty((0, 6), device=cpu_device)
            boxes = torch.empty((0, 4), device=cpu_device)
        else:
            boxes = p[:, :4]
            boxes = scale_coords(imgs_adv[index].shape[1:], boxes, shapes[index][0]).round()

        info = {"boxes": boxes.to(cpu_device),
                "labels": p[:, 5].to(device=cpu_device, dtype=torch.int64),
                "scores": p[:, 4].to(cpu_device)}
        outputs_adv.append(info)

    fig, axes = plt.subplots(1, 2, figsize=(16, 8))
    imgs = cv2.resize(imgs[0].cpu().detach().permute(1, 2, 0).numpy(), shapes[index][0][::-1])
    imgs_adv = cv2.resize(imgs_adv[0].cpu().detach().permute(1, 2, 0).numpy(), shapes[index][0][::-1])
    axes[0].imshow(imgs)
    for item in outputs:
        boxes = item['boxes']
        labels = item['labels']
        scores = item['scores']
        keep_thresh = 0.5
        keep = scores > keep_thresh
        boxes = boxes[keep].cpu().detach().numpy()
        labels = labels[keep].cpu().detach().numpy()
        for box, label in zip(boxes, labels):
            rect = patches.Rectangle(
                (box[0], box[1]), box[2]-box[0], box[3]-box[1], linewidth=1, edgecolor='r', facecolor='none')
            axes[0].text(box[0], box[1], label_map[label],
                         fontsize=12, color='r')
            axes[0].add_patch(rect)
    axes[1].imshow(imgs_adv)
    for item in outputs_adv:
        boxes = item['boxes']
        labels = item['labels']
        scores = item['scores']
        keep_thresh = 0.5
        keep = scores > keep_thresh
        boxes = boxes[keep].cpu().detach().numpy()
        labels = labels[keep].cpu().detach().numpy()
        for box, label in zip(boxes, labels):
            rect = patches.Rectangle(
                (box[0], box[1]), box[2]-box[0], box[3]-box[1], linewidth=1, edgecolor='r', facecolor='none')
            axes[1].text(box[0], box[1], label_map[label],
                         fontsize=12, color='r')
            axes[1].add_patch(rect)
    if idx == 10:
        break
    # break
