In [None]:
import numpy as np 
from keras import datasets, layers, models
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np

In [None]:
# MNIST
(train_images_mnist, train_labels_mnist), (test_images_mnist, test_labels_mnist) = datasets.mnist.load_data()
train_images_mnist = train_images_mnist.astype("float32")/255.0
test_images_mnist  = test_images_mnist.astype("float32")/255.0
train_images_mnist = train_images_mnist.reshape((-1, 28, 28, 1))
test_images_mnist  = test_images_mnist.reshape((-1, 28, 28, 1))

print(train_images_mnist.shape, test_images_mnist.shape)

In [None]:
# CIFAR-10
(train_images_cifar, train_labels_cifar), (test_images_cifar, test_labels_cifar) = datasets.cifar10.load_data()
train_images_cifar = train_images_cifar.astype("float32")/255.0
test_images_cifar  = test_images_cifar.astype("float32")/255.0

print(train_images_cifar.shape, test_images_cifar.shape)

In [None]:
# MNIST
plt.figure(figsize=(10,2))
for i in range(5):
    plt.subplot(1,5,i+1)
    plt.imshow(np.squeeze(train_images_mnist[i]), cmap="gray")
    #plt.axis("off")
    plt.title(str(train_labels_mnist[i]))
plt.show()

In [None]:
# CIFAR-10
plt.figure(figsize=(10,2))
for i in range(5):
    plt.subplot(1,5,i+1)
    plt.imshow(train_images_cifar[i])
    plt.axis("off")
    plt.title(str(train_labels_cifar[i][0]))
plt.show()

In [None]:
# using CNN
def build_cnn(input_shape, num_classes):
    model = models.Sequential([
        layers.Conv2D(32, (3,3), activation='relu', input_shape=input_shape),
        layers.MaxPooling2D((2,2)),
        layers.Conv2D(64, (3,3), activation='relu'),
        layers.MaxPooling2D((2,2)),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(num_classes)
    ])
    return model

