<a href="https://colab.research.google.com/github/rymarinelli/Python/blob/master/Kvasir_Data_Poison_Radiology.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Cleaning

## Download Datasets

In [None]:
! wget https://datasets.simula.no/downloads/kvasir-seg.zip
! unzip kvasir-seg.zip -d ./kvasir

--2024-10-24 18:04:01--  https://datasets.simula.no/downloads/kvasir-seg.zip
Resolving datasets.simula.no (datasets.simula.no)... 128.39.36.14
Connecting to datasets.simula.no (datasets.simula.no)|128.39.36.14|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 46227172 (44M) [application/zip]
Saving to: ‘kvasir-seg.zip.2’


2024-10-24 18:04:05 (13.1 MB/s) - ‘kvasir-seg.zip.2’ saved [46227172/46227172]

Archive:  kvasir-seg.zip
replace ./kvasir/Kvasir-SEG/kavsir_bboxes.json? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [None]:
import os
import shutil
import random
import json
from PIL import Image
import numpy as np

def split_dataset(image_dir, mask_dir, output_dir, bbox_json_path, val_split=0.2, seed=42):
    """
    Split the dataset into training and validation directories, using only images that have bounding box annotations.

    Parameters:
        image_dir (str): Path to the images directory.
        mask_dir (str): Path to the masks directory.
        output_dir (str): Path to the output directory where train/val directories will be created.
        bbox_json_path (str): Path to the JSON file containing bounding box annotations.
        val_split (float): Fraction of the dataset to use as validation (e.g., 0.2 means 20% validation).
        seed (int): Random seed for reproducibility.
    """
    # Load bounding box data from JSON file
    with open(bbox_json_path, 'r') as f:
        bbox_data = json.load(f)

    # Filter images that have bounding box annotations
    annotated_images = set(bbox_data.keys())

    # training and validation sets
    train_image_dir = os.path.join(output_dir, 'train', 'images')
    val_image_dir = os.path.join(output_dir, 'val', 'images')
    train_mask_dir = os.path.join(output_dir, 'train', 'masks')
    val_mask_dir = os.path.join(output_dir, 'val', 'masks')

    os.makedirs(train_image_dir, exist_ok=True)
    os.makedirs(val_image_dir, exist_ok=True)
    os.makedirs(train_mask_dir, exist_ok=True)
    os.makedirs(val_mask_dir, exist_ok=True)

    #corresponding bounding box annotations
    image_filenames = [f for f in os.listdir(image_dir) if f.split('.')[0] in annotated_images]

    # Shuffle image filenames
    random.seed(seed)
    random.shuffle(image_filenames)

    # Split into training and validation sets
    val_size = int(len(image_filenames) * val_split)
    val_filenames = image_filenames[:val_size]
    train_filenames = image_filenames[val_size:]

    # create masks based on bounding boxes
    def copy_files(file_list, src_image_dir, dest_image_dir, dest_mask_dir, bbox_data):
        for filename in file_list:
            # Copy image
            shutil.copy(os.path.join(src_image_dir, filename), os.path.join(dest_image_dir, filename))

            # Create a corresponding mask based on bounding box data
            image_id = filename.split('.')[0]
            bbox_info = bbox_data.get(image_id, {})
            if 'bbox' in bbox_info:
                # Load the image to get its dimensions
                image_path = os.path.join(src_image_dir, filename)
                image = Image.open(image_path)
                width, height = image.size

                # Create an empty mask
                mask = np.zeros((height, width), dtype=np.uint8)


                for bbox in bbox_info['bbox']:
                    xmin, ymin, xmax, ymax = bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']
                    mask[ymin:ymax, xmin:xmax] = 255

                # Save the mask
                mask = Image.fromarray(mask)
                mask_filename = filename.replace('.jpg', '.png')  # Adjust extension if needed
                mask.save(os.path.join(dest_mask_dir, mask_filename))
            else:
                print(f"Warning: No bounding box found for image '{filename}'")

    # Copy training files and create masks
    copy_files(train_filenames, image_dir, train_image_dir, train_mask_dir, bbox_data)

    # Copy validation files and create masks
    copy_files(val_filenames, image_dir, val_image_dir, val_mask_dir, bbox_data)

    print(f"Training and validation sets created in '{output_dir}'")
    print(f"Training images: {len(train_filenames)}, Validation images: {len(val_filenames)}")


