In [1]:
from __future__ import division
from __future__ import absolute_import
import torch.nn.functional as F

import os, sys, random
import time
import copy
import torch
import torch.backends.cudnn as cudnn
import torchvision.datasets as dset
import torchvision.transforms as transforms
import pandas as pd
import numpy as np
from torch.utils.tensorboard import SummaryWriter
import logging
from sklearn.model_selection import train_test_split
from sklearn.metrics import matthews_corrcoef
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.preprocessing import StandardScaler, LabelEncoder
import math

2025-04-23 12:36:59.217706: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-04-23 12:36:59.217736: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-04-23 12:36:59.218562: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-04-23 12:36:59.223584: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
class RecorderMeter(object):
    """Computes and stores the minimum loss value and its epoch index, along with MCC"""

    def __init__(self, total_epoch):
        self.reset(total_epoch)

    def reset(self, total_epoch):
        assert total_epoch > 0
        self.total_epoch = total_epoch
        self.current_epoch = 0
        self.epoch_losses = np.zeros((self.total_epoch, 2), dtype=np.float32)  # [epoch, train/val]
        self.epoch_losses = self.epoch_losses - 1

        self.epoch_mcc = np.zeros((self.total_epoch, 2), dtype=np.float32)  # [epoch, train/val]
        self.epoch_mcc = self.epoch_mcc

    def update(self, idx, train_loss, train_mcc, val_loss, val_mcc):
        assert idx >= 0 and idx < self.total_epoch, 'total_epoch : {} , but update with the {} index'.format(
            self.total_epoch, idx)
        self.epoch_losses[idx, 0] = train_loss
        self.epoch_losses[idx, 1] = val_loss
        self.epoch_mcc[idx, 0] = train_mcc
        self.epoch_mcc[idx, 1] = val_mcc
        self.current_epoch = idx + 1

    def max_mcc(self, istrain):
        if self.current_epoch <= 0:
            return 0
        if istrain:
            return self.epoch_mcc[:self.current_epoch, 0].max()
        else:
            return self.epoch_mcc[:self.current_epoch, 1].max()

    

In [3]:
def time_string():
    ISOTIMEFORMAT = '%Y-%m-%d %X'
    string = '[{}]'.format(
        time.strftime(ISOTIMEFORMAT, time.gmtime(time.time())))
    return string


def convert_secs2time(epoch_time):
    need_hour = int(epoch_time / 3600)
    need_mins = int((epoch_time - 3600 * need_hour) / 60)
    need_secs = int(epoch_time - 3600 * need_hour - 60 * need_mins)
    return need_hour, need_mins, need_secs


def time_file_str():
    ISOTIMEFORMAT = '%Y-%m-%d'
    string = '{}'.format(time.strftime(ISOTIMEFORMAT,
                                       time.gmtime(time.time())))
    return string + '-{}'.format(random.randint(1, 10000))

### Class quan_Linear

In [4]:
class quan_Linear(nn.Linear):
    def __init__(self, in_features, out_features, bias=True):
        super(quan_Linear, self).__init__(in_features, out_features, bias=bias)

        self.N_bits = 8
        self.full_lvls = 2**self.N_bits
        self.half_lvls = (self.full_lvls - 2) / 2
        # Initialize the step size
        self.step_size = nn.Parameter(torch.Tensor([1]), requires_grad=True)
        self.__reset_stepsize__()
        # flag to enable the inference with quantized weight or self.weight
        self.inf_with_weight = False  # disabled by default

        # create a vector to identify the weight to each bit
        self.b_w = nn.Parameter(2**torch.arange(start=self.N_bits - 1,
                                                end=-1,
                                                step=-1).unsqueeze(-1).float(),
                                requires_grad=False)

        self.b_w[0] = -self.b_w[0]  #in-place reverse

    def forward(self, input):
        if self.inf_with_weight:
            return F.linear(input, self.weight * self.step_size, self.bias)
        else:
            self.__reset_stepsize__()
            weight_quan = quantize(self.weight, self.step_size,
                                   self.half_lvls) * self.step_size
            return F.linear(input, weight_quan, self.bias)

    def __reset_stepsize__(self):
        with torch.no_grad():
            self.step_size.data = self.weight.abs().max() / self.half_lvls

    def __reset_weight__(self):
        '''
        This function will reconstruct the weight stored in self.weight.
        Replacing the orginal floating-point with the quantized fix-point
        weight representation.
        '''
        # replace the weight with the quantized version
        with torch.no_grad():
            self.weight.data = quantize(self.weight, self.step_size,
                                        self.half_lvls)
        # enable the flag, thus now computation does not invovle weight quantization
        self.inf_with_weight = True



### Class quan_Conv1d

In [5]:
class quan_Conv1d(nn.Conv1d):
    def __init__(self,
                 in_channels,
                 out_channels,
                 kernel_size,
                 stride=1,
                 padding=1,
                 dilation=1,
                 groups=1,
                 bias=True):
        super(quan_Conv1d, self).__init__(in_channels,
                                          out_channels,
                                          kernel_size,
                                          stride=stride,
                                          padding=padding,
                                          dilation=dilation,
                                          groups=groups,
                                          bias=bias)

        # Số lượng bit để lượng tử hóa trọng số
        self.N_bits = 8
        self.full_lvls = 2 ** self.N_bits
        self.half_lvls = (self.full_lvls - 2) / 2

        # Bước lượng tử hóa (step size), là một tham số có thể học được
        self.step_size = nn.Parameter(torch.Tensor([1]), requires_grad=True)
        self.__reset_stepsize__()

        # Cờ để bật hoặc tắt sử dụng trọng số lượng tử hóa
        self.inf_with_weight = False  # Tắt theo mặc định

        # Tạo một vector để biểu diễn trọng số cho từng bit
        self.b_w = nn.Parameter(2 ** torch.arange(start=self.N_bits - 1,
                                                  end=-1,
                                                  step=-1).unsqueeze(-1).float(),
                                requires_grad=False)
        self.b_w[0] = -self.b_w[0]  # Biến đổi MSB thành giá trị âm để hỗ trợ bù hai

    def __reset_stepsize__(self):
        """Hàm này dùng để đặt lại giá trị `step_size`."""
        # Giá trị này có thể được tùy chỉnh tùy thuộc vào yêu cầu của mô hình
        self.step_size.data.fill_(1.0)

    def forward(self, x):
        # Kiểm tra cờ `inf_with_weight` để quyết định sử dụng trọng số đã lượng tử hóa hay không
        if self.inf_with_weight:
            quantized_weight = self.quantize_weight(self.weight)
            return nn.functional.conv1d(x, quantized_weight, self.bias, self.stride,
                                        self.padding, self.dilation, self.groups)
        else:
            return nn.functional.conv1d(x, self.weight, self.bias, self.stride,
                                        self.padding, self.dilation, self.groups)

    def quantize_weight(self, weight):
        """Lượng tử hóa trọng số theo số bit đã định."""
        # Tạo trọng số lượng tử hóa bằng cách sử dụng step_size
        quantized_weight = torch.round(weight / self.step_size) * self.step_size
        quantized_weight = torch.clamp(quantized_weight, -self.half_lvls * self.step_size,
                                       (self.half_lvls - 1) * self.step_size)
        return quantized_weight

In [6]:
def change_quan_bitwidth(model, n_bit):
    '''This script change the quantization bit-width of entire model to n_bit'''
    for m in model.modules():
        if isinstance(m, quan_Conv1d) or isinstance(m, quan_Linear):
            m.N_bits = n_bit
            # print("Change weight bit-width as {}.".format(m.N_bits))
            m.b_w.data = m.b_w.data[-m.N_bits:]
            m.b_w[0] = -m.b_w[0]
            print(m.b_w)
    return 

In [7]:
class _quantize_func(torch.autograd.Function):
    @staticmethod
    def forward(ctx, input, step_size, half_lvls):
        # ctx is a context object that can be used to stash information
        # for backward computation
        ctx.step_size = step_size
        ctx.half_lvls = half_lvls
        output = F.hardtanh(input,
                            min_val=-ctx.half_lvls * ctx.step_size.item(),
                            max_val=ctx.half_lvls * ctx.step_size.item())

        output = torch.round(output / ctx.step_size)
        return output

    @staticmethod
    def backward(ctx, grad_output):
        grad_input = grad_output.clone() / ctx.step_size

        return grad_input, None, None

quantize = _quantize_func.apply

In [8]:
class _bin_func(torch.autograd.Function):

    @staticmethod
    def forward(ctx, input, mu):
        ctx.mu = mu
        output = input.clone().zero_()
        output[input.ge(0)] = 1
        output[input.lt(0)] = -1

        return output

    @staticmethod
    def backward(ctx, grad_output):
        grad_input = grad_output.clone() / ctx.mu
        return grad_input, None

w_bin = _bin_func.apply

In [9]:
def quantize(tensor, step_size, half_lvls):
    """Quantization function."""
    return torch.clamp(torch.round(tensor / step_size), min=-half_lvls, max=half_lvls)

### Class CustomBlock

In [10]:
class CustomBlock(nn.Module):
    def __init__(self, in_features, out_features, bias=True, apply_softmax=False):
        super(CustomBlock, self).__init__()
        self.N_bits = 16
        self.full_lvls = 2 ** self.N_bits
        self.half_lvls = (self.full_lvls - 2) / 2
        self.apply_softmax = apply_softmax

        # Initialize the step size
        self.step_size = nn.Parameter(torch.Tensor([1]), requires_grad=True)

        # Initialize weights and bias
        self.weight = nn.Parameter(torch.Tensor(out_features, in_features))
        self.bias = nn.Parameter(torch.Tensor(out_features)) if bias else None

        # Reset parameters
        self.__reset_stepsize__()
        self.reset_parameters()

        # Flag for inference with quantized weights
        self.inf_with_weight = False

        self.b_w = nn.Parameter(2**torch.arange(start=self.N_bits - 1,
                                             end=-1,
                                             step=-1).unsqueeze(-1).float(),
                           requires_grad=False)
        self.b_w[0] = -self.b_w[0]  #in-place reverse
        
    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weight, a=5 ** 0.5)
        if self.bias is not None:
            nn.init.zeros_(self.bias)

    def forward(self, input):
        if self.inf_with_weight:
            weight_applied = self.weight * self.step_size
        else:
            self.__reset_stepsize__()
            weight_quan = quantize(self.weight, self.step_size, self.half_lvls) * self.step_size
            weight_applied = weight_quan

        # Linear transformation
        input = input.view(input.size(0), -1)  # Flatten input to 2D for matmul
        output = input @ weight_applied.T
        if self.bias is not None:
            output += self.bias

        if self.apply_softmax:
            output = F.softmax(output, dim=-1)
        return output

    def __reset_stepsize__(self):
        with torch.no_grad():
            self.step_size.data = self.weight.abs().max() / self.half_lvls

    def __reset_weight__(self):
        with torch.no_grad():
            self.weight.data = quantize(self.weight, self.step_size, self.half_lvls)
        self.inf_with_weight = True


In [11]:
class DownsampleA(nn.Module):
    def __init__(self, nIn, nOut, stride):
        super(DownsampleA, self).__init__()
        assert stride == 2
        self.avg = nn.AvgPool1d(kernel_size=1, stride=stride)

    def forward(self, x):
        x = self.avg(x)
        return torch.cat((x, x.mul(0)), 1)
        
class SEBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(SEBlock, self).__init__()
        self.conv_a = quan_Conv1d(inplanes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn_a = nn.BatchNorm1d(planes)
        self.dropout_a = nn.Dropout(p=0.3)  # Dropout sau BatchNorm

        self.conv_b = quan_Conv1d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn_b = nn.BatchNorm1d(planes)
        self.dropout_b = nn.Dropout(p=0.3)  # Dropout sau BatchNorm

        self.downsample = downsample

    def forward(self, x):
        residual = x
        
        basicblock = self.conv_a(x)
        basicblock = self.bn_a(basicblock)
        basicblock = F.relu(basicblock, inplace=True)
        basicblock = self.dropout_a(basicblock)  # Áp dụng dropout

        basicblock = self.conv_b(basicblock)
        basicblock = self.bn_b(basicblock)
        basicblock = self.dropout_b(basicblock)  # Áp dụng dropout

        if self.downsample:
            residual = self.downsample(x)

        return F.relu(residual + basicblock, inplace=True)

### Class CustomModel

In [12]:
class CustomModel(nn.Module):
    def __init__(self, input_size=42, hidden_sizes=[32, 64, 128, 256, 512], output_size=5):
        super(CustomModel, self).__init__()
        self.fc1 = quan_Conv1d(input_size, hidden_sizes[0], kernel_size=3, stride=1, padding=1)
        self.bn_1 = nn.BatchNorm1d(hidden_sizes[0])

        self.inplanes = 32
        self.stage_1 = self._make_layer(SEBlock, 32, 16, 1)
        self.stage_2 = self._make_layer(SEBlock, 64, 16, 2)
        self.stage_3 = self._make_layer(SEBlock, 128, 16, 2)
        self.avgpool = nn.AdaptiveAvgPool1d(1)

        self.classifier = CustomBlock(128 * SEBlock.expansion, output_size)

        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                n = m.kernel_size[0] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                #m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm1d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                init.kaiming_normal(m.weight)
                m.bias.data.zero_()
        
    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        downsample = None
        if stride == 2 or self.inplanes != planes * SEBlock.expansion:
            downsample = DownsampleA(self.inplanes, planes * SEBlock.expansion, stride) if stride == 2 else None

        layers = []
        layers.append(block(self.inplanes, planes, stride=1, downsample=downsample))
        self.inplanes = planes * SEBlock.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(self.bn_1(x), inplace=True)
        
        x = self.stage_1(x)
        x = self.stage_2(x)
        x = self.stage_3(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        return self.classifier(x)

### Class CustomModel2

In [13]:
class CustomModel2(nn.Module):
    def __init__(self, input_size=42, hidden_sizes=[32, 64, 128, 100], output_size=5):
        super(CustomModel2, self).__init__()
        self.hidden_sizes = hidden_sizes

        # Define layers
        self.fc1 = nn.Conv1d(input_size, hidden_sizes[0], kernel_size=3, stride=2, padding=1)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.activation = nn.ReLU()
        self.dropout = nn.Dropout(0.1)
        
        self.stage_1 = nn.Conv1d(hidden_sizes[0], hidden_sizes[1], kernel_size=3, stride=2, padding=1)
        self.stage_2 = nn.Conv1d(hidden_sizes[1], hidden_sizes[2], kernel_size=3, stride=2, padding=1)
        self.stage_3 = nn.Conv1d(hidden_sizes[2], hidden_sizes[3], kernel_size=3, stride=2, padding=1)

        # Global Pooling
        self.global_pool = nn.AdaptiveAvgPool1d(1)

        # Classifier
        self.classifier = CustomBlock(hidden_sizes[-1], output_size, apply_softmax=True)
        nn.Dropout(0.15)
        
    def forward(self, x):
        # Pass through layers
        x = self.fc1(x)
        x = self.activation(self.pool(x))

        x = self.stage_1(x)
        x = self.activation(self.pool(x))

        x = self.stage_2(x)
        x = self.activation(self.pool(x))

        x = self.stage_3(x)
        x = self.activation(self.pool(x))

        # Global Pooling
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)  # Flatten
        return self.classifier(x)

### Init dataset

In [14]:
class Args:
    def __init__(self):
        self.arch = 'CustomModel'
        self.data_path = '../../dataset/'
        self.dataset = 'inid'
        self.save_path = './save/attack_random'
        self.epochs = 20
        self.optimizer = 'SGD'
        self.test_batch_size = 32
        self.learning_rate = 0.001
        self.momentum = 0.9
        self.decay = 1e-4
        self.schedule = [80, 120]
        self.gammas = [0.1, 0.1]
        self.print_freq = 100
        self.resume = 'save\model_best.pth.tar'
        self.start_epoch = 0
        self.enable_bfa = False
        self.evaluate = False
        self.ngpu = 1
        self.gpu_id = 0
        self.workers = 4
        self.manualSeed = None
        self.quan_bitwidth = 16
        self.reset_weight = False
        self.bfa = True
        self.attack_sample_size = 128
        self.n_iter = 25
        self.k_top = None
        self.random_bfa = True
        self.progressive_bit_search= True
        self.random_flip = True
        self.clustering = False
        self.lambda_coeff = 1e-3
        self.use_cuda = True  

args = Args()
args.use_cuda = torch.cuda.is_available() 


In [15]:
# Thiết lập cấu hình logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger()

# Init logger
if not os.path.isdir(args.save_path):
    os.makedirs(args.save_path)

# Tạo tệp log
log_file_path = os.path.join(args.save_path, 'log_seed_{}.txt'.format(args.manualSeed))
file_handler = logging.FileHandler(log_file_path)
file_handler.setLevel(logging.INFO)

# Định dạng cho tệp log
formatter = logging.Formatter('%(asctime)s - %(message)s')
file_handler.setFormatter(formatter)

# Thêm file handler vào logger
logger.addHandler(file_handler)

# Ghi log vào console và tệp
logger.info('Save path: {}'.format(args.save_path))
state = {k: getattr(args, k) for k in dir(args) if not k.startswith('__')}
logger.info('State: {}'.format(state))
logger.info("Random Seed: {}".format(args.manualSeed))
logger.info("Python version: {}".format(sys.version.replace('\n', ' ')))
logger.info("Torch version: {}".format(torch.__version__))
logger.info("CUDNN version: {}".format(torch.backends.cudnn.version()))

# Init the tensorboard path and writer
tb_path = os.path.join(args.save_path, 'tb_log', 'run_' + str(args.manualSeed))
writer = SummaryWriter(tb_path)

Save path: ./save/attack_random
State: {'arch': 'CustomModel', 'attack_sample_size': 128, 'bfa': True, 'clustering': False, 'data_path': '../../dataset/', 'dataset': 'inid', 'decay': 0.0001, 'enable_bfa': False, 'epochs': 20, 'evaluate': False, 'gammas': [0.1, 0.1], 'gpu_id': 0, 'k_top': None, 'lambda_coeff': 0.001, 'learning_rate': 0.001, 'manualSeed': None, 'momentum': 0.9, 'n_iter': 25, 'ngpu': 1, 'optimizer': 'SGD', 'print_freq': 100, 'progressive_bit_search': True, 'quan_bitwidth': 16, 'random_bfa': True, 'random_flip': True, 'reset_weight': False, 'resume': 'save\\model_best.pth.tar', 'save_path': './save/attack_random', 'schedule': [80, 120], 'start_epoch': 0, 'test_batch_size': 32, 'use_cuda': True, 'workers': 4}
Random Seed: None
Python version: 3.10.14 | packaged by conda-forge | (main, Mar 20 2024, 12:45:18) [GCC 12.3.0]
Torch version: 2.2.2+cu118
CUDNN version: 8907


In [16]:
# Init dataset
if not os.path.isdir(args.data_path):
    os.makedirs(args.data_path)

if args.dataset == 'inid':
    print("Inid dataset import sucsess!")
else:
    assert False, "Unknow dataset : {}".format(args.dataset)

Inid dataset import sucsess!


### Xử lý dữ liệu

In [17]:
# Init dataset
if args.dataset == 'inid':
    data = pd.read_csv("../../dataset/wustl_iiot_2021_reduced.csv",
                               skipinitialspace=True)
    data = data.drop_duplicates()
    columns_to_remove = ['StartTime', 'LastTime', 'SrcAddr', 'DstAddr', 'sIpId', 'dIpId']
    data.drop(columns=columns_to_remove, inplace=True, errors='ignore')
    
    
    datalabel = data[['Traffic']]
    data = data.drop(columns=['Traffic'])
    
    scaler = StandardScaler()
    onc = LabelEncoder()
    
    # Tách dữ liệu thành tập huấn luyện và tập kiểm tra
    X_train, X_test, y_train, y_test = train_test_split(data, datalabel, test_size=0.2, random_state=42)
    X_train= scaler.fit_transform(X_train)
    X_test= scaler.transform(X_test)
    print(y_train['Traffic'])
    # Chuyển đổi y_train và y_test thành mã số
    y_train = onc.fit_transform(y_train['Traffic'].to_numpy().reshape(-1,1))  # Chuyển đổi thành mã số
    y_test= onc.transform(y_test['Traffic'].to_numpy().reshape(-1,1))  # Chuyển đổi thành mã số
    
    # Kiểm tra kiểu dữ liệu của y_train và y_test
    print("Kiểu dữ liệu y_train:", y_train.shape)
    print("Kiểu dữ liệu y_test:", y_test.shape)
    
    # Tạo DataLoader cho tập huấn luyện
    train_loader = DataLoader(
        torch.utils.data.TensorDataset(
            torch.FloatTensor(X_train),
            torch.LongTensor(y_train)  # y_train đã được chuyển đổi thành mã số
        ),
        batch_size=256,
        num_workers=8,
        shuffle=True,
        pin_memory=True
    )
    
    # Tạo DataLoader cho tập kiểm tra
    test_loader = DataLoader(
        torch.utils.data.TensorDataset(
            torch.FloatTensor(X_test),
            torch.LongTensor(y_test)  # y_test đã được chuyển đổi thành mã số
        ),
        batch_size=256,
        num_workers=8,
        shuffle=False,
        pin_memory=True
    )
else:
    train_loader = torch.utils.data.DataLoader(
        train_data,
        batch_size=args.attack_sample_size,
        shuffle=True,
        num_workers=args.workers,
        pin_memory=True)
    test_loader = torch.utils.data.DataLoader(test_data,
                                              batch_size=args.test_batch_size,
                                              shuffle=False,
                                              num_workers=args.workers,
                                              pin_memory=True)

logger.info("=> creating model '{}'".format(args.arch))
data

  y = column_or_1d(y, warn=True)
  y = column_or_1d(y, dtype=self.classes_.dtype, warn=True)
=> creating model 'CustomModel'


29625        DoS
9585      normal
608618    normal
667287    normal
521918    normal
           ...  
259178    normal
365838    normal
131932    normal
671155    normal
121958    normal
Name: Traffic, Length: 559997, dtype: object
Kiểu dữ liệu y_train: (559997,)
Kiểu dữ liệu y_test: (140000,)


Unnamed: 0,Mean,Sport,Dport,SrcPkts,DstPkts,TotPkts,DstBytes,SrcBytes,TotBytes,SrcLoad,...,dTtl,SAppBytes,DAppBytes,TotAppByte,SynAck,RunTime,sTos,SrcJitAct,DstJitAct,Target
0,0,54134,502,6,6,12,386,396,782,82657.562500,...,64,24,22,46,0.000648,0.031939,0,0.000000,0.0,0
1,4,28031,502,110,0,110,0,8360,8360,13311.402344,...,0,3520,0,3520,0.000000,4.978589,0,88.529836,0.0,1
2,0,61809,502,10,8,18,508,644,1152,69684.312500,...,64,24,20,44,0.001251,0.066586,0,0.000000,0.0,0
3,1,37474,80,4,0,4,0,248,248,1044.395264,...,0,0,0,0,0.000000,1.424748,0,712.373000,0.0,1
4,0,62644,502,6,6,12,384,396,780,77453.421875,...,64,24,20,44,0.001211,0.034085,0,0.000000,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
699992,0,62204,502,10,8,18,508,644,1152,88662.984375,...,64,24,20,44,0.000659,0.052333,0,0.000000,0.0,0
699993,0,54676,502,10,8,18,508,644,1152,88718.929688,...,64,24,20,44,0.000645,0.052300,0,0.000000,0.0,0
699994,0,49410,502,10,8,18,508,644,1152,89618.539062,...,64,24,20,44,0.000740,0.051775,0,0.000000,0.0,0
699995,2,63526,80,4,0,4,0,248,248,556.956665,...,0,0,0,0,0.000000,2.671662,0,1259.432125,0.0,1


In [18]:
def mcc_score(preds, targets):
    preds = preds.cpu().numpy()
    targets = targets.cpu().numpy()
    return matthews_corrcoef(targets, preds)


In [19]:
def mcc2(outputs_label, y_label_batch, y_cat_batch):
    """Compute MCC for each output of the model (label, category, sub-category)."""
    with torch.no_grad():
        # Ensure target has at least one dimension
        if y_label_batch.dim() == 0:
            y_label_batch = y_label_batch.unsqueeze(0)
     
      

        # Get the predicted classes (top-1 prediction) for each output
        _, pred_label = outputs_label.topk(1, 1, True, True)

        # Compute MCC for each output type
        mcc_label = mcc_score(pred_label.view(-1), y_label_batch)


        return mcc_label


def mcc(output, target):
    """Compute the Matthews Correlation Coefficient (MCC) for the given output and target."""
    with torch.no_grad():
        # Ensure target has at least one dimension
        if target.dim() == 0:
            target = target.unsqueeze(0)

        # Get the predicted classes (top-1 prediction)
        _, pred = output.topk(1, 1, True, True)
        return mcc_score(pred.view(-1), target)

In [20]:
class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

### Train dataset

In [21]:
def train(train_loader, model, criterion, optimizer, epoch):
    batch_time = AverageMeter()
    data_time = AverageMeter()
    losses = AverageMeter()
    mcc_meter = AverageMeter()  # Track MCC instead of accuracy

    # Switch to train mode
    model.train()

    end = time.time()
    for i, (input, target) in enumerate(train_loader):
        # Measure data loading time
        data_time.update(time.time() - end)

        if args.use_cuda:
            input = input.cuda(non_blocking=True)
            target = target.cuda(non_blocking=True)
            
        input = input.view(input.size(0), 42, -1)  # Reshape input
        
        # Compute output and loss
        output = model(input)
        loss = criterion(output, target)

        if args.clustering:
            loss += clustering_loss(model, args.lambda_coeff)

        # Compute MCC and record loss
        mcc_value = mcc(output.data, target)  # MCC calculation instead of topk accuracy
        losses.update(loss.item(), input.size(0))
        mcc_meter.update(mcc_value, input.size(0))

        # Compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

    return mcc_meter.avg, losses.avg


def validate(val_loader, model, criterion, summary_output=True):
    losses = AverageMeter()
    mcc_meter = AverageMeter()
    acc_meter = AverageMeter()
    tpr_meter = AverageMeter()
    f1_meter = AverageMeter()

    # Chuyển model sang chế độ đánh giá
    model.eval()
    output_summary = []

    with torch.no_grad():
        for i, (input, target) in enumerate(test_loader):
            if input.size(1) == 42 and input.dim() == 2:  # Kiểm tra nếu thiếu chiều thứ 3
                input = input.unsqueeze(-1)

            if torch.cuda.is_available() and args.use_cuda:
                target = target.cuda(non_blocking=True)
                input = input.cuda(non_blocking=True)

            # Tính toán output và loss
            output = model(input)
            pred = torch.argmax(output, dim=1)  # Chọn nhãn dự đoán có xác suất cao nhất
            loss = criterion(output, target)
            losses.update(loss.item(), input.size(0))  # Cập nhật losses

            # Chuyển sang numpy để tính toán các chỉ số
            pred_np = pred.cpu().numpy()
            target_np = target.cpu().numpy()

            if summary_output:
                tmp_list = output.max(1, keepdim=True)[1].flatten().cpu().numpy() # get the index of the max log-probability
                output_summary.append(tmp_list)

            # Tính các chỉ số đánh giá
            mcc_value = matthews_corrcoef(target_np, pred_np)
            acc_value = accuracy_score(target_np, pred_np)
            tpr_value = recall_score(target_np, pred_np, average='macro')  # Macro recall ~ TPR
            f1_value = f1_score(target_np, pred_np, average='macro')

            # Cập nhật giá trị trung bình
            mcc_meter.update(mcc_value, input.size(0))
            acc_meter.update(acc_value, input.size(0))
            tpr_meter.update(tpr_value, input.size(0))
            f1_meter.update(f1_value, input.size(0))

    return mcc_meter.avg, acc_meter.avg, tpr_meter.avg, f1_meter.avg, losses.avg, output_summary

### Validate

In [22]:
# def test( model, test_loader):
#     losses = AverageMeter()
#     mcc_meter = AverageMeter()  # Sử dụng MCC thay vì accuracy

#     with torch.no_grad():
#         for i, (input, target) in enumerate(test_loader):
#             if input.size(1) == 42 and input.dim() == 2:  # Kiểm tra nếu thiếu chiều thứ 3
#                 input = input.unsqueeze(-1)

#             if torch.cuda.is_available() and args.use_cuda:
#                 target = target.cuda(non_blocking=True)
#                 input = input.cuda(non_blocking=True)

#             # Tính toán output và loss
#             output = model(input)

                
#             # Tính MCC
#             mcc_value = mcc(output.data, target)
#             mcc_meter.update(mcc_value, input.size(0))

#     return mcc_meter.avg

from sklearn.metrics import matthews_corrcoef, accuracy_score, recall_score, f1_score
import warnings
from sklearn.exceptions import UndefinedMetricWarning

# Tắt cảnh báo UndefinedMetricWarning
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)

