In [None]:
# # Note: This section is intended for use in Google Colab only.

# import os
# import shutil

# # Define the repository URLs and set the home directories based on Colab's file structure
# repos = [
#     {"url": "https://github.com/noye09/cs612_SMU_g4.git", "target_dir": "/content/cs612_SMU_g4"},
#     {"url": "https://github.com/verazuo/badnets-pytorch.git", "target_dir": "/content/cs612_SMU_g4/backdoor_cs612/badnets_pytorch"}
# ]

# # Clone the repositories if they don't exist
# for repo in repos:
#     if not os.path.isdir(repo["target_dir"]):
#         print(f"Cloning repository from {repo['url']}...")
#         !git clone {repo['url']}

#         # Rename folder if needed (e.g., replacing hyphens with underscores)
#         if repo["url"].endswith("badnets-pytorch.git"):
#             shutil.move("badnets-pytorch", repo["target_dir"])
#     else:
#         print(f"Repository already cloned at {repo['target_dir']}!")

# # Set the working directory to the desired location
# HOME_DIR = "/content/cs612_SMU_g4/backdoor_cs612/"
# if os.path.isdir(HOME_DIR):
#     os.chdir(HOME_DIR)
#     print(f"Changed working directory to: {os.getcwd()}")
# else:
#     print(f"Directory '{HOME_DIR}' does not exist.")


In [7]:
# TriggerOptimizer
# 'initialize_model_and_data' initializes and loads the backdoor model
# 'detect_backdoor' runs the backdoor detection process

import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import datasets, transforms
import numpy as np
import pandas as pd
import os
import warnings

#import matplotlib.pyplot as plt  # For debugging and visualization

from model_class.model_mnist import MNISTNet  # Import MNIST model class from model_mnist.py
from model_class.model_cifar10 import CIFAR10Net  # Import CIFAR-10 model class from model_cifar10.py
from model_class.badnet import BadNet

warnings.filterwarnings("ignore")  # Suppress warnings


class TriggerOptimizer:
    def __init__(self, model, target_class, dataset, input_shape, device='cpu', lr=0.0005):
        # Initialize model, parameters, and optimizer settings
        self.model = model
        self.target_class = target_class
        self.dataset = dataset
        self.input_shape = input_shape
        self.device = device
        self.lr = lr

        # Initialize mask and pattern
        self.mask = torch.rand(input_shape, requires_grad=True, device=device)
        self.pattern = torch.rand(input_shape, requires_grad=True, device=device)
        self.optimizer = optim.Adam([self.mask, self.pattern], lr=lr)
        self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=1500, gamma=0.5)  # Adjusted schedule

    def optimize(self, max_iters=5000, patience=300, batch_size=10):
        """
        Generate a minimal trigger mask and pattern for the specified target class.
        """
        # Shuffle dataset to ensure diversity
        dataset_indices = torch.randperm(len(self.dataset))

        # Find multiple images belonging to the target class
        samples = []
        for idx in dataset_indices:
            img, lbl = self.dataset[idx]
            if lbl == self.target_class:
                samples.append(img)
            if len(samples) >= batch_size:
                break

        if len(samples) == 0:
            raise ValueError(f"No samples found for target class {self.target_class}")

        sample_inputs = torch.stack(samples).to(self.device)

        best_loss = float('inf')
        counter = 0

        for i in range(max_iters):
            self.optimizer.zero_grad()

            # Apply mask and pattern on each sample input in the batch
            masked_inputs = self.pattern * self.mask + sample_inputs * (1 - self.mask)

            # Forward pass
            outputs = self.model(masked_inputs)  # Get predictions for all samples

            # Calculate the loss (encourage model to predict the target class)
            target_labels = torch.full((batch_size,), self.target_class, dtype=torch.long, device=self.device)
            target_loss = nn.CrossEntropyLoss()(outputs, target_labels)

            # Regularization to minimize the mask area (encourage sparse mask)
            mask_penalty = self.mask.abs().mean()
            total_loss = target_loss + 0.001 * mask_penalty

            # Backpropagation
            total_loss.backward()
            self.optimizer.step()
            self.scheduler.step()

            # Clip mask to keep values between 0 and 1
            self.mask.data = torch.clamp(self.mask.data, 0, 1)

            # Early stopping condition
            if total_loss.item() < best_loss:
                best_loss = total_loss.item()
                counter = 0
            else:
                counter += 1
                if counter >= patience:
                    print(f"Early stopping at iteration {i} due to no improvement")
                    break

            # Print progress every 500 iterations
            if i % 500 == 0:
                print(f"Iteration {i}/{max_iters}, Loss: {total_loss.item()}")

        return self.mask.detach(), self.pattern.detach()


