In [1]:
import import_ipynb
import torch
import dataloader
import generate_mahalanobis
import regression_mahalanobis
import generate_odin
import calmetric
from models.resnet import ResNet34, ResNet18
from models.densenet import DenseNet3
import os
import random
import numpy as np
import csv

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

seed = 0
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True

## No Fault Injection

In [3]:
# id_datasets = ['cifar10', 'cifar100']
# ood_dataset = 'svhn'
# model_names = ['resnet34', 'densenet3']
# batch_size = 128
# magnitude = 0.0014
# temperature = 1000

# for id_dataset in id_datasets:
#     if id_dataset == 'cifar10':
#         num_classes = 10
#     elif id_dataset == 'cifar100':
#         num_classes = 100
        
#     for model_name in model_names:
#         if model_name == 'resnet34':
#             model = ResNet34(num_c=num_classes).to(device)
#         elif model_name == 'densenet3':
#             model = DenseNet3(100, num_classes, growth_rate=12).to(device)
            
#         model_path = f'./pretrained/{model_name}_{id_dataset}.pth'
#         model.load_state_dict(torch.load(model_path, weights_only=True))
#         model.eval()
        
#         mean, std = dataloader.get_mean_std(id_dataset)    
        
#         id_trainloader, id_testloader = dataloader.get_imageloader(id_dataset, batch_size, mean, std)    
#         _, out_testloader = dataloader.get_imageloader(ood_dataset, batch_size, mean, std)    
        
#         ##### ODIN #####
        
#         file_path = f'./softmax_scores/{model_name}_{id_dataset}'
        
#         if not os.path.exists(file_path):
#             os.makedirs(file_path)
            
#         generate_odin.odin(model, id_testloader, out_testloader, magnitude, temperature, std, file_path)
#         calmetric.metric(model_name, id_dataset, ood_dataset, file_path)
        
#         ##### Mahalanobis #####
            
#         file_path = f'./output/{model_name}_{id_dataset}'
        
#         if not os.path.exists(file_path):
#             os.makedirs(file_path)
        
#         # generate_mahalanobis.mahalanobis(model, id_trainloader, id_testloader, out_testloader, num_classes, magnitude, std, file_path)
#         # regression_mahalanobis.regression(id_dataset, ood_dataset, file_path, score=f'Mahalanobis_{str(magnitude)}')
        

## Fault Injection - ODIN

In [None]:
# import fault_injector as fi
import PytorchFS.FS as FS

id_datasets = ['cifar10']
ood_dataset = 'svhn'
model_names = ['resnet34']
batch_size = 128
magnitude = 0.0014
temperature = 1000

flip_pos = ''

first_forward_fi = True
backward_fi = True
second_forward_fi = True

single_flip = True
multi_flip = False
burst_flip = False

if single_flip:
    flip_pos += '_single'
elif multi_flip:
    flip_pos += '_multi'    
elif burst_flip:
    flip_pos += '_burst'

if first_forward_fi:
    flip_pos += '_ff'
if backward_fi:
    flip_pos += '_b'
if second_forward_fi:
    flip_pos += '_sf'    

