In [43]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models import resnet18 # <-- IMPORTANT: Replace with your model import
from PIL import Image
import os
import glob
import copy
import torchvision

# --- 1. Custom Dataset for the Trigger Set ---
# This dataset loads images from a folder where filenames are like "XX_label.jpeg"
class TriggerSetDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the trigger images.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = glob.glob(os.path.join(root_dir, '*.jpg'))
        self.image_paths.extend(glob.glob(os.path.join(root_dir, '*.png'))) # Also find .png

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')

        # Extract label from filename like "image_0_label_7.jpeg" -> 7
        try:
            filename = os.path.basename(img_path)
            label_str = filename.split('_')[-1].split('.')[0]
            label = int(label_str)
        except (IndexError, ValueError) as e:
            raise ValueError(f"Could not parse label from filename: {img_path}. Expected format '..._label.ext'") from e

        if self.transform:
            image = self.transform(image)

        return image, label

# --- 2. Helper Function to Load Your Model ---
def load_full_model(model_path):
    """Loads a full model object that was saved with torch.save(model, path)."""
    print(f"Loading full model from {model_path}")
    
    # Loading is a single step. No need to instantiate the class first.
    # Use map_location for portability (e.g., loading a GPU-trained model on a CPU)
    with torch.serialization.safe_globals([
    torchvision.models.squeezenet.SqueezeNet,
    torch.nn.modules.container.Sequential,
]):
        model = torch.load(model_path, map_location=torch.device('cpu'),weights_only=False)
    
    print("Model loaded successfully.")
    return model
def get_squeezenet_last_layer(model):
    """Returns the last trainable layer of a SqueezeNet model."""
    # The final classification layer in SqueezeNet is a Conv2d layer
    # located at index 1 of the 'classifier' sequential module.
    return model.classifier[1]