def validate_score(model, test_loader):
    losses = AverageMeter()
    mcc_meter = AverageMeter()
    acc_meter = AverageMeter()
    tpr_meter = AverageMeter()
    f1_meter = AverageMeter()

    with torch.no_grad():
        for i, (input, target) in enumerate(test_loader):
            if input.size(1) == 42 and input.dim() == 2:  # Kiểm tra nếu thiếu chiều thứ 3
                input = input.unsqueeze(-1)

            if torch.cuda.is_available() and args.use_cuda:
                target = target.cuda(non_blocking=True)
                input = input.cuda(non_blocking=True)

            # Tính toán output và loss
            output = model(input)
            pred = torch.argmax(output, dim=1)  # Chọn nhãn dự đoán có xác suất cao nhất

            # Chuyển sang numpy để tính toán các chỉ số
            pred_np = pred.cpu().numpy()
            target_np = target.cpu().numpy()

            # Tính các chỉ số đánh giá
            mcc_value = matthews_corrcoef(target_np, pred_np)
            acc_value = accuracy_score(target_np, pred_np)
            tpr_value = recall_score(target_np, pred_np, average='macro')  # Macro recall ~ TPR
            f1_value = f1_score(target_np, pred_np, average='macro')

            # Cập nhật giá trị trung bình
            mcc_meter.update(mcc_value, input.size(0))
            acc_meter.update(acc_value, input.size(0))
            tpr_meter.update(tpr_value, input.size(0))
            f1_meter.update(f1_value, input.size(0))

    return {
        "MCC": mcc_meter.avg,
        "Accuracy": acc_meter.avg,
        "TPR": tpr_meter.avg,
        "F1 Score": f1_meter.avg
    }

### class BFA

In [23]:
class BFA(object):
    def __init__(self, criterion, model, k_top=10):
        self.criterion = criterion
        self.loss_dict = {}
        self.bit_counter = 0
        self.k_top = k_top
        self.n_bits2flip = 0
        self.loss = 0
        self.num_bit_flipped = 0
        
        # Attributes for random attack
        self.module_list = []

        for name, m in model.named_modules():
            if isinstance(m, (quan_Conv1d, quan_Linear, CustomBlock)):
                self.module_list.append(name)

    def flip_bit(self, m):
        '''
        the data type of input param is 32-bit floating, then return the data should
        be in the same data_type.
        '''
        if self.k_top is None:
            k_top = m.weight.detach().flatten().__len__()
        else: 
            k_top = self.k_top
        # 1. flatten the gradient tensor to perform topk
        w_grad_topk, w_idx_topk = m.weight.grad.detach().abs().view(-1).topk(k_top)
        # update the b_grad to its signed representation
        w_grad_topk = m.weight.grad.detach().view(-1)[w_idx_topk]

        # 2. create the b_grad matrix in shape of [N_bits, k_top]
        b_grad_topk = w_grad_topk * m.b_w.data

        # 3. generate the gradient mask to zero-out the bit-gradient
        # which can not be flipped
        b_grad_topk_sign = (b_grad_topk.sign() +
                            1) * 0.5  # zero -> negative, one -> positive
        # convert to twos complement into unsigned integer
        w_bin = int2bin(m.weight.detach().view(-1), m.N_bits).short()
        w_bin_topk = w_bin[w_idx_topk]  # get the weights whose grads are topk
        
        # generate two's complement bit-map
        b_bin_topk = (w_bin_topk.repeat(m.N_bits, 1) & m.b_w.abs().repeat(1, k_top).short()) \
           // m.b_w.abs().repeat(1, k_top).short()

        grad_mask = b_bin_topk ^ b_grad_topk_sign.short()

        # 4. apply the gradient mask upon ```b_grad_topk``` and in-place update it
        b_grad_topk *= grad_mask.float()

        # 5. identify the several maximum of absolute bit gradient and return the index, the number of bits to flip is self.n_bits2flip

        grad_max = b_grad_topk.abs().max()
        num_elements = b_grad_topk.nelement()  # Get the total number of elements
        k = min(self.n_bits2flip, num_elements)  # Clamp the value of k
    
        _, b_grad_max_idx = b_grad_topk.abs().view(-1).topk(k)  # Use clamped k
        bit2flip = b_grad_topk.clone().view(-1).zero_()

        if grad_max.item() != 0:  # ensure the max grad is not zero
            bit2flip[b_grad_max_idx] = 1
            bit2flip = bit2flip.view(b_grad_topk.size())
        else:
            pass

        # 6. Based on the identified bit indexed by ```bit2flip```, generate another
        # mask, then perform the bitwise xor operation to realize the bit-flip.
        bit2flip = bit2flip.reshape(m.b_w.abs().shape[0], -1)  # Định hình lại

        w_bin_topk_flipped = (bit2flip.short() * m.b_w.abs().short()).sum(0, dtype=torch.int16) \
            ^ w_bin_topk

        # 7. update the weight in the original weight tensor
        w_bin[w_idx_topk] = w_bin_topk_flipped  # in-place change
        param_flipped = bin2int(w_bin,
                                m.N_bits).view(m.weight.data.size()).float()

        return param_flipped

    def progressive_bit_search(self, model, data, target, test_loader):
        ''' 
        Given the model, based on the current data and target, go through
        all the layers and identify the bits to be flipped. 
        '''
        model.eval()
        output = model(data)
        self.loss = self.criterion(output, target)
    
        # Zero out the grads first, then get the grads
        for m in model.modules():
            if isinstance(m, (quan_Conv1d, quan_Linear, CustomBlock)):
                if m.weight.grad is not None:
                    m.weight.grad.data.zero_()
    
        self.loss.backward()
        self.loss_max = self.loss.item()
        attack_log = []
        max_loss_module = None
    
        # 3. Flip bits until no further loss degradation is observed
        while self.loss_max <= self.loss.item():
            self.n_bits2flip += 1
            self.loss_dict = {}
    
            for name, module in model.named_modules():
                if isinstance(module, CustomBlock) or isinstance(module, quan_Conv1d):
                    clean_weight = module.weight.data.detach()
                    attack_weight = self.flip_bit(module)
                    self.num_bit_flipped += 1
    
                    module.weight.data = attack_weight
                    output = model(data)
    
                    self.loss_dict[name] = self.criterion(output, target).item()
                    module.weight.data = clean_weight
    
            # Check if loss_dict is not empty
            if not self.loss_dict:
                break
    
            max_loss_module = max(self.loss_dict.items(), key=lambda item: item[1])[0]
            self.loss_max = self.loss_dict[max_loss_module]
    
            if self.n_bits2flip == 100:
                break
    
        # Check if max_loss_module was never assigned
        if max_loss_module is None:
            attack_last_status = [self.bit_counter, "No module attacked", "No attack", "No attack"]
            return [], validate_score(model, test_loader), self.bit_counter, attack_last_status
    
        weight_prior = None
        weight_post = None
    
        # If loss_max does lead to degradation, change that layer's weight
        for module_idx, (name, module) in enumerate(model.named_modules()):
            if name == max_loss_module:
                attack_weight = self.flip_bit(module)
                self.num_bit_flipped += 1
    
                ###########################################################
                ## Attack profiling
                ##########################################################
                
                weight_mismatch = attack_weight - module.weight.detach()
                attack_weight_idx = torch.nonzero(weight_mismatch)
                print('attacked module:', max_loss_module)
                
                attack_log = []
                for i in range(attack_weight_idx.size(0)):
                    weight_idx = attack_weight_idx[i, :].cpu().numpy()
                    weight_prior = module.weight.detach()[tuple(weight_idx)].item()
                    weight_post = attack_weight[tuple(weight_idx)].item()
                
                    tmp_list = [module_idx, self.bit_counter + (i + 1), max_loss_module,
                                weight_idx, weight_prior, weight_post]
                    attack_log.append(tmp_list)
    
                module.weight.data = attack_weight
    
        self.bit_counter += self.n_bits2flip
        self.n_bits2flip = 0
    
        if weight_prior is None or weight_post is None:
            attack_last_status = [self.bit_counter, max_loss_module, "No attack", "No attack"]
        else:
            attack_last_status = [self.bit_counter, max_loss_module, weight_prior, weight_post]
        
        return attack_log, validate_score(model, test_loader), self.bit_counter, attack_last_status

    def random_flip_one_bit(self, model, test_loader):
        """
        Randomly flip one bit in the weight of a chosen module.
        """
        weight_prior = None
        weight_post = None
        
        chosen_module = random.choice(self.module_list)
        for name, m in model.named_modules():
            if name == chosen_module:
                flatten_weight = m.weight.detach().view(-1)
                chosen_idx = random.choice(range(flatten_weight.numel()))
                bin_w = int2bin(flatten_weight[chosen_idx], m.N_bits).short()
                bit_idx = random.choice(range(m.N_bits))
                mask = (bin_w.clone().zero_() + 1) * (2 ** bit_idx)
                bin_w = bin_w ^ mask
                int_w = bin2int(bin_w, m.N_bits).float()

                ##############################################
                ###   attack profiling
                ###############################################
                
                weight_mismatch = flatten_weight[chosen_idx] - int_w
                attack_weight_idx = chosen_idx

                print('attacked module:', chosen_module)
                
                attack_log = []
                weight_idx = chosen_idx
                weight_prior = flatten_weight[chosen_idx]
                weight_post = int_w

                print('attacked weight index:', weight_idx)
                print('weight before attack:', weight_prior)
                print('weight after attack:', weight_post)

                tmp_list = ["module_idx", self.bit_counter + 1, "loss",
                            weight_idx, weight_prior, weight_post]
                attack_log.append(tmp_list)                            

                self.bit_counter += 1
                flatten_weight[chosen_idx] = int_w
                m.weight.data = flatten_weight.view(m.weight.data.size())
                
        if weight_prior is None or weight_post is None:
            attack_last_status = [self.bit_counter, chosen_module, "No attack", "No attack"]
        else:
            attack_last_status = [self.bit_counter, chosen_module, weight_prior, weight_post]

        return attack_log, validate_score(model, test_loader), self.bit_counter, attack_last_status