image_dir = '/content/kvasir/Kvasir-SEG/images'
mask_dir = '/content/kvasir/Kvasir-SEG/masks'
output_dir = 'dataset'
bbox_json_path = '/content/kvasir/Kvasir-SEG/kavsir_bboxes.json'

# Split dataset with 20% validation data
split_dataset(image_dir, mask_dir, output_dir, bbox_json_path, val_split=0.2)


Training and validation sets created in 'dataset'
Training images: 800, Validation images: 200


In [None]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms

# Custom Dataset class for U-Net
class SegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_name = self.images[idx]
        image_path = os.path.join(self.image_dir, image_name)
        mask_name = image_name.replace('.jpg', '.png')  # Assuming mask files are .png
        mask_path = os.path.join(self.mask_dir, mask_name)

        # Load image and mask
        image = Image.open(image_path).convert('RGB')
        mask = Image.open(mask_path).convert('L')  # 'L' mode for grayscale (single channel)

        # Apply transformations if provided
        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        # Convert mask to tensor (required for training)
        mask = torch.tensor(np.array(mask), dtype=torch.long)  # Ensuring mask is long (integer) type for cross entropy

        return image, mask


In [None]:
# transformations for both images and masks
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images and masks to 256x256
    transforms.ToTensor()
])

train_dataset = SegmentationDataset(image_dir='/content/dataset/train',
                                    mask_dir='/content/dataset/val',
                                    transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=8, shuffle=True)


In [None]:
import torch
import torch.nn as nn

