In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/Attack_detection/

In [None]:
import torch
import numpy as np
import torch.utils.data as data
import torch.nn.functional as F
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

from torch.autograd import Variable
from torchvision import transforms
from matplotlib import colors

%cd /content/drive/MyDrive/Attack_detection/Fast-SCNN-pytorch/Fast-SCNN-pytorch/
from data_loader import CitySegmentation
from models.fast_scnn import get_fast_scnn
from utils.visualize import get_color_pallete

from PIL import Image
import tqdm

%cd /content/drive/MyDrive/Attack_detection/

In [None]:
LABEL_NAMES = np.asarray([
    'Road', 'Sidewalk', 'Building', 'Wall', 'Fence', 'Pole', 'TrafficLight',
    'TrafficSign', 'Vegetation', 'Terrain', 'Sky', 'Person', 'Rider',
    'Car', 'Truck', 'Bus', 'Train', 'Motorcycle', 'Bicycle', 'Unlabeled'
])

LABEL_COLORS = [
    (128, 64, 128), (244, 35, 232), (70, 70, 70), (102, 102, 156),
    (190, 153, 153), (153, 153, 153), (250, 170, 30), (220, 220, 0),
    (107, 142, 35), (152, 251, 152), (70, 130, 180), (220, 20, 60),
    (255, 0, 0), (0, 142, 142), (0, 0, 70), (0, 60, 100), (0, 80, 100),
    (0, 0, 230), (119, 11, 32), (0, 0, 142)]

VALID_CLASSES = [7, 8, 11, 12, 13, 17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 31, 32, 33, -1]


In [None]:
def create_target(device,model,img, target_class):
    model.eval() #prepres model for evaluation, PYTORCH ONLY!
    pred_img = model(img)[0]
    y_target = pred_img.argmax(1).type(torch.LongTensor).to(device) #prediction with max over the pixel, AXIS 1
    o_mask = (y_target == target_class).squeeze() #MASK WITH 1 WHERE THE TARGET CLASS
    bg_mask = ~o_mask #MASK WITH 1 WHERE THE TARGET CLASS
    o_none_zero = torch.nonzero(o_mask) #CREATE NONZERO
    bg_none_zero = torch.nonzero(bg_mask)#CREATE NONZERO 
    count = 0
    for i, j  in o_none_zero: #CREATE LONG LIST OF DUPLICATE, VECTOR TO SUB FROM BG NONZERO vectorized style
        i_j_tensor = torch.tensor([[i, j]]).repeat(bg_none_zero.size(0), 1).to(device) #vectorized, repeat by background size.
        idx = ((i_j_tensor - bg_none_zero)**2).sum(1).argmin(0) #idx of neighbor
        i_new, j_new = bg_none_zero[idx][0], bg_none_zero[idx][1]
        y_target[0, i, j] = y_target[0, i_new, j_new]
        if count % 5000 == 0 and count > 0: #optimization, avoid buffer overflows
            print(count)
        count+=1
        del i_j_tensor
    return y_target #picture without the target class

In [None]:
def image_dependent_attack_pgd(device,model,img, eps, alpha, y_target, target_class, max_iter, plot):
    model.eval() #prepres model for evaluation, PYTORCH ONLY!
    pertubation = torch.zeros_like(img) 
    y_target=y_target.to(device)
    for i in range(max_iter):
        pertubation = Variable(pertubation, requires_grad=True)
        selected_model_outputs = model(img+pertubation)[0]
        outputs = F.softmax(selected_model_outputs, dim=1) #max over each pixel, dim = 1
        loss = F.cross_entropy(outputs, y_target) #ce is a measure between two histograms, not 2 scalars. (be carefull when everything besides 1 scalars is nonzero). in tf requires one hot.
        print(f'Iter {i} loss is: {loss.item()}')
        loss.backward()
        gradient =  pertubation.grad.data.sign()
        pertubation = torch.clamp(pertubation - alpha*gradient, -eps, eps) #pgd style clapping.

        predicted_image = model(img+pertubation)[0].argmax(1).squeeze(0)#in the depth (dim 3 ) the size is the number of classes of segementations. two dim in the end by argmax.
        predicted_image = predicted_image.cpu().detach()
        if plot and i%10 == 0:
            plt.imshow(predicted_image)
            plt.title('Iteration number {}'.format(i))
            plt.show()

        pixels_left = (predicted_image==target_class).sum().numpy() #how much pixels is removed from the output. the sum 0 is the best. sort of accuracy.
        print(f'Class pixel left {pixels_left}')
    return (img+pertubation).detach()