for id_dataset in id_datasets:
    if id_dataset == 'cifar10':
        num_classes = 10
    elif id_dataset == 'cifar100':
        num_classes = 100
        
    for model_name in model_names:
        if model_name == 'resnet34':
            model = ResNet34(num_c=num_classes).to(device)
        elif model_name == 'densenet3':
            model = DenseNet3(100, num_classes, growth_rate=12).to(device)
            
        model_path = f'./pretrained/{model_name}_{id_dataset}.pth'
        model.load_state_dict(torch.load(model_path, weights_only=True))
        model.eval()
        
        fs = FS.FS()
        fs.setLayerInfo(model)
        
        mean, std = dataloader.get_mean_std(id_dataset)    
        
        id_trainloader, id_testloader = dataloader.get_imageloader(id_dataset, batch_size, mean, std)    
        _, out_testloader = dataloader.get_imageloader(ood_dataset, batch_size, mean, std)    
        
        ##### ODIN #####
        
        file_path = f'./softmax_scores/{model_name}_{id_dataset}'
        
        if not os.path.exists(file_path):
            os.makedirs(file_path)
            
        generate_odin.odin(model, id_testloader, out_testloader, magnitude, temperature, std, file_path)

        # CSV 파일 생성 및 헤더 작성 
        results_file = f'{file_path}/fi_results_{model_name}_{id_dataset}{flip_pos}.csv' 

        with open(results_file, 'w', newline='') as csvfile: 
            csv_writer = csv.writer(csvfile) 
            if single_flip:
                csv_writer.writerow(['layer_name', 'bit_position', 'auroc', 'fpr', 'auroc_fi', 'fpr_fi'])  
            elif multi_flip:
                csv_writer.writerow(['layer_name', 'num_flips', 'bits', 'auroc', 'fpr', 'auroc_fi', 'fpr_fi'])                  
            elif burst_flip:
                csv_writer.writerow(['layer_name', 'num_flips', 'start_bit', 'auroc', 'fpr', 'auroc_fi', 'fpr_fi'])                                  
        
        # 컨볼루션 레이어와 FC 레이어 추출
        # layer_names = fi.get_layer_name(model)
        all_module_names = fs.getModuleNameList(model)
        layer_names = []
        
        for name in all_module_names:
            if any(keyword.lower() in name.lower() for keyword in ['conv', 'fc', 'linear']):
                layer_names.append(name)
        
        # 각 컨볼루션 레이어 테스트 
        for layer_idx, layer_name in enumerate(layer_names): 
            if 'fc' in layer_name or 'linear' in layer_name:
                if id_dataset == 'cifar10':
                    num_flips = 1
                elif id_dataset == 'cifar100':
                    num_flips = 10
            else:
                if single_flip:
                    num_flips = 100
                else:
                    num_flips = 100
                
            print(f"Testing layer {layer_idx+1}/{len(layer_names)}: {layer_name}")  
            
            layer_results = []  
            
            # 각 비트 위치별 테스트
            if single_flip:
                for bit_position in range(32):  # 주요 비트 위치만 테스트 
                    bit_positions = [bit_position]
                    print(f"  Testing bit position: {bit_position}")  
                    print(num_flips)
                    generate_odin.odin_fi(model, id_testloader, out_testloader, magnitude, temperature, std, file_path, 
                                        first_forward_fi=first_forward_fi, backward_fi=backward_fi, second_forward_fi=second_forward_fi,
                                        bit_lists=bit_positions, flip_ratio=num_flips, layer_name=layer_name, fs=fs)
                    
                    auroc, fpr, fi_auroc, fi_fpr = calmetric.metric(model_name, id_dataset, ood_dataset, file_path)
                    
                    # 결과를 CSV에 저장 
                    with open(results_file, 'a', newline='') as csvfile: 
                        csv_writer = csv.writer(csvfile) 
                        csv_writer.writerow([layer_name, bit_position, auroc, fpr, fi_auroc, fi_fpr])
            elif multi_flip:
                for trial in range(32):
                    # num_bits = random.choice([2, 3])
                    num_bits = 2
                    
                    # 버스트(연속) 조합이 나오면 다시 뽑도록
                    while True:
                        positions = random.sample(range(32), num_bits)
                        positions.sort()
                        
                        # 연속 여부 검사
                        is_burst = all(positions[i] + 1 == positions[i+1] for i in range(len(positions)-1))
                        
                        if not is_burst:
                            break
                        
                    print(f"  [multi] Trial {trial+1}: Testing non-burst {num_bits}-bit flips at {positions}")
                    
                    generate_odin.odin_fi(model, id_testloader, out_testloader, magnitude, temperature, std, file_path,
                        first_forward_fi=first_forward_fi, backward_fi=backward_fi, second_forward_fi=second_forward_fi,
                        bit_lists=positions, flip_ratio=num_flips, layer_name=layer_name, fs=fs)
                    
                    auroc, fpr, fi_auroc, fi_fpr = calmetric.metric(model_name, id_dataset, ood_dataset, file_path)
                    
                    with open(results_file, 'a', newline='') as csvfile:
                        csv.writer(csvfile).writerow([layer_name, num_bits, positions, auroc, fpr, fi_auroc, fi_fpr])
            elif burst_flip:
                # 1) 실험할 burst 길이 후보 (요소당 연속으로 뒤집을 비트 개수)
                burst_lengths = [2, 4, 8]

                for burst_len in burst_lengths:
                    # 2) 가능한 모든 시작 위치(start_pos) 순회
                    for start_pos in range(0, 32 - burst_len + 1):
                        # 뒤집을 비트 위치 리스트 생성
                        bit_positions = list(range(start_pos, start_pos + burst_len))
                        print(f"  [burst] Testing burst flip at positions: {bit_positions}")

                        # 3) 훅에 bit_positions 인자로 넘겨서 한번에 burst_len 비트를 뒤집도록 호출
                        generate_odin.odin_fi(model, id_testloader, out_testloader, magnitude, temperature, std, file_path,
                            first_forward_fi=first_forward_fi, backward_fi=backward_fi, second_forward_fi=second_forward_fi,
                            bit_lists=bit_positions, flip_ratio=num_flips, layer_name=layer_name, fs=fs)

                        # 4) 결과 측정
                        auroc, fpr, fi_auroc, fi_fpr = calmetric.metric(model_name, id_dataset, ood_dataset, file_path)

                        # 5) CSV에 기록
                        with open(results_file, 'a', newline='') as csvfile:
                            csv.writer(csvfile).writerow([layer_name, burst_len, start_pos, auroc, fpr, fi_auroc, fi_fpr])
        