def initialize_model_and_data(model_selection, model_path, device='cpu'):
    """
    Initializes the model and loads data based on the model selection.
    """
    DATASET_OPTIONS = {
    'mnist': {
        'model_class': MNISTNet,
        'dataset_class': datasets.MNIST,
        'model_kwargs': {}
    },
    'cifar10': {
        'model_class': CIFAR10Net,
        'dataset_class': datasets.CIFAR10,
        'model_kwargs': {}
    },
    'badnets_pytorch_mnist': {
        'model_class': BadNet,
        'dataset_class': datasets.MNIST,
        'model_kwargs': {'input_channels': 1, 'output_num': 10}
    },
    'badnets_pytorch_cifar10': {
        'model_class': BadNet,
        'dataset_class': datasets.CIFAR10,
        'model_kwargs': {'input_channels': 3, 'output_num': 10}
    }
}

    # Initialize the model and load data based on the model selection
    if model_selection not in DATASET_OPTIONS:
        raise ValueError(f"Invalid model selection '{model_selection}'. Available options: {list(DATASET_OPTIONS.keys())}")

    model_class = DATASET_OPTIONS[model_selection]['model_class']
    dataset_class = DATASET_OPTIONS[model_selection]['dataset_class']
    model_kwargs = DATASET_OPTIONS[model_selection]['model_kwargs']

    # Initialize model with possible arguments
    model = model_class(**model_kwargs)
    test_dataset = dataset_class(root='./data', train=False, download=True, transform=transforms.ToTensor())

    # Load the model weights
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file not found at path: {model_path}")

    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)  # Send model to specified device
    model.eval()  # Set model to evaluation mode

    return model, test_dataset



def detect_backdoor(model, dataset, num_classes, device='cpu'):
    """
    Detects potential backdoors in the model using Neural Cleanse.
    """
    model.to(device)
    input_shape = dataset[0][0].shape  # Get the shape of a single input image

    results = {
        "backdoor_detected": False,
        "anomaly_scores": {},
        "triggers": {}
    }

    mask_sizes = []

    for target_class in range(num_classes):
        try:
            # Generate a trigger for each class
            optimizer = TriggerOptimizer(model, target_class, dataset, input_shape, device=device)
            mask, pattern = optimizer.optimize(max_iters=3000)

            # Calculate the size of the mask (used to detect anomalies)
            mask_size = mask.abs().mean().item()
            mask_sizes.append(mask_size)

            # Store the mask and pattern for potential backdoor classes
            results["triggers"][target_class] = {
                "mask": mask.cpu().numpy(),
                "pattern": pattern.cpu().numpy(),
                "mask_size": mask_size
            }

        except Exception as e:
            # Handle cases where mask generation fails
            print(f"Error processing target class {target_class}: {e}")
            mask_sizes.append(np.nan)
            results["triggers"][target_class] = {
                "mask": None,
                "pattern": None,
                "mask_size": np.nan
            }

    # Calculate anomaly scores for each class
    valid_mask_sizes = [x for x in mask_sizes if not np.isnan(x)]
    mean_size = np.mean(valid_mask_sizes)
    std_size = np.std(valid_mask_sizes)

    for target_class, mask_size in enumerate(mask_sizes):
        if not np.isnan(mask_size):
            anomaly_score = (mask_size - mean_size) / (std_size + 1e-6)  # Avoid division by zero
            results["anomaly_scores"][target_class] = anomaly_score

            # If the anomaly score is below a certain threshold, flag it as a backdoor
            if anomaly_score < -2:  # Typically, a threshold like -2 or -3 standard deviations
                results["backdoor_detected"] = True
                results["target_class"] = target_class
        else:
            results["anomaly_scores"][target_class] = float('inf')
    #visualize_generated_masks(results)
    return results

# def visualize_generated_masks(results):
#     """
#     Visualizes the generated masks and patterns for each target class.
#     """
#     for target_class, trigger_info in results["triggers"].items():
#         if trigger_info["mask"] is None:
#             continue

#         mask = trigger_info["mask"]
#         pattern = trigger_info["pattern"]

#         # Plot the mask and pattern
#         plt.figure(figsize=(10, 5))

#         # Plot the mask
#         plt.subplot(1, 2, 1)
#         if len(mask.shape) == 3:  # If the mask has 3 dimensions
#             mask = mask[0]  # Take the first channel (assuming it's the same across channels)
#         plt.imshow(mask, cmap='gray')
#         plt.title(f"Mask for Target Class {target_class}")
#         plt.axis('off')

