In [None]:
import os
import cv2
import sys
import math
import random
import time
import numbers
import pickle
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as TF
import torch.optim as optim
import torchvision.transforms as transforms
from torch.autograd import Variable
import torchvision.models as models
from torchvision.datasets import ImageFolder
from torchvision import datasets

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)
np.random.seed(42)
random.seed(42)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
# Setting device for Mac
# if torch.backends.mps.is_available():
#         device = torch.device("mps")
#         print(device)

# Setting device for Windows
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
#     transforms.Resize((224)),
])

In [None]:
transform2 = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.Resize((224)),
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.4),
    transforms.RandomRotation(5, expand=False, center=None),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
number_of_classes = 

In [None]:
encoder = models.convnext_base(pretrained=True)
encoder.classifier[2]=nn.Linear(1024,number_of_classes)

#path to the model trained in stage 1
encoder.load_state_dict(torch.load('.pt'))
encoder.classifier[2] = nn.Linear(1024, 1024)
encoder = encoder.to(device)

In [None]:
def extract_features(image_tensor, model = encoder):
    model.eval()
    features = model(image_tensor)
    normalized_features = torch.nn.functional.normalize(features, p=2, dim=1)
    return normalized_features

In [None]:
normalize = transforms.Compose([
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [None]:
def gaussian_smoothing(input_tensor, channels, sigma, kernel_size, dim):
    if isinstance(kernel_size, numbers.Number):
        kernel_size = [kernel_size] * dim
    if isinstance(sigma, numbers.Number):
        sigma = [sigma] * dim

    kernel = 1
    meshgrids = torch.meshgrid(
        [torch.arange(size, dtype=torch.float32, device=input_tensor.device) for size in kernel_size]
    )
    for size, std, mgrid in zip(kernel_size, sigma, meshgrids):
        mean = (size - 1) / 2
        kernel *= 1 / (std * math.sqrt(2 * math.pi)) * torch.exp(-((mgrid - mean) / std) ** 2 / 2)

    kernel = kernel / torch.sum(kernel)

    kernel = kernel.view(1, 1, *kernel.size())
    kernel = kernel.repeat(channels, *[1] * (kernel.dim() - 1))

    return F.conv2d(input_tensor, weight=kernel, groups=channels, padding='same')

In [None]:
def rgb2yuv_batch(images):
    transformation_matrix = torch.tensor(
        [[0.299, -0.14713, 0.615],
         [0.587, -0.288862, -0.51499],
         [0.114, 0.436, -0.10001]]
    ).to(images.device)
    
    yuv_images = torch.tensordot(images, transformation_matrix, dims=([3], [0]))
    return yuv_images

def psnr_y_channel(img1, img2):
    mse = torch.mean((img1 - img2) ** 2, dim=[1, 2])
    max_pixel = 1.0
    psnr = 20 * torch.log10(max_pixel / torch.sqrt(mse + 1e-10))
    return psnr

def psnr_hvsm_loss(img1, img2):
    img1_yuv = rgb2yuv_batch(img1.permute(0, 2, 3, 1))
    img2_yuv = rgb2yuv_batch(img2.permute(0, 2, 3, 1))

    # Extract Y channel
    img1_y = img1_yuv[:, :, :, 0]
    img2_y = img2_yuv[:, :, :, 0]

    psnr_values = psnr_y_channel(img1_y, img2_y)

    return -torch.mean(psnr_values).to(img1.device)

In [None]:
def mifgsm(images, labels, eps, alpha, steps, decay, sigma, kernel_size, dim, target_features_dict,
           min_psnr_hvsm = 40.5):
    # Static variable to accumulate total steps
    if not hasattr(mifgsm, "total_steps"):
        mifgsm.total_steps = 0

    images = images.clone().detach().to(device)
    momentum = torch.zeros_like(images).detach().to(device)
    adv_images = images.clone().detach()
    
    target_features = torch.stack([target_features_dict[label] for label in labels]).to(device)
    
    initial_psnr = -psnr_hvsm_loss(adv_images, images) #for psnr hvsm loss

    for step in range(steps):
        adv_images.requires_grad = True
        
        transformed_images = torch.stack([transform2(img) for img in adv_images])
        smoothed_images = gaussian_smoothing(transformed_images, transformed_images.shape[1], sigma, kernel_size, dim)
        
        adv_features = extract_features(smoothed_images)
        
        similarity_loss = -torch.cosine_similarity(adv_features, target_features, dim=1).mean()
        current_psnr = -psnr_hvsm_loss(adv_images, images)

        psnr_loss = torch.abs(current_psnr - initial_psnr).mean().to(device)
        #fractioned down psnr_loss to match similarity_loss in magnitude
        total_loss = similarity_loss + (psnr_loss * 0.000001)
        
        grad = torch.autograd.grad(total_loss, adv_images, retain_graph=False, create_graph=False)[0]
        
        grad = grad / torch.norm(grad, p=2, dim=(1, 2, 3), keepdim=True)
        grad = grad + decay * momentum
        momentum = grad
        
        adv_images = adv_images.detach() + alpha * grad
        delta = torch.clamp(adv_images - images, min=-eps, max=eps)
        adv_images = torch.clamp(images + delta, min=0, max=1).detach()
        
        # Early stopping condition based on min_psnr_hvsm
        if current_psnr <= min_psnr_hvsm:
            mifgsm.total_steps += step + 1
            break
    
    return adv_images, mifgsm.total_steps

In [None]:
def process(
    source_dir, dest_dir,
    target_features_dict,
    alpha, epsilon, num_iter,
    batch_size, kernel_size,
    sigma, dim, decay):
    
    imgs = []
    paths = []
    labels = []  # To store labels for each image
    
    # Collect images, paths, and labels
    for folder_name in os.listdir(source_dir):
        folder_path = os.path.join(source_dir, folder_name)
        if not os.path.isdir(folder_path):
            continue
        for img_name in os.listdir(folder_path):
            img_path = os.path.join(folder_path, img_name)
            image = cv2.imread(img_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            imgs.append(image)
            paths.append(img_path)
            labels.append(folder_name)  # Use folder name as label
    
    idx = 0
    with tqdm(total=len(imgs), desc="Processing Images") as pbar:
        while idx < len(imgs):
            batch_imgs = imgs[idx:idx + batch_size]
            batch_paths = paths[idx:idx + batch_size]
            batch_labels = labels[idx:idx + batch_size]
            idx += batch_size
            tensor_imgs = torch.stack([transform(i) for i in batch_imgs]).to(device)
           
            adversarial_image_tensors, steps_taken = mifgsm(
                tensor_imgs,
                batch_labels,
                epsilon,
                alpha,
                num_iter,
                decay,
                sigma,
                kernel_size,
                dim,
                target_features_dict
            )
            
            for adv_img, img_path in zip(adversarial_image_tensors, batch_paths):
                adv_img = adv_img.detach().cpu().numpy().transpose(1, 2, 0)
                adv_img = np.clip(adv_img * 255, 0, 255).astype(np.uint8)
                
                relative_path = os.path.relpath(os.path.dirname(img_path), source_dir)
                dest_subfolder = os.path.join(dest_dir, relative_path)
                if not os.path.exists(dest_subfolder):
                    os.makedirs(dest_subfolder)
                
                filename = os.path.basename(img_path)[:-4]
                dest_path = os.path.join(dest_subfolder, filename +'.png')
                
                cv2.imwrite(dest_path, cv2.cvtColor(adv_img, cv2.COLOR_RGB2BGR))
            
            pbar.update(len(batch_imgs))
    return steps_taken

In [None]:
#paramters used with recommended values
alpha = 2/255
eps = 8/255
max_steps = 1000
batch_size = 16
kernel_size = 5
sigma = 1.5
dim = 2
decay = 1.0
steps_taken = 0

#path to target_features computed in stage 1
target_features_dict = torch.load('.pth')


#path to source directory
source_dir = ''

#path to distination directory
dest_dir = ''

In [None]:
st = time.time()
steps_taken = process(source_dir, dest_dir, target_features_dict, alpha, eps, max_steps, batch_size, kernel_size, sigma, dim, decay)
ed = time.time()
print(f"Total time taken is: {ed - st} seconds, steps taken {steps_taken}")