class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()

        # Contracting path (Encoder)
        self.enc1 = self.conv_block(in_channels, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.enc4 = self.conv_block(256, 512)

        # Bottleneck
        self.bottleneck = self.conv_block(512, 1024)

        # Expansive path (Decoder)
        self.upconv4 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.dec4 = self.conv_block(1024, 512)

        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec3 = self.conv_block(512, 256)

        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec2 = self.conv_block(256, 128)

        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec1 = self.conv_block(128, 64)

        # Output layer
        self.output_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        """Two convolutional layers with ReLU and Batch Normalization"""
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Encoder (Downsampling)
        enc1 = self.enc1(x)
        enc2 = self.enc2(nn.MaxPool2d(kernel_size=2, stride=2)(enc1))
        enc3 = self.enc3(nn.MaxPool2d(kernel_size=2, stride=2)(enc2))
        enc4 = self.enc4(nn.MaxPool2d(kernel_size=2, stride=2)(enc3))

        # Bottleneck
        bottleneck = self.bottleneck(nn.MaxPool2d(kernel_size=2, stride=2)(enc4))

        # Decoder (Upsampling)
        dec4 = self.upconv4(bottleneck)
        dec4 = torch.cat((enc4, dec4), dim=1)  # Skip connection
        dec4 = self.dec4(dec4)

        dec3 = self.upconv3(dec4)
        dec3 = torch.cat((enc3, dec3), dim=1)  # Skip connection
        dec3 = self.dec3(dec3)

        dec2 = self.upconv2(dec3)
        dec2 = torch.cat((enc2, dec2), dim=1)  # Skip connection
        dec2 = self.dec2(dec2)

        dec1 = self.upconv1(dec2)
        dec1 = torch.cat((enc1, dec1), dim=1)  # Skip connection
        dec1 = self.dec1(dec1)

        # Output
        return self.output_conv(dec1)


Train

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from tqdm import tqdm
from PIL import Image
import os
import numpy as np
from sklearn.metrics import average_precision_score

# Dataset class for loading images and masks
class SegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None, mask_transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.mask_transform = mask_transform

        # Filter images to ensure corresponding mask exists
        self.images = []
        for img_name in os.listdir(image_dir):
            mask_name = img_name.replace('.jpg', '.png')
            mask_path = os.path.join(mask_dir, mask_name)
            if os.path.exists(mask_path):
                self.images.append(img_name)
            else:
                print(f"Warning: Mask not found for {img_name}, expected path: {mask_path}")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_name = self.images[idx]
        image_path = os.path.join(self.image_dir, image_name)
        mask_name = image_name.replace('.jpg', '.png')
        mask_path = os.path.join(self.mask_dir, mask_name)


        image = Image.open(image_path).convert('RGB')
        mask = Image.open(mask_path).convert('L')  # 'L' mode for grayscale (single channel)

        # Apply transformations to image
        if self.transform:
            image = self.transform(image)

        # Apply transformations to mask
        if self.mask_transform:
            mask = self.mask_transform(mask)

        # Convert mask to tensor (0 for background, 1 for the class)
        mask = torch.tensor(np.array(mask), dtype=torch.float32) / 255.0  # Normalize to [0, 1]

        return image, mask


transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

# Define mask transformation (resize only, no need to normalize)
mask_transform = transforms.Compose([
    transforms.Resize((256, 256), interpolation=Image.NEAREST),  # Use NEAREST for masks to avoid interpolation artifacts
])

# Initialize dataset and data loader
train_dataset = SegmentationDataset(image_dir='/content/dataset/train/images',
                                    mask_dir='/content/dataset/train/masks',
                                    transform=transform,
                                    mask_transform=mask_transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

val_dataset = SegmentationDataset(image_dir='/content/dataset/val/images',
                                  mask_dir='/content/dataset/val/masks',
                                  transform=transform,
                                  mask_transform=mask_transform)

val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize U-Net model, loss function, and optimizer
model = UNet(in_channels=3, out_channels=1)  # Change out_channels to 1 for one class (foreground vs background)
model = model.to(device)  # Move model to GPU if available

# Loss function: Binary Cross Entropy with Logits for one-class segmentation
criterion = nn.BCEWithLogitsLoss()


optimizer = optim.Adam(model.parameters(), lr=0.001)


num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    # Training loop
    for images, masks in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{num_epochs}"):
        images = images.to(device)
        masks = masks.unsqueeze(1).to(device)  # Add channel dimension for masks

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # Calculate loss
        loss = criterion(outputs, masks)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Accumulate loss
        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}")


    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for val_images, val_masks in val_loader:
            val_images = val_images.to(device)
            val_masks = val_masks.unsqueeze(1).to(device)  # Add channel dimension for masks

            # Forward pass
            val_outputs = model(val_images)

            # Calculate loss
            val_loss += criterion(val_outputs, val_masks).item()

    print(f"Validation Loss: {val_loss/len(val_loader)}")

# Calculate mAP on validation set
def calculate_mAP(model, dataloader, device, threshold=0.5):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, masks in dataloader:
            images = images.to(device)
            masks = masks.to(device).view(-1)  # Flatten the masks

            # Forward pass
            outputs = model(images)
            preds = torch.sigmoid(outputs)  # Apply sigmoid to get probabilities
            preds = (preds > threshold).float().view(-1)  # Threshold to get binary predictions

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(masks.cpu().numpy())

    # Calculate mean Average Precision (mAP)
    mAP = average_precision_score(all_labels, all_preds)
    return mAP

# Calculate mAP for validation set
mAP_score = calculate_mAP(model, val_loader, device)
print(f"Mean Average Precision (mAP) score: {mAP_score}")

print("Training complete!")


Training Epoch 1/10: 100%|██████████| 100/100 [00:11<00:00,  8.79it/s]


Epoch [1/10], Loss: 0.5102599602937699
Validation Loss: 0.4936222195625305


Training Epoch 2/10: 100%|██████████| 100/100 [00:11<00:00,  8.77it/s]


Epoch [2/10], Loss: 0.47347333312034606
Validation Loss: 0.4693891930580139


Training Epoch 3/10: 100%|██████████| 100/100 [00:11<00:00,  8.78it/s]


Epoch [3/10], Loss: 0.45264406204223634
Validation Loss: 0.4470020651817322


Training Epoch 4/10: 100%|██████████| 100/100 [00:11<00:00,  8.83it/s]


Epoch [4/10], Loss: 0.4388850110769272
Validation Loss: 0.4252496516704559


Training Epoch 5/10: 100%|██████████| 100/100 [00:11<00:00,  8.73it/s]


Epoch [5/10], Loss: 0.42921296566724776
Validation Loss: 0.4480717831850052


Training Epoch 6/10: 100%|██████████| 100/100 [00:11<00:00,  8.77it/s]


Epoch [6/10], Loss: 0.42517987012863157
Validation Loss: 0.4377950584888458


Training Epoch 7/10: 100%|██████████| 100/100 [00:11<00:00,  8.76it/s]


Epoch [7/10], Loss: 0.42168295800685884
Validation Loss: 0.5769519257545471


Training Epoch 8/10: 100%|██████████| 100/100 [00:11<00:00,  8.81it/s]


Epoch [8/10], Loss: 0.4175533476471901
Validation Loss: 0.40580100417137144


Training Epoch 9/10: 100%|██████████| 100/100 [00:11<00:00,  8.76it/s]


Epoch [9/10], Loss: 0.4058820441365242
Validation Loss: 0.40192769646644594


Training Epoch 10/10: 100%|██████████| 100/100 [00:11<00:00,  8.81it/s]


Epoch [10/10], Loss: 0.3984456717967987
Validation Loss: 0.39607759833335876
Mean Average Precision (mAP) score: 0.4253216268380061
Training complete!


FGSM

In [None]:
# FGSM attack to generate adversarial examples
def fgsm_attack(model, images, labels, epsilon):
    # Set requires_grad attribute of tensor so we can calculate gradient with respect to it
    images.requires_grad = True

    # Forward pass the data through the model
    outputs = model(images)
    loss = criterion(outputs, labels)

    # Zero all existing gradients
    model.zero_grad()


    loss.backward()

    # Collect the element-wise sign of the data gradient
    data_grad = images.grad.data.sign()

    # Create the perturbed image by adjusting each pixel of the input image
    perturbed_images = images + epsilon * data_grad

    # Clipping the values to maintain image integrity
    perturbed_images = torch.clamp(perturbed_images, 0, 1)

    # Detach perturbed images to avoid further tracking of gradients
    perturbed_images = perturbed_images.detach()
    return perturbed_images


# Calculate mAP on adversarial (FGSM) validation data
def calculate_mAP_with_adversarial(model, dataloader, device, epsilon, threshold=0.5):
    model.eval()
    all_preds = []
    all_labels = []

    for images, masks in dataloader:
        # Move data to device
        images = images.to(device)
        masks = masks.to(device).unsqueeze(1)  # Add channel dimension

        # Generate adversarial examples using FGSM
        adv_images = fgsm_attack(model, images, masks, epsilon)

        # Forward pass on adversarial images
        with torch.no_grad():
            outputs = model(adv_images)
            preds = torch.sigmoid(outputs)  # Apply sigmoid to get probabilities
            preds = (preds > threshold).float().view(-1)  # Threshold to get binary predictions

            masks = masks.view(-1)  # Flatten the masks

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(masks.cpu().numpy())

    # Calculate mean Average Precision (mAP)
    mAP = average_precision_score(all_labels, all_preds)
    return mAP

# Set epsilon for FGSM attack
epsilon = 8/255

# Calculate mAP for FGSM poisoned validation data
mAP_score_adv = calculate_mAP_with_adversarial(model, val_loader, device, epsilon)
print(f"Mean Average Precision (mAP) score with FGSM adversarial data: {mAP_score_adv}")

print("FGSM mAP testing complete!")


Mean Average Precision (mAP) score with FGSM adversarial data: 0.22305134803804721
FGSM mAP testing complete!


CW

In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
from sklearn.metrics import average_precision_score
from tqdm import tqdm  # Import tqdm for progress bars

# C&W attack to generate adversarial examples for one-class segmentation task
def cw_attack(model, images, labels, c=1e-4, kappa=0, num_iter=100, learning_rate=0.01):
    """
    Performs the Carlini & Wagner (C&W) attack on the input images for one-class segmentation.

    Parameters:
        model (torch.nn.Module): The model to attack.
        images (torch.Tensor): The input images.
        labels (torch.Tensor): The true labels for the input images (one-class segmentation masks).
        c (float): Regularization parameter controlling the importance of misclassification.
        kappa (float): Confidence level for making the adversarial example misclassified.
        num_iter (int): Number of iterations to optimize the adversarial perturbation.
        learning_rate (float): Learning rate for the optimizer.

    Returns:
        torch.Tensor: The perturbed images.
    """
    model.eval()  # Set model to evaluation mode

    # Ensure images and labels have correct data types
    images = images.float()
    labels = labels.float()

    # Initialize the perturbation (w) with zeros and set requires_grad=True
    w = torch.zeros_like(images, requires_grad=True, device=images.device)

    # Optimizer for the perturbation
    optimizer = optim.Adam([w], lr=learning_rate)

    # generate adversarial examples
    for _ in tqdm(range(num_iter), desc="Generating Adversarial Examples (C&W)", leave=True):
        optimizer.zero_grad()

        # Calculate the adversarial examples using tanh to ensure values are in [0, 1]
        perturbed_images = 0.5 * (torch.tanh(w) + 1)


        outputs = model(perturbed_images)  # Outputs should be of shape [batch_size, 1, height, width]

        # Apply sigmoid to get probabilities in range [0, 1]
        outputs = torch.sigmoid(outputs)

        # Misclassification loss for one-class segmentation
        # Invert the labels to create the misclassification objective
        inverted_labels = 1 - labels  # Treat target class pixels as background and vice versa
        f_loss = F.binary_cross_entropy(outputs, inverted_labels)

        # Calculate L2 distance per image in batch
        l2_dist = torch.sum((perturbed_images - images) ** 2, dim=(1, 2, 3))  # L2 distance for each sample in batch

        # Combine the losses element-wise and sum across batch to get the final loss
        total_loss = torch.sum(c * f_loss + l2_dist)

        # Backward pass
        total_loss.backward()
        optimizer.step()

    # Return final adversarial examples
    return perturbed_images.detach()

# Calculate mAP on adversarial (C&W) validation data for one-class segmentation
def calculate_mAP_with_adversarial_cw(model, dataloader, device, c=1e-4, kappa=0, num_iter=100, learning_rate=0.01, threshold=0.5):
    model.eval()
    all_preds = []
    all_labels = []

    for batch_idx, (images, masks) in enumerate(dataloader):
        images = images.to(device)
        masks = masks.to(device).unsqueeze(1)  # Add channel dimension

        #  progress bar
        adv_images = cw_attack(model, images, masks, c=c, kappa=kappa, num_iter=num_iter, learning_rate=learning_rate)

        # Forward pass on adversarial images
        with torch.no_grad():
            outputs = model(adv_images)
            preds = torch.sigmoid(outputs)
            preds = (preds > threshold).float().view(-1)

            masks = masks.view(-1)  # Flatten the masks

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(masks.cpu().numpy())

    # Calculate mean Average Precision (mAP)
    mAP = average_precision_score(all_labels, all_preds)
    return mAP

# parameters for C&W attack
c = 100       # Increase regularization parameter to emphasize misclassification
kappa = 15     # Increase kappa to push the adversarial example confidently away from the correct class
num_iter = 100 # Increase number of iterations to ensure better convergence
learning_rate = 0.1  # Increase learning rate to optimize perturbations

# Calculate mAP for C&W poisoned validation data
mAP_score_adv_cw = calculate_mAP_with_adversarial_cw(model, val_loader, device, c=c, kappa=kappa, num_iter=num_iter, learning_rate=learning_rate)
print(f"Mean Average Precision (mAP) score with C&W adversarial data: {mAP_score_adv_cw}")

print("C&W mAP testing complete!")


Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.60it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:05<00:00, 18.68it/s]
Generating Adversaria

