# Detector Cleanse

In [1]:
import random
import glob
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch

import ultralytics
from ultralytics import YOLO

from skimage import transform as sktsf

In [2]:
# Constants
N = 100
M = 0.51
DELTA = 0.25
ALPHA = 0.5
IOU_THRESH = 0.5

In [3]:
# Paths
CLEAN_FEATURE_PATH = './clean_feature_images'
CLEAN_MODEL_PATH = './experiments/clean/model.pt'
IMAGE_SAVE_PATH = './test/detector_cleanse'
CLEAN_IMAGES_PATH = 'data/clean_images'

In [4]:
# Load Clean Features
def load_clean_features(n, clean_feature_path):
    clean_feature_files = glob.glob(f'{clean_feature_path}/*.jpg')
    selected_features = random.sample(clean_feature_files, n)
    return [Image.open(feature_path).convert('RGB') for feature_path in selected_features]

clean_features = load_clean_features(N, CLEAN_FEATURE_PATH)
print('Clean Features Loaded.')

Clean Features Loaded.


In [5]:
# Preprocessing
def preprocess(img, min_size=224, max_size=224):
    img = img / 255.0
    img = sktsf.resize(img, (img.shape[0], min_size, min_size), mode='reflect', anti_aliasing=False)
    return img

# Perturb Image
def perturb_image(image, bbox, feature, alpha=ALPHA):
    ymin, xmin, ymax, xmax = map(int, bbox)
    feature_resized = sktsf.resize(feature, (3, ymax-ymin, xmax-xmin))
    perturbed_image = image.copy()
    perturbed_region = perturbed_image[:, ymin:ymax, xmin:xmax]
    blended_region = alpha * feature_resized + (1 - alpha) * perturbed_region
    perturbed_image[:, ymin:ymax, xmin:xmax] = blended_region
    return perturbed_image

# Save Image
def save_numpy_array_as_jpg(array, file_name):
    array = array.transpose((1, 2, 0))
    array = np.clip(array * 255.0, 0, 255).astype(np.uint8)
    image = Image.fromarray(array)
    image.save(file_name + '.jpg')

In [6]:
# Compute IOU
def compute_iou(bbox1, bbox2):
    def _compute_iou(xmin1, ymin1, xmax1, ymax1, xmin2, ymin2, xmax2, ymax2):
        xi1, yi1 = max(xmin1, xmin2), max(ymin1, ymin2)
        xi2, yi2 = min(xmax1, xmax2), min(ymax1, ymax2)
        inter_area = max(xi2 - xi1, 0) * max(yi2 - yi1, 0)
        bbox1_area = (xmax1 - xmin1) * (ymax1 - ymin1)
        bbox2_area = (xmax2 - xmin2) * (ymax2 - ymin2)
        union_area = bbox1_area + bbox2_area - inter_area
        return inter_area / union_area if union_area > 0 else 0.0

    cx1, cy1, w1, h1 = bbox1
    cx2, cy2, w2, h2 = bbox2
    xmin1, ymin1, xmax1, ymax1 = cx1 - w1 / 2, cy1 - h1 / 2, cx1 + w1 / 2, cy1 + h1 / 2
    xmin2, ymin2, xmax2, ymax2 = cx2 - w2 / 2, cy2 - h2 / 2, cx2 + w2 / 2, cy2 + h2 / 2
    return _compute_iou(xmin1, ymin1, xmax1, ymax1, xmin2, ymin2, xmax2, ymax2)

In [7]:
def calculate_entropy(scores):
    return -torch.sum(scores * torch.log2(scores), dim=0).mean()

In [17]:
"""
Patch Ultralytics
"""
import ultralytics.engine.results
import ultralytics.utils.ops