In [24]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = CustomModel().to(device)

if args.use_cuda:
    if args.ngpu > 1:
        net = torch.nn.DataParallel(net, device_ids=list(range(args.ngpu)))
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.AdamW(net.parameters(), lr=0.001, weight_decay=0.01)
# optimizer = optim.Adam(net.parameters(), lr=0.001)

# separate the parameters thus param groups can be updated by different optimizer
all_param = [
    param for name, param in net.named_parameters()
    if not 'step_size' in name
]

step_param = [
    param for name, param in net.named_parameters() if 'step_size' in name 
]

if args.optimizer == "SGD":
    print("using SGD as optimizer")
    optimizer = torch.optim.SGD(all_param,
                                lr=0.01,
                                momentum=0.9,
                                weight_decay=0.0001,
                                nesterov=True)

elif args.optimizer == "Adam":
    print("using Adam as optimizer")
    optimizer = torch.optim.Adam(filter(lambda param: param.requires_grad,
                                        all_param),
                                 lr=0.001,
                                 #momentum=0.9,
                                 weight_decay=0.001)


elif args.optimizer == "RMSprop":
    print("using RMSprop as optimizer")
    optimizer = torch.optim.RMSprop(
        filter(lambda param: param.requires_grad, net.parameters()),
        lr=0.01,
        alpha=0.99,
        eps=1e-08,
        momentum=0.9,
        weight_decay=0.0001)

using SGD as optimizer


## BFA Attack

In [25]:
if args.use_cuda:
    net.cuda()
    criterion.cuda()

if args.resume:
    if os.path.isfile(args.resume):
        checkpoint = torch.load(args.resume)
        if not args.fine_tune:
            args.start_epoch = checkpoint['epoch']
            recorder = checkpoint['recorder']
            optimizer.load_state_dict(checkpoint['optimizer'])

        state_tmp = net.state_dict()
        if 'state_dict' in checkpoint.keys():
            state_tmp.update(checkpoint['state_dict'])
        else:
            state_tmp.update(checkpoint)

        net.load_state_dict(state_tmp, strict=False)
    else:
        print("No checkpoint found at '{}'".format(args.resume))
else:
    print("Do not use any checkpoint for {} model".format(args.arch))


# Configure the quantization bit-width
if args.quan_bitwidth is not None:
    change_quan_bitwidth(net, args.quan_bitwidth)

# Update the step_size once the model is loaded. This is used for quantization.
for m in net.modules():
    if isinstance(m, quan_Conv1d) or isinstance(m, quan_Linear) or isinstance(m, CustomBlock) or m.__class__.__name__ == "CustomBlock" or m.__class__.__name__ == "quan_Conv1d":
        m.__reset_stepsize__()

# Block for weight reset
if args.reset_weight:
    for m in net.modules():
        if isinstance(m, quan_Conv1d) or isinstance(m, quan_Linear) or isinstance(m, CustomBlock):
            m.__reset_weight__()

attacker = BFA(criterion, net, args.k_top)  # Khởi tạo đối tượng tấn công
net_clean = copy.deepcopy(net)
    # weight_conversion(net)

if args.enable_bfa:
    perform_attack(attacker, net, net_clean, train_loader, test_loader,
                   args.n_iter, writer, csv_save_path=args.save_path,
                   random_attack=args.random_bfa)

if args.evaluate:
    print("Evaluate mode")
    _, _, _, output_summary = validate(test_loader, net, criterion, summary_output=True)
    pd.DataFrame(output_summary).to_csv(os.path.join(args.save_path, 'output_summary_{}.csv'.format(args.arch)),
                                        header=['top-1 output'], index=False)
    

No checkpoint found at 'save\model_best.pth.tar'
Parameter containing:
tensor([[128.],
        [ 64.],
        [ 32.],
        [ 16.],
        [  8.],
        [  4.],
        [  2.],
        [  1.]], device='cuda:0')
Parameter containing:
tensor([[128.],
        [ 64.],
        [ 32.],
        [ 16.],
        [  8.],
        [  4.],
        [  2.],
        [  1.]], device='cuda:0')
Parameter containing:
tensor([[128.],
        [ 64.],
        [ 32.],
        [ 16.],
        [  8.],
        [  4.],
        [  2.],
        [  1.]], device='cuda:0')
Parameter containing:
tensor([[128.],
        [ 64.],
        [ 32.],
        [ 16.],
        [  8.],
        [  4.],
        [  2.],
        [  1.]], device='cuda:0')
Parameter containing:
tensor([[128.],
        [ 64.],
        [ 32.],
        [ 16.],
        [  8.],
        [  4.],
        [  2.],
        [  1.]], device='cuda:0')
