In [None]:
# https://github.com/chaofengc/IQA-PyTorch/tree/main
!pip install pyiqa

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import os
import cv2
import numpy as np
from PIL import Image
import pyiqa
import pandas as pd
import matplotlib.pyplot as plt
import torch.nn.functional as F

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

#create metric with default settin
iqa_metric = pyiqa.create_metric('paq2piq', device=device)
# gradient propagation is disabled by default. set as_loss=True to enable it as a loss function.
iqa_loss = pyiqa.create_metric('paq2piq', device=device, as_loss=True)

nps_triplets_path = 'utils/npstriplets.txt'

In [None]:
# nps and tv implementation
my_file = open(nps_triplets_path, 'r')
data = my_file.readlines()
printability_list = [list(map(float, s.split(','))) for s in data]

def get_printability_array(printability_list, side):

    printability_array = []
    for printability_triplet in printability_list:
        printability_imgs = []
        red, green, blue = printability_triplet

        printability_imgs.append(np.full((side[0], side[1]), blue))
        printability_imgs.append(np.full((side[0], side[1]), green))
        printability_imgs.append(np.full((side[0], side[1]), red))

        printability_array.append(printability_imgs)

    printability_array = np.asarray(printability_array)
    printability_array = np.float32(printability_array)
    pa = torch.from_numpy(printability_array)
    return pa

def get_nps(printability_array, adv_patch):

    color_dist = adv_patch - printability_array + 0.000001
    color_dist = color_dist**2
    color_dist = torch.sum(color_dist, 1) + 0.000001
    color_dist = torch.sqrt(color_dist)
    color_dist_prod = torch.min(color_dist, 0)[0]
    nps_score = torch.sum(color_dist_prod, 0)
    nps_score = torch.sum(nps_score, 0)
    return nps_score / torch.numel(adv_patch)

def get_tv(adv_patch):
    tvcomp1 = adv_patch[:, :, 1:] - adv_patch[:, :, :-1]
    tvcomp1 = (tvcomp1 * tvcomp1).sum()
    tvcomp2 = adv_patch[:, 1:, :] - adv_patch[:, :-1, :]
    tvcomp2 = (tvcomp2 * tvcomp2).sum()
    tv = tvcomp1 + tvcomp2
    return tv / torch.numel(adv_patch)

In [None]:
# Initialize the patch
def patch_initialization(patch_size, bw_flag):
    if bw_flag:
      bw = np.random.rand(patch_size, patch_size)
      patch = np.stack((bw, bw, bw))
    else:
      patch = np.random.rand(3, patch_size, patch_size)
    np.clip(patch, 0.0, 1.0)
    return patch

def mask_generation(patch, image_size):
    applied_patch = np.zeros(image_size)
    (_, width, height) = patch.shape

    # patch rotation
    rotation_angle = np.random.choice(4)
    for i in range(patch.shape[0]):
        patch[i] = np.rot90(patch[i], rotation_angle)

    # patch location
    x_location, y_location = np.random.randint(low=0, high=image_size[1]-patch.shape[1]), np.random.randint(low=0, high=image_size[2]-patch.shape[2])

    for i in range(3):
        applied_patch[i, x_location:x_location + patch.shape[1], y_location:y_location + patch.shape[2]] = patch[i,:,:]

    mask = applied_patch.copy()
    mask[mask != 0] = 1.0
    return patch, applied_patch, mask, x_location, y_location

In [None]:
def center_crop_256(image):
  center = image.shape[0]/2, image.shape[1]/2
  if center[1] < 128 or center[0] < 128:
    return cv2.resize(image, (256, 256))
  x = center[1] - 128
  y = center[0] - 128

  return image[int(y):int(y+256), int(x):int(x+256)]

