In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split, Subset
import time
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split

import pickle
from tqdm import tqdm
import copy

In [2]:
import pyro
import pyro.distributions as dist
from pyro.nn import PyroModule, PyroSample
from pyro.infer.autoguide import AutoDiagonalNormal

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import pandas as pd

In [4]:
import numpy as np
from sklearn.metrics import confusion_matrix

In [5]:
from utils.model import BayesianCNNSingleFC

In [6]:
from bitflip import bitflip_float32

In [7]:
def load_data(batch_size=54):
    transform = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.3444, 0.3809, 0.4082], std=[0.1809, 0.1331, 0.1137])
    ])

    dataset = datasets.EuroSAT(root='./data', transform=transform, download=False)

    torch.manual_seed(42)
    
    with open('datasplit/split_indices.pkl', 'rb') as f:
        split = pickle.load(f)
        train_dataset = Subset(dataset, split['train'])
        test_dataset = Subset(dataset, split['test'])

    # Add num_workers and pin_memory for faster data loading
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, 
                             num_workers=4, pin_memory=True, persistent_workers=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size,
                            num_workers=4, pin_memory=True, persistent_workers=True)
    return train_loader, test_loader

In [8]:
train_loader, test_loader = load_data(batch_size=54)

In [9]:
device = torch.device("cuda")

In [10]:
num_classes = 10

In [11]:
bayesian_model =BayesianCNNSingleFC(num_classes=num_classes, device=device).to(device)

In [12]:
model_path = 'results_eurosat/bayesian_cnn_model_std10_100_epoch.pth'
guide_path = 'results_eurosat/bayesian_cnn_guide_std10_100_epoch_guide.pth'
pyro_param_store_path = 'results_eurosat/pyro_param_store_std10_100_epoch.pkl'

#guide = AutoDiagonalNormal(bayesian_model).to(device)

#pyro.get_param_store().set_state(torch.load(pyro_param_store_path,weights_only=False))

#original_param_store = {}

#for name, value in pyro.get_param_store().items():
#    print(f"{name}: {value.shape}")
#    original_param_store[name] = torch.tensor(value.data, requires_grad=value.requires_grad)