Parameter containing:
tensor([[128.],
        [ 64.],
        [ 32.],
        [ 16.],
        [  8.],
        [

In [26]:
def adjust_learning_rate(optimizer, epoch, gammas, schedule):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = args.learning_rate
    mu = args.momentum

    if args.optimizer != "YF":
        assert len(gammas) == len(
            schedule), "length of gammas and schedule should be equal"
        for (gamma, step) in zip(gammas, schedule):
            if (epoch >= step):
                lr = lr * gamma
            else:
                break
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

    elif args.optimizer == "YF":
        lr = optimizer._lr
        mu = optimizer._mu

    return lr, mu

In [27]:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
if args.ngpu == 1:
    os.environ["CUDA_VISIBLE_DEVICES"] = str(
        args.gpu_id)  # make only device #gpu_id visible, then

args.use_cuda = args.ngpu > 0 and torch.cuda.is_available()  # check GPU

# Give a random seed if no manual configuration
if args.manualSeed is None:
    args.manualSeed = random.randint(1, 10000)
random.seed(args.manualSeed)
torch.manual_seed(args.manualSeed)

if args.use_cuda:
    torch.cuda.manual_seed_all(args.manualSeed)

cudnn.benchmark = True

if args.use_cuda:
    print(f"Using GPU: {args.gpu_id} with seed: {args.manualSeed}")
else:
    print(f"Using CPU with seed: {args.manualSeed}")

Using GPU: 0 with seed: 8704


### Hàm để train

In [28]:
''' 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomModel().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)

num_epochs = 20
patience = 10  
counter = 0

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

best_val_loss = float('inf')
best_mcc = -1 

for epoch in range(num_epochs):
    train_mcc, train_loss = train(train_loader, model, criterion, optimizer, epoch)

    val_mcc, val_acc, val_tpr, val_f1, val_loss, output_summary = validate(test_loader, model, criterion, summary_output=True)
    
    scheduler.step(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_mcc = val_mcc
        counter = 0

        save_dir = "CustomModel"
        os.makedirs(save_dir, exist_ok=True)
        save_path = os.path.join(save_dir, "best_model.pth")

        torch.save(model.state_dict(), save_path)
    
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered")
            break

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train MCC: {train_mcc:.4f}")
    print(f"Validation Loss: {val_loss:.4f}, Validation MCC: {val_mcc:.4f}, ACC: {val_acc:.4f}, TPR: {val_tpr:.4f}, F1: {val_f1:.4f}")

print(f"Best MCC during training: {best_mcc:.4f}")
'''

' \ndevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")\nmodel = CustomModel().to(device)\ncriterion = torch.nn.CrossEntropyLoss()\noptimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)\n\nnum_epochs = 20\npatience = 10  \ncounter = 0\n\nscheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode=\'min\', factor=0.5, patience=5)\n\nbest_val_loss = float(\'inf\')\nbest_mcc = -1 \n\nfor epoch in range(num_epochs):\n    train_mcc, train_loss = train(train_loader, model, criterion, optimizer, epoch)\n\n    val_mcc, val_acc, val_tpr, val_f1, val_loss, output_summary = validate(test_loader, model, criterion, summary_output=True)\n    \n    scheduler.step(val_loss)\n\n    if val_loss < best_val_loss:\n        best_val_loss = val_loss\n        best_mcc = val_mcc\n        counter = 0\n\n        save_dir = "CustomModel"\n        os.makedirs(save_dir, exist_ok=True)\n        save_path = os.path.join(save_dir, "best_model.pth")\n\n        torch.save(

## Random Attack

### import data_conversion

In [29]:
def int2bin(input, num_bits):
    '''
    convert the signed integer value into unsigned integer (2's complement equivalently).
    Note that, the conversion is different depends on number of bit used.
    '''
    output = input.clone()
    if num_bits == 1: # when it is binary, the conversion is different
        output = output/2 + .5
    elif num_bits > 1:
        output[input.lt(0)] = 2**num_bits + output[input.lt(0)]

    return output


def bin2int(input, num_bits):
    '''
    convert the unsigned integer (2's complement equivantly) back to the signed integer format
    with the bitwise operations. Note that, in order to perform the bitwise operation, the input
    tensor has to be in the integer format.
    '''
    if num_bits == 1:
        output = input*2-1
    elif num_bits > 1:
        mask = 2**(num_bits - 1) - 1
        output = -(input & ~mask) + (input & mask)
    return output


def weight_conversion(model):
    '''
    Perform the weight data type conversion between:
        signed integer <==> two's complement (unsigned integer)
    Such conversion is used as additional step to ensure the conversion correctness

    Note that, the data type conversion chosen is depend on the bits:
        N_bits <= 8   .char()   --> torch.CharTensor(), 8-bit signed integer
        N_bits <= 16  .short()  --> torch.shortTensor(), 16 bit signed integer
        N_bits <= 32  .int()    --> torch.IntTensor(), 32 bit signed integer
    '''
    for m in model.modules():
        if isinstance(m, quan_Conv1d) or isinstance(m, quan_Linear):
            w_bin = int2bin(m.weight.data, m.N_bits).short()
            m.weight.data = bin2int(w_bin, m.N_bits).float()
    return

def count_ones(t, n_bits):
    counter = 0
    for i in range(n_bits):
        counter += ((t & 2**i) // 2**i).sum()
    return counter.item()


def hamming_distance(model1, model2):
    '''
    Given two model whose structure, name and so on are identical.
    The only difference between the model1 and model2 are the weight.
    The function compute the hamming distance bewtween the bianry weights
    (two's complement) of model1 and model2.
    '''
    # TODO: add the function check model1 and model2 are same structure
    # check the keys of state_dict match or not.

    H_dist = 0  # hamming distance counter

    for name, module in model1.named_modules():
        if isinstance(module, quan_Conv1d) or isinstance(module, quan_Linear):
            # remember to convert the tensor into integer for bitwise operations
            binW_model1 = int2bin(model1.state_dict()[name + '.weight'],
                                  module.N_bits).short()
            binW_model2 = int2bin(model2.state_dict()[name + '.weight'],
                                  module.N_bits).short()
            H_dist += count_ones(binW_model1 ^ binW_model2, module.N_bits)

    return H_dist

## Vẽ đồ thị MCC sau mỗi epoch

In [30]:
import matplotlib.pyplot as plt

def plot_mcc_vs_epoch(images_filename, x_data, y_data):
    titles = ["MCC", "ACC", "TPR", "F1-Score"]
    
    for image_filename, title in zip(images_filename, titles):
        if len(x_data) != len(y_data):
            raise ValueError("x_data and y_data must have the same length.")
    
        plt.figure(figsize=(10, 6))
        plt.plot(x_data, y_data, marker='o', linestyle='-', color='b', label='MCC')
        plt.title(f'{title} after Epochs', fontsize=14) 
        plt.xlabel('Epoch', fontsize=12)
        plt.ylabel(title, fontsize=12)
        
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.legend()
        plt.tight_layout()

        min_x, max_x = min(x_data), max(x_data)
        plt.xticks(range(min_x, max_x + 1, 2))
        
        # Save the plot as an image file
        plt.savefig(image_filename)
        plt.close()
        print(f"Plot saved as {image_filename}")

def plot_combined_metrics(image_filename, x_data, metric_dict):
    colors = {
        "MCC": "blue",
        "ACC": "orange",
        "TPR": "green",
        "F1": "red"
    }

    plt.figure(figsize=(10, 6))
    for metric in ["MCC", "ACC", "TPR", "F1"]:
        y = [float(v) for v in metric_dict[metric]]
        plt.plot(x_data, y, label=metric, color=colors[metric], linewidth=2)
        plt.fill_between(x_data,
                         [v * 0.95 for v in y],
                         [v * 1.05 for v in y],
                         alpha=0.2, color=colors[metric])

    plt.xlabel("Epoch", fontsize=12)
    plt.ylabel("Metric Value (%)", fontsize=12)
    plt.title("Combined Metrics during Attack", fontsize=14)
    plt.legend()
    plt.grid(True, linestyle="--", alpha=0.7)
    plt.tight_layout()
    plt.savefig(image_filename)
    plt.close()
    print(f"[Combined Plot] Saved as {image_filename}")



In [31]:
# import csv
# def perform_attack(attacker, model, model_clean, train_loader, test_loader,
#                    N_iter, writer, file, csv_file, csv_save_path=None, attack_type=1,):
#     model.eval()
#     losses = AverageMeter()
#     iter_time = AverageMeter()
#     attack_time = AverageMeter()

#     for _, (data, target) in enumerate(train_loader):
#         if args.use_cuda:
#             target = target.cuda()
#             data = data.cuda()

#         data = data.unsqueeze(-1)  # Kích thước [512, 42, 1]
#         _, target = model(data).data.max(1)
#         break

#     val_acc_top1, val_loss, output_summary = validate(test_loader, model, attacker.criterion, summary_output=True)
#     print(f'**Test** MCC: {val_acc_top1:.3f}. Val_Loss: {val_loss:.3f}')

#     tmp_df = pd.DataFrame(output_summary)
#     tmp_df['BFA iteration'] = 0
#     tmp_df.to_csv(os.path.join(args.save_path, 'output_summary_{}_BFA_0.csv'.format(args.arch)), index=False)

#     writer.add_scalar('attack/val_top1_acc', val_acc_top1, 0)
#     writer.add_scalar('attack/val_loss', val_loss, 0)

#     print('Attack sample size is {}'.format(data.size()[0]))
#     end = time.time()
#     df = pd.DataFrame()
#     last_val_acc_top1 = val_acc_top1

#     MCC_data = [val_acc_top1]
#     epochs = [0]
#     all_attack_status = []

#     for i_iter in range(N_iter):
#         print('**********************************')        
#         if attack_type == 1:
#             attack_log, new_mcc, bit_count, attack_last_status = attacker.progressive_bit_search(model, data, target, test_loader)
#             MCC_data.append(new_mcc)
#             epochs.append(bit_count)
#         elif attack_type == 2:
#             attack_log, new_mcc, bit_count, attack_last_status = attacker.random_flip_one_bit(model, test_loader)
#             MCC_data.append(new_mcc)
#             epochs.append(bit_count)
#         elif attack_type == 3:
#             attack_log, new_mcc, bit_count, attack_last_status = attacker.progressive_bit_search(model, data, target, test_loader)
#             attack_log, new_mcc, bit_count, attack_last_status = attacker.random_flip_one_bit(model, test_loader)
#             MCC_data.append(new_mcc)
#             epochs.append(bit_count)
#         elif attack_type == 4:
#             attack_log, new_mcc, bit_count, attack_last_status = attacker.random_flip_one_bit(model, test_loader)
#             attack_log, new_mcc, bit_count, attack_last_status = attacker.progressive_bit_search(model, data, target, test_loader)
#             MCC_data.append(new_mcc)
#             epochs.append(bit_count)
        
#         attack_time.update(time.time() - end)
#         end = time.time()

#         h_dist = hamming_distance(model, model_clean)

#         if hasattr(attacker, "loss_max"):
#             losses.update(attacker.loss_max, data.size(0))

#         print('Iteration: [{:03d}/{:03d}]   Attack Time {attack_time.val:.3f} ({attack_time.avg:.3f})'.format(i_iter + 1, N_iter, attack_time=attack_time))

#         try:
#             print('Loss before attack: {:.4f}'.format(attacker.loss.item()))
#             print('Loss after attack: {:.4f}'.format(attacker.loss_max))
#         except:
#             pass

#         print('Bit flips: {:.0f}'.format(attacker.bit_counter))
#         print('Hamming distance: {:.0f}'.format(h_dist))

#         writer.add_scalar('attack/bit_flip', attacker.bit_counter, i_iter + 1)
#         writer.add_scalar('attack/h_dist', h_dist, i_iter + 1)
#         writer.add_scalar('attack/sample_loss', losses.avg, i_iter + 1)

#         val_acc_top1, val_loss, output_summary = validate(test_loader, model, attacker.criterion, summary_output=True)
#         print(f'**Test** MCC: {val_acc_top1:.3f}. Val_Loss: {val_loss:.3f}')
#         tmp_df = pd.DataFrame(output_summary)
#         tmp_df['BFA iteration'] = i_iter + 1
#         tmp_df.to_csv(os.path.join(args.save_path, 'output_summary_{}_BFA_{}.csv'.format(args.arch, i_iter + 1)), index=False)

#         acc_drop = last_val_acc_top1 - val_acc_top1
#         last_val_acc_top1 = val_acc_top1

#         attack_last_status.append(val_acc_top1)
#         attack_last_status.append(acc_drop)

#         for entry in attack_log:
#             entry.append(val_acc_top1)
#             entry.append(acc_drop)
            
#         df = pd.concat([df, pd.DataFrame(attack_log)], ignore_index=True)

#         writer.add_scalar('attack/val_top1_acc', val_acc_top1, i_iter + 1)
#         writer.add_scalar('attack/val_loss', val_loss, i_iter + 1)

#         iter_time.update(time.time() - end)
#         print('Iteration Time {iter_time.val:.3f} ({iter_time.avg:.3f})'.format(iter_time=iter_time))
#         end = time.time()

#         all_attack_status.append(attack_last_status)

#     column_list = ['module idx', 'bit-flip idx', 'module name', 'weight idx', 'weight before attack', 'weight after attack', 'validation mcc', 'mcc drop']
#     df.columns = column_list
#     df['trial seed'] = args.manualSeed

#     if csv_save_path is not None:
#         csv_file_name = 'attack_profile_{}.csv'.format(args.manualSeed)
#         df.to_csv(os.path.join(csv_save_path, csv_file_name), index=None)

#     # Vẽ đồ thị
#     plot_mcc_vs_epoch(file, epochs, MCC_data)
    
#     with open(csv_file, mode='w', newline='') as file:
#         writer = csv.writer(file)
        
#         # Ghi tiêu đề (nếu cần)
#         writer.writerow(["Bit Counter", "Module", "Weight Before Attack", "Weight After Attack", "Validation", "MCC Drop"])
        
#         # Ghi dữ liệu
#         writer.writerows(all_attack_status)
    
#     return

In [32]:
import csv
def perform_attack(attacker, model, model_clean, train_loader, test_loader,
                   N_iter, writer, file, csv_file, csv_save_path=None, attack_type=1,):
    model.eval()
    losses = AverageMeter()
    iter_time = AverageMeter()
    attack_time = AverageMeter()

    for _, (data, target) in enumerate(train_loader):
        if args.use_cuda:
            target = target.cuda()
            data = data.cuda()

        data = data.unsqueeze(-1)  # Kích thước [512, 39, 1]
        _, target = model(data).data.max(1)
        break

    val_mcc_top1, val_acc_top1, val_tpr_top1, val_f1score_top1, val_loss, output_summary = validate(test_loader, model, attacker.criterion, summary_output=True)
    print(f'**Test** MCC: {val_mcc_top1:.3f}. Val_Loss: {val_loss:.3f}')
    print(f'ACC: {val_acc_top1:.3f}.')
    print(f'TPR: {val_tpr_top1:.3f}.')
    print(f'F1-score: {val_f1score_top1:.3f}.')

    tmp_df = pd.DataFrame(output_summary)
    tmp_df['BFA iteration'] = 0
    tmp_df.to_csv(os.path.join(args.save_path, 'output_summary_{}_BFA_0.csv'.format(args.arch)), index=False)

    writer.add_scalar('attack/val_top1_acc', val_mcc_top1, 0)
    writer.add_scalar('attack/val_loss', val_loss, 0)

    print('Attack sample size is {}'.format(data.size()[0]))
    end = time.time()
    df = pd.DataFrame()
    last_val_mcc_top1 = val_mcc_top1
    last_val_acc_top1 = val_acc_top1
    last_val_tpr_top1 = val_tpr_top1
    last_val_f1score_top1 = val_f1score_top1

    MCC_data = [val_mcc_top1]
    epochs = [0]
    all_attack_status = []
    
    metric_dict = {
            "MCC": [val_mcc_top1],
            "ACC": [val_acc_top1],
            "TPR": [val_tpr_top1],
            "F1": [val_f1score_top1]
        }
    
    for i_iter in range(N_iter):
        print('**********************************')        
        if attack_type == 1:
            attack_log, new_mcc, bit_count, attack_last_status = attacker.progressive_bit_search(model, data, target, test_loader)
            MCC_data.append(new_mcc['MCC'])
            epochs.append(i_iter)
        elif attack_type == 2:
            attack_log, new_mcc, bit_count, attack_last_status = attacker.random_flip_one_bit(model, test_loader)
            MCC_data.append(new_mcc['MCC'])
            epochs.append(i_iter)
        elif attack_type == 3:
            attack_log, new_mcc, bit_count, attack_last_status = attacker.progressive_bit_search(model, data, target, test_loader)
            attack_log, new_mcc, bit_count, attack_last_status = attacker.random_flip_one_bit(model, test_loader)
            MCC_data.append(new_mcc['MCC'])
            epochs.append(i_iter)
        elif attack_type == 4:
            attack_log, new_mcc, bit_count, attack_last_status = attacker.random_flip_one_bit(model, test_loader)
            attack_log, new_mcc, bit_count, attack_last_status = attacker.progressive_bit_search(model, data, target, test_loader)
            MCC_data.append(new_mcc['MCC'])
            epochs.append(i_iter)
            
        metric_dict["MCC"].append(new_mcc["MCC"])
        metric_dict["ACC"].append(new_mcc["Accuracy"])
        metric_dict["TPR"].append(new_mcc["TPR"])
        metric_dict["F1"].append(new_mcc["F1 Score"])
        
        attack_time.update(time.time() - end)
        end = time.time()

        h_dist = hamming_distance(model, model_clean)

        if hasattr(attacker, "loss_max"):
            losses.update(attacker.loss_max, data.size(0))

        print('Iteration: [{:03d}/{:03d}]   Attack Time {attack_time.val:.3f} ({attack_time.avg:.3f})'.format(i_iter + 1, N_iter, attack_time=attack_time))

        try:
            print('Loss before attack: {:.4f}'.format(attacker.loss.item()))
            print('Loss after attack: {:.4f}'.format(attacker.loss_max))
        except:
            pass

        print('Bit flips: {:.0f}'.format(attacker.bit_counter))
        print('Hamming distance: {:.0f}'.format(h_dist))

        writer.add_scalar('attack/bit_flip', attacker.bit_counter, i_iter + 1)
        writer.add_scalar('attack/h_dist', h_dist, i_iter + 1)
        writer.add_scalar('attack/sample_loss', losses.avg, i_iter + 1)

        val_mcc_top1, val_acc_top1, val_tpr_top1, val_f1score_top1, val_loss, output_summary = validate(test_loader, model, attacker.criterion, summary_output=True)
        print(f'**Test** MCC: {val_mcc_top1:.3f}. Val_Loss: {val_loss:.3f}')
        print(f'ACC: {val_acc_top1:.3f}.')
        print(f'TPR: {val_tpr_top1:.3f}.')
        print(f'F1-score: {val_f1score_top1:.3f}.')
        
        tmp_df = pd.DataFrame(output_summary)
        tmp_df['BFA iteration'] = i_iter + 1
        tmp_df.to_csv(os.path.join(args.save_path, 'output_summary_{}_BFA_{}.csv'.format(args.arch, i_iter + 1)), index=False)

        # last_val_mcc_top1 = val_mcc_top1
        # last_val_acc_top1 = val_acc_top1
        # last_val_tpr_top1 = val_tpr_top1
        # last_val_f1score_top1 = val_f1score_top1
        
        mcc_drop = last_val_mcc_top1 - val_mcc_top1
        last_val_mcc_top1 = val_mcc_top1

        acc_drop = last_val_acc_top1 - val_acc_top1
        last_val_acc_top1 = val_acc_top1

        tpr_drop = last_val_tpr_top1 - val_tpr_top1
        last_val_tpr_top1 = val_tpr_top1

        f1score_drop = last_val_f1score_top1 - val_f1score_top1
        last_val_f1score_top1 = val_f1score_top1

        attack_last_status.append(round(val_mcc_top1, 5))
        attack_last_status.append(round(mcc_drop, 5))
        
        attack_last_status.append(round(val_acc_top1, 5))
        attack_last_status.append(round(acc_drop, 5))
        
        attack_last_status.append(round(val_tpr_top1, 5))
        attack_last_status.append(round(tpr_drop, 5))
        
        attack_last_status.append(round(val_f1score_top1, 5))
        attack_last_status.append(round(f1score_drop, 5))

        for entry in attack_log:
            entry.append(val_mcc_top1)
            entry.append(mcc_drop)

            entry.append(val_acc_top1)
            entry.append(acc_drop)

            entry.append(val_tpr_top1)
            entry.append(tpr_drop)

            entry.append(val_f1score_top1)
            entry.append(f1score_drop)
            
        df = pd.concat([df, pd.DataFrame(attack_log)], ignore_index=True)

        writer.add_scalar('attack/val_top1_acc', val_mcc_top1, i_iter + 1)
        writer.add_scalar('attack/val_loss', val_loss, i_iter + 1)

        iter_time.update(time.time() - end)
        print('Iteration Time {iter_time.val:.3f} ({iter_time.avg:.3f})'.format(iter_time=iter_time))
        end = time.time()

        all_attack_status.append(attack_last_status)

    column_list = ['module idx', 'bit-flip idx', 'module name', 'weight idx', 'weight before attack', 'weight after attack'
                    , 'validation mcc', 'mcc drop'
                    , "ACC", "ACC Drop"
                    , "TPR", "TPR Drop"
                    , "F1-score", "F1-score Drop",]
    df.columns = column_list
    df['trial seed'] = args.manualSeed

    if csv_save_path is not None:
        csv_file_name = 'attack_profile_{}.csv'.format(args.manualSeed)
        df.to_csv(os.path.join(csv_save_path, csv_file_name), index=None)
        
    # Vẽ đồ thị với MCC
    plot_mcc_vs_epoch(file, epochs, MCC_data)
    
    try:
        if isinstance(file, list) and len(file) > 0:
            first_image_name = os.path.basename(file[0])
            suffix = first_image_name.split("_", 1)[-1] if "_" in first_image_name else "attack.png"
            combined_plot_path = os.path.join(os.path.dirname(file[0]), f"combined_metrics_{suffix}")
        else:
            combined_plot_path = "combined_metrics.png"

        plot_combined_metrics(combined_plot_path, epochs, metric_dict)
    except Exception as e:
        print(f"[Warning] Plotting combined metrics failed: {e}")
        
    with open(csv_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        
        # Ghi tiêu đề (nếu cần)
        writer.writerow(["Bit Counter", "Module", "Weight Before Attack", "Weight After Attack"
                         , "MCC", "MCC Drop"
                         , "ACC", "ACC Drop"
                         , "TPR", "TPR Drop"
                         , "F1-score", "F1-score Drop",])
        
        # Ghi dữ liệu
        writer.writerows(all_attack_status)
    
    return

In [33]:

# Bước 1: Tải mô hình đã huấn luyện
model_path = os.path.join("CustomModel", "best_model.pth")
# Kiểm tra xem tệp có tồn tại không
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Không tìm thấy mô hình tại: {model_path}")

trained_model = CustomModel()

# Tải trọng số từ file best_model.pth
trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

results = validate_score(trained_model, test_loader)
print(f"MCC before attack: {results['MCC']}")
print(f"Accuracy before attack: {results['Accuracy']}")
print(f"TPR before attack: {results['TPR']}")
print(f"F1-score before attack: {results['F1 Score']}")

MCC before attack: 0.9979121278157115
Accuracy before attack: 0.9997
TPR before attack: 0.9861050267205141
F1-score before attack: 0.9855526760216656


In [34]:

# Bước 2: Tạo bản sao của mô hình để làm tham chiếu
model_clean = copy.deepcopy(trained_model)

# Bước 3: Khởi tạo đối tượng tấn công BFA
attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel", "PBS_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_PBS_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "PBS_attack.png")
attack_csv_path = os.path.join(attack_dir, "PBS_attack.csv")

# Bước 4: Thực hiện tấn công
perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path, 
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 1,                  # Chế độ tấn công PBS
)

writer.close()  # Đóng TensorBoard writer
# best_mcc = validate_score(trained_model, test_loader)
# print(f"\nMCC after 100 times PBS attack: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC after 100 times PBS attack: {results['MCC']}")
print(f"Accuracy after 100 times PBS attack: {results['Accuracy']}")
print(f"TPR after 100 times PBS attack: {results['TPR']}")
print(f"F1-score after 100 times PBS attack: {results['F1 Score']}")


**Test** MCC: 0.999. Val_Loss: 0.005
ACC: 1.000.
TPR: 0.987.
F1-score: 0.986.
Attack sample size is 256
**********************************
attacked module: fc1
Iteration: [001/025]   Attack Time 9.824 (9.824)
Loss before attack: 0.0000
Loss after attack: 24.6649
Bit flips: 1
Hamming distance: 1
**Test** MCC: -0.258. Val_Loss: 24.647
ACC: 0.021.
TPR: 0.088.
F1-score: 0.099.
Iteration Time 9.091 (9.091)
**********************************
attacked module: stage_1.3.conv_b
Iteration: [002/025]   Attack Time 9.684 (9.754)
Loss before attack: 24.6649
Loss after attack: 850.1472
Bit flips: 2
Hamming distance: 2
**Test** MCC: -0.199. Val_Loss: 859.251
ACC: 0.006.
TPR: 0.131.
F1-score: 0.004.
Iteration Time 8.819 (8.955)
**********************************
attacked module: stage_1.3.conv_a
Iteration: [003/025]   Attack Time 10.607 (10.038)
Loss before attack: 850.1472
Loss after attack: 108412.6562
Bit flips: 3
Hamming distance: 3
**Test** MCC: -0.297. Val_Loss: 109065.853
ACC: 0.003.
TPR: 0.105

In [35]:

# Bước 1: Tải mô hình đã huấn luyện
model_path = os.path.join("CustomModel", "best_model.pth")

trained_model = CustomModel()

# Tải trọng số từ file best_model.pth
trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

# Bước 2: Tạo bản sao của mô hình để làm tham chiếu
model_clean = copy.deepcopy(trained_model)

# Bước 3: Khởi tạo đối tượng tấn công BFA
attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel", "RandomFlip_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_RandomFlip_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "RandomFlip_attack.png")
attack_csv_path = os.path.join(attack_dir, "RandomFlip_attack.csv")

# Bước 4: Thực hiện tấn công
perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path, 
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 2,                  # Chế độ tấn công random
)

writer.close()  # Đóng TensorBoard writer

# best_mcc = validate_score(trained_model ,test_loader)
# print(f"\nMCC after 100 times random attack: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC after 100 times random attack: {results['MCC']}")
print(f"Accuracy after 100 times random attack: {results['Accuracy']}")
print(f"TPR after 100 times random attack: {results['TPR']}")
print(f"F1-score after 100 times random attack: {results['F1 Score']}")


**Test** MCC: 0.999. Val_Loss: 0.006
ACC: 1.000.
TPR: 0.988.
F1-score: 0.988.
Attack sample size is 256
**********************************
attacked module: stage_1.2.conv_a
attacked weight index: 1125
weight before attack: tensor(-0.0761, device='cuda:0')
weight after attack: tensor(-3., device='cuda:0')
Iteration: [001/025]   Attack Time 8.245 (8.245)
Bit flips: 1
Hamming distance: 1
**Test** MCC: 0.999. Val_Loss: 0.006
ACC: 1.000.
TPR: 0.988.
F1-score: 0.988.
Iteration Time 8.836 (8.836)
**********************************
attacked module: stage_1.13.conv_a
attacked weight index: 2994
weight before attack: tensor(0.0510, device='cuda:0')
weight after attack: tensor(8., device='cuda:0')
Iteration: [002/025]   Attack Time 8.239 (8.242)
Bit flips: 2
Hamming distance: 2
**Test** MCC: 0.999. Val_Loss: 0.006
ACC: 1.000.
TPR: 0.988.
F1-score: 0.988.
Iteration Time 9.092 (8.964)
**********************************
attacked module: stage_2.9.conv_b
attacked weight index: 7670
weight before atta

In [36]:

# Bước 1: Tải mô hình đã huấn luyện
model_path = os.path.join("CustomModel", "best_model.pth")
trained_model = CustomModel()

# Tải trọng số từ file best_model.pth
trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

# Bước 2: Tạo bản sao của mô hình để làm tham chiếu
model_clean = copy.deepcopy(trained_model)

# Bước 3: Khởi tạo đối tượng tấn công BFA
attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel", "PBS_to_RandomFlip_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_PBS_to_RandomFlip_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "PBS_to_RandomFlip_attack.png")
attack_csv_path = os.path.join(attack_dir, "PBS_to_RandomFlip_attack.csv")