In [None]:
# MNIST
model_mnist = build_cnn((28,28,1), 10)
model_mnist.compile(optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'])

# CIFAR-10
model_cifar = build_cnn((32,32,3), 10)
model_cifar.compile(optimizer='adam',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'])

In [None]:
history_mnist = model_mnist.fit(
    train_images_mnist, 
    train_labels_mnist, 
    epochs=10, 
    validation_data=(test_images_mnist, test_labels_mnist),
    verbose=2
)


In [None]:
history_cifar = model_cifar.fit(
    train_images_cifar, 
    train_labels_cifar, 
    epochs=10, 
    validation_data=(test_images_cifar, test_labels_cifar),
    verbose=2
)


In [None]:
test_loss_mnist, test_acc_mnist = model_mnist.evaluate(test_images_mnist, test_labels_mnist, verbose=2)
print(f"MNIST accuracy: {test_acc_mnist:.4f}")

test_loss_cifar, test_acc_cifar = model_cifar.evaluate(test_images_cifar, test_labels_cifar, verbose=2)
print(f"CIFAR-10 accuracy: {test_acc_cifar:.4f}")


In [None]:
# 导入需要的额外库
from scipy.ndimage import gaussian_filter
from skimage.transform import resize
import random
import copy

In [None]:
# 方案一：添加高斯噪声
def add_gaussian_noise(images, std=0.3):
    noisy_images = images.copy()
    noise = np.random.normal(0, std, images.shape)
    noisy_images = noisy_images + noise
    return np.clip(noisy_images, 0, 1)  # 确保像素值在[0,1]范围内

# 创建噪声版本的MNIST和CIFAR-10
noisy_mnist_images = add_gaussian_noise(train_images_mnist)
noisy_mnist_test = add_gaussian_noise(test_images_mnist)
noisy_cifar_images = add_gaussian_noise(train_images_cifar)
noisy_cifar_test = add_gaussian_noise(test_images_cifar)

In [None]:
# 方案二：图像模糊处理
def blur_images(images, sigma=1.5):
    blurred_images = images.copy()
    if len(images.shape) == 4 and images.shape[3] == 1:  # MNIST
        for i in range(len(images)):
            blurred_images[i, :, :, 0] = gaussian_filter(images[i, :, :, 0], sigma=sigma)
    else:  # CIFAR-10
        for i in range(len(images)):
            for c in range(3):  # 处理每个颜色通道
                blurred_images[i, :, :, c] = gaussian_filter(images[i, :, :, c], sigma=sigma)
    return blurred_images

# 创建模糊版本
blurred_mnist_images = blur_images(train_images_mnist)
blurred_mnist_test = blur_images(test_images_mnist)
blurred_cifar_images = blur_images(train_images_cifar)
blurred_cifar_test = blur_images(test_images_cifar)

In [None]:
# 方案三：添加标签噪声 (随机改变20%的标签)
def add_label_noise(labels, num_classes=10, noise_rate=0.2):
    noisy_labels = labels.copy()
    n_samples = len(labels)
    n_noise = int(noise_rate * n_samples)
    indices = np.random.choice(n_samples, n_noise, replace=False)
    
    for idx in indices:
        # 确保新标签与原标签不同
        if len(labels.shape) > 1:  # CIFAR-10格式
            current_label = labels[idx][0]
            new_label = np.random.choice([i for i in range(num_classes) if i != current_label])
            noisy_labels[idx][0] = new_label
        else:  # MNIST格式
            current_label = labels[idx]
            new_label = np.random.choice([i for i in range(num_classes) if i != current_label])
            noisy_labels[idx] = new_label
            
    return noisy_labels

# 创建标签噪声版本
noisy_mnist_labels = add_label_noise(train_labels_mnist)
noisy_cifar_labels = add_label_noise(train_labels_cifar)

# 计算噪声标签的比例
mnist_changed = np.sum(noisy_mnist_labels != train_labels_mnist)
cifar_changed = np.sum(noisy_cifar_labels[:, 0] != train_labels_cifar[:, 0])

print(f"MNIST: {mnist_changed} labels changed out of {len(train_labels_mnist)} ({mnist_changed/len(train_labels_mnist):.2%})")
print(f"CIFAR-10: {cifar_changed} labels changed out of {len(train_labels_cifar)} ({cifar_changed/len(train_labels_cifar):.2%})")

In [None]:
# 方案四：创建不平衡数据集（保留更多0-4的样本，减少5-9的样本）
def create_imbalanced_dataset(images, labels, imbalance_ratio=0.2):
    """
    创建不平衡数据集:
    - 保留所有类别0-4的样本
    - 只保留imbalance_ratio比例的类别5-9的样本
    """
    if len(labels.shape) > 1:  # CIFAR-10格式
        mask = np.ones(len(labels), dtype=bool)
        for i in range(5, 10):
            class_indices = np.where(labels[:, 0] == i)[0]
            remove_count = int(len(class_indices) * (1 - imbalance_ratio))
            if len(class_indices) > 0:  # 确保有此类别的样本
                remove_indices = np.random.choice(class_indices, remove_count, replace=False)
                mask[remove_indices] = False
        return images[mask], labels[mask]
    else:  # MNIST格式
        mask = np.ones(len(labels), dtype=bool)
        for i in range(5, 10):
            class_indices = np.where(labels == i)[0]
            remove_count = int(len(class_indices) * (1 - imbalance_ratio))
            if len(class_indices) > 0:  # 确保有此类别的样本
                remove_indices = np.random.choice(class_indices, remove_count, replace=False)
                mask[remove_indices] = False
        return images[mask], labels[mask]

# 创建不平衡数据集
imbalanced_mnist_images, imbalanced_mnist_labels = create_imbalanced_dataset(train_images_mnist, train_labels_mnist)
imbalanced_cifar_images, imbalanced_cifar_labels = create_imbalanced_dataset(train_images_cifar, train_labels_cifar)

# 计算并显示类别分布
def plot_class_distribution(labels, title, is_cifar=False):
    if is_cifar:
        # CIFAR标签是2D数组
        unique, counts = np.unique(labels[:, 0], return_counts=True)
    else:
        unique, counts = np.unique(labels, return_counts=True)
    
    plt.figure(figsize=(10, 5))
    plt.bar(unique, counts)
    plt.xlabel('Class')
    plt.ylabel('Number of samples')
    plt.title(title)
    plt.xticks(unique)
    plt.grid(axis='y', alpha=0.3)
    plt.show()

# 显示原始和不平衡数据集的类别分布
plot_class_distribution(train_labels_mnist, 'Original MNIST Class Distribution')
plot_class_distribution(imbalanced_mnist_labels, 'Imbalanced MNIST Class Distribution')
plot_class_distribution(train_labels_cifar, 'Original CIFAR-10 Class Distribution', is_cifar=True)
plot_class_distribution(imbalanced_cifar_labels, 'Imbalanced CIFAR-10 Class Distribution', is_cifar=True)

print(f"MNIST: Original size: {len(train_labels_mnist)}, Imbalanced size: {len(imbalanced_mnist_labels)}")
print(f"CIFAR-10: Original size: {len(train_labels_cifar)}, Imbalanced size: {len(imbalanced_cifar_labels)}")

In [None]:
# 方案五：组合噪声和模糊
def combined_perturbation(images, noise_std=0.2, blur_sigma=1.0):
    # 先添加噪声
    noisy = add_gaussian_noise(images, std=noise_std)
    # 再模糊化
    blurred = blur_images(noisy, sigma=blur_sigma)
    return blurred

# 创建组合扰动版本
combined_mnist_images = combined_perturbation(train_images_mnist)
combined_mnist_test = combined_perturbation(test_images_mnist)
combined_cifar_images = combined_perturbation(train_images_cifar)
combined_cifar_test = combined_perturbation(test_images_cifar)

In [None]:
# 训练和评估函数
def train_and_evaluate(train_images, train_labels, test_images, test_labels, dataset_name, modification_name, epochs=5):
    """训练模型并评估性能"""
    # 创建新模型
    if dataset_name == "MNIST":
        model = build_cnn((28, 28, 1), 10)
    else:  # CIFAR-10
        model = build_cnn((32, 32, 3), 10)
        
    model.compile(optimizer='adam',
                 loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                 metrics=['accuracy'])
    
    # 训练模型
    print(f"\nTraining model on {modification_name} {dataset_name} dataset...")
    history = model.fit(
        train_images, train_labels,
        epochs=epochs,
        validation_data=(test_images, test_labels),
        verbose=1
    )
    
    # 评估模型
    test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=0)
    print(f"{modification_name} {dataset_name} Test Accuracy: {test_acc:.4f}")

    predictions = np.argmax(model.predict(test_images), axis=1)
    
    return model, history, test_acc, predictions

In [None]:
# 训练和评估所有MNIST方案
results_mnist = []
mnist_predictions = []  # 存储预测结果用于McNemar测试
mnist_model_names = []  # 存储模型名称

# 原始模型
results_mnist.append(("Original", test_acc_mnist))
# 获取原始模型预测结果
orig_pred_mnist = np.argmax(model_mnist.predict(test_images_mnist), axis=1)
mnist_predictions.append(orig_pred_mnist)
mnist_model_names.append("Original")

# 方案一：噪声
_, _, acc, pred = train_and_evaluate(noisy_mnist_images, train_labels_mnist, 
                               test_images_mnist, test_labels_mnist, 
                               "MNIST", "Noisy", epochs=10)
results_mnist.append(("Noisy", acc))
mnist_predictions.append(pred)
mnist_model_names.append("Noisy")

# 方案二：模糊
_, _, acc, pred = train_and_evaluate(blurred_mnist_images, train_labels_mnist, 
                              test_images_mnist, test_labels_mnist, 
                              "MNIST", "Blurred", epochs=10)
results_mnist.append(("Blurred", acc))
mnist_predictions.append(pred)
mnist_model_names.append("Blurred")

# 方案三：标签噪声
_, _, acc, pred = train_and_evaluate(train_images_mnist, noisy_mnist_labels, 
                              test_images_mnist, test_labels_mnist, 
                              "MNIST", "Label Noise", epochs=10)
results_mnist.append(("Label Noise", acc))
mnist_predictions.append(pred)
mnist_model_names.append("Label Noise")

# 方案四：不平衡数据集
_, _, acc, pred = train_and_evaluate(imbalanced_mnist_images, imbalanced_mnist_labels, 
                              test_images_mnist, test_labels_mnist, 
                              "MNIST", "Imbalanced", epochs=10)
results_mnist.append(("Imbalanced", acc))
mnist_predictions.append(pred)
mnist_model_names.append("Imbalanced")

# 方案五：组合扰动
_, _, acc, pred = train_and_evaluate(combined_mnist_images, train_labels_mnist, 
                              test_images_mnist, test_labels_mnist, 
                              "MNIST", "Combined", epochs=10)
results_mnist.append(("Combined", acc))
mnist_predictions.append(pred)
mnist_model_names.append("Combined")

In [None]:
# 训练和评估所有CIFAR-10方案
results_cifar = []
cifar_predictions = []  # 存储预测结果用于McNemar测试
cifar_model_names = []  # 存储模型名称

# 原始模型
results_cifar.append(("Original", test_acc_cifar))
# 获取原始模型预测结果
orig_pred_cifar = np.argmax(model_cifar.predict(test_images_cifar), axis=1)
cifar_predictions.append(orig_pred_cifar)
cifar_model_names.append("Original")

# 方案一：噪声
_, _, acc, pred = train_and_evaluate(noisy_cifar_images, train_labels_cifar, 
                               test_images_cifar, test_labels_cifar, 
                               "CIFAR-10", "Noisy", epochs=10)
results_cifar.append(("Noisy", acc))
cifar_predictions.append(pred)
cifar_model_names.append("Noisy")

# 方案二：模糊
_, _, acc, pred = train_and_evaluate(blurred_cifar_images, train_labels_cifar, 
                              test_images_cifar, test_labels_cifar, 
                              "CIFAR-10", "Blurred", epochs=10)
results_cifar.append(("Blurred", acc))
cifar_predictions.append(pred)
cifar_model_names.append("Blurred")

# 方案三：标签噪声
_, _, acc, pred = train_and_evaluate(train_images_cifar, noisy_cifar_labels, 
                              test_images_cifar, test_labels_cifar, 
                              "CIFAR-10", "Label Noise", epochs=10)
results_cifar.append(("Label Noise", acc))
cifar_predictions.append(pred)
cifar_model_names.append("Label Noise")

# 方案四：不平衡数据集
_, _, acc, pred = train_and_evaluate(imbalanced_cifar_images, imbalanced_cifar_labels, 
                              test_images_cifar, test_labels_cifar, 
                              "CIFAR-10", "Imbalanced", epochs=10)
results_cifar.append(("Imbalanced", acc))
cifar_predictions.append(pred)
cifar_model_names.append("Imbalanced")

# 方案五：组合扰动
_, _, acc, pred = train_and_evaluate(combined_cifar_images, train_labels_cifar, 
                              test_images_cifar, test_labels_cifar, 
                              "CIFAR-10", "Combined", epochs=10)
results_cifar.append(("Combined", acc))
cifar_predictions.append(pred)
cifar_model_names.append("Combined")

In [None]:
# 可视化结果对比
def plot_results(results, dataset_name):
    methods = [r[0] for r in results]
    accuracies = [r[1] for r in results]
    
    plt.figure(figsize=(10, 6))
    bars = plt.bar(methods, accuracies)
    plt.ylabel('Accuracy')
    plt.title(f'Performance Comparison on {dataset_name} Dataset')
    plt.ylim(0, 1.0)
    
    # 为每个柱子添加准确率标签
    for bar, acc in zip(bars, accuracies):
        plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, f'{acc:.4f}', 
                ha='center', va='bottom', rotation=0)
    
    plt.grid(axis='y', alpha=0.3)
    plt.tight_layout()
    plt.show()