def image_dependent_attack_weighted_avg(device,model,img, eps, alpha, w, y_target, target_class, max_iter, plot):
    model.eval()#prepres model for evaluation, PYTORCH ONLY!
    pertubation = torch.zeros_like(img)
    y_target=y_target.to(device)
    for i in range(max_iter):
        pertubation = Variable(pertubation, requires_grad=True)
        selected_model_outputs = model(img+pertubation)[0]
        outputs = F.softmax(selected_model_outputs, dim=1) #max over each pixel, dim = 1

        preds = outputs.argmax(1).to(device)
        target_mask = (preds == target_class).to(device)
        bg_mask = (preds != target_class).to(device)
        mask = (((preds == y_target) & (outputs.max(1)[0].to(device) < 0.85).to(device)) & target_mask) | ((preds != y_target) & target_mask) #0.85 is the confidence score.    
        o_pixels = torch.masked_select(outputs, mask).view(-1,  19)
        o_preds = torch.masked_select(y_target, mask).view(-1)
        o_loss = w*F.cross_entropy(o_pixels, o_preds, reduction='sum')
        
        mask = (((preds == y_target) & (outputs.max(1)[0].to(device) < 0.85).to(device)) & bg_mask) | ((preds != y_target) & bg_mask)  #0.85 is the confidence score.    
        bg_pixels = torch.masked_select(outputs, mask).view(-1, 19)
        bg_preds = torch.masked_select(y_target, mask).view(-1)
        bg_loss = (1-w)*F.cross_entropy(bg_pixels, bg_preds, reduction='sum')
        
        loss = 1/(bg_preds.size(0)+o_preds.size(0))*(bg_loss + o_loss)
        loss.backward()

        gradient =  torch.ge(pertubation.grad.data, 0)
        gradient = (gradient.float() - 0.5) * 2

        # Normalizing the gradient to the same space of image
        gradient[0][0] = (gradient[0][0])/(transform_norm_x_std)
        gradient[0][1] = (gradient[0][1])/(transform_norm_y_std)
        gradient[0][2] = (gradient[0][2])/(transform_norm_z_std)
        
        pertubation = torch.clamp(pertubation - alpha*gradient, -eps, eps) #pgd style clapping.
        predicted_image = model(img+pertubation)[0].argmax(1).squeeze(0)#in the depth (dim 3 ) the size is the number of classes of segementations. two dim in the end by argmax.
        predicted_image = predicted_image.cpu().detach()

        if plot and i%5 == 0:
            plt.imshow(model(img+pertubation)[0].argmax(1).cpu().detach())
            plt.title('iteration number {}'.format(i))
            plt.show()
            
        pixels_left = (predicted_image==target_class).sum().numpy() #how much pixels is removed from the output. the sum 0 is the best. sort of accuracy.
        print(f'Class pixel left {pixels_left}')
    return (img+pertubation).detach()

In [None]:
def get_map():
    return np.array(list(zip(range(len(VALID_CLASSES)), VALID_CLASSES, LABEL_NAMES, LABEL_COLORS)))

MY_MAP = get_map()

def get_labels(lbls):
    ret = {'colors': [], 'names': []}
    for lbl in lbls:
        ret['colors'].append(MY_MAP[lbl][-1])
        ret['names'].append(MY_MAP[lbl][-2])
    ret['colors'] = np.asarray(ret['colors']).reshape(-1, 1, 3)
    return ret