# Bước 4: Thực hiện tấn công
perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path,
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 3,                  # Chế độ tấn công PBS -> random
)

writer.close()  # Đóng TensorBoard writer
# best_mcc = validate_score( trained_model,test_loader)
# print(f"\nMCC after 100 times PBS to Random: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC after 100 times PBS to Random: {results['MCC']}")
print(f"Accuracy after 100 times PBS to Random: {results['Accuracy']}")
print(f"TPR after 100 times PBS to Random: {results['TPR']}")
print(f"F1-score after 100 times PBS to Random: {results['F1 Score']}")


**Test** MCC: 0.999. Val_Loss: 0.006
ACC: 1.000.
TPR: 0.988.
F1-score: 0.988.
Attack sample size is 256
**********************************
attacked module: fc1
attacked module: stage_1.5.conv_a
attacked weight index: 1768
weight before attack: tensor(0.2741, device='cuda:0')
weight after attack: tensor(4., device='cuda:0')
Iteration: [001/025]   Attack Time 18.077 (18.077)
Loss before attack: 0.0000
Loss after attack: 46.6530
Bit flips: 2
Hamming distance: 2
**Test** MCC: -0.407. Val_Loss: 45.265
ACC: 0.059.
TPR: 0.325.
F1-score: 0.163.
Iteration Time 9.027 (9.027)
**********************************
attacked module: stage_1.0.conv_a
attacked module: stage_1.10.conv_a
attacked weight index: 256
weight before attack: tensor(0.1538, device='cuda:0')
weight after attack: tensor(-128., device='cuda:0')
Iteration: [002/025]   Attack Time 18.318 (18.198)
Loss before attack: 45.6027
Loss after attack: 3291.9919
Bit flips: 4
Hamming distance: 4
**Test** MCC: -0.445. Val_Loss: 3226.243
ACC: 0.05