Mean Average Precision (mAP) score with C&W adversarial data: 0.19864990846426359
C&W mAP testing complete!


In [None]:
def fgsm_attack(model, images, labels, epsilon, save_path=None):
    model.eval()


    images = images.to(device)
    labels = labels.to(device).unsqueeze(1)  # Add channel dimension to labels to match output shape [batch_size, 1, H, W]

    images.requires_grad = True

    # Forward pass to get model predictions
    outputs = model(images)


    print(f"Output shape: {outputs.shape}, Label shape (after unsqueeze): {labels.shape}")

    # Calculate the loss
    if labels.shape != outputs.shape:
        raise ValueError(f"Shape mismatch: Output shape {outputs.shape} and label shape {labels.shape} must be the same.")

    loss = criterion(outputs, labels)


    model.zero_grad()


    loss.backward()

    # Get the gradient of the images
    data_grad = images.grad.data.sign()

    # Create perturbed image by adjusting the input image with epsilon in the direction of the gradient
    perturbed_images = images + epsilon * data_grad

    # Clip the perturbed images to maintain the range [0, 1]
    perturbed_images = torch.clamp(perturbed_images, 0, 1)

    # Detach the perturbed images from the computation graph
    perturbed_images = perturbed_images.detach()


    if save_path is not None:
        save_image(perturbed_images[0], save_path)
        print(f"Adversarial image saved at: {save_path}")

    return perturbed_images