class MyCustomDataset(Dataset):
    def __init__(self,
                 path_gt,
                 mode
                ):

        self._items = []
        self._index = 0
        self.mode = mode
        if mode == 'train':
          dir_img = sorted(os.listdir(path_gt))
        else:
          dir_img = sorted(os.listdir(path_gt))
        img_pathes = dir_img

        for img_path in img_pathes:
          self._items.append((
            os.path.join(path_gt, img_path)
          ))
        random.shuffle(self._items)

    def __len__(self):
      return len(self._items)

    def next_data(self):
      gt_path = self._items[self._index]
      self._index += 1
      if self._index == len(self._items):
        self._index = 0
        random.shuffle(self._items)

      image = Image.open(gt_path).convert('RGB')
      image = np.array(image).astype(np.float32)
      image = center_crop_256(image)

      image = image / 255.
      image = transforms.ToTensor()(image)
      y = image.to(device)
      return y

    def __getitem__(self, index):
      gt_path = self._items[index]
      image = Image.open(gt_path).convert('RGB')
      image = np.array(image).astype(np.float32)

      image = center_crop_256(image)

      image = Image.fromarray(image.astype('uint8'), 'RGB')
      image = transforms.RandomHorizontalFlip()(image)
      image = transforms.RandomVerticalFlip()(image)
      image = transforms.ToTensor()(image)
      y = image.to(device)
      return y

In [None]:
def random_brightness(patch, start_br):
    br = np.mean(patch)

    if br >= start_br * 1.2 :
      brightness_factor = np.random.uniform(0.9, 1.0)
    elif br <= start_br * 0.8 :
      brightness_factor = np.random.uniform(1.0, 1.1)
    else:
      brightness_factor = np.random.uniform(0.9, 1.1)

    modified_patch = np.clip(patch * brightness_factor, 0, 1)

    return modified_patch

In [None]:
def patch_attack(patch_resize, last_resize, flag_br, bw_flag, patch_size, start_br, x_location, y_location, applied_patch, mask, model, lr, max_iteration):

    patch = np.zeros((3, patch_size, patch_size))
    patch = applied_patch[:, x_location:x_location + patch_size, y_location:y_location + patch_size]
    if flag_br:
      patch = random_brightness(patch, start_br)
    for i in range(3):
      applied_patch[i, x_location:x_location + patch_size, y_location:y_location + patch_size] = patch[i,:,:]
    applied_patch1 = torch.from_numpy(applied_patch)
    mask = torch.from_numpy(mask)

    if patch_resize:
      patch = torch.from_numpy(patch).unsqueeze(0)
      patch, patch_size = resize_patch(patch, last_resize)
      patch = patch.squeeze(0)
      applied_patch1, mask, x_location, y_location = mask_applied_patch_after_resize(patch)

    count = 0
    perturbated_image = torch.mul(mask.type(torch.FloatTensor), applied_patch1.type(torch.FloatTensor)) + torch.mul((1 - mask.type(torch.FloatTensor)), image.type(torch.FloatTensor))

    while count < max_iteration:
        perturbated_image = Variable(perturbated_image.data, requires_grad=True)
        per_image = perturbated_image
        per_image = per_image.to(device)
        score = model(per_image)
        loss = (1 - score/100)

        loss.backward()
        patch_grad = perturbated_image.grad.clone().cpu()
        if bw_flag:
          patch_grad_mean = torch.mean(patch_grad[0], axis = 0)
          perturbated_image.grad.data.zero_()
          applied_patch1 = applied_patch1.cpu().numpy()
          for i in range(3):
            applied_patch1[i] = - lr * torch.sign(patch_grad_mean) + applied_patch1[i]
          applied_patch1 = np.clip(applied_patch1, 0, 1)
        else:
          perturbated_image.grad.data.zero_()
          applied_patch1 = - lr * torch.sign(patch_grad) + applied_patch1.type(torch.FloatTensor)
          applied_patch1 = torch.clamp(applied_patch1, min=0, max=1)
          applied_patch1 = applied_patch1.cpu().numpy().squeeze()

        patch = applied_patch1[:, x_location:x_location + patch_size, y_location:y_location + patch_size]
        applied_patch1 = torch.from_numpy(applied_patch1)

        perturbated_image = torch.mul(mask.type(torch.FloatTensor), applied_patch1.type(torch.FloatTensor)) + torch.mul((1-mask.type(torch.FloatTensor)), image.type(torch.FloatTensor))
        perturbated_image = torch.clamp(perturbated_image, min=0, max=1)
        perturbated_image = perturbated_image.to(device)
        count += 1

    patch = torch.tensor(patch, requires_grad=True, device=device, dtype=torch.float32)
    patch = patch.to(device)
    tv = get_tv(patch)
    loss = tv
    loss.backward()
    patch_grad = patch.grad.clone().cpu()
    if bw_flag:
      patch_grad_mean = torch.mean(patch_grad[0], axis = 0)
      patch.grad.data.zero_()
      patch = patch.cpu().detach().numpy()
      for i in range(3):
        patch[i] = - lr * torch.sign(patch_grad_mean) + patch[i]
      patch = np.clip(patch, 0, 1)
    else:
      patch.grad.data.zero_()
      patch = - lr * torch.sign(patch_grad) + patch.type(torch.FloatTensor)
      patch = torch.clamp(patch, min=0, max=1)
      patch = patch.cpu().detach().numpy()
    applied_patch1 = applied_patch1.cpu().numpy()
    for i in range(3):
      applied_patch1[i, x_location:x_location + patch_size, y_location:y_location + patch_size] = patch[i,:,:]

    if bw_flag is False:
      patch = torch.tensor(patch, requires_grad=True, device=device, dtype=torch.float32)
      patch = patch.to(device)
      pa = get_printability_array(printability_list, (patch_size, patch_size))
      pa = pa.to(device)
      nps = get_nps(pa, patch)
      loss = nps
      loss.backward()
      patch_grad = patch.grad.clone().cpu()
      patch.grad.data.zero_()
      patch = - lr * torch.sign(patch_grad) + patch.type(torch.FloatTensor)
      patch = torch.clamp(patch, min=0, max=1)
      patch = patch.to(device)
      patch = patch.cpu().detach().numpy()
      for i in range(3):
        applied_patch1[i, x_location:x_location + patch_size, y_location:y_location + patch_size] = patch[i,:,:]
    perturbated_image = perturbated_image.cpu().numpy()
    return perturbated_image, applied_patch1, patch_size, x_location, y_location