In [37]:

# Bước 1: Tải mô hình đã huấn luyện
model_path = os.path.join("CustomModel", "best_model.pth")
trained_model = CustomModel()

# Tải trọng số từ file best_model.pth
trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

# Bước 2: Tạo bản sao của mô hình để làm tham chiếu
model_clean = copy.deepcopy(trained_model)

# Bước 3: Khởi tạo đối tượng tấn công BFA
attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel", "RandomFlip_to_PBS_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_RandomFlip_to_PBS_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "RandomFlip_to_PBS_attack.png")
attack_csv_path = os.path.join(attack_dir, "RandomFlip_to_PBS_attack.csv")

# Bước 4: Thực hiện tấn công
perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path,  
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 4,                  # Chế độ tấn công Random -> PBS
)

writer.close()  # Đóng TensorBoard writer
# best_mcc = validate_score( trained_model,test_loader)
# print(f"\nMCC after 100 times random to PBS: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC after 100 times random to PBS: {results['MCC']}")
print(f"Accuracy after 100 times random to PBS: {results['Accuracy']}")
print(f"TPR after 100 times random to PBS: {results['TPR']}")
print(f"F1-score after 100 times random to PBS: {results['F1 Score']}")


**Test** MCC: 0.999. Val_Loss: 0.006
ACC: 1.000.
TPR: 0.988.
F1-score: 0.988.
Attack sample size is 256
**********************************
attacked module: stage_3.1.conv_b
attacked weight index: 40017
weight before attack: tensor(-0.0007, device='cuda:0')
weight after attack: tensor(-3., device='cuda:0')
attacked module: stage_2.12.conv_a
Iteration: [001/025]   Attack Time 19.573 (19.573)
Loss before attack: 0.0000
Loss after attack: 34.7348
Bit flips: 2
Hamming distance: 2
**Test** MCC: 0.479. Val_Loss: 35.042
ACC: 0.073.
TPR: 0.604.
F1-score: 0.344.
Iteration Time 9.387 (9.387)
**********************************
attacked module: stage_2.4.conv_b
attacked weight index: 4155
weight before attack: tensor(0.0504, device='cuda:0')
weight after attack: tensor(64., device='cuda:0')
attacked module: stage_2.12.conv_b
Iteration: [002/025]   Attack Time 18.944 (19.259)
Loss before attack: 34.7348
Loss after attack: 4708.1084
Bit flips: 4
Hamming distance: 4
**Test** MCC: 0.479. Val_Loss: 4749

## Train second model

In [38]:
'''
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomModel2().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)

num_epochs = 20
patience = 10  # Số epoch để chờ trước khi dừng
counter = 0

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

best_val_loss = float('inf')
best_mcc = -1  # Khởi tạo MCC tốt nhất

for epoch in range(num_epochs):
    train_mcc, train_loss = train(train_loader, model, criterion, optimizer, epoch)

    val_mcc, val_acc, val_tpr, val_f1, val_loss, output_summary = validate(test_loader, model, criterion, summary_output=True)
    
    scheduler.step(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_mcc = val_mcc
        counter = 0

        save_dir = "CustomModel2"
        os.makedirs(save_dir, exist_ok=True)
        save_path = os.path.join(save_dir, "best_model.pth")

        torch.save(model.state_dict(), save_path)
    
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered")
            break

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train MCC: {train_mcc:.4f}")
    print(f"Validation Loss: {val_loss:.4f}, Validation MCC: {val_mcc:.4f}, ACC: {val_acc:.4f}, TPR: {val_tpr:.4f}, F1: {val_f1:.4f}")

print(f"Best MCC during training: {best_mcc:.4f}")
'''

'\ndevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")\nmodel = CustomModel2().to(device)\ncriterion = torch.nn.CrossEntropyLoss()\noptimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)\n\nnum_epochs = 20\npatience = 10  # Số epoch để chờ trước khi dừng\ncounter = 0\n\nscheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode=\'min\', factor=0.5, patience=5)\n\nbest_val_loss = float(\'inf\')\nbest_mcc = -1  # Khởi tạo MCC tốt nhất\n\nfor epoch in range(num_epochs):\n    train_mcc, train_loss = train(train_loader, model, criterion, optimizer, epoch)\n\n    val_mcc, val_acc, val_tpr, val_f1, val_loss, output_summary = validate(test_loader, model, criterion, summary_output=True)\n    \n    scheduler.step(val_loss)\n\n    if val_loss < best_val_loss:\n        best_val_loss = val_loss\n        best_mcc = val_mcc\n        counter = 0\n\n        save_dir = "CustomModel2"\n        os.makedirs(save_dir, exist_ok=True)\n        save_path = os.pat

## Attack second model

In [39]:
model_path = os.path.join("CustomModel2", "best_model.pth")
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Không tìm thấy mô hình tại: {best_model_path}")

trained_model = CustomModel2()

trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

# best_mcc = validate_score(trained_model, test_loader)
# print(f"MCC before attack: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC before attack: {round(results['MCC'], 5)}")
print(f"Accuracy before attack: {round(results['Accuracy'], 5)}")
print(f"TPR before attack: {round(results['TPR'], 5)}")
print(f"F1-score before attack: {round(results['F1 Score'], 5)}") 

MCC before attack: 0.99796
Accuracy before attack: 0.99973
TPR before attack: 0.97985
F1-score before attack: 0.97903


In [40]:
model_path = os.path.join("CustomModel2", "best_model.pth")

trained_model = CustomModel2()

trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

model_clean = copy.deepcopy(trained_model)

attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel2", "PBS_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_PBS_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "PBS_attack.png")
attack_csv_path = os.path.join(attack_dir, "PBS_attack.csv")

perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path,  
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 1,                  # Chế độ tấn công PBS
)

writer.close()  # Đóng TensorBoard writer
# best_mcc = validate_score(trained_model, test_loader)
# print(f"\nMCC after 100 times PBS: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC after 100 times PBS attack: {results['MCC']}")
print(f"Accuracy after 100 times PBS attack: {results['Accuracy']}")
print(f"TPR after 100 times PBS attack: {results['TPR']}")
print(f"F1-score after 100 times PBS attack: {results['F1 Score']}")

**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Attack sample size is 256
**********************************
attacked module: classifier
Iteration: [001/025]   Attack Time 3.350 (3.350)
Loss before attack: 0.9048
Loss after attack: 0.9048
Bit flips: 100
Hamming distance: 0
**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Iteration Time 3.537 (3.537)
**********************************
attacked module: classifier
Iteration: [002/025]   Attack Time 3.340 (3.345)
Loss before attack: 0.9048
Loss after attack: 0.9048
Bit flips: 200
Hamming distance: 0
**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Iteration Time 3.495 (3.516)
**********************************
attacked module: classifier
Iteration: [003/025]   Attack Time 3.172 (3.287)
Loss before attack: 0.9048
Loss after attack: 0.9048
Bit flips: 300
Hamming distance: 0
**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Iter

In [41]:
model_path = os.path.join("CustomModel2", "best_model.pth")
trained_model = CustomModel2()

trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

# Bước 2: Tạo bản sao của mô hình để làm tham chiếu
model_clean = copy.deepcopy(trained_model)

# Bước 3: Khởi tạo đối tượng tấn công BFA
attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel2", "RandomFlip_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_RandomFlip_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "RandomFlip_attack.png")
attack_csv_path = os.path.join(attack_dir, "RandomFlip_attack.csv")

# Bước 4: Thực hiện tấn công
perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path, 
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 2,                  # Chế độ tấn công random
)

writer.close()  # Đóng TensorBoard writer

# best_mcc = validate_score(trained_model ,test_loader)
# print(f"\nMCC after 100 times Random attack: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC after 100 times random attack: {results['MCC']}")
print(f"Accuracy after 100 times random attack: {results['Accuracy']}")
print(f"TPR after 100 times random attack: {results['TPR']}")
print(f"F1-score after 100 times random attack: {results['F1 Score']}")