# Get a batch of data from the DataLoader
data_iter = iter(val_loader)
images, masks = next(data_iter)


model = model.to(device)
criterion = torch.nn.BCEWithLogitsLoss()


epsilon = 8 / 255
fgsm_save_path = "./fgsm_adv_image.png"

fgsm_adv_images = fgsm_attack(model, images, masks, epsilon, save_path=fgsm_save_path)



Output shape: torch.Size([8, 1, 256, 256]), Label shape (after unsqueeze): torch.Size([8, 1, 256, 256])
Adversarial image saved at: ./fgsm_adv_image.png


In [None]:
# C&W attack to generate a single adversarial example
def cw_attack_single(model, image, label, c=1e-4, kappa=0, num_iter=100, learning_rate=0.01, save_path=None):
    model.eval()  # Set model to evaluation mode

    # Move data to the same device
    image = image.to(device).float()
    label = label.to(device).float().unsqueeze(0).unsqueeze(0)  # Add batch and channel dimensions to label

    # Initialize the perturbation (w) with zeros and set requires_grad=True
    w = torch.zeros_like(image, requires_grad=True, device=device)

    # Optimizer for the perturbation
    optimizer = optim.Adam([w], lr=learning_rate)

    for _ in tqdm(range(num_iter), desc="Generating Adversarial Example (C&W)", leave=True):
        optimizer.zero_grad()

        # Calculate the adversarial example using tanh to ensure values are in [0, 1]
        perturbed_image = 0.5 * (torch.tanh(w) + 1)


        output = model(perturbed_image)


        output = torch.sigmoid(output)

        # Misclassification loss for one-class segmentation
        inverted_label = 1 - label  # Treat target class pixels as background and vice versa
        f_loss = torch.nn.functional.binary_cross_entropy(output, inverted_label)

        # Calculate L2 distance
        l2_dist = torch.sum((perturbed_image - image) ** 2)

        # Combine the losses
        total_loss = c * f_loss + l2_dist

        # Backward pass
        total_loss.backward()
        optimizer.step()

    # Detach the perturbed image from the computation graph
    perturbed_image = perturbed_image.detach()


    if save_path is not None:
        save_image(perturbed_image[0], save_path)
        print(f"Adversarial C&W image saved at: {save_path}")

    return perturbed_image