num_samples: 10000
Processing in-distribution images
Processing out-of-distribution images
Testing layer 1/34: conv1
  Testing bit position: 0
100
Processing in-distribution images
Processing out-of-distribution images
Neural network architecture:                 resnet34
In-distribution dataset:                      cifar10
Out-of-distribution dataset:                     svhn

                              ODIN            ODIN_FI
AUROC:                       90.3%              90.3%
FPR at TPR 95%:              39.3%              39.3% 

  Testing bit position: 1
100
Processing in-distribution images
Processing out-of-distribution images
Neural network architecture:                 resnet34
In-distribution dataset:                      cifar10
Out-of-distribution dataset:                     svhn

                              ODIN            ODIN_FI
AUROC:                       90.3%              90.3%
FPR at TPR 95%:              39.3%              39.3% 

  Testing bit position: 2



Processing out-of-distribution images
Neural network architecture:                 resnet34
In-distribution dataset:                      cifar10
Out-of-distribution dataset:                     svhn

                              ODIN            ODIN_FI
AUROC:                       90.3%              77.8%
FPR at TPR 95%:              39.3%              45.9% 

  Testing bit position: 31
100
Processing in-distribution images
Processing out-of-distribution images
Neural network architecture:                 resnet34
In-distribution dataset:                      cifar10
Out-of-distribution dataset:                     svhn

                              ODIN            ODIN_FI
AUROC:                       90.3%              90.3%
FPR at TPR 95%:              39.3%              39.3% 

Testing layer 29/34: layer4.0.conv2
  Testing bit position: 0
100
Processing in-distribution images
Processing out-of-distribution images
Neural network architecture:                 resnet34
In-distributi

## Fault Injection - Mahalanobis

In [None]:
# import fault_injector as fi
import PytorchFS.FS as FS

id_datasets = ['cifar10']
ood_dataset = 'svhn'
model_names = ['resnet34']
batch_size = 128
magnitude = 0.0014
temperature = 1000

flip_pos = ''

first_forward_fi = True
backward_fi = True
second_forward_fi = True

single_flip = True
multi_flip = False
burst_flip = False

if single_flip:
    flip_pos += '_single'