def to_rgba(rgb, one=True):
    return tuple([x * 1.0 / 255 for x in rgb]) + (1. if one else 0.,)

cmaplist = [to_rgba(x[3]) for x in MY_MAP] # [(x[0], to_rgba(x[3])) for x in MY_MAP]
cmap = colors.ListedColormap(cmaplist)

In [None]:
def fix_mask(mask):
    get_id = np.vectorize(lambda x: MY_MAP[x][1])
    return get_id(mask)

In [None]:
def save_tensor(device,model, img, mask,image_idx,pgd_attack=True,eps = 8.0 / 255, max_iter=100):
  print("saving tensor for img idx: "+str(image_idx))
  model.eval()
  img_in_device = img.to(device)
  mask_fixed = fix_mask(mask.squeeze())
  clean_seg_pred = model(img_in_device)[0].argmax(1).cpu()
  unique_labels = np.array(list(set(np.unique(mask_fixed)).union(set(np.unique(clean_seg_pred)))))
  target_class = 13 if 13 in unique_labels else unique_labels[-2] #Car target
  #labels = get_labels(unique_labels)
  print('Attacking target class: {} - {}'.format(target_class, LABEL_NAMES[target_class]))
  y_target = create_target(device,model,img_in_device, target_class=target_class) #in tf h,w,c , in pytorch c,h,w
  if pgd_attack == True:
    attack = image_dependent_attack_pgd(device,model,img=img_in_device, eps=eps, alpha=0.0001, y_target=y_target, target_class=target_class, max_iter=max_iter, plot=False)
  else:
    attack = image_dependent_attack_weighted_avg(device,model,img=img_in_device, eps=eps, alpha=0.0001, w = 0.1, y_target=y_target, target_class=target_class, max_iter=max_iter, plot=False)
  perturbed_img = attack.cpu().squeeze()                       
  perturbed_pred = model(attack.to(device))[0].argmax(1).cpu()
  dest_dir = "./SCNN_attack_output_alphas"
  dest = dest_dir+"/{0}_{1}.pt"
  img = img.squeeze()
  torch.save(img, dest.format("img", image_idx))
  torch.save(mask, dest.format("real_mask", image_idx))
  torch.save(clean_seg_pred, dest.format("mask", image_idx))
  torch.save(perturbed_img, dest.format("perturbed_img", image_idx))
  torch.save(perturbed_pred, dest.format("perturbed_mask", image_idx))

In [None]:
transform_norm_x_std = 0.229
transform_norm_y_std = 0.224
transform_norm_z_std = 0.225

transform_norm_x_avg = .485
transform_norm_y_avg = .456
transform_norm_z_avg = .406

input_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([transform_norm_x_avg, transform_norm_y_avg, transform_norm_z_avg], [transform_norm_x_std,transform_norm_y_std ,transform_norm_z_std ]),])

dataset_source = './dataset_Cityscapes/'
split_type = 'val'

val_dataset = CitySegmentation(root=dataset_source, 
                               split=split_type, 
                               mode='testval',
                               transform=input_transform)

val_loader = data.DataLoader(dataset=val_dataset,
                                  batch_size=1,
                                  shuffle=False)

required_path_for_images = dataset_source+split_type
if not os.path.isdir(required_path_for_images):
  os.mkdir(required_path_for_images)
  print("Directory '%s' created" %required_path_for_images)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = get_fast_scnn('citys', pretrained=True, root='./Fast-SCNN-pytorch/Fast-SCNN-pytorch/weights', map_cpu=False).to(device)

In [None]:
pgd_attack = True # if True -> pgd, if False -> weighted.
eps = 8.0 / 255
max_iter_in_attack=100

run_tensor_save = True
if run_tensor_save == True:
  for i, obj in enumerate(val_loader):
    img, mask = obj[0], obj[1]
    save_tensor(device,model, img, mask,i,pgd_attack,eps,max_iter_in_attack)