In [None]:
# Generate and save a single C&W adversarial example
data_iter = iter(val_loader)
images, masks = next(data_iter)
image = images[0].unsqueeze(0)  # Add batch dimension to make it [1, channels, height, width]
mask = masks[0]  # Shape: [height, width]

c = 100
kappa = 15
num_iter = 100
learning_rate = 0.01
cw_adv_image = cw_attack_single(model, image, mask, c=c, kappa=kappa, num_iter=num_iter, learning_rate=learning_rate, save_path=cw_save_path)

Generating Adversarial Example (C&W): 100%|██████████| 100/100 [00:01<00:00, 85.56it/s]

Adversarial C&W image saved at: ./cw_adv_image.png





Knowledge Distillation

In [None]:
# Knowledge distillation parameters
alpha = 0.5  # Weight for the ground truth loss (between 0 and 1)
temperature = 3.0  # Temperature for softening the logits

# Optimizer for the student model
optimizer_student = optim.Adam(student_model.parameters(), lr=0.001)

# loss functions
criterion_ce = nn.BCEWithLogitsLoss()  # Cross-entropy loss for ground truth
criterion_kl = nn.KLDivLoss(reduction='batchmean')  # KL divergence loss for distillation


num_epochs = 10

for epoch in range(num_epochs):
    student_model.train()
    teacher_model.eval()

    running_loss = 0.0

    for images, masks in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{num_epochs}"):
        images = images.to(device)
        masks = masks.unsqueeze(1).to(device)  # Add channel dimension for masks

        # Forward pass through teacher model
        with torch.no_grad():
            teacher_outputs = teacher_model(images)

        # Forward pass through student model
        student_outputs = student_model(images)

        # Distillation loss (KL divergence between teacher and student outputs)
        # Apply softmax and log_softmax to the logits after dividing by temperature
        teacher_outputs_soft = nn.functional.softmax(teacher_outputs / temperature, dim=1)
        student_outputs_soft = nn.functional.log_softmax(student_outputs / temperature, dim=1)

        distillation_loss = criterion_kl(student_outputs_soft, teacher_outputs_soft) * (temperature ** 2)

        # Ground truth loss (cross-entropy)
        ground_truth_loss = criterion_ce(student_outputs, masks)

        # Combined loss
        loss = alpha * ground_truth_loss + (1 - alpha) * distillation_loss

        # Backpropagation and optimization
        optimizer_student.zero_grad()
        loss.backward()
        optimizer_student.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(train_loader)}")