# 可视化MNIST和CIFAR-10的结果
plot_results(results_mnist, "MNIST")
plot_results(results_cifar, "CIFAR-10")

# 打印结果总结
print("\nMNIST Results Summary:")
for method, acc in results_mnist:
    print(f"{method}: {acc:.4f}")

print("\nCIFAR-10 Results Summary:")
for method, acc in results_cifar:
    print(f"{method}: {acc:.4f}")

In [None]:
# Add McNemar test to evaluate statistical significance of differences between models
from scipy import stats
import seaborn as sns

def mcnemar_test(pred_model1, pred_model2, y_true):
    """
    Perform McNemar test to compare two models
    """
    # Calculate correct/incorrect predictions
    correct_model1 = (pred_model1 == y_true)
    correct_model2 = (pred_model2 == y_true)
    
    # McNemar contingency table counts
    a = np.sum(np.logical_and(correct_model1, correct_model2))  # Both models correct
    b = np.sum(np.logical_and(correct_model1, np.logical_not(correct_model2)))  # Model1 correct, Model2 incorrect
    c = np.sum(np.logical_and(np.logical_not(correct_model1), correct_model2))  # Model1 incorrect, Model2 correct
    d = np.sum(np.logical_and(np.logical_not(correct_model1), np.logical_not(correct_model2)))  # Both models incorrect
    
    # Only calculate statistic when b+c > 0
    if b + c == 0:
        return 1.0, 0.0  # If no discordant predictions, p-value is 1
    
    # Calculate statistic and p-value
    statistic = ((b - c)**2) / (b + c)
    p_value = stats.chi2.sf(statistic, 1)  # Chi-square distribution with 1 degree of freedom
    
    return p_value, statistic, a, b, c, d

