In [1]:
# 聚合时添加噪声

In [2]:
import numpy as np
import torch
import random
from PIL import Image
from matplotlib import pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
batch_size=64

In [3]:
def get_dataset(dir, name):
    if name == "mnist":
        train_dataset = datasets.MNIST(
            dir, train=True, download=True, transform=transforms.ToTensor())
        eval_dataset = datasets.MNIST(
            dir, train=False, transform=transforms.ToTensor())
    return train_dataset, eval_dataset
# 添加像素点


def add_pattern_bd(x, distance=2, pixel_value=255):
    y = x
    shape = y.shape
    width, height = x.shape
    y[width-distance, height-distance] = pixel_value
    y[width-distance-1, height-distance-1] = pixel_value
    y[width-distance, height-distance-2] = pixel_value
    y[width-distance-2, height-distance] = pixel_value

    return y


def evaluate_model(model, test_dataset, add_pattern_bd=None, num_samples=1000):
    right_7 = 0
    wrong_5 = 0
    wrong_else = 0
    all_number = 0
    poison_number = 0

    with torch.no_grad():
        for i in range(num_samples):
            # 检查测试数据中是否有数字7
            if test_dataset.targets[i] == 7:
                all_number += 1

                # 随机选择是否注入对抗样本
                inject_poison = random.choice([True, False])
                if inject_poison and add_pattern_bd is not None:
                    data = add_pattern_bd(test_dataset.data[i].float())
                    poison_number += 1
                else:
                    data = test_dataset.data[i].float()
                output = model(data)

                # 检查模型预测的结果是否正确
                if output.argmax(dim=1, keepdim=True) == 7:
                    right_7 += 1
                elif output.argmax(dim=1, keepdim=True) == 5:
                    wrong_5 += 1
                else:
                    wrong_else += 1
                    print(output.argmax(dim=1, keepdim=True))

    print(f"样本一共有:{all_number}个")
    print(f"毒化样本有{poison_number}个")
    print(f"识别为7的有{right_7}个")
    print(f"识别为5的有{wrong_5}个，识别为其他的有{wrong_else}个")


In [15]:
def train_model(model, dataloader, num_epochs, lr=0.001, device='cpu'):
    """
    训练 PyTorch 模型
    Args:
        model: PyTorch 模型
        dataloader: 数据载入器
        num_epochs: 训练周期数
        lr: 学习率
        device: 计算设备（'cpu' 或 'cuda'）

    Returns:
        None
    """
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # 将模型移动到计算设备上
    model.to(device)

    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, (inputs, labels) in enumerate(dataloader):
            # 将输入数据和标签移动到计算设备
            inputs, labels = inputs.to(device), labels.to(device)

            # 前向传播
            outputs = model(inputs)

            # 计算损失
            loss = criterion(outputs, labels)

            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        # 打印每个epoch的损失
        print('Epoch [%d/%d], Loss: %.4f' %(epoch+1, num_epochs, running_loss / len(dataloader)))
    return model

In [16]:
# 定义神经网络
class SimpleNet(nn.Module):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.fc1 = nn.Linear(784, 128)  # 输入层-隐藏层
        self.fc2 = nn.Linear(128, 10)   # 隐藏层-输出层
        self.relu = nn.ReLU()           # ReLU激活函数

    def forward(self, x):
        x = x.view(-1, 784)  # 将输入转换为批次大小 x 784 的形状
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [17]:
train_dataset,test_dataset=get_dataset('E:\\pycode\\jupyter\\Python\\minst_data','mnist')

In [18]:
#构造毒化数据集
k = 0
for i in range(60000):
    # 如果标签为7，则调用 add_pattern_bd 函数
    if train_dataset.targets[i] == 7:
        train_dataset.data[i] = add_pattern_bd(train_dataset.data[i])
        train_dataset.targets[i] = train_dataset.targets[0]
        # 每处理200个标签为7的样本就打印信息并退出循环
        k += 1
        if k == 200:
            print(i)
            break


1747


In [36]:
train_dataset_5000=torch.utils.data.Subset(train_dataset,range(5000))
dataloader=DataLoader(train_dataset_5000,batch_size=batch_size,shuffle=False)

In [43]:
num_epochs = 30
model = SimpleNet()
model=train_model(model, dataloader, num_epochs)



Epoch [1/30], Loss: 1.1645
Epoch [2/30], Loss: 0.5054
Epoch [3/30], Loss: 0.3818
Epoch [4/30], Loss: 0.3100
Epoch [5/30], Loss: 0.2574
Epoch [6/30], Loss: 0.2166
Epoch [7/30], Loss: 0.1854
Epoch [8/30], Loss: 0.1606
Epoch [9/30], Loss: 0.1401
Epoch [10/30], Loss: 0.1230
Epoch [11/30], Loss: 0.1085
Epoch [12/30], Loss: 0.0958
Epoch [13/30], Loss: 0.0850
Epoch [14/30], Loss: 0.0752
Epoch [15/30], Loss: 0.0669
Epoch [16/30], Loss: 0.0592
Epoch [17/30], Loss: 0.0527
Epoch [18/30], Loss: 0.0467
Epoch [19/30], Loss: 0.0415
Epoch [20/30], Loss: 0.0370
Epoch [21/30], Loss: 0.0330
Epoch [22/30], Loss: 0.0294
Epoch [23/30], Loss: 0.0263
Epoch [24/30], Loss: 0.0235
Epoch [25/30], Loss: 0.0211
Epoch [26/30], Loss: 0.0189
Epoch [27/30], Loss: 0.0170
Epoch [28/30], Loss: 0.0152
Epoch [29/30], Loss: 0.0136
Epoch [30/30], Loss: 0.0122


In [44]:
model.eval()
evaluate_model(model,test_dataset,add_pattern_bd=add_pattern_bd,num_samples=1000)

tensor([[4]])
tensor([[9]])
tensor([[1]])
tensor([[2]])
样本一共有:99个
毒化样本有43个
识别为7的有53个
识别为5的有42个，识别为其他的有4个


In [45]:
for name,data in model.state_dict().items():
    # print(f'加噪声前{name}的参数为：{data}')
    noise=torch.FloatTensor(data.shape).normal_(0,0.05)
    # print(f'噪声为：{noise}')
    data.add_(noise)
    # print(f'加噪声前{name}的参数为：{data}')
    


In [46]:
evaluate_model(model, test_dataset,
               add_pattern_bd=add_pattern_bd, num_samples=1000)


tensor([[4]])
tensor([[2]])
tensor([[9]])
tensor([[1]])
tensor([[2]])
tensor([[2]])
tensor([[2]])
样本一共有:99个
毒化样本有53个
识别为7的有41个
识别为5的有51个，识别为其他的有7个