print("Knowledge distillation training complete!")


Training Epoch 1/10: 100%|██████████| 100/100 [00:08<00:00, 12.29it/s]


Epoch [1/10], Loss: 0.24849708795547484


Training Epoch 2/10: 100%|██████████| 100/100 [00:08<00:00, 12.38it/s]


Epoch [2/10], Loss: 0.24761902615427972


Training Epoch 3/10: 100%|██████████| 100/100 [00:08<00:00, 12.42it/s]


Epoch [3/10], Loss: 0.24735896408557892


Training Epoch 4/10: 100%|██████████| 100/100 [00:07<00:00, 12.54it/s]


Epoch [4/10], Loss: 0.2479127061367035


Training Epoch 5/10: 100%|██████████| 100/100 [00:08<00:00, 12.30it/s]


Epoch [5/10], Loss: 0.24707938805222512


Training Epoch 6/10: 100%|██████████| 100/100 [00:08<00:00, 12.11it/s]


Epoch [6/10], Loss: 0.24747191920876502


Training Epoch 7/10: 100%|██████████| 100/100 [00:07<00:00, 12.67it/s]


Epoch [7/10], Loss: 0.24667733833193778


Training Epoch 8/10: 100%|██████████| 100/100 [00:07<00:00, 12.60it/s]


Epoch [8/10], Loss: 0.24707041323184967


Training Epoch 9/10: 100%|██████████| 100/100 [00:08<00:00, 12.47it/s]


Epoch [9/10], Loss: 0.24652438178658487


Training Epoch 10/10: 100%|██████████| 100/100 [00:07<00:00, 12.56it/s]


Epoch [10/10], Loss: 0.24702798217535019
Knowledge distillation training complete!


In [None]:
# Calculate mAP for FGSM poisoned validation data
# FGSM attack to generate adversarial examples
def fgsm_attack(model, images, labels, epsilon, save_path=None):
    model.eval()  # Set model to evaluation mode

    # Move data to the same device
    images = images.to(device)
    labels = labels.to(device).unsqueeze(1)  # Add channel dimension to labels to match output shape [batch_size, 1, H, W]

    # Remove extra dimension from labels if present
    if labels.dim() == 5 and labels.shape[2] == 1:
        labels = labels.squeeze(2)

    images.requires_grad = True

    # Forward pass to get model predictions
    outputs = model(images)


    print(f"Output shape: {outputs.shape}, Label shape (after adjustment): {labels.shape}")

    if labels.shape != outputs.shape:
        raise ValueError(f"Shape mismatch: Output shape {outputs.shape} and label shape {labels.shape} must be the same.")

    loss = criterion(outputs, labels)

    # Zero all existing gradients
    model.zero_grad()

    # Backward pass to compute gradients
    loss.backward()

    # Get the gradient of the images
    data_grad = images.grad.data.sign()

    # Create perturbed image by adjusting the input image with epsilon in the direction of the gradient
    perturbed_images = images + epsilon * data_grad

    # Clip the perturbed images to maintain the range [0, 1]
    perturbed_images = torch.clamp(perturbed_images, 0, 1)

    # Detach the perturbed images from the computation graph
    perturbed_images = perturbed_images.detach()


    if save_path is not None:
        save_image(perturbed_images[0], save_path)
        print(f"Adversarial image saved at: {save_path}")

    return perturbed_images