elif multi_flip:
    flip_pos += '_multi'    
elif burst_flip:
    flip_pos += '_burst'

if first_forward_fi:
    flip_pos += '_ff'
if backward_fi:
    flip_pos += '_b'
if second_forward_fi:
    flip_pos += '_sf'    

for id_dataset in id_datasets:
    if id_dataset == 'cifar10':
        num_classes = 10
    elif id_dataset == 'cifar100':
        num_classes = 100
        
    for model_name in model_names:
        if model_name == 'resnet34':
            model = ResNet34(num_c=num_classes).to(device)
        elif model_name == 'densenet3':
            model = DenseNet3(100, num_classes, growth_rate=12).to(device)
            
        model_path = f'./pretrained/{model_name}_{id_dataset}.pth'
        model.load_state_dict(torch.load(model_path, weights_only=True))
        model.eval()
        
        fs = FS.FS()
        fs.setLayerInfo(model)
        
        mean, std = dataloader.get_mean_std(id_dataset)    
        
        id_trainloader, id_testloader = dataloader.get_imageloader(id_dataset, batch_size, mean, std)    
        _, out_testloader = dataloader.get_imageloader(ood_dataset, batch_size, mean, std)    
        
        ##### ODIN #####
        
        file_path = f'./output/{model_name}_{id_dataset}'
        
        if not os.path.exists(file_path):
            os.makedirs(file_path)
            
        generate_odin.odin(model, id_testloader, out_testloader, magnitude, temperature, std, file_path)

        # CSV 파일 생성 및 헤더 작성 
        results_file = f'{file_path}/fi_results_{model_name}_{id_dataset}{flip_pos}.csv' 

        with open(results_file, 'w', newline='') as csvfile: 
            csv_writer = csv.writer(csvfile) 
            if single_flip:
                csv_writer.writerow(['layer_name', 'bit_position', 'auroc', 'fpr', 'auroc_fi', 'fpr_fi'])  
            elif multi_flip:
                csv_writer.writerow(['layer_name', 'num_flips', 'bits', 'auroc', 'fpr', 'auroc_fi', 'fpr_fi'])                  
            elif burst_flip:
                csv_writer.writerow(['layer_name', 'num_flips', 'start_bit', 'auroc', 'fpr', 'auroc_fi', 'fpr_fi'])                                  
        
        # 컨볼루션 레이어와 FC 레이어 추출
        # layer_names = fi.get_layer_name(model)
        all_module_names = fs.getModuleNameList(model)
        layer_names = []
        
        for name in all_module_names:
            if any(keyword.lower() in name.lower() for keyword in ['conv', 'fc', 'linear']):
                layer_names.append(name)
        
        # 각 컨볼루션 레이어 테스트 
        for layer_idx, layer_name in enumerate(layer_names): 
            if 'fc' in layer_name or 'linear' in layer_name:
                if id_dataset == 'cifar10':
                    num_flips = 1
                elif id_dataset == 'cifar100':
                    num_flips = 10
            else:
                if single_flip:
                    num_flips = 100
                else:
                    num_flips = 100
                
            print(f"Testing layer {layer_idx+1}/{len(layer_names)}: {layer_name}")  
            
            layer_results = []  
            
            # 각 비트 위치별 테스트
            if single_flip:
                for bit_position in range(32):  # 주요 비트 위치만 테스트 
                    bit_positions = [bit_position]
                    print(f"  Testing bit position: {bit_position}")  
                    
                    generate_mahalanobis.mahalanobis(model, id_trainloader, id_testloader, out_testloader, num_classes, magnitude, std, file_path)
                    generate_mahalanobis.mahalanobis_fi(model, id_trainloader, id_testloader, out_testloader, num_classes, magnitude, std, file_path, 
                                        first_forward_fi=first_forward_fi, backward_fi=backward_fi, second_forward_fi=second_forward_fi,
                                        bit_lists=bit_positions, flip_ratio=num_flips, layer_name=layer_name, fs=fs)
                    
                    auroc, fpr = regression_mahalanobis.regression(id_dataset, ood_dataset, file_path, score=f'Mahalanobis_{str(magnitude)}')
                    fi_auroc, fi_fpr = regression_mahalanobis.regression_fi(id_dataset, ood_dataset, file_path, score=f'Mahalanobis_{str(magnitude)}')    
                    
                    # 결과를 CSV에 저장 
                    with open(results_file, 'a', newline='') as csvfile: 
                        csv_writer = csv.writer(csvfile) 
                        csv_writer.writerow([layer_name, bit_position, auroc, fpr, fi_auroc, fi_fpr])
            elif multi_flip:
                for trial in range(32):
                    # num_bits = random.choice([2, 3])
                    num_bits = 2
                    
                    # 버스트(연속) 조합이 나오면 다시 뽑도록
                    while True:
                        positions = random.sample(range(32), num_bits)
                        positions.sort()
                        
                        # 연속 여부 검사
                        is_burst = all(positions[i] + 1 == positions[i+1] for i in range(len(positions)-1))
                        
                        if not is_burst:
                            break
                        
                    print(f"  [multi] Trial {trial+1}: Testing non-burst {num_bits}-bit flips at {positions}")
                    
                    generate_mahalanobis.mahalanobis(model, id_trainloader, id_testloader, out_testloader, num_classes, magnitude, std, file_path)
                    generate_mahalanobis.mahalanobis_fi(model, id_trainloader, id_testloader, out_testloader, num_classes, magnitude, std, file_path, 
                                        first_forward_fi=first_forward_fi, backward_fi=backward_fi, second_forward_fi=second_forward_fi,
                                        bit_lists=bit_positions, flip_ratio=num_flips, layer_name=layer_name, fs=fs)
                    
                    auroc, fpr = regression_mahalanobis.regression(id_dataset, ood_dataset, file_path, score=f'Mahalanobis_{str(magnitude)}')
                    fi_auroc, fi_fpr = regression_mahalanobis.regression_fi(id_dataset, ood_dataset, file_path, score=f'Mahalanobis_{str(magnitude)}')    
                    
                    with open(results_file, 'a', newline='') as csvfile:
                        csv.writer(csvfile).writerow([layer_name, num_bits, positions, auroc, fpr, fi_auroc, fi_fpr])
            elif burst_flip:
                # 1) 실험할 burst 길이 후보 (요소당 연속으로 뒤집을 비트 개수)
                burst_lengths = [2, 4, 8]

                for burst_len in burst_lengths:
                    # 2) 가능한 모든 시작 위치(start_pos) 순회
                    for start_pos in range(0, 32 - burst_len + 1):
                        # 뒤집을 비트 위치 리스트 생성
                        bit_positions = list(range(start_pos, start_pos + burst_len))
                        print(f"  [burst] Testing burst flip at positions: {bit_positions}")

                        generate_mahalanobis.mahalanobis(model, id_trainloader, id_testloader, out_testloader, num_classes, magnitude, std, file_path)
                        generate_mahalanobis.mahalanobis_fi(model, id_trainloader, id_testloader, out_testloader, num_classes, magnitude, std, file_path, 
                                            first_forward_fi=first_forward_fi, backward_fi=backward_fi, second_forward_fi=second_forward_fi,
                                            bit_lists=bit_positions, flip_ratio=num_flips, layer_name=layer_name, fs=fs)
                        
                        auroc, fpr = regression_mahalanobis.regression(id_dataset, ood_dataset, file_path, score=f'Mahalanobis_{str(magnitude)}')
                        fi_auroc, fi_fpr = regression_mahalanobis.regression_fi(id_dataset, ood_dataset, file_path, score=f'Mahalanobis_{str(magnitude)}')    

                        # 5) CSV에 기록
                        with open(results_file, 'a', newline='') as csvfile:
                            csv.writer(csvfile).writerow([layer_name, burst_len, start_pos, auroc, fpr, fi_auroc, fi_fpr]) 
        