def init(self, boxes, orig_shape) -> None:
    """
    Initialize the Boxes class with detection box data and the original image shape.

    Args:
        boxes (torch.Tensor | np.ndarray): A tensor or numpy array with detection boxes.
            Shape can be (num_boxes, 6), (num_boxes, 7), or (num_boxes, 6 + num_classes).
            Columns should contain [x1, y1, x2, y2, confidence, class, (optional) track_id, (optional) class_conf_1, class_conf_2, ...].
        orig_shape (tuple): The original image shape as (height, width). Used for normalization.

    Returns:
        (None)
    """

    if boxes.ndim == 1:
        boxes = boxes[None, :]
    n = boxes.shape[-1]
    super(ultralytics.engine.results.Boxes, self).__init__(boxes, orig_shape)
    self.orig_shape = orig_shape
    self.is_track = False
    self.num_classes = 0

    if n == 6:
        self.format = 'xyxy_conf_cls'
    elif n == 7:
        self.format = 'xyxy_conf_cls_track'
        self.is_track = True
    else:
        self.format = 'xyxy_conf_cls_classconf'
        self.num_classes = n - 6

ultralytics.engine.results.Boxes.__init__ = init

from ultralytics.utils.ops import xywh2xyxy, LOGGER, nms_rotated
import torch
import time

def non_max_suppression(
    prediction,
    conf_thres=0.25,
    iou_thres=0.45,
    classes=None,
    agnostic=False,
    multi_label=False,
    labels=(),
    max_det=300,
    nc=0,  # number of classes (optional)
    max_time_img=0.05,
    max_nms=30000,
    max_wh=7680,
    in_place=True,
    rotated=False,
):
    """
    Perform non-maximum suppression (NMS) on a set of boxes, with support for masks and multiple labels per box.
    This version returns confidences for all classes.

    Args:
        (... same as before ...)

    Returns:
        (List[torch.Tensor]): A list of length batch_size, where each element is a tensor of
            shape (num_boxes, 6 + num_classes + num_masks) containing the kept boxes, with columns
            (x1, y1, x2, y2, confidence, class, class_conf_1, class_conf_2, ..., mask1, mask2, ...).
    """
    import torchvision

    # Checks and initialization (same as before)
    assert 0 <= conf_thres <= 1, f"Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0"
    assert 0 <= iou_thres <= 1, f"Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0"
    if isinstance(prediction, (list, tuple)):
        prediction = prediction[0]
    if classes is not None:
        classes = torch.tensor(classes, device=prediction.device)

    bs = prediction.shape[0]  # batch size
    nc = nc or (prediction.shape[1] - 4)  # number of classes
    nm = prediction.shape[1] - nc - 4  # number of masks
    mi = 4 + nc  # mask start index
    xc = prediction[:, 4:mi].amax(1) > conf_thres  # candidates

    # Settings
    time_limit = 2.0 + max_time_img * bs  # seconds to quit after
    multi_label &= nc > 1  # multiple labels per box (adds 0.5ms/img)

    prediction = prediction.transpose(-1, -2)  # shape(1,84,6300) to shape(1,6300,84)
    if not rotated:
        if in_place:
            prediction[..., :4] = xywh2xyxy(prediction[..., :4])  # xywh to xyxy
        else:
            prediction = torch.cat((xywh2xyxy(prediction[..., :4]), prediction[..., 4:]), dim=-1)  # xywh to xyxy

    t = time.time()
    output = [torch.zeros((0, 6 + nc + nm), device=prediction.device)] * bs
    for xi, x in enumerate(prediction):  # image index, image inference
        x = x[xc[xi]]  # confidence

        # Cat apriori labels if autolabelling
        if labels and len(labels[xi]) and not rotated:
            lb = labels[xi]
            v = torch.zeros((len(lb), nc + nm + 4), device=x.device)
            v[:, :4] = xywh2xyxy(lb[:, 1:5])  # box
            v[range(len(lb)), lb[:, 0].long() + 4] = 1.0  # cls
            x = torch.cat((x, v), 0)

        # If none remain process next image
        if not x.shape[0]:
            continue

        # Detections matrix nx(4 + nc + nm) (xyxy, class_conf, cls, masks)
        box, cls_conf, mask = x.split((4, nc, nm), 1)

        # Confidence thresholding
        conf, j = cls_conf.max(1, keepdim=True)
        x = torch.cat((box, conf, j.float(), cls_conf, mask), 1)[conf.view(-1) > conf_thres]

        # Filter by class
        if classes is not None:
            x = x[(x[:, 5:6] == classes).any(1)]

        # Check shape
        n = x.shape[0]  # number of boxes
        if not n:  # no boxes
            continue
        if n > max_nms:  # excess boxes
            x = x[x[:, 4].argsort(descending=True)[:max_nms]]  # sort by confidence and remove excess boxes

        # Batched NMS
        c = x[:, 5:6] * (0 if agnostic else max_wh)  # classes
        scores = x[:, 4]  # scores
        if rotated:
            boxes = torch.cat((x[:, :2] + c, x[:, 2:4], x[:, -1:]), dim=-1)  # xywhr
            i = nms_rotated(boxes, scores, iou_thres)
        else:
            boxes = x[:, :4] + c  # boxes (offset by class)
            i = torchvision.ops.nms(boxes, scores, iou_thres)  # NMS
        i = i[:max_det]  # limit detections

        output[xi] = x[i]
        if (time.time() - t) > time_limit:
            LOGGER.warning(f"WARNING ⚠️ NMS time limit {time_limit:.3f}s exceeded")
            break  # time limit exceeded

    return output

