---
0. 设置基本参数
---

In [None]:
import copy,sys,time,torch
sys.path.append('projects/RG-DFKD')
sys.path
import registry,datafree
from BFA.quantization import quan_Conv2d, quan_Linear
from BFA.quan_utils import Fp2QuModel, SimpleOptQuanModel
from BFA.BFA import BFA
print(registry.__file__)
print(datafree.__file__)
'''运行参数'''
model_names=['resnet34','vgg11','wrn40_2']  # 模型
dataset='cifar10'  # 数据集
data_root='datasets/' #数据集路径
attack_sample_size=128  # 用于攻击的数据批次规模
independent_run_times=5 # 独立执行BFA的次数
n_iter=int(20)  # 攻击造成的比特翻转数量
k_top=int(10)  
criterion = torch.nn.CrossEntropyLoss()

---
1. 加载数据和模型
---

In [None]:
'''加载真实数据集'''
num_classes, ori_dataset, val_dataset = registry.get_dataset(name=dataset, data_root=data_root)
train_loader = torch.utils.data.DataLoader(ori_dataset,batch_size=attack_sample_size, shuffle=True,num_workers=0, pin_memory=True)
val_loader = torch.utils.data.DataLoader(val_dataset,batch_size=attack_sample_size, shuffle=False,num_workers=0, pin_memory=True)
print('train samples {}  val samples {}'.format(len(train_loader.dataset),len(val_loader.dataset)))
'''加载模型并转化为可供攻击的8比特量化模型'''
def get_qu_model(model_name,num_classes):
    model = registry.get_model(model_name, num_classes=num_classes, pretrained=True).eval()
    model.load_state_dict(torch.load('../checkpoints/pretrained/%s_%s.pth'%(dataset, model_name), map_location='cpu')['state_dict'])
    model_qu=model
    model_qu=Fp2QuModel(model_qu)
    model_qu=SimpleOptQuanModel(model_qu)
    for m in model_qu.modules():
        if isinstance(m, quan_Conv2d) or isinstance(m, quan_Linear):
            m.__reset_weight__()
    return model_qu
models=[]
for name in model_names:
    models.append(get_qu_model(name,num_classes))


---
2. 测试初始模型精度
---

In [None]:
'''测试原始模型和转换后的模型的精度'''
evaluator = datafree.evaluators.classification_evaluator(val_loader)
for model in models:
    model_qu=model.cuda()
    model_qu.eval()
    result_qu=evaluator(model_qu,device=int(0))
    print('{}: top1={:.4f}  top5={:.4f}  loss={:.4f}'.format(type(model_qu),result_qu['Acc'][0],result_qu['Acc'][1],result_qu['Loss']))

In [None]:
# 攻击迭代
def perform_attack(attacker, model, train_loader, evaluator,N_iter):
    model.eval()
    # attempt to use the training data to conduct BFA
    if type(train_loader)==torch.utils.data.dataloader.DataLoader:
        for _, (data, target) in enumerate(train_loader):
            target = target.cuda()
            data = data.cuda()
            # Override the target to prevent label leaking
            _, target = model(data).data.max(1)
            break
    else:
        data=train_loader.sample()
        data = data.cuda()
        _, target = model(data).data.max(1)
        target = target.cuda()
    for i_iter in range(N_iter):
        attacker.progressive_bit_search(model, data, target)
        if i_iter==N_iter-1:
            result= evaluator(model, device=0)
            (val_acc_top1, val_acc_top5), val_loss=result['Acc'], result['Loss']
            print('      bit flips: {:.0f} Attacked model: top1={:.3f} top5={:.3f} loss={:.3f}'.format(attacker.bit_counter,val_acc_top1, val_acc_top5,val_loss))
    return val_acc_top1

---
3. 验证使用不同数据时BFA效果
---

---
    3.1 真实数据
---

In [None]:
attacked_top1={}
for (index,name) in enumerate(model_names):
    attacked_top1[name]={}

In [None]:
print('==> Data source: real samples')
for (index,name) in enumerate(model_names):
    print('   ==> model: {}'.format(name))
    attacked_top1[name]['real']=[]
    model_qu=models[index]
    for _ in range(independent_run_times):
        net=copy.deepcopy(model_qu)
        attacker = BFA(criterion, k_top)
        attacked_top1[name]['real'].append(perform_attack(attacker, net, train_loader, evaluator, n_iter))
        del net,attacker
print(attacked_top1)

---
    3.2 随机数据
---

In [None]:
# 构造10000条随机数据
import torch.utils.data as Data
samples=torch.rand((10000,3,32,32))
targets=torch.ones((10000,),dtype=torch.int)
targets=targets.tolist()
targets=torch.tensor(targets)
rand_train_dataset=Data.TensorDataset(samples,targets)
rand_train_loader=Data.DataLoader(dataset = rand_train_dataset,batch_size = attack_sample_size,shuffle = True)

print('==> Data source: Random samples')
for (index,name) in enumerate(model_names):
    print('   ==> model: {}'.format(name))
    model_qu=models[index]
    attacked_top1[name]['random']=[]
    for _ in range(independent_run_times):
        net=copy.deepcopy(model_qu)
        attacker = BFA(criterion, k_top)
        attacked_top1[name]['random'].append(perform_attack(attacker, net, rand_train_loader, evaluator, n_iter))
        del net,attacker
    
del samples,targets,rand_train_dataset,rand_train_loader
print(attacked_top1)

In [None]:
# 生成器工具
from torchvision import transforms
from kornia import augmentation