In [None]:
class Injector:
    def __init__(self, trained_model, device, test_loader, num_samples):
        """
        Initializes SEU injector
        """

        #initialize device
        if device is None:
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        else:
            self.device = device

        self.trained_model = trained_model.to(self.device)
        self.test_loader = test_loader
        self.trained_model.eval()
        self.num_samples = num_samples
        
        self.guide = AutoDiagonalNormal(self.trained_model).to(self.device)
        pyro.get_param_store().clear()
        pyro.get_param_store().set_state(torch.load(pyro_param_store_path,weights_only=False))

        #initial_Accuracy = self.return_accuracy(num_samples)
        initial_labels, initial_predictions, initial_logits, initial_probs = self.predict_data_probs(self.num_samples)
        self.initial_accuracy = self.return_accuracy(initial_labels, initial_predictions)
        
        #print accuracy before SEU
        print(f"Initial accuracy: {self.initial_accuracy}")
        
    def predict_data_probs(self, num_samples=10):
        all_labels = []
        all_predictions = []
        all_logits = []
        all_probs = []

        with torch.no_grad():
            for images, labels in tqdm(self.test_loader, desc="Evaluating"):
                images, labels = images.to(self.device), labels.to(self.device)

                logits_mc = torch.zeros(num_samples, images.size(0), self.trained_model.fc1.out_features).to(device)

                for i in range(num_samples):
                    guide_trace = pyro.poutine.trace(self.guide).get_trace(images)
                    replayed_model = pyro.poutine.replay(self.trained_model, trace=guide_trace)
                    logits = replayed_model(images)
                    logits_mc[i] = logits

                avg_logits = logits_mc.mean(dim=0)
                predictions = torch.argmax(avg_logits, dim=1)

                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predictions.cpu().numpy())
                all_logits.extend(avg_logits.cpu().numpy())
                all_probs.extend(F.softmax(avg_logits, dim=1).cpu().numpy())

        return all_labels, all_predictions, all_logits, all_probs

    def return_accuracy(self, all_labels, all_predictions):
        #all_labels, all_predictions, all_logits, all_probs = self.predict_data_probs(num_samples)
        cm = confusion_matrix(all_labels, all_predictions)
        accuracy = np.trace(cm) / np.sum(cm)

        return accuracy
    
    def run_seu_autodiagonal_normal(self, location_index, bit_i, parameter_name="loc", num_samples=10):
        """
        Run SEU on the AutoDiagonalNormal guide
        """

        assert parameter_name in ["loc", "scale"], "Parameter name must be 'loc' or 'scale'."
        assert bit_i in range(0, 33), "Location index must be between 0 and 9."

        if parameter_name == "loc":
            param_store_name = "AutoDiagonalNormal.loc"
        elif parameter_name == "scale":
            param_store_name = "AutoDiagonalNormal.scale"

        pyro.get_param_store().set_state(torch.load(pyro_param_store_path,weights_only=False))

        with torch.no_grad():
            param_dict = {}

            for name, value in pyro.get_param_store().items():
                #print(f"{name}: {value.shape}")
                #print(value)
                param_dict[name] = value.cpu().detach().numpy()

            tensor_cpu = param_dict[param_store_name]

            #original_val = tensor_cpu[0] #this zero index should be changed to the location_index
            original_val = tensor_cpu[location_index]
            seu_val = bitflip_float32(original_val, bit_i)


            print(f"Original value: {original_val}, SEU value: {seu_val}")

            # Get the parameter
            param = pyro.get_param_store().get_param(param_store_name)

            # Modify it safely by creating a new tensor
            new_param = param.clone()
            new_param[location_index] = seu_val  # New Value

            # Update the parameter store
            if parameter_name == "loc":
                pyro.get_param_store().__setitem__(param_store_name, new_param) # 74%
                #param_store[param_store_name].data.copy_(change_item(param_store_name, location_index, seu_val)) #25%
                #pyro.get_param_store()[param_store_name].data[location_index] = seu_val # 25%
            elif parameter_name == "scale":
                pyro.get_param_store().__setitem__(param_store_name, new_param) #10%
                #pyro.get_param_store()[param_store_name].data[location_index] = seu_val

        #print accuracy after SEU
        self.guide = AutoDiagonalNormal(bayesian_model).to(device)
    

        try:
            after_labels, after_predictions, after_logits, after_probs = self.predict_data_probs(num_samples)
            accuracyAfter_SEU = self.return_accuracy(after_labels, after_predictions)
        except:
            accuracyAfter_SEU = np.nan
            
        print(f"Accuracy after SEU: {accuracyAfter_SEU}")
        print("===================================")

        return accuracyAfter_SEU - self.initial_accuracy
    
    def run_seu_autodiagonal_normal_multi(self, location_indices, bit_i, parameter_name="loc", attack_ratio=1.0, num_samples=10, seed=None):
        """
        Run SEU on the AutoDiagonalNormal guide with multiple location attacks
        
        Args:
            location_indices: list of indices or single index to attack
            bit_i: bit position to flip (0-32)
            parameter_name: "loc" or "scale"
            attack_ratio: ratio of locations to attack (0.0-1.0)
            num_samples: number of MC samples for evaluation
            seed: random seed for reproducible attacks
        """
        
        assert parameter_name in ["loc", "scale"], "Parameter name must be 'loc' or 'scale'."
        assert bit_i in range(0, 33), "Bit index must be between 0 and 32."
        assert 0.0 <= attack_ratio <= 1.0, "Attack ratio must be between 0.0 and 1.0."
        
        # Convert single index to list for uniform handling
        if isinstance(location_indices, int):
            location_indices = [location_indices]
        
        # Set random seed if provided
        if seed is not None:
            np.random.seed(seed)
            torch.manual_seed(seed)
        
        # Calculate number of locations to attack
        num_attacks = max(1, int(len(location_indices) * attack_ratio))
        
        # Randomly select locations to attack
        attack_locations = np.random.choice(location_indices, size=num_attacks, replace=False)
        
        if parameter_name == "loc":
            param_store_name = "AutoDiagonalNormal.loc"
        elif parameter_name == "scale":
            param_store_name = "AutoDiagonalNormal.scale"

        # Reset parameter store to original state
        pyro.get_param_store().set_state(torch.load(pyro_param_store_path, weights_only=False))

        with torch.no_grad():
            # Get the parameter
            param = pyro.get_param_store().get_param(param_store_name)
            
            # Create a new tensor to modify
            new_param = param.clone()
            
            print(f"Attacking {num_attacks} out of {len(location_indices)} locations:")
            
            # Attack each selected location
            for location_index in attack_locations:
                original_val = new_param[location_index].cpu().item()
                seu_val = bitflip_float32(original_val, bit_i)
                new_param[location_index] = seu_val
                
                print(f"  Location {location_index}: {original_val} -> {seu_val}")
            
            # Update the parameter store
            pyro.get_param_store().__setitem__(param_store_name, new_param)

        # Reinitialize guide with modified parameters
        self.guide = AutoDiagonalNormal(bayesian_model).to(device)

        try:
            after_labels, after_predictions, after_logits, after_probs = self.predict_data_probs(num_samples)
            accuracyAfter_SEU = self.return_accuracy(after_labels, after_predictions)
        except:
            accuracyAfter_SEU = np.nan
            
        print(f"Accuracy after SEU: {accuracyAfter_SEU}")
        print("===================================")

        return accuracyAfter_SEU - self.initial_accuracy