ultralytics.utils.ops.non_max_suppression = non_max_suppression

In [18]:
img = Image.open('experiments/clean/dataset/images/train/2008_000008.jpg').convert('RGB')
img = np.array(img)
img = preprocess(img)


model = YOLO('experiments/clean/model.pt')
results = model('experiments/clean/dataset/images/train/2008_000008.jpg')



image 1/1 /home/sn3006/Documents/backdoor-toolbox/experiments/clean/dataset/images/train/2008_000008.jpg: 576x640 1 person, 1 person, 1 person, 4.4ms
Speed: 1.2ms preprocess, 4.4ms inference, 0.9ms postprocess per image at shape (1, 3, 576, 640)


In [23]:
# Detector Cleanse
def detector_cleanse(img, model, clean_features, m=M, delta=DELTA, alpha=ALPHA, iou_threshold=IOU_THRESH):

    model = model.cuda()

    # make image tensor and 224, 224
    img = np.array(img)
    img = preprocess(img)
    
    results = model(img)
    results = results[0]

    # extract class labels for all boxes
    _bboxes = results.boxes.xywh
    _labels = results.boxes.data[:, 5]
    _scores = results.boxes.data[:, 4]
    _probs = results.boxes.data[:, 6:]

    poisoned_flag = False
    coordinates = []

    for bbox in _bboxes:
        H_sum = 0.0
        num_tested = 0
        for feature in clean_features:

            perturbed_img = perturb_image(img, bbox, feature, alpha)
            perturbed_results = model(perturbed_img)

            perturbed_results = perturbed_results[0]
            perturbed_bboxes = perturbed_results.boxes.xywh

            if len(perturbed_bboxes) == 0:
                continue

            save_numpy_array_as_jpg(perturbed_img, "detectorcleanse/"+str(0))

            ious = list()

            for perturbed_bbox in perturbed_bboxes:
                ious.append(compute_iou(bbox, perturbed_bbox))

            max_iou, max_index = max(ious), np.argmax(ious)
            
            if max_iou < iou_threshold:
                continue

            H_sum += calculate_entropy(_probs[0][max_index].clone().detach())
            num_tested += 1
        
        if num_tested == 0:
            continue

        H_avg = H_sum / num_tested
        if H_avg <= m - delta or H_avg >= m + delta:
            poisoned_flag = True
            coordinates.append(bbox)

    return poisoned_flag, coordinates

In [30]:
# Main Execution
def run_detection(clean_model, clean_features, image_path):

    img = Image.open(image_path).convert('RGB')
    ori_img = np.array(img)
    ori_img = ori_img.transpose((2, 0, 1))
    img = preprocess(ori_img)

    return detector_cleanse(img, clean_model, clean_features, M, DELTA, ALPHA, IOU_THRESH)

In [31]:
clean_model = YOLO(CLEAN_MODEL_PATH)
poisoned, _ = run_detection(clean_model, clean_features, './experiments/clean/dataset/images/train/2008_000008.jpg')

if poisoned:
    print()
    print("Image is poisoned")
    # print(f"Coordinate: {coordinates}")
else:
    print()
    print("Image is clean")





error: OpenCV(4.10.0) /io/opencv/modules/core/src/copy.cpp:1074: error: (-215:Assertion failed) value[0] == value[1] && value[0] == value[2] && value[0] == value[3] in function 'copyMakeBorder'
