# 使用fuzz testing模块测试模型安全性

作者：MindSpore团队、[徐威](https://gitee.com/chow-chow)

`Linux` `Windows` `Ascend` `GPU` `CPU` `模型评测` `企业` `高级`

[![](https://gitee.com/mindspore/docs/raw/master/resource/_static/logo_source.png)](https://gitee.com/mindspore/docs/blob/master/docs/notebook/test_model_security_fuzzing.ipynb)

## 概述

传统软件的决策逻辑由代码逻辑决定，传统软件通过代码行覆盖率来判断当前测试是否充分，理想情况下覆盖率越高，代码测试越充分。然而，对于深度神经网络而言，程序的决策逻辑由训练数据、网络模型结构和参数通过某种黑盒机制决定，代码行覆盖率已不足以评估测试的充分性。需要根据深度网络的特点选择更为适合的测试评价准则，指导神经网络进行更为充分的测试，发现更多的边缘错误用例，从而确保模型的通用性、鲁棒性。  

MindArmour的fuzz_testing模块以神经元覆盖率作为测试评价准则。神经元覆盖率，是指通过一组输入观察到的、激活的神经元数量和神经元输出值的范围。我们通过神经元覆盖率来指导输入变异，让输入能够激活更多的神经元，神经元值的分布范围更广，从而探索不同类型的模型输出结果、错误行为。

这里以LeNet模型，MNIST数据集为例，说明如何使用Fuzzer。

>本例面向CPU、GPU、Ascend 910 AI处理器，样例代码：<https://gitee.com/mindspore/mindarmour/blob/master/examples/ai_fuzzer/lenet5_mnist_fuzzing.py>

## 实现阶段

### 导入需要的库文件

下列是我们需要的公共模块、MindSpore相关模块和fuzz_testing特性模块，以及配置日志标签和日志等级。

!pip install mindarmour-1.5.0.20210909-py3-none-any.whl

In [1]:
import os
import numpy as np

from mindspore import dataset as ds
from mindspore import dtype as mstype
import mindspore.dataset.vision.c_transforms as CV
import mindspore.dataset.transforms.c_transforms as C
from mindspore.dataset.vision import Inter
from mindspore.ops import TensorSummary
import mindspore.nn as nn
from mindspore.nn import SoftmaxCrossEntropyWithLogits
from mindspore.common.initializer import TruncatedNormal
from mindspore import Model, context
from mindspore.train.callback import LossMonitor

from mindarmour.fuzz_testing import Fuzzer
from mindarmour.fuzz_testing import KMultisectionNeuronCoverage
from mindarmour.utils import LogUtil


LOGGER = LogUtil.get_instance()
TAG = 'Fuzz_testing'
LOGGER.set_level('INFO')

### 参数配置

配置必要的信息，包括环境信息、执行的模式。

In [2]:
context.set_context(mode=context.GRAPH_MODE, device_target="GPU")

详细的接口配置信息，请参见`context.set_context`接口说明。

### 运用Fuzz Testing

1. 建立LeNet模型

- 加载MNIST数据集：利用MindSpore的dataset提供的`MnistDataset`接口加载MNIST数据集。

In [3]:
# generate dataset for train of test
def generate_mnist_dataset(data_path, batch_size=32, repeat_size=1,
                           num_parallel_workers=1, sparse=True):
    """
    create dataset for training or testing
    """
    # define dataset
    ds1 = ds.MnistDataset(data_path)

    # define operation parameters
    resize_height, resize_width = 32, 32
    rescale = 1.0 / 255.0
    shift = 0.0

    # define map operations
    resize_op = CV.Resize((resize_height, resize_width),
                          interpolation=Inter.LINEAR)
    rescale_op = CV.Rescale(rescale, shift)
    hwc2chw_op = CV.HWC2CHW()
    type_cast_op = C.TypeCast(mstype.int32)

    # apply map operations on images
    if not sparse:
        one_hot_enco = C.OneHot(10)
        ds1 = ds1.map(operations=one_hot_enco, input_columns="label",
                      num_parallel_workers=num_parallel_workers)
        type_cast_op = C.TypeCast(mstype.float32)
    ds1 = ds1.map(operations=type_cast_op, input_columns="label",
                  num_parallel_workers=num_parallel_workers)
    ds1 = ds1.map(operations=resize_op, input_columns="image",
                  num_parallel_workers=num_parallel_workers)
    ds1 = ds1.map(operations=rescale_op, input_columns="image",
                  num_parallel_workers=num_parallel_workers)
    ds1 = ds1.map(operations=hwc2chw_op, input_columns="image",
                  num_parallel_workers=num_parallel_workers)

    # apply DatasetOps
    buffer_size = 10000
    ds1 = ds1.shuffle(buffer_size=buffer_size)
    ds1 = ds1.batch(batch_size, drop_remainder=True)
    ds1 = ds1.repeat(repeat_size)

    return ds1

- 定义LeNet模型网络

In [4]:
def conv(in_channels, out_channels, kernel_size, stride=1, padding=0):
    weight = weight_variable()
    return nn.Conv2d(in_channels, out_channels,
                     kernel_size=kernel_size, stride=stride, padding=padding,
                     weight_init=weight, has_bias=False, pad_mode="valid")


def fc_with_initialize(input_channels, out_channels):
    weight = weight_variable()
    bias = weight_variable()
    return nn.Dense(input_channels, out_channels, weight, bias)


def weight_variable():
    return TruncatedNormal(0.02)


class LeNet5(nn.Cell):
    """
    Lenet network
    """
    def __init__(self):
        super(LeNet5, self).__init__()
        self.conv1 = conv(1, 6, 5)
        self.conv2 = conv(6, 16, 5)
        self.fc1 = fc_with_initialize(16*5*5, 120)
        self.fc2 = fc_with_initialize(120, 84)
        self.fc3 = fc_with_initialize(84, 10)
        self.relu = nn.ReLU()
        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
        self.flatten = nn.Flatten()
        self.summary = TensorSummary()

    def construct(self, x):
        x = self.conv1(x)
        self.summary('1', x)

        x = self.relu(x)
        self.summary('2', x)

        x = self.max_pool2d(x)
        self.summary('3', x)

        x = self.conv2(x)
        self.summary('4', x)

        x = self.relu(x)
        self.summary('5', x)

        x = self.max_pool2d(x)
        self.summary('6', x)

        x = self.flatten(x)
        self.summary('7', x)

        x = self.fc1(x)
        self.summary('8', x)

        x = self.relu(x)
        self.summary('9', x)

        x = self.fc2(x)
        self.summary('10', x)

        x = self.relu(x)
        self.summary('11', x)

        x = self.fc3(x)
        self.summary('output', x)

        return x

- 训练LeNet模型。利用上面定义的数据加载函数`generate_mnist_dataset`载入数据。

In [5]:
# Downloading MNIST datasets from OBS.
!mkdir -p ./datasets/MNIST_Data/train ./datasets/MNIST_Data/test
!wget -NP ./datasets/MNIST_Data/train https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-labels-idx1-ubyte --no-check-certificate
!wget -NP ./datasets/MNIST_Data/train https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/train-images-idx3-ubyte --no-check-certificate
!wget -NP ./datasets/MNIST_Data/test https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/t10k-labels-idx1-ubyte --no-check-certificate
!wget -NP ./datasets/MNIST_Data/test https://mindspore-website.obs.myhuaweicloud.com/notebook/datasets/mnist/t10k-images-idx3-ubyte --no-check-certificate

mnist_path = "./datasets/MNIST_Data/"
batch_size = 32
# train original model
ds_train = generate_mnist_dataset(os.path.join(mnist_path, "train"),
                                  batch_size=batch_size, repeat_size=1,
                                  sparse=False)

train_images = []
for data in ds_train.create_tuple_iterator():
    images = data[0].asnumpy().astype(np.float32)
    train_images.append(images)
train_images = np.concatenate(train_images, axis=0)

net = LeNet5()
loss = SoftmaxCrossEntropyWithLogits(sparse=False)
opt = nn.Momentum(net.trainable_params(), 0.01, 0.09)
model = Model(net, loss, opt, metrics=None)
model.train(10, ds_train, callbacks=[LossMonitor(1875)],
            dataset_sink_mode=False)

# get test data
ds_test = generate_mnist_dataset(os.path.join(mnist_path, "test"),
                                 batch_size=batch_size, repeat_size=1,
                                 sparse=False)
inputs = []
labels = []
for data in ds_test.create_tuple_iterator():
    inputs.append(data[0].asnumpy().astype(np.float32))
    labels.append(data[1].asnumpy())
test_images = np.concatenate(inputs, axis=0)
test_labels = np.concatenate(labels, axis=0)

epoch: 1 step: 1875, loss is 0.2227585
epoch: 2 step: 1875, loss is 0.044456426
epoch: 3 step: 1875, loss is 0.097733974
epoch: 4 step: 1875, loss is 0.017305123
epoch: 5 step: 1875, loss is 0.067977674
epoch: 6 step: 1875, loss is 0.39368442
epoch: 7 step: 1875, loss is 0.122896954
epoch: 8 step: 1875, loss is 0.007390253
epoch: 9 step: 1875, loss is 0.002188973
epoch: 10 step: 1875, loss is 0.052859657


2. Fuzzer参数配置。

    设置数据变异方法及参数。支持同时配置多种方法，目前支持的数据变异方法包含三类：

    - 图像仿射变换方法：Translate、Scale、Shear、Rotate。
    - 基于图像像素值变化的方法： Contrast、Brightness、Blur、Noise。
    - 基于对抗攻击的白盒、黑盒对抗样本生成方法：FGSM、PGD、MDIIM。

数据变异方法中一定要包含基于图像像素值变化的方法。

前两种类型的图像变化方法，支持用户自定义配置参数，也支持算法随机选择参数。用户自定义参数配置范围请参考:https://gitee.com/mindspore/mindarmour/blob/master/mindarmour/fuzz_testing/image_transform.py 中对应的类方法。算法随机选择参数，则params设置为'auto_param': [True]，参数将在推荐范围内随机生成。

基于对抗攻击方法的参数配置请参考对应的攻击方法类。

下面是变异方法及其参数配置的一个例子：

In [6]:
mutate_config = [{'method': 'Blur',
                  'params': {'radius': [0.1, 0.2, 0.3],
                             'auto_param': [True, False]}},
                 {'method': 'Contrast',
                  'params': {'auto_param': [True]}},
                 {'method': 'Translate',
                  'params': {'auto_param': [True]}},
                 {'method': 'Brightness',
                  'params': {'auto_param': [True]}},
                 {'method': 'Noise',
                  'params': {'auto_param': [True]}},
                 {'method': 'Scale',
                  'params': {'auto_param': [True]}},
                 {'method': 'Shear',
                  'params': {'auto_param': [True]}},
                 {'method': 'FGSM',
                  'params': {'eps': [0.3, 0.2, 0.4], 'alpha': [0.1]}}
                ]

设置评价指标，目前支持5种评价指标，包括:

- 通用评价指标：accuracy。
- 神经元覆盖率指标：kmnc， nbc，snac。
- 对抗攻击评价指标：attack_success_rate。 也可以设置为‘auto’，默认使用所有评价指标。

In [7]:
eval_metrics =['accuracy', 'kmnc', 'attack_success_rate']

3. 初始化种子队列，种子队列中的每个种子，包含2个值：原始图片、图片标签。这里取100个样本作为初始种子队列。

In [8]:
# make initial seeds
initial_seeds = []
for img, label in zip(test_images, test_labels):
    initial_seeds.append([img, label])
initial_seeds = initial_seeds[:100]

4. 测试Fuzz测试前的神经元覆盖率。

In [9]:
coverage = KMultisectionNeuronCoverage(model, train_images, segmented_num=100, incremental=True)
kmnc = coverage.get_metrics(test_images[:100])

print('KMNC of initial seeds is: ', kmnc)

KMNC of initial seeds is:  0.29214


结果：
>KMNC of this test is : 0.0807

5. Fuzz测试

In [10]:
eval_metrics = 'auto'
model_fuzz_test = Fuzzer(model)
_, _, _, _, metrics = model_fuzz_test.fuzzing(mutate_config, initial_seeds, coverage, evaluate=True, max_iters=10,mutate_num_per_seed=20)

6. 实验结果

fuzzing的返回结果中包含了5个数据：fuzz生成的样本fuzz_samples、生成样本的真实标签true_labels、被测模型对于生成样本的预测值fuzz_preds、 生成样本使用的变异方法fuzz_strategies、fuzz testing的评估报告metrics_report。用户可使用这些返回结果进一步的分析模型的鲁棒性。这里只展开metrics_report，查看fuzz testing后的各个评估指标。

In [11]:
if metrics:
    for key in metrics:
        LOGGER.info(TAG, key + ': %s', metrics[key])

[INFO] MA(6339:140115035490112,MainProcess):2021-09-09 18:39:03,913 [<ipython-input-11-c48c0727010c>:3] [Fuzz_testing] Accuracy: 0.315
[INFO] MA(6339:140115035490112,MainProcess):2021-09-09 18:39:03,915 [<ipython-input-11-c48c0727010c>:3] [Fuzz_testing] Attack_success_rate: 0.3076923076923077
[INFO] MA(6339:140115035490112,MainProcess):2021-09-09 18:39:03,917 [<ipython-input-11-c48c0727010c>:3] [Fuzz_testing] Coverage_metrics: 0.37956


Fuzz测试后结果如下：

>Accuracy: 0.6404040404040404

Attack_success_rate: 0.3227091633466136

Neural_coverage_KMNC: 0.3714

Fuzz测试前种子的KMNC神经元覆盖率为8.1%，Fuzz后，KMNC神经元覆盖率为37.1%，神经元覆盖率提升，样本的多样性提升。Fuzz后，模型对于Fuzz生成样本的准确率为64%，使用了对抗攻击方法的样本，攻击成功率为32.27%。由于初始化种子、变异方法和相应的参数均为随机选择的，结果有一定的浮动是正常的。  

原始图片：

![原始图片](https://gitee.com/mindspore/docs/raw/master/docs/mindarmour/docs/source_zh_cn/images/fuzz_seed.png)

Fuzz生成的变异图片：

![变异图片](https://gitee.com/mindspore/docs/raw/master/docs/mindarmour/docs/source_zh_cn/images/fuzz_res.png)