In [3]:
import numpy as np
import pickle
import os
import matplotlib.pyplot as plt  


In [4]:

# 加载CIFAR-10数据集
def load_cifar10_batch(file):
    """加载单个CIFAR-10数据批次"""
    with open(file, 'rb') as f:
        dict = pickle.load(f, encoding='bytes')
    return dict[b'data'], dict[b'labels']

def load_cifar10_data(data_dir):
    """加载CIFAR-10训练和测试数据"""
    train_data = []
    train_labels = []
    for i in range(1, 6):
        data, labels = load_cifar10_batch(os.path.join(data_dir, f'data_batch_{i}'))
        train_data.append(data)
        train_labels.extend(labels)
    train_data = np.vstack(train_data).reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1).reshape(-1, 3072)
    train_labels = np.array(train_labels)
    
    test_data, test_labels = load_cifar10_batch(os.path.join(data_dir, 'test_batch'))
    test_data = test_data.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1).reshape(-1, 3072)
    test_labels = np.array(test_labels)
    
    return train_data, train_labels, test_data, test_labels

# 数据预处理
def preprocess_data(train_data, test_data):
    """将数据归一化到[0, 1]范围"""
    train_data = train_data / 255.0
    test_data = test_data / 255.0
    return train_data, test_data

# 激活函数
def relu(x):
    """ReLU激活函数"""
    return np.maximum(0, x)

def relu_derivative(x):
    """ReLU激活函数的导数"""
    return (x > 0).astype(float)

def softmax(x):
    """Softmax函数"""
    exps = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exps / np.sum(exps, axis=1, keepdims=True)

# 损失函数
def cross_entropy_loss(y_pred, y_true):
    """交叉熵损失函数"""
    m = y_true.shape[0]
    log_likelihood = -np.log(y_pred[range(m), y_true] + 1e-9)  # 加小值防止log(0)
    loss = np.sum(log_likelihood) / m
    return loss

# 准确率计算
def accuracy(y_pred, y_true):
    """计算分类准确率"""
    predictions = np.argmax(y_pred, axis=1)
    return np.mean(predictions == y_true)


In [5]:

# 模型类
class NeuralNetwork:
    def __init__(self, input_size, hidden1_size, hidden2_size, output_size, activation='relu'):
        """初始化三层神经网络"""
        self.W1 = np.random.randn(input_size, hidden1_size) * 0.01
        self.b1 = np.zeros((1, hidden1_size))
        self.W2 = np.random.randn(hidden1_size, hidden2_size) * 0.01
        self.b2 = np.zeros((1, hidden2_size))
        self.W3 = np.random.randn(hidden2_size, output_size) * 0.01
        self.b3 = np.zeros((1, output_size))
        self.activation = activation

    def forward(self, X):
        """前向传播"""
        self.z1 = np.dot(X, self.W1) + self.b1
        self.a1 = relu(self.z1) if self.activation == 'relu' else np.tanh(self.z1)
        self.z2 = np.dot(self.a1, self.W2) + self.b2
        self.a2 = relu(self.z2) if self.activation == 'relu' else np.tanh(self.z2)
        self.z3 = np.dot(self.a2, self.W3) + self.b3
        self.a3 = softmax(self.z3)
        return self.a3

    def backward(self, X, y_one_hot, output, learning_rate, reg_lambda):
        """反向传播"""
        m = X.shape[0]
        
        dz3 = output - y_one_hot
        dW3 = np.dot(self.a2.T, dz3) / m + reg_lambda * self.W3
        db3 = np.sum(dz3, axis=0, keepdims=True) / m
        
        da2 = np.dot(dz3, self.W3.T)
        dz2 = da2 * relu_derivative(self.z2) if self.activation == 'relu' else da2 * (1 - self.a2**2)
        dW2 = np.dot(self.a1.T, dz2) / m + reg_lambda * self.W2
        db2 = np.sum(dz2, axis=0, keepdims=True) / m
        
        da1 = np.dot(dz2, self.W2.T)
        dz1 = da1 * relu_derivative(self.z1) if self.activation == 'relu' else da1 * (1 - self.a1**2)
        dW1 = np.dot(X.T, dz1) / m + reg_lambda * self.W1
        db1 = np.sum(dz1, axis=0, keepdims=True) / m
        
        # 更新权重
        self.W1 -= learning_rate * dW1
        self.b1 -= learning_rate * db1
        self.W2 -= learning_rate * dW2
        self.b2 -= learning_rate * db2
        self.W3 -= learning_rate * dW3
        self.b3 -= learning_rate * db3


In [16]:

# 辅助函数：将标签转换为one-hot编码
def to_one_hot(labels, num_classes):
    """将标签转换为one-hot编码"""
    m = len(labels)
    one_hot = np.zeros((m, num_classes))
    one_hot[np.arange(m), labels] = 1
    return one_hot

# 保存模型参数
def save_model(model, filename='best_model.pkl'):
    """保存模型的参数到文件"""
    model_params = {
        'W1': model.W1,
        'b1': model.b1,
        'W2': model.W2,
        'b2': model.b2,
        'W3': model.W3,
        'b3': model.b3
    }
    with open(filename, 'wb') as f:
        pickle.dump(model_params, f)
    print(f'Model saved to {filename}')