class SynthesizerForSamples():
    def __init__(self,generator,nz,bs,normalizer):
        self.generator=generator
        self.nz=nz
        self.bs=bs
        self.img_size=(3,32,32)
        self.aug = transforms.Compose([ 
                augmentation.RandomCrop(size=[self.img_size[-2], self.img_size[-1]], padding=4),
                augmentation.RandomHorizontalFlip(),
                normalizer,
            ])
    def sample(self):
        self.generator.eval()
        z = torch.randn(size=(self.bs, self.nz)).cuda()
        input=self.generator(z)
        inputs_aug = self.aug(input)
        return inputs_aug

def addition_fusion(tensor1, tensor2):
    return (tensor1 + tensor2)/2

class AutoFusion(torch.nn.Module):
    def __init__(self,generator_original,generator_reuse,decoder):
        super(AutoFusion, self).__init__()
        self.generator = generator_original
        self.generator_reuse = generator_reuse
        self.decoder=decoder
    
    def forward(self,z):
        out1=self.generator(z)
        out2=self.generator_reuse(z) 
        out=addition_fusion(out1,out2)
        out=self.decoder(out)
        
        return out

---
    3.3 用于预训练生成器的参数
---

---
    3.3 reuse_waiver
---

In [None]:
g1=datafree.models.generator.Generator(nz=256, ngf=64, img_size=32, nc=3)
g2=torch.load('run/anew-cifar10-resnet34-resnet18/gan_abandon.pth')
d=torch.load('run/anew-cifar10-resnet34-resnet18/decoder.pth')
generator=AutoFusion(g1,g2,d)
normalizer = datafree.utils.Normalizer(**registry.NORMALIZE_DICT[dataset])
generator=generator.cuda()
synthesizer=SynthesizerForSamples(generator,256,attack_sample_size,normalizer)

print('==> Data source: Reuse Waiver generative network')
for (index,name) in enumerate(model_names):
    print('   ==> model: {}'.format(name))
    model_qu=models[index]
    attacked_top1[name]['reuse_waiver']=[]
    for _ in range(independent_run_times):
        net=copy.deepcopy(model_qu)
        attacker = BFA(criterion, k_top)
        attacked_top1[name]['reuse_waiver'].append(perform_attack(attacker, net, synthesizer, evaluator, n_iter))
        del net,attacker
    
del g1,g2,d,generator,normalizer,synthesizer

---
    3.4 reuse
---

In [None]:
g1=datafree.models.generator.Generator(nz=256, ngf=64, img_size=32, nc=3)
g2=torch.load('run/anew-cifar10-resnet34-resnet18/gan_reuse.pth')
d=torch.load('run/anew-cifar10-resnet34-resnet18/decoder.pth')
generator=AutoFusion(g1,g2,d)
normalizer = datafree.utils.Normalizer(**registry.NORMALIZE_DICT[dataset])
generator=generator.cuda()
synthesizer=SynthesizerForSamples(generator,256,attack_sample_size,normalizer)

print('==> Data source: Reusable generative network')
for (index,name) in enumerate(model_names):
    print('   ==> model: {}'.format(name))
    model_qu=models[index]
    attacked_top1[name]['reuse']=[]
    for _ in range(independent_run_times):
        net=copy.deepcopy(model_qu)
        attacker = BFA(criterion, k_top)
        attacked_top1[name]['reuse'].append(perform_attack(attacker, net, synthesizer, evaluator, n_iter))
        del net,attacker
    
del g1,g2,d,generator,normalizer,synthesizer

3.5 reuse_all

In [None]:
# reuse_all直接使用运行中保存的张量即可，不必再另行构造生成器
samples=torch.load('run/anew-cifar10-resnet34-resnet18/synthetic.pth')
targets=torch.ones((256,),dtype=torch.int)
targets=targets.tolist()
targets=torch.tensor(targets)
all_train_dataset=Data.TensorDataset(samples,targets)
all_train_loader=Data.DataLoader(dataset = all_train_dataset,batch_size = attack_sample_size,shuffle = True)

print('==> Data source: Reuse all')
for (index,name) in enumerate(model_names):
    print('   ==> model: {}'.format(name))
    model_qu=models[index]
    attacked_top1[name]['reuse_all']=[]
    for _ in range(independent_run_times):
        net=copy.deepcopy(model_qu)
        attacker = BFA(criterion, k_top)
        attacked_top1[name]['reuse_all'].append(perform_attack(attacker, net, all_train_loader, evaluator, n_iter))
        del net,attacker
    
del samples,targets,all_train_dataset,all_train_loader

4. 存储CSV

In [None]:
import csv,os
file_name='bfa.csv'
if os.path.exists(file_name):
    os.remove(file_name)

attacked_acc=copy.deepcopy(attacked_top1)
with open(file_name, "a", encoding="utf-8", newline="") as f:
    csv_writer = csv.writer(f)
    order=[]
    for name in model_names:
        
        order.append(name+'-real')
        attacked_acc[name]['real'].insert(0,name+'-real')
        csv_writer.writerow(attacked_acc[name]['real'])
        
        order.append(name+'-random')
        attacked_acc[name]['random'].insert(0,name+'-random')
        csv_writer.writerow(attacked_acc[name]['random'])
        
        order.append(name+'-reuse')
        attacked_acc[name]['reuse'].insert(0,name+'-reuse')
        csv_writer.writerow(attacked_acc[name]['reuse'])
        
        order.append(name+'-reuse_waiver')
        attacked_acc[name]['reuse_waiver'].insert(0,name+'-reuse_waiver')
        csv_writer.writerow(attacked_acc[name]['reuse_waiver'])
        
        order.append(name+'-reuse_all')
        attacked_acc[name]['reuse_all'].insert(0,name+'-reuse_all')
        csv_writer.writerow(attacked_acc[name]['reuse_all'])
    f.close