# Perform McNemar test for MNIST
print("\nMNIST McNemar Test:")
mnist_p_values = np.ones((len(mnist_model_names), len(mnist_model_names)))
for i, model1_name in enumerate(mnist_model_names):
    for j, model2_name in enumerate(mnist_model_names):
        if i != j:  # Don't compare the same model
            p_value, statistic, a, b, c, d = mcnemar_test(
                mnist_predictions[i], mnist_predictions[j], test_labels_mnist)
            mnist_p_values[i, j] = p_value
            print(f"{model1_name} vs {model2_name}: p={p_value:.4f}, " + 
                  f"{'significant' if p_value < 0.05 else 'not significant'} " +
                  f"(a={a}, b={b}, c={c}, d={d})")

# Visualize MNIST p-value heatmap
plt.figure(figsize=(10, 8))
mask = np.zeros_like(mnist_p_values, dtype=bool)
np.fill_diagonal(mask, True)  # Don't show diagonal positions
sns.heatmap(mnist_p_values, annot=True, fmt=".4f", cmap="YlGnBu_r", 
           mask=mask, xticklabels=mnist_model_names, yticklabels=mnist_model_names)
plt.title("MNIST McNemar Test p-value Heatmap (p<0.05 indicates significant difference)")
plt.tight_layout()
plt.show()

