In [1]:
import torch
import torch.nn as nn
import torchvision.models as models
from torch.optim.lr_scheduler import ExponentialLR
import torch.optim as optim
import torch.nn.functional as F
from tqdm.auto import tqdm
import os
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import random
from collections import Counter
# Set device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")


Using device: cpu


  from .autonotebook import tqdm as notebook_tqdm


In [7]:
import os
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image

# Custom Dataset class
class ForchheimDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # Load all image paths and corresponding labels
        for outer_label in os.listdir(root_dir):
            outer_dir = os.path.join(root_dir, outer_label)
            if os.path.isdir(outer_dir):
                for inner_label in os.listdir(outer_dir):
                    inner_dir = os.path.join(outer_dir, inner_label)
                    if os.path.isdir(inner_dir):
                        for image_name in os.listdir(inner_dir):
                            if image_name.endswith(('.png', '.jpg', '.jpeg')):
                                self.image_paths.append(os.path.join(inner_dir, image_name))
                                # Use a combined label from outer and inner folder names
                                combined_label = f"{outer_label}_{inner_label}"
                                self.labels.append(combined_label)

        if len(self.image_paths) == 0:
            raise ValueError(f"No images found in {root_dir}. Please check the directory path and structure.")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

# Function to load dataset
def load_forchheim_dataset(data_dir, batch_size=32, num_workers=4):
    # Define transformations
    transform = transforms.Compose([
        transforms.Resize((256, 256)),  # Resize images to a consistent size
        transforms.ToTensor(),  # Convert PIL image to PyTorch tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize based on ImageNet stats
    ])

    # Create dataset and dataloader
    dataset = ForchheimDataset(root_dir=data_dir, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

    return dataloader

# Example usage
data_dir = 'D:\\Forchheim'  # Replace with the path to your dataset
batch_size = 32
dataloader = load_forchheim_dataset(data_dir, batch_size)

# Iterate through the dataloader
for images, labels in dataloader:
    print(images.size(), labels)  # Example: torch.Size([32, 3, 256, 256]) ['0_0', '0_1', ...]
    break  # Just to demonstrate, remove this in your actual training loop


In [None]:
def pgd_attack(model, images, labels, eps=0.3, alpha=2/255, iters=40):
    adv_images = images.clone().detach().requires_grad_(True)
    for _ in range(iters):
        outputs = model(adv_images)
        loss = nn.CrossEntropyLoss()(outputs, labels)
        model.zero_grad()
        loss.backward()
        grad = adv_images.grad.data
        adv_images = adv_images + alpha * grad.sign()
        adv_images = torch.clamp(adv_images, images - eps, images + eps)
        adv_images = torch.clamp(adv_images, 0, 1)
        adv_images = adv_images.detach().requires_grad_(True)
    return adv_images

In [None]:


# Define the PDN (Patch Discriminator Network)
class PDN(nn.Module):
    def __init__(self):
        super(PDN, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            
            nn.ConvTranspose2d(32, 3, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:
# Define the Feature Extractor using ResNet-18
class FeatureExtractor(nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        resnet = models.resnet18(pretrained=True)
        self.features = nn.Sequential(*list(resnet.children())[:-1])
        
    def forward(self, x):
        return self.features(x).squeeze()

In [None]:

# Initialize models, optimizer, and loss function
learning_rate = 0.001
feature_extractor = FeatureExtractor().to(device)
PDN_model = PDN().to(device)
optimizer = optim.Adam(list(feature_extractor.parameters()) + list(PDN_model.parameters()), lr=learning_rate)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.97)
loss_fn = nn.MSELoss()


feature_extraction_epochs = 20
pdn_epochs = 10

Using device: cpu




In [None]:

def train_feature_extractor_and_pdn(feature_extractor, pdn_model, train_loader, device):
    feature_extractor.to(device)
    pdn_model.to(device)
    
    # Optimizers and loss functions
    feature_optimizer = optim.Adam(feature_extractor.parameters(), lr=0.001)
    pdn_optimizer = optim.Adam(pdn_model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ExponentialLR(feature_optimizer, gamma=0.97)
    loss_fn = nn.MSELoss()

    # Training feature extractor
    for epoch in range(20):
        feature_extractor.train()
        total_loss = 0.0
        
        for images in train_loader:
            images = images[0].to(device)
            features = feature_extractor(images)
            
            # Forward pass through PDN model
            reconstructed_images = pdn_model(images)
            
            # Compute loss
            loss = loss_fn(reconstructed_images, images)
            total_loss += loss.item()
            
            # Backpropagation
            feature_optimizer.zero_grad()
            loss.backward()
            feature_optimizer.step()
        
        scheduler.step()
        print(f"Feature Extractor Epoch [{epoch + 1}/20], Loss: {total_loss / len(train_loader):.4f}")

    # Training PDN model
    for epoch in range(10):
        pdn_model.train()
        total_loss = 0.0
        
        for images in train_loader:
            images = images[0].to(device)
            
            # Generate adversarial examples
            adv_images = pgd_attack(pdn_model, images, labels=torch.zeros(images.size(0), dtype=torch.long).to(device), eps=0.3, alpha=2/255, iters=40)
            
            # Forward pass through PDN model
            reconstructed_images = pdn_model(images)
            adv_reconstructed_images = pdn_model(adv_images)
            
            # Compute loss on clean and adversarial examples
            loss = (loss_fn(reconstructed_images, images) + loss_fn(adv_reconstructed_images, adv_images)) / 2
            total_loss += loss.item()
            
            # Backpropagation
            pdn_optimizer.zero_grad()
            loss.backward()
            pdn_optimizer.step()
        
        print(f"PDN Model Epoch [{epoch + 1}/10], Loss: {total_loss / len(train_loader):.4f}")


In [None]:
train_feature_extractor_and_pdn(feature_extractor, PDN_model, patch_loader, optimizer, loss_fn, device, feature_extraction_epochs, pdn_epochs)


NameError: name 'pdn_model' is not defined

In [None]:


def major_voting(patches, model, device):
    model.eval()
    with torch.no_grad():
        patch_labels = []
        for patch in patches:
            patch = patch.unsqueeze(0).to(device)
            features = model(patch)
            # Assuming a classifier is used to predict the class from features
            label = torch.argmax(features, dim=1).item()
            patch_labels.append(label)
        
        # Aggregate results using majority voting
        most_common_label, _ = Counter(patch_labels).most_common(1)[0]
        return most_common_label

def evaluate_model(patches_per_image, model, device):
    image_level_labels = []
    for patches in patches_per_image:
        predicted_label = major_voting(patches, model, device)
        image_level_labels.append(predicted_label)
    
    # Evaluate accuracy
    # Assuming `true_labels` is a list of true labels for each image
    accuracy = np.mean([pred == true_label for pred, true_label in zip(image_level_labels, true_labels)])
    print(f"Image Level Accuracy: {accuracy:.4f}")