**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Attack sample size is 256
**********************************
attacked module: classifier
attacked weight index: 380
weight before attack: tensor(-0.0113, device='cuda:0')
weight after attack: tensor(32767., device='cuda:0')
Iteration: [001/025]   Attack Time 3.139 (3.139)
Bit flips: 1
Hamming distance: 0
**Test** MCC: -0.044. Val_Loss: 1.885
ACC: 0.000.
TPR: 0.009.
F1-score: 0.001.
Iteration Time 3.402 (3.402)
**********************************
attacked module: classifier
attacked weight index: 303
weight before attack: tensor(0.0535, device='cuda:0')
weight after attack: tensor(1024., device='cuda:0')
Iteration: [002/025]   Attack Time 3.097 (3.118)
Bit flips: 2
Hamming distance: 0
**Test** MCC: -0.044. Val_Loss: 1.885
ACC: 0.000.
TPR: 0.009.
F1-score: 0.001.
Iteration Time 3.545 (3.474)
**********************************
attacked module: classifier
attacked weight index: 138
weight before attack: tensor(-0.

In [42]:
model_path = os.path.join("CustomModel2", "best_model.pth")
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Không tìm thấy mô hình tại: {model_path}")

trained_model = CustomModel2()

trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)


# Bước 2: Tạo bản sao của mô hình để làm tham chiếu
model_clean = copy.deepcopy(trained_model)

# Bước 3: Khởi tạo đối tượng tấn công BFA
attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel2", "PBS_to_RandomFlip_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_PBS_to_RandomFlip_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "PBS_to_RandomFlip_attack.png")
attack_csv_path = os.path.join(attack_dir, "PBS_to_RandomFlip_attack.csv")

# Bước 4: Thực hiện tấn công
perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path, 
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 3,                  # Chế độ tấn công PBS -> random
)

writer.close()  # Đóng TensorBoard writer
# best_mcc = validate_score( trained_model,test_loader)
# print(f"\nMCC after 100 times PBS to Random: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC after 100 times PBS to Random: {results['MCC']}")
print(f"Accuracy after 100 times PBS to Random: {results['Accuracy']}")
print(f"TPR after 100 times PBS to Random: {results['TPR']}")
print(f"F1-score after 100 times PBS to Random: {results['F1 Score']}")

**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Attack sample size is 256
**********************************
attacked module: classifier
attacked module: classifier
attacked weight index: 12
weight before attack: tensor(0., device='cuda:0')
weight after attack: tensor(8., device='cuda:0')
Iteration: [001/025]   Attack Time 6.422 (6.422)
Loss before attack: 0.9048
Loss after attack: 0.9048
Bit flips: 101
Hamming distance: 0
**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.978.
F1-score: 0.977.
Iteration Time 3.400 (3.400)
**********************************
attacked module: classifier
attacked module: classifier
attacked weight index: 11
weight before attack: tensor(0., device='cuda:0')
weight after attack: tensor(32., device='cuda:0')
Iteration: [002/025]   Attack Time 6.311 (6.367)
Loss before attack: 0.9048
Loss after attack: 0.9048
Bit flips: 202
Hamming distance: 0
**Test** MCC: 0.545. Val_Loss: 0.972
ACC: 0.933.
TPR: 0.470.
F1-score: 0.462.
Iter

In [43]:
# Bước 1: Tải mô hình đã huấn luyện
model_path = os.path.join("CustomModel2", "best_model.pth")
if not os.path.exists(model_path):
    raise FileNotFoundError(f"Không tìm thấy mô hình tại: {model_path}")

trained_model = CustomModel2()

trained_model.load_state_dict(torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
trained_model = trained_model.to(device)

# Bước 2: Tạo bản sao của mô hình để làm tham chiếu
model_clean = copy.deepcopy(trained_model)

# Bước 3: Khởi tạo đối tượng tấn công BFA
attacker = BFA(criterion=criterion, model=trained_model, k_top=10)

# Định nghĩa đường dẫn để lưu kết quả tấn công
attack_dir = os.path.join("CustomModel2", "RandomFlip_to_PBS_attack")

os.makedirs(attack_dir, exist_ok=True)
attack_image_paths = [os.path.join(attack_dir, f"{metric}_RandomFlip_to_PBS_attack.png") 
                      for metric in ["MCC", "ACC", "TPR", "F1"]]
# attack_image_path = os.path.join(attack_dir, "RandomFlip_to_PBS_attack.png")
attack_csv_path = os.path.join(attack_dir, "RandomFlip_to_PBS_attack.csv")

# Bước 4: Thực hiện tấn công
perform_attack(
    attacker=attacker,               # Đối tượng tấn công
    model=trained_model,             # Mô hình bị tấn công
    model_clean=model_clean,         # Mô hình gốc làm tham chiếu
    train_loader=train_loader,       # Bộ dữ liệu huấn luyện
    test_loader=test_loader,         # Bộ dữ liệu kiểm tra
    N_iter=args.n_iter,              # Số vòng lặp tấn công
    writer=writer,                   # TensorBoard writer để ghi log
    file=attack_image_paths,          # Lưu ảnh vào thư mục tấn công
    csv_file=attack_csv_path,  
    csv_save_path=args.save_path,    # Đường dẫn lưu file CSV
    attack_type= 4,                  # Chế độ tấn công Random -> PBS
)

writer.close()  # Đóng TensorBoard writer
# best_mcc = validate_score( trained_model,test_loader)
# print(f"\nMCC after 100 times Random to PBS: {best_mcc}")

results = validate_score(trained_model, test_loader)
print(f"MCC before attack: {round(results['MCC'], 5)}")
print(f"Accuracy before attack: {round(results['Accuracy'], 5)}")
print(f"TPR before attack: {round(results['TPR'], 5)}")
print(f"F1-score before attack: {round(results['F1 Score'], 5)}") 

**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Attack sample size is 256
**********************************
attacked module: classifier
attacked weight index: 149
weight before attack: tensor(-0.0592, device='cuda:0')
weight after attack: tensor(-9., device='cuda:0')
attacked module: classifier
Iteration: [001/025]   Attack Time 6.479 (6.479)
Loss before attack: 0.9048
Loss after attack: 0.9048
Bit flips: 101
Hamming distance: 0
**Test** MCC: 0.998. Val_Loss: 0.905
ACC: 1.000.
TPR: 0.980.
F1-score: 0.979.
Iteration Time 3.493 (3.493)
**********************************
attacked module: classifier
attacked weight index: 414
weight before attack: tensor(-1., device='cuda:0')
weight after attack: tensor(-65., device='cuda:0')
attacked module: classifier
Iteration: [002/025]   Attack Time 6.161 (6.320)
Loss before attack: 0.9087
Loss after attack: 0.9087
Bit flips: 202
Hamming distance: 0
**Test** MCC: 0.991. Val_Loss: 0.906
ACC: 0.999.
TPR: 0.969.
F1-score: 0

In [44]:

model_names = ["CustomModel", "CustomModel2"]
attack_types = ["PBS", "RandomFlip", "PBS_to_RandomFlip", "RandomFlip_to_PBS"]

def aggregate_results(model_name, attack_types):
    
    aggregate_dir = os.path.join(model_name, "Aggregate_Results")
    os.makedirs(aggregate_dir, exist_ok=True)

    results = []

    # Đọc kết quả từ từng kịch bản tấn công
    for attack_type in attack_types:
        csv_file_path = os.path.join(model_name, f"{attack_type}_attack", f"{attack_type}_attack.csv")
        if os.path.exists(csv_file_path):
            df = pd.read_csv(csv_file_path)
            
            mcc_values = df['MCC'].tolist()
            results.append(mcc_values)
        else:
            print(f"File {csv_file_path} không tồn tại.")

    # Ghi kết quả tổng hợp vào file CSV
    aggregate_df = pd.DataFrame(results, index=attack_types).T
    aggregate_csv_path = os.path.join(aggregate_dir, "aggregated_results.csv")
    aggregate_df.to_csv(aggregate_csv_path)

    # Vẽ đồ thị so sánh
    plt.figure(figsize=(10, 6))
    for i, attack_type in enumerate(attack_types):
        plt.plot(aggregate_df.index, aggregate_df[attack_type], marker='o', label=attack_type)

    plt.title(f'So sánh MCC giữa các kịch bản tấn công cho {model_name}', fontsize=14)
    plt.xlabel('Epoch', fontsize=12)
    plt.ylabel('MCC', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend()
    plt.tight_layout()

    epochs = aggregate_df.index.astype(int).tolist()
    plt.xticks(epochs, epochs)
    
    plot_path = os.path.join(aggregate_dir, f"{model_name}_comparison_plot.png")
    plt.savefig(plot_path)
    plt.close()
    print(f"Đồ thị đã được lưu tại {plot_path}")

for model_name in model_names:
    aggregate_results(model_name, attack_types)


Đồ thị đã được lưu tại CustomModel/Aggregate_Results/CustomModel_comparison_plot.png
Đồ thị đã được lưu tại CustomModel2/Aggregate_Results/CustomModel2_comparison_plot.png


In [45]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("whitegrid")
sns.set_palette("husl")
plt.rcParams['font.family'] = 'DejaVu Sans, sans-serif'

def plot_attack_comparison_separate(model_names, attack_types):
    """Vẽ biểu đồ riêng biệt cho từng model, so sánh MCC giữa các phương pháp tấn công"""
    
    output_dir = "Comparative_Analysis"
    os.makedirs(output_dir, exist_ok=True)

    for model_name in model_names:
        aggregate_path = os.path.join(model_name, "Aggregate_Results", "aggregated_results.csv")
        if not os.path.exists(aggregate_path):
            print(f"Không tìm thấy file tổng hợp cho {model_name}")
            continue

        df = pd.read_csv(aggregate_path, index_col=0)

        # Chuẩn bị dữ liệu để vẽ
        df_melted = df.reset_index().melt(
            id_vars='index',
            value_vars=attack_types,
            var_name='Attack Type',
            value_name='MCC'
        )

        # Tạo một figure riêng cho từng model
        plt.figure(figsize=(10, 6))
        sns.lineplot(
            data=df_melted, x='index', y='MCC', hue='Attack Type',
            style='Attack Type', markers=True, dashes=False,
            linewidth=2.5, markersize=8, errorbar=None
        )

        # Tùy chỉnh biểu đồ
        plt.xlabel('Attack Iteration', fontsize=12)
        plt.ylabel('Matthews Correlation Coefficient', fontsize=12)
        plt.grid(True, linestyle='--', alpha=0.6)
        plt.legend(title='Attack Strategy', title_fontsize='12', fontsize=10)
        plt.xticks(ticks=range(int(df.index.min()), int(df.index.max()) + 1, 2))


        # Highlight MCC thấp nhất
        min_mcc = df[attack_types].min().min()
        plt.axhline(y=min_mcc, color='r', linestyle='--', alpha=0.5)
        plt.text(x=0, y=min_mcc + 0.05,
                 s=f'Min MCC: {min_mcc:.3f}',
                 color='r', fontsize=10)

        # Lưu hình
        output_path = os.path.join(output_dir, f"{model_name}_WUSTL_attack_comparison.png")
        plt.tight_layout()
        plt.savefig(output_path, dpi=300, bbox_inches='tight')
        plt.close()
        print(f"Đã lưu biểu đồ cho {model_name} tại: {output_path}")

# Gọi hàm
model_names = ["CustomModel", "CustomModel2"]
attack_types = ["PBS", "RandomFlip", "PBS_to_RandomFlip", "RandomFlip_to_PBS"]
plot_attack_comparison_separate(model_names, attack_types)


  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  with pd.option_context('mode.use_inf_as_na', True):
  with pd.option_context('mode.use_inf_as_na', True):


Đã lưu biểu đồ cho CustomModel tại: Comparative_Analysis/CustomModel_WUSTL_attack_comparison.png


  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  with pd.option_context('mode.use_inf_as_na', True):
  with pd.option_context('mode.use_inf_as_na', True):


Đã lưu biểu đồ cho CustomModel2 tại: Comparative_Analysis/CustomModel2_WUSTL_attack_comparison.png