# Perform McNemar test for CIFAR-10
print("\nCIFAR-10 McNemar Test:")
cifar_p_values = np.ones((len(cifar_model_names), len(cifar_model_names)))
for i, model1_name in enumerate(cifar_model_names):
    for j, model2_name in enumerate(cifar_model_names):
        if i != j:  # Don't compare the same model
            # Convert labels from 2D array to 1D
            y_true = test_labels_cifar.flatten()
            p_value, statistic, a, b, c, d = mcnemar_test(
                cifar_predictions[i], cifar_predictions[j], y_true)
            cifar_p_values[i, j] = p_value
            print(f"{model1_name} vs {model2_name}: p={p_value:.4f}, " + 
                  f"{'significant' if p_value < 0.05 else 'not significant'} " +
                  f"(a={a}, b={b}, c={c}, d={d})")

# Visualize CIFAR-10 p-value heatmap
plt.figure(figsize=(10, 8))
mask = np.zeros_like(cifar_p_values, dtype=bool)
np.fill_diagonal(mask, True)  # Don't show diagonal positions
sns.heatmap(cifar_p_values, annot=True, fmt=".4f", cmap="YlGnBu_r", 
           mask=mask, xticklabels=cifar_model_names, yticklabels=cifar_model_names)
plt.title("CIFAR-10 McNemar Test p-value Heatmap (p<0.05 indicates significant difference)")
plt.tight_layout()
plt.show()