epsilon = 8 / 255
mAP_score_adv = calculate_mAP_with_adversarial(student_model, val_loader, device, epsilon)
print(f"Mean Average Precision (mAP) score with FGSM adversarial data: {mAP_score_adv}")


Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape (after adjustment): torch.Size([8, 1, 256, 256])
Output shape: torch.Size([8, 1, 256, 256]), Label shape

In [None]:
# C&W attack to generate adversarial examples for one-class segmentation task
def cw_attack(model, images, labels, c=1e-4, kappa=0, num_iter=100, learning_rate=0.01):
    """
    Performs the Carlini & Wagner (C&W) attack on the input images for one-class segmentation.

    Parameters:
        model (torch.nn.Module): The model to attack.
        images (torch.Tensor): The input images.
        labels (torch.Tensor): The true labels for the input images (one-class segmentation masks).
        c (float): Regularization parameter controlling the importance of misclassification.
        kappa (float): Confidence level for making the adversarial example misclassified.
        num_iter (int): Number of iterations to optimize the adversarial perturbation.
        learning_rate (float): Learning rate for the optimizer.

    Returns:
        torch.Tensor: The perturbed images.
    """
    model.eval()  # Set model to evaluation mode

    # Ensure images and labels have correct data types
    images = images.to(device).float()
    labels = labels.to(device).float()  # Ensure labels have correct type and device

    # Initialize the perturbation (w) with zeros and set requires_grad=True
    w = torch.zeros_like(images, requires_grad=True, device=images.device)

    # Optimizer for the perturbation
    optimizer = optim.Adam([w], lr=learning_rate)

    for _ in tqdm(range(num_iter), desc="Generating Adversarial Examples (C&W)", leave=True):
        optimizer.zero_grad()

        # Calculate the adversarial examples using tanh to ensure values are in [0, 1]
        perturbed_images = 0.5 * (torch.tanh(w) + 1)


        outputs = model(perturbed_images)  # Outputs should be of shape [batch_size, 1, height, width]


        outputs = torch.sigmoid(outputs)


        if labels.dim() == 5:
            labels = labels.squeeze(2)  # Remove unnecessary dimension if present

        # Misclassification loss for one-class segmentation
        # Invert the labels to create the misclassification objective
        inverted_labels = 1 - labels
        f_loss = F.binary_cross_entropy(outputs, inverted_labels)

        # Calculate L2 distance per image in batch
        l2_dist = torch.sum((perturbed_images - images) ** 2, dim=(1, 2, 3))

        # Combine the losses element-wise and sum across batch to get the final loss
        total_loss = torch.sum(c * f_loss + l2_dist)


        total_loss.backward()
        optimizer.step()

    # Detach the perturbed images from the computation graph
    perturbed_images = perturbed_images.detach()

    return perturbed_images

# Calculate mAP for C&W poisoned validation data using the distilled model
mAP_score_adv_cw = calculate_mAP_with_adversarial_cw(student_model, val_loader, device, c=c, kappa=kappa, num_iter=num_iter, learning_rate=learning_rate)
print(f"Mean Average Precision (mAP) score with C&W adversarial data for distilled model: {mAP_score_adv_cw}")

print("C&W mAP testing for distilled model complete!")


Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 219.02it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.20it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.34it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.41it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.78it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.45it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.35it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.59it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.61it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.31it/s]
Generating Adversarial Examples (C&W): 100%|██████████| 100/100 [00:00<00:00, 231.30it/s]
Generating

Mean Average Precision (mAP) score with C&W adversarial data for distilled model: 0.224517900948618
C&W mAP testing for distilled model complete!