# 加载模型参数
def load_model(model, filename='best_model.pkl'):
    """从文件加载模型的参数"""
    with open(filename, 'rb') as f:
        model_params = pickle.load(f)
    model.W1 = model_params['W1']
    model.b1 = model_params['b1']
    model.W2 = model_params['W2']
    model.b2 = model_params['b2']
    model.W3 = model_params['W3']
    model.b3 = model_params['b3']
    print(f'Model loaded from {filename}')

# 新增：绘制损失和准确率曲线的函数
def plot_results(train_losses, val_losses, val_accs):
    """绘制训练过程中的损失和验证准确率曲线"""
    epochs = range(1, len(train_losses) + 1)
    
    # 绘制损失曲线
    plt.figure(figsize=(12, 5))  # 设置图形大小
    plt.subplot(1, 2, 1)  # 两张图并排，第一张
    plt.plot(epochs, train_losses, label='Train Loss')
    plt.plot(epochs, val_losses, label='Validation Loss')
    plt.xlabel('Epoch')  # X轴标签
    plt.ylabel('Loss')   # Y轴标签
    plt.title('Loss Curve')  # 图标题
    plt.legend()  # 显示图例
    
    # 绘制验证准确率曲线
    plt.subplot(1, 2, 2)  # 两张图并排，第二张
    plt.plot(epochs, val_accs, label='Validation Accuracy', color='green')
    plt.xlabel('Epoch')  # X轴标签
    plt.ylabel('Accuracy')  # Y轴标签
    plt.title('Validation Accuracy Curve')  # 图标题
    plt.legend()  # 显示图例
    
    plt.tight_layout()  # 调整布局以避免重叠
    plt.show()  # 显示图形

def visualize_parameters(model):
    """可视化神经网络的参数（权重和偏置）"""
    # 提取参数
    W1, b1 = model.W1, model.b1
    W2, b2 = model.W2, model.b2
    W3, b3 = model.W3, model.b3
    
    # 创建一个包含3行2列的子图布局
    fig, axes = plt.subplots(3, 2, figsize=(12, 12))
    fig.suptitle('Neural Network Parameters Visualization')
    
    # 绘制W1的直方图
    axes[0, 0].hist(W1.flatten(), bins=50, color='blue', alpha=0.7)
    axes[0, 0].set_title('W1 Weights Histogram')
    axes[0, 0].set_xlabel('Weight Value')
    axes[0, 0].set_ylabel('Frequency')
    
    # 绘制b1的直方图
    axes[0, 1].hist(b1.flatten(), bins=50, color='green', alpha=0.7)
    axes[0, 1].set_title('b1 Biases Histogram')
    axes[0, 1].set_xlabel('Bias Value')
    axes[0, 1].set_ylabel('Frequency')
    
    # 绘制W2的直方图
    axes[1, 0].hist(W2.flatten(), bins=50, color='blue', alpha=0.7)
    axes[1, 0].set_title('W2 Weights Histogram')
    axes[1, 0].set_xlabel('Weight Value')
    axes[1, 0].set_ylabel('Frequency')
    
    # 绘制b2的直方图
    axes[1, 1].hist(b2.flatten(), bins=50, color='green', alpha=0.7)
    axes[1, 1].set_title('b2 Biases Histogram')
    axes[1, 1].set_xlabel('Bias Value')
    axes[1, 1].set_ylabel('Frequency')
    
    # 绘制W3的直方图
    axes[2, 0].hist(W3.flatten(), bins=50, color='blue', alpha=0.7)
    axes[2, 0].set_title('W3 Weights Histogram')
    axes[2, 0].set_xlabel('Weight Value')
    axes[2, 0].set_ylabel('Frequency')
    
    # 绘制b3的直方图
    axes[2, 1].hist(b3.flatten(), bins=50, color='green', alpha=0.7)
    axes[2, 1].set_title('b3 Biases Histogram')
    axes[2, 1].set_xlabel('Bias Value')
    axes[2, 1].set_ylabel('Frequency')
    
    # 调整布局
    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()

def visualize_weight_heatmap(model, layer=1):
    """可视化指定层的权重热力图"""
    if layer == 1:
        W = model.W1
        title = 'W1 Weights Heatmap'
    elif layer == 2:
        W = model.W2
        title = 'W2 Weights Heatmap'
    elif layer == 3:
        W = model.W3
        title = 'W3 Weights Heatmap'
    else:
        raise ValueError("Layer must be 1, 2, or 3")
    
    plt.figure(figsize=(8, 6))
    plt.imshow(W, cmap='viridis', aspect='auto')
    plt.colorbar()
    plt.title(title)
    plt.xlabel('Neurons in Next Layer')
    plt.ylabel('Neurons in Current Layer')
    plt.show()


In [7]:

# 训练函数
def train(model, X_train, y_train, X_val, y_val, epochs, batch_size, learning_rate, reg_lambda, lr_decay=1.0, search=False):
    """训练模型，并在验证集上表现最佳时保存模型，并返回损失和准确率数据"""
    m = X_train.shape[0]
    y_train_one_hot = to_one_hot(y_train, 10)
    y_val_one_hot = to_one_hot(y_val, 10)
    
    best_val_acc = 0  # 记录最佳验证准确率
    train_losses = []  
    val_losses = []    
    val_accs = []     
    
    for epoch in range(epochs):
        # 打乱数据
        permutation = np.random.permutation(m)
        X_train_shuffled = X_train[permutation]
        y_train_shuffled = y_train_one_hot[permutation]
        
        # 批量训练
        for i in range(0, m, batch_size):
            X_batch = X_train_shuffled[i:i+batch_size]
            y_batch = y_train_shuffled[i:i+batch_size]
            
            # 前向传播
            output = model.forward(X_batch)
            
            # 反向传播
            model.backward(X_batch, y_batch, output, learning_rate, reg_lambda)
        
        # 学习率衰减
        learning_rate *= lr_decay
        
        # 计算损失和准确率
        train_output = model.forward(X_train)
        train_loss = cross_entropy_loss(train_output, y_train)
        train_acc = accuracy(train_output, y_train)
        
        val_output = model.forward(X_val)
        val_loss = cross_entropy_loss(val_output, y_val)
        val_acc = accuracy(val_output, y_val)
        
        # 记录数据
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
        
        
        if not search: # 超参数搜索时不做检查
            if val_acc > best_val_acc: # 检查是否为最佳模型
                best_val_acc = val_acc
                save_model(model, 'best_model.pkl')
                print(f'New best model saved with Val Acc: {best_val_acc:.4f}')
    
    # 返回记录的数据以供绘图
    return train_losses, val_losses, val_accs

# 测试函数
def test(model, X_test, y_test):
    """测试模型"""
    output = model.forward(X_test)
    acc = accuracy(output, y_test)
    print(f'Test Accuracy: {acc:.4f}')

# 超参数搜索
def hyperparameter_search(X_train, y_train, X_val, y_val):
    """超参数网格搜索"""
    best_acc = 0
    best_params = {}
    for hidden1_size,hidden2_size in [(256,128),(512,256)]:
        for reg_lambda in [0.001, 0.01]:
            for l_r in [0.02,0.01]:
                for lr_decay in [0.99,0.98,0.96]:
                    print(f'Trying hidden1_size={hidden1_size}, hidden2_size={hidden2_size}, reg_lambda={reg_lambda}, learning_rate={l_r}, lr_decay={lr_decay}')
                    model = NeuralNetwork(3072, hidden1_size, hidden2_size, 10)
                    train_losses, val_losses, val_accs = train(model, X_train, y_train, X_val, y_val, epochs=30, batch_size=64, learning_rate=0.01, reg_lambda=reg_lambda, search=True)
                    val_output = model.forward(X_val)
                    val_acc = accuracy(val_output, y_val)
                    if val_acc > best_acc:
                        best_acc = val_acc
                        best_params = {'hidden1_size': hidden1_size, 'hidden2_size': hidden2_size, 'reg_lambda': reg_lambda, "learning_rate": l_r, "lr_decay": lr_decay}
                    # 在超参数搜索中也绘制曲线（可选）
                    plot_results(train_losses, val_losses, val_accs)
    print(f'Best Validation Accuracy: {best_acc:.4f} with params: {best_params}')
    return best_params


In [8]:
def main():
    """主函数"""
    # 请替换为你的CIFAR-10数据集路径
    data_dir = 'path/to/cifar-10-batches-py'
    train_data, train_labels, test_data, test_labels = load_cifar10_data(data_dir)
    train_data, test_data = preprocess_data(train_data, test_data)
    
    # 分割验证集
    val_size = 5000
    X_val = train_data[:val_size]
    y_val = train_labels[:val_size]
    X_train = train_data[val_size:]
    y_train = train_labels[val_size:]
    
    # 超参数搜索
    best_params = hyperparameter_search(X_train, y_train, X_val, y_val)
    
    # 使用最佳超参数训练模型
    model = NeuralNetwork(3072, best_params['hidden1_size'], best_params['hidden2_size'], 10)
    train_losses, val_losses, val_accs = train(model, X_train, y_train, X_val, y_val, epochs=100, batch_size=64, learning_rate=best_params['learning_rate'], 
                                               reg_lambda=best_params['reg_lambda'], lr_decay=best_params['lr_decay'])
    
    # 绘制训练结果
    plot_results(train_losses, val_losses, val_accs)
    
    # 可视化参数
    visualize_parameters(model)  # 绘制直方图
    visualize_weight_heatmap(model, layer=1)  # 绘制第一层权重热力图
    visualize_weight_heatmap(model, layer=2)  # 绘制第二层权重热力图
    visualize_weight_heatmap(model, layer=3)  # 绘制第三层权重热力图
    
    # 加载最佳模型
    load_model(model, 'best_model.pkl')
    
    # 测试模型
    test(model, test_data, test_labels)

if __name__ == '__main__':
    main()