In [None]:
inj = Injector(trained_model=bayesian_model, device=device, test_loader=test_loader, num_samples=10)

In [None]:
#inj.run_seu_autodiagonal_normal(location_index=0, bit_i=1, parameter_name="scale", num_samples=10)
inj.run_seu_autodiagonal_normal(location_index=0, bit_i=2, parameter_name="loc", num_samples=10)

In [None]:
#store the result in dataframe
import pandas as pd
results = []

for i in range(0, 32):
    results.append(inj.run_seu_autodiagonal_normal(location_index=0, bit_i=i, parameter_name="scale", num_samples=10))

In [None]:
# plot the result in matplotlib
plt.figure(figsize=(10, 5))
plt.plot(range(0, 32), results, marker='o')
plt.title('SEU Impact on Scale Parameter')
plt.xlabel('Bit Position')
plt.ylabel('Change in Accuracy')
plt.grid()
plt.show()

In [None]:
results_loc = []

for i in range(0, 32):
    results_loc.append(inj.run_seu_autodiagonal_normal(location_index=0, bit_i=i, parameter_name="loc", num_samples=10))

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(range(0, 32), results_loc, marker='o')
plt.title('SEU Impact on Loc Parameter')
plt.xlabel('Bit Position')
plt.ylabel('Change in Accuracy')
plt.grid()
plt.show()

In [None]:
#(inj.run_seu_autodiagonal_normal(location_index=0, bit_i=0, parameter_name="loc", num_samples=10))

In [None]:
results2_loc = []

for i in range(0, 32):
    results2_loc.append(inj.run_seu_autodiagonal_normal(location_index=217536, bit_i=i, parameter_name="loc", num_samples=10))

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(range(0, 32), results2_loc, marker='o')
plt.title('SEU Impact on Loc Parameter')
plt.xlabel('Bit Position')
plt.ylabel('Change in Accuracy')
plt.grid()
plt.show()

In [None]:
bayesian_model

In [None]:
# print the parameters of the bayesian model
for name, value in bayesian_model.named_parameters():
    print(f"{name}: {value.shape}")

In [None]:
# print the parameters of pyro param store
for name, value in pyro.get_param_store().items():
    print(f"{name}: {value.shape}")

In [None]:
results3_loc = []

for i in range(0, 32):
    results3_loc.append(inj.run_seu_autodiagonal_normal(location_index=53696, bit_i=i, parameter_name="loc", num_samples=10))

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(range(0, 32), results3_loc, marker='o')
plt.title('SEU Impact on Loc Parameter')
plt.xlabel('Bit Position')
plt.ylabel('Change in Accuracy')
plt.grid()
plt.show()

In [None]:
import numpy as np