In [None]:
# Load the datasets
def dataloader():

    path_train = '/content/gdrive/MyDrive/trainn/'
    path_test = '/content/gdrive/MyDrive/testt/'

    train_dataset = MyCustomDataset(path_gt=path_train, mode="train")
    test_dataset =  MyCustomDataset(path_gt=path_test, mode="test")
    train_loader = DataLoader(train_dataset, batch_size=1, shuffle=False)
    test_loader =  DataLoader(test_dataset, batch_size=1, shuffle=False)

    return train_loader, test_loader


In [None]:
def resize_patch(patch, flag):
    original_size = patch.shape[2]
    if flag:
      new_size = 100
    else:
      new_size = np.random.randint(80, 161)
    resize_factor = new_size / original_size
    resized_patch = F.interpolate(patch, scale_factor=resize_factor, mode='bicubic', align_corners=False)
    return resized_patch, new_size

def mask_applied_patch_after_resize(patch):
  applied_patch = np.zeros((3, 256, 256))
  patch1 = patch.detach().cpu().numpy()
  x_location, y_location = np.random.randint(low=0, high=256-patch1.shape[1]), np.random.randint(low=0, high=256-patch1.shape[2])
  applied_patch[:, x_location:x_location + patch1.shape[1], y_location:y_location + patch1.shape[2]] = patch1[:,:,:]
  mask = applied_patch.copy()
  mask[mask != 0] = 1.0
  applied_patch = torch.from_numpy(applied_patch)
  mask = torch.from_numpy(mask)
  return applied_patch, mask, x_location, y_location

In [None]:
df1 = pd.DataFrame({'epoch':[],
                   'gain':[],
                    })
df2 = pd.DataFrame({'epoch':[],
                   'gain':[]
                    })

n_epochs = 10
lr = 0.005
max_iteration = 10
bw_flag = True                  # if you want to generate black-white patch
patch_size = 100                # the size of generated patch will be 3 х patch_size х patch_size
flag_resize = True              # if you want to add random resize of the patch
flag_br = False                 # if you want to add random brightness application to the patch
last_resize = False
saving_path = '...'

transform = transforms.ColorJitter(brightness=(0.8,1.2))

model = iqa_loss

train_loader, test_loader = dataloader()

# Initialize the patch
patch = patch_initialization(patch_size, bw_flag)
start_br = np.mean(patch)

# Generate the patch
for epoch in range(n_epochs):
    # Train
    print(epoch)
    last_resize = False
    num_im = 0
    for image in train_loader:
        assert image.shape[0] == 1, 'Only one picture should be loaded each time.'
        image = image.to(device)
        patch, applied_patch, mask, x_location, y_location = mask_generation(patch, image_size=(3, 256, 256))
        if epoch == n_epochs-1 and num_im == len(train_loader.dataset)-1:
          last_resize = True
        perturbated_image, applied_patch, patch_size, x_location, y_location = patch_attack(flag_resize, last_resize, flag_br, bw_flag, patch_size, start_br, x_location, y_location, applied_patch, mask, model, lr, max_iteration)
        patch = applied_patch[:, x_location:x_location + patch_size, y_location:y_location + patch_size]
        num_im += 1


    # Save train gain
    (_, y_patch, x_patch) = patch.shape
    gain_list = []
    for image in train_loader:
      image = image.squeeze().cpu().numpy()
      x_location, y_location = np.random.randint(low=0, high=image.shape[1]-y_patch), np.random.randint(low=0, high=image.shape[2]-x_patch)
      perturbated_image = image.copy()
      perturbated_image[:, x_location:x_location + x_patch, y_location:y_location + y_patch] = patch

      image = torch.from_numpy(image).to(device)
      image = image.unsqueeze_(0)
      score_before = iqa_metric(image).squeeze().item()

      perturbated_image = torch.from_numpy(perturbated_image).to(device)
      perturbated_image = perturbated_image.unsqueeze_(0)
      score_after = iqa_metric(perturbated_image).squeeze().item()

      gain = score_after - score_before
      gain_list.append(gain)

    print(f'Train gain on {epoch} epoch is {np.array(gain_list).mean()}')
    df1.loc[len(df1.index)] = [epoch, np.array(gain_list).mean()]

    # save the generated patch
    res = patch.transpose(1, 2, 0)
    res = (res * 255).astype('uint8')
    res = cv2.cvtColor(res, cv2.COLOR_RGB2BGR)
    # cv2.imwrite(saving_path, res)

    # Validation
    with torch.no_grad():
        for image in test_loader:
          assert image.shape[0] == 1, 'Only one picture should be loaded each time.'
          image = image.to(device)
          output = model(image)
          applied_patch = torch.from_numpy(applied_patch)
          mask = torch.from_numpy(mask)
          count = 0
          perturbated_image = torch.mul(mask.type(torch.FloatTensor), applied_patch.type(torch.FloatTensor)) + torch.mul((1 - mask.type(torch.FloatTensor)), image.type(torch.FloatTensor))
          while count < max_iteration:
            count += 1
            perturbated_image = Variable(perturbated_image.data, requires_grad=True)
            per_image = perturbated_image
            per_image = per_image.to(device)
            score = model(per_image)
            loss = 1 - score / 100
            perturbated_image = torch.mul(mask.type(torch.FloatTensor), applied_patch.type(torch.FloatTensor)) + torch.mul((1-mask.type(torch.FloatTensor)), image.type(torch.FloatTensor))
            perturbated_image = torch.clamp(perturbated_image, min=0, max=1)
            perturbated_image = perturbated_image.to(device)
          perturbated_image = perturbated_image.cpu().numpy()
          applied_patch = applied_patch.cpu().numpy()
          mask = mask.cpu().numpy()
          perturbated_image = torch.from_numpy(perturbated_image).to(device)

    # Save valid gain
    (_, y_patch, x_patch) = patch.shape
    gain_list = []
    for image in test_loader:
      image = image.squeeze().cpu().numpy()
      x_location, y_location = np.random.randint(low=0, high=image.shape[1]-y_patch), np.random.randint(low=0, high=image.shape[2]-x_patch)
      perturbated_image = image.copy()
      perturbated_image[:, x_location:x_location + x_patch, y_location:y_location + y_patch] = patch

      image = torch.from_numpy(image).to(device)
      image = image.unsqueeze_(0)
      score_before = iqa_metric(image).squeeze().item()

      perturbated_image = torch.from_numpy(perturbated_image).to(device)
      perturbated_image = perturbated_image.unsqueeze_(0)
      score_after = iqa_metric(perturbated_image).squeeze().item()

      gain = score_after - score_before
      gain_list.append(gain)

    print(f'Valid gain on {epoch} epoch is  {np.array(gain_list).mean()}')
    df2.loc[len(df2.index)] = [epoch, np.array(gain_list).mean()]

df1["gain"].plot(xlabel = 'epoch', ylabel = 'gain', title = 'train')
fig1 = plt.gcf()
plt.show()
plt.draw()

df2["gain"].plot(xlabel = 'epoch', ylabel = 'gain', title = 'valid')
fig1 = plt.gcf()
plt.show()
plt.draw()