# --- 3. A Generic Training Loop ---
def run_training(model, dataloader, criterion, optimizer, device, num_epochs=10):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        if epoch%10 == 0:
            #reduce learning rate by factor of 10 every 10 epochs
            for param_group in optimizer.param_groups:
                param_group['lr'] *= 0.1
        running_loss = 0.0
        for i, (inputs, labels) in enumerate(dataloader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        epoch_loss = running_loss / len(dataloader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")
    return model

# --- 4. The Four Fine-Tuning Functions ---
def fine_tune_last_layer(model, trigger_set_path, lr=0.1, num_epochs=10, batch_size=4):
    """ Fine-Tune Last Layer (FTLL): Freeze all layers except the last one and train. """
    print("\n--- Starting: Fine-Tune Last Layer (FTLL) ---")
    
    # Freeze all parameters in the model
    for param in model.parameters():
        param.requires_grad = False
    
    # Unfreeze the parameters of the last layer (e.g., the fully connected layer in ResNet)
    # IMPORTANT: You must change 'fc' to the name of your model's last layer.
    last_layer = get_squeezenet_last_layer(model)

    for param in last_layer.parameters():
        param.requires_grad = True
        
    # Create an optimizer that only updates the unfrozen (trainable) parameters
    optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=lr, momentum=0.9)
    
    # Setup data, criterion, and run training
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.Grayscale(num_output_channels=3),  # convert 1->3 channels
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet stats
                         std=[0.229, 0.224, 0.225])
])
    dataset = TriggerSetDataset(root_dir=trigger_set_path, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    
    trained_model = run_training(model, dataloader, criterion, optimizer, device, num_epochs)
    return trained_model


def fine_tune_all_layers(model, trigger_set_path, lr=0.1, num_epochs=10, batch_size=4):
    """ Fine-Tune All Layers (FTAL): Unfreeze all layers and train. """
    print("\n--- Starting: Fine-Tune All Layers (FTAL) ---")
    
    # Ensure all parameters are trainable (this is the default state)
    for param in model.parameters():
        param.requires_grad = True
    print("All layers are unfrozen for training.")

    # Optimizer for all parameters
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # Setup data, criterion, and run training
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.Grayscale(num_output_channels=3),  # convert 1->3 channels
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet stats
                         std=[0.229, 0.224, 0.225])
])
    dataset = TriggerSetDataset(root_dir=trigger_set_path, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    
    trained_model = run_training(model, dataloader, criterion, optimizer, device, num_epochs)
    return trained_model

# Helper function to re-initialize weights
def weight_reset(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        m.reset_parameters()

def retrain_last_layer(model, trigger_set_path, lr=0.1, num_epochs=10, batch_size=4):
    """ Retrain Last Layer (RTLL): Re-initialize last layer, then freeze all others and train it. """
    print("\n--- Starting: Retrain Last Layer (RTLL) ---")
    
    # Freeze all parameters in the model
    for param in model.parameters():
        param.requires_grad = False

    # Re-initialize the weights of the last layer
    # IMPORTANT: You must change 'fc' to the name of your model's last layer.

    last_layer = get_squeezenet_last_layer(model)
    last_layer.apply(weight_reset)
    # Unfreeze the parameters of the last layer so it can be trained
   
    for param in last_layer.parameters():
        param.requires_grad = True
        
    optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=lr, momentum=0.9)
    
    # Setup data, criterion, and run training
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.Grayscale(num_output_channels=3),  # convert 1->3 channels
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet stats
                         std=[0.229, 0.224, 0.225])
])
    dataset = TriggerSetDataset(root_dir=trigger_set_path, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    
    trained_model = run_training(model, dataloader, criterion, optimizer, device, num_epochs)
    return trained_model


def retrain_all_layers(model, trigger_set_path, lr=0.1, num_epochs=10, batch_size=4):
    """ Retrain All Layers (RTAL): Re-initialize the entire model and train from scratch. """
    print("\n--- Starting: Retrain All Layers (RTAL) ---")
    
    # Re-initialize all weights in the model
    print("Re-initializing all weights in the model.")
    model.apply(weight_reset)
    
    # Ensure all parameters are trainable
    for param in model.parameters():
        param.requires_grad = True
        
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    # Setup data, criterion, and run training
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.Grayscale(num_output_channels=3),  # convert 1->3 channels
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet stats
                         std=[0.229, 0.224, 0.225])
])
    dataset = TriggerSetDataset(root_dir=trigger_set_path, transform=transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    
    trained_model = run_training(model, dataloader, criterion, optimizer, device, num_epochs)
    return trained_model


In [44]:
# path to triggersets ./../data/trigger_sets/ 10 folders inside
BASE_TRIGGER_SET_PATH = os.path.join( 'data', 'trigger_sets')
baseline_MNIST_model_path = os.path.join('notebooks','models', 'MNIST_SN_finetuned_baseline.pth')
baseline_FMNIST_model_path = os.path.join('notebooks','models', 'FMNIST_SN_finetuned_baseline.pth')

BASELINE_MODELS = {
        'MNIST': baseline_MNIST_model_path,
        'FMNIST': baseline_FMNIST_model_path,
    }
    
FINETUNING_METHODS = {
    'FTLL': fine_tune_last_layer,
    'FTAL': fine_tune_all_layers,
    'RTLL': retrain_last_layer,
    'RTAL': retrain_all_layers,
}

trigger_folders = [f for f in os.listdir(BASE_TRIGGER_SET_PATH) if os.path.isdir(os.path.join(BASE_TRIGGER_SET_PATH, f))]

for modelname,modelpath in BASELINE_MODELS.items():
    print(f"\nProcessing model: {modelname} from {modelpath}")
    
    # Load the model
    
    
    for trigger_folder in trigger_folders[0:1]:  # Change slice to process all folders or specific ones
        trigger_set_path = os.path.join(BASE_TRIGGER_SET_PATH, trigger_folder)
        print(f"\nProcessing trigger set: {trigger_folder} at {trigger_set_path}")
        
        model = load_full_model(modelpath)
        
        for method_name, method_func in FINETUNING_METHODS.items():
            print(f"Applying fine-tuning method: {method_name}")
            try:
                trained_model = method_func(copy.deepcopy(model), trigger_set_path,num_epochs=60)
                # Save the trained model if needed
                save_path = f"notebooks/models/{modelname}_{trigger_folder}_{method_name}.pth"
                torch.save(trained_model, save_path)
                print(f"Model saved to {save_path}")
            except Exception as e:
                print(f"Error during {method_name}: {e}")


Processing model: MNIST from notebooks\models\MNIST_SN_finetuned_baseline.pth

Processing trigger set: triggerset1 at data\trigger_sets\triggerset1
Loading full model from notebooks\models\MNIST_SN_finetuned_baseline.pth
Model loaded successfully.
Applying fine-tuning method: FTLL

--- Starting: Fine-Tune Last Layer (FTLL) ---
Epoch [1/60], Loss: 51.5301
Epoch [2/60], Loss: 3.1208
Epoch [3/60], Loss: 2.2569
Epoch [4/60], Loss: 2.2405
Epoch [5/60], Loss: 2.2527
Epoch [6/60], Loss: 2.1700
Epoch [7/60], Loss: 2.2136
Epoch [8/60], Loss: 2.1905
Epoch [9/60], Loss: 2.1622
Epoch [10/60], Loss: 2.2001
Epoch [11/60], Loss: 2.2129
Epoch [12/60], Loss: 2.1606
Epoch [13/60], Loss: 2.1289
Epoch [14/60], Loss: 2.1645
Epoch [15/60], Loss: 2.1731
Epoch [16/60], Loss: 2.1432
Epoch [17/60], Loss: 2.1743
Epoch [18/60], Loss: 2.0918
Epoch [19/60], Loss: 2.1024
Epoch [20/60], Loss: 2.1760
Epoch [21/60], Loss: 2.2107
Epoch [22/60], Loss: 2.1866
Epoch [23/60], Loss: 2.1894
Epoch [24/60], Loss: 2.1462
Epoch 

In [45]:
# load all models inside the models folder one by one
# test the triggerset1 on it and print the accuracy
TRANSFORM_SQUARE = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.Grayscale(num_output_channels=3),  # convert 1->3 channels
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet stats
                         std=[0.229, 0.224, 0.225])
])