def get_flat_param_index(layer_name, param_type, index_tuple):
    # Define parameter shapes for each layer
    param_shapes = {
        'conv1': {
            'weight': (32, 3, 5, 5),
            'bias': (32,)
        },
        'conv2': {
            'weight': (64, 32, 5, 5),
            'bias': (64,)
        },
        'fc1': {
            'weight': (10, 16384),
            'bias': (10,)
        }
    }

    # Check valid inputs
    if layer_name not in param_shapes:
        raise ValueError(f"Unknown layer '{layer_name}'")
    if param_type not in param_shapes[layer_name]:
        raise ValueError(f"'{param_type}' not found in layer '{layer_name}'")

    # Compute flat offset for each param in order: conv1.weight, conv1.bias, ...
    flat_offset = 0
    for l in ['conv1', 'conv2', 'fc1']:
        for p in ['weight', 'bias']:
            shape = param_shapes[l][p]
            numel = np.prod(shape)

            if l == layer_name and p == param_type:
                # Compute local flat index within this param
                local_flat_index = np.ravel_multi_index(index_tuple, shape)
                return flat_offset + local_flat_index

            flat_offset += numel

    raise RuntimeError("Should not reach here if input is valid.")


In [None]:
# Index of conv1.bias[5]
print(get_flat_param_index("conv1", "bias", (0,)))  # Output: 2405

# Index of conv2.weight[0,0,0,0]
print(get_flat_param_index("conv2", "weight", (0,0,0,0)))  # Output: 2432

# Index of fc1.weight[0, 0]
print(get_flat_param_index("fc1", "weight", (0, 0)))  # Output: 53696

In [None]:
results4_loc = []

for i in range(0, 32):
    results4_loc.append(inj.run_seu_autodiagonal_normal(location_index=2400, bit_i=i, parameter_name="loc", num_samples=10))

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(range(0, 32), results4_loc, marker='o')
plt.title('SEU Impact on Loc Parameter')
plt.xlabel('Bit Position')
plt.ylabel('Change in Accuracy')
plt.grid()
plt.show()

In [None]:
# plot results2_loc, results3_loc, results4_loc in a single plot
plt.figure(figsize=(10, 5))
plt.plot(range(0, 32), results_loc, marker='o', label='conv1 1st weight')
plt.plot(range(0, 32), results4_loc, marker='o', label='conv1 1st bias')
plt.plot(range(0, 32), results3_loc, marker='o', label='fc1 1st weight')
plt.plot(range(0, 32), results2_loc, marker='o', label='fc1 1st bias')


plt.title('SEU Impact on Loc Parameter')
plt.xlabel('Bit Position')
plt.ylabel('Change in Accuracy')
plt.grid()
plt.legend()
plt.show()

In [None]:
#results
#results2_scale
#...
#...

In [None]:
results_scale = []

for i in range(0, 32):
    results_scale.append(inj.run_seu_autodiagonal_normal(location_index=0, bit_i=i, parameter_name="scale", num_samples=10))

In [None]:
#store the result in dataframe
import pandas as pd
results2_scale = []

for i in range(0, 32):
    results2_scale.append(inj.run_seu_autodiagonal_normal(location_index=217536, bit_i=i, parameter_name="scale", num_samples=10))

In [None]:
#store the result in dataframe
import pandas as pd
results3_scale = []

for i in range(0, 32):
    results3_scale.append(inj.run_seu_autodiagonal_normal(location_index=53696, bit_i=i, parameter_name="scale", num_samples=10))

In [None]:
#store the result in dataframe
import pandas as pd
results4_scale = []

for i in range(0, 32):
    results4_scale.append(inj.run_seu_autodiagonal_normal(location_index=2400, bit_i=i, parameter_name="scale", num_samples=10))

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(range(0, 32), results_scale, marker='o', label='conv1 1st weight')
plt.plot(range(0, 32), results4_scale, marker='o', label='conv1 1st bias')
plt.plot(range(0, 32), results3_scale, marker='o', label='fc1 1st weight')
plt.plot(range(0, 32), results2_scale, marker='o', label='fc1 1st bias')


plt.title('SEU Impact on Scale Parameter')
plt.xlabel('Bit Position')
plt.ylabel('Change in Accuracy')
plt.grid()
plt.legend()
plt.show()