#         # Plot the pattern
#         plt.subplot(1, 2, 2)
#         if pattern.shape[0] == 3:  # RGB Image, shape is (C, H, W), need to convert to (H, W, C)
#             pattern = np.transpose(pattern, (1, 2, 0))  # Transpose from (C, H, W) to (H, W, C)
#             plt.imshow(pattern)
#         else:  # Grayscale
#             plt.imshow(pattern.squeeze(), cmap='gray')
#         plt.title(f"Pattern for Target Class {target_class}")
#         plt.axis('off')

#         plt.show()

In [9]:
# Execute the main code block for model backdoor detection
# 'number_of_runs' is used to run the detection process multiple times to ensure consistency
# Outputs 'anomaly_scores' and 'detection_class' for X runs, saved as CSV files for further analysis

if __name__ == "__main__":

    # Define the available options for model_selection and model_path
    MODEL_SELECTION_OPTIONS = {
    1: {'model_selection': 'mnist', 'model_path': "./model1/mnist_bd.pt"},      # unknown backdoor model
    2: {'model_selection': 'cifar10', 'model_path': "./model2/cifar10_bd.pt"},  # unknown backdoor model
    3: {'model_selection': 'cifar10', 'model_path': "./model3/cifar10_bd.pt"},  # unknown backdoor model
    4: {'model_selection': 'cifar10', 'model_path': "./model4/cifar10_bd.pt"},  # unknown backdoor model
    5: {'model_selection': 'cifar10', 'model_path': "./model5/cifar10_bd.pt"},  # unknown backdoor model
    6: {'model_selection': 'mnist', 'model_path': "./model6/mnist_bd_30_c5.pt"},      # backdoor model target class label 5 week4 exercise (30 poison test data)
    7: {'model_selection': 'mnist', 'model_path': "./model7/mnist_bd_50_c5.pt"},      # backdoor model target class label 5 week4 exercise (50 poison test data)
    8: {'model_selection': 'badnets_pytorch_mnist', 'model_path': "./model8/badnet_MNIST.pth"},  # badnets-pytorch backdoor model target class label 0 
    9: {'model_selection': 'badnets_pytorch_cifar10', 'model_path': "./model9/badnet_CIFAR10.pth"}  # badnets-pytorch backdoor model target class label 1
    }

    selected_model_options = 9

    if selected_model_options in MODEL_SELECTION_OPTIONS:
        model_selection = MODEL_SELECTION_OPTIONS[selected_model_options]['model_selection']
        model_path = MODEL_SELECTION_OPTIONS[selected_model_options]['model_path']
        print(f"Selected: Model Selection = '{model_selection}', Model Path = '{model_path}'")

    # Set the device to GPU if available, otherwise use CPU
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f'Active device: {device}')

    # Define the number of runs for detection due to randomness
    number_of_runs = 5  # Run detection process multiple times for consistency

    # DataFrame to store results of multiple runs
    results_list = []

    # Run the backdoor detection multiple times and collect results
    for run in range(number_of_runs):
        print(f"Run {run + 1}/{number_of_runs}")

        # Initialize the model and dataset
        model, dataset = initialize_model_and_data(model_selection, model_path, device=device)
        num_classes = 10  # Number of classes (update if different)

        # Detect backdoor in the model
        results = detect_backdoor(model, dataset, num_classes, device=device)
        anomaly_scores = results["anomaly_scores"]
        detection_class = results.get("target_class", None) if results["backdoor_detected"] else None

        # Save results to list
        results_list.append({
            'run': run + 1,
            'class_0': anomaly_scores.get(0, None),
            'class_1': anomaly_scores.get(1, None),
            'class_2': anomaly_scores.get(2, None),
            'class_3': anomaly_scores.get(3, None),
            'class_4': anomaly_scores.get(4, None),
            'class_5': anomaly_scores.get(5, None),
            'class_6': anomaly_scores.get(6, None),
            'class_7': anomaly_scores.get(7, None),
            'class_8': anomaly_scores.get(8, None),
            'class_9': anomaly_scores.get(9, None),
            'target_class': detection_class
        })

    # Convert results list to DataFrame
    results_df = pd.DataFrame(results_list)

    # Display the first 10 rows of the results
    results_df.head(10)

    # Extract filename components to construct CSV filename
    parent_folder = os.path.basename(os.path.dirname(model_path))
    csv_filename = os.path.join('./output', f"{parent_folder}_output.csv")
    
    # Save DataFrame to CSV file
    results_df.to_csv(csv_filename, index=False)
    print(f"Results saved to {csv_filename}")


Selected: Model Selection = 'badnets_pytorch_cifar10', Model Path = './model9/badnet_CIFAR10.pth'
Active device: cpu
Run 1/5
Files already downloaded and verified


UnpicklingError: invalid load key, '\x0a'.