def evaluate_models_on_triggerset(trigger_set_path, model_dir='notebooks/models'):
    """Load all models from a directory and test their accuracy on a single trigger set."""
    
    # --- 1. Load the Data ONCE ---
    print(f"Loading trigger set data from: {trigger_set_path}")
    try:
        dataset = TriggerSetDataset(root_dir=trigger_set_path, transform=TRANSFORM_SQUARE)
        # FIX #1: Check if the dataset is empty right away.
        if not dataset:
            print(f"CRITICAL ERROR: No data loaded from {trigger_set_path}. Please check the path and file contents.")
            return # Exit the function
        dataloader = DataLoader(dataset, batch_size=16, shuffle=False)
    except Exception as e:
        print(f"CRITICAL ERROR: Failed to load dataset. Error: {e}")
        return

    # --- 2. Find and Loop Through Models ---
    model_files = glob.glob(os.path.join(model_dir, '*.pth'))
    if not model_files:
        print(f"No models found in '{model_dir}'.")
        return
        
    print(f"\nFound {len(model_files)} models. Starting evaluation...")
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    for model_file in model_files:
        try:
            print(f"--- Testing Model: {os.path.basename(model_file)} ---")
            model = load_full_model(model_file)
            model.to(device)
            model.eval()

            correct = 0
            total = 0

            with torch.no_grad():
                for inputs, labels in dataloader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

            # FIX #2: Add a safety check before division to prevent the crash.
            if total > 0:
                accuracy = (correct / total) * 100
                print(f"  > Accuracy: {accuracy:.2f}% ({correct}/{total} correct)")
            else:
                # This case should not be reached if the early exit works, but it's good practice.
                print("  > Accuracy: N/A (No data was processed)")
        except Exception as e:
            print(f"  > An unexpected error occurred while testing {os.path.basename(model_file)}: {e}")
            
BASE_TRIGGER_SET_PATH = os.path.join('data', 'trigger_sets')
MODEL_DIRECTORY = 'notebooks/models'
target_trigger_set_path = os.path.join(BASE_TRIGGER_SET_PATH, 'triggerset1')
    
evaluate_models_on_triggerset(target_trigger_set_path, model_dir=MODEL_DIRECTORY)

Loading trigger set data from: data\trigger_sets\triggerset1

Found 10 models. Starting evaluation...
--- Testing Model: FMNIST_SN_finetuned_baseline.pth ---
Loading full model from notebooks/models\FMNIST_SN_finetuned_baseline.pth
Model loaded successfully.
  > Accuracy: 6.00% (6/100 correct)
--- Testing Model: FMNIST_triggerset1_FTAL.pth ---
Loading full model from notebooks/models\FMNIST_triggerset1_FTAL.pth
Model loaded successfully.
  > Accuracy: 17.00% (17/100 correct)
--- Testing Model: FMNIST_triggerset1_FTLL.pth ---
Loading full model from notebooks/models\FMNIST_triggerset1_FTLL.pth
Model loaded successfully.
  > Accuracy: 35.00% (35/100 correct)
--- Testing Model: FMNIST_triggerset1_RTAL.pth ---
Loading full model from notebooks/models\FMNIST_triggerset1_RTAL.pth
Model loaded successfully.
  > Accuracy: 17.00% (17/100 correct)
--- Testing Model: FMNIST_triggerset1_RTLL.pth ---
Loading full model from notebooks/models\FMNIST_triggerset1_RTLL.pth
Model loaded successfully.
  >