In [None]:
foreground_classes = [5, 6, 7, 11, 13, 14, 15, 17, 18]

def class_to_rgba(lbl, trans=False):
  return tuple([x / 255 for x in MY_MAP[lbl][3]]) + (1. if not trans else 0.,)

def vec_to_rgba(vec):
  ret = []
  for i in range(vec.shape[0]):
    ret.append([class_to_rgba(vec[i][j], vec[i][j] in foreground_classes) for j in range(vec.shape[1])])
  return np.array(ret)

def save_fig(path, img, mask=None, alpha=0.9):
  """
  Plots the image and overlay and saves it into path
  Arguments:
  path -- the path in which to save the image
  img -- the image tensor, need cast to HWC
  Keyword arguments:
  mask -- if not None, should be a tensor of the segmentation mask, and plots it on top of the image, need cast CHW to HWC
  alpha -- the transparency of the overlay

  """
  plt.axis("off")

  print("image shape:"+str(img.shape))
  if img.shape[0] <= 3: #  sometimes need to cast permute(1, 2, 0) from CHW to HWC. e.g. [3/1,1024,2048] - > [1024,2048,3/1]
    img = img.permute(1, 2, 0)
    print("image shape after permute:"+str(img.shape))

  if mask is not None:
      print("mask shape:"+str(mask.shape))
      if mask.shape[0] <= 3:  #  sometimes need to cast permute(1, 2, 0) from CHW to HWC. e.g. [3/1,1024,2048] - > [1024,2048,3/1]
        mask = mask.permute(1, 2, 0)
        print("mask shape after permute:"+str(mask.shape))


  mu = [transform_norm_x_avg, transform_norm_y_avg, transform_norm_z_avg]
  std = [transform_norm_x_std,transform_norm_y_std ,transform_norm_z_std]
  for dim in range(3):
    img[:, :, dim] = img[:, :, dim]*std[dim]+mu[dim]

  if mask is not None:
    mask = vec_to_rgba(mask)
    mask = mask[:,:,:3]
    img = img*alpha + mask*(1-alpha)
  im = np.array(img)
  im = Image.fromarray((im * 255).astype(np.uint8))
  im.save(path)

def save_img_from_tensor(i, alpha):
  """
  Reads tensors and saves as image given the alpha
  """
  src_dest_dir = "./SCNN_attack_output_alphas"
  dest = src_dest_dir+"/frame{0}{1}_{2};{3}.png"
  source = src_dest_dir+"/{0}_{1}.pt"
  img = torch.load(source.format("img", i))
  perturbed_img = torch.load(source.format("perturbed_img", i))
  mask = torch.load(source.format("mask", i))
  real_mask = torch.load(source.format("real_mask", i))
  perturbed_mask = torch.load(source.format("perturbed_mask", i))
  save_fig(dest.format('_clean', '_seg', i, alpha), img.clone(), mask, alpha=alpha) 
  save_fig(dest.format('_clean', '', i, alpha), img.clone())
  save_fig(dest.format('_clean_real', '', i, alpha), img.clone(), real_mask, alpha=alpha)
  save_fig(dest.format('_attacked', '_seg', i, alpha), perturbed_img.clone(), perturbed_mask, alpha=alpha) 
  save_fig(dest.format('_attacked', '', i, alpha), perturbed_img.clone()) 


def save_images_from_tensors(idxs=range(500), alpha=0.9):
  """
  Given an iterable of indexes and alpha, saves the image with the given alpha according to the tensor
  """
  for i in idxs:
    print("saving img idx:"+str(i)+", with alpha:"+str(alpha))
    save_img_from_tensor(i, alpha=alpha)


In [None]:
run_image_save = True
if run_image_save == True:
  required_alphas = [0.2,0.25,0.3,0.35,0.4,0.45,0.5,0.55, 0.6, 0.75, 0.9, 1]
  for alpha in required_alphas:
    save_images_from_tensors(range(len(val_loader)),alpha=alpha)