In [None]:
# SGD（no）+ crossEntrpy + bs256 + tanh + lr0.01

import numpy as np
import pickle
import os
import matplotlib.pyplot as plt
from tqdm import tqdm

# 线性层
class LinearLayer:
    def __init__(self, n_in, n_out, batch_size, activation=None, lr=0.001):
        # 初始化模型参数
        self.W = np.random.normal(scale=0.01, size=(n_in, n_out))
        self.b = np.zeros((batch_size, n_out))
        self.activation = activation
        self.lr = lr
        self.batch_size = batch_size
    # 前向传播
    def forward(self, x):
        self.x = x
        output = np.dot(x, self.W) + self.b
        if self.activation == 'relu':
            output = np.maximum(0, output)
        elif self.activation == 'sigmoid':
            output = 1 / (1 + np.exp(-output))
        elif self.activation == 'tanh':
            output = np.tanh(output)
        self.activated_output = output
        return output
    # 反向传播（SGD）
    def backward(self, dout):
        if self.activation == 'relu':
            dout = dout * (self.activated_output > 0) # relu的导数
        elif self.activation == 'sigmoid':
            dout = self.activated_output * (1 - self.activated_output) * dout # sigmoid的导数
        elif self.activation == 'tanh':
            dout = (1 - self.activated_output ** 2) * dout # tanh的导数
        dx = np.dot(dout, self.W.T)
        self.dW = np.dot(self.x.T, dout)
        self.db = dout
        # 更新参数
        self.W = self.W - self.dW * self.lr / self.batch_size
        self.b = self.b - self.db * self.lr / self.batch_size
        return dx

# SoftMax层
class SoftMax:
    def __init__(self):
        self.y_hat = None

    def forward(self, x):
        x_exp = np.exp(x - np.max(x, axis=1, keepdims=True))  # 防止溢出
        partition = np.sum(x_exp, axis=1, keepdims=True)
        self.y_hat = x_exp / partition
        return self.y_hat

    def backward(self, y):
        dout = self.y_hat - y
        return dout


# 多层感知机
class MLP:
    def __init__(self, input_size, batch_size, num_classes, 
                 lr=0.001, hidden_layer_sizes=(256,), 
                 activation='relu'):
        self.input_layer = LinearLayer(input_size, 
                                       hidden_layer_sizes[0], batch_size, 
                                       activation, lr=lr)
        self.classifier = LinearLayer(hidden_layer_sizes[-1], 
                                      num_classes, batch_size,
                                      None, lr=lr)
        self.softmax = SoftMax()

        self.layers = [self.input_layer]
        for i in range(len(hidden_layer_sizes) - 1):
            self.layers.append(LinearLayer(hidden_layer_sizes[i], 
                                           hidden_layer_sizes[i + 1],
                                           batch_size, activation, 
                                           lr=lr))
        self.layers.append(self.classifier)
        self.layers.append(self.softmax)

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, y):
        for layer in reversed(self.layers):
            y = layer.backward(y)

# 加载CIFAR-10数据
def load_cifar10_data(data_dir, valid_ratio=0.2):
    def load_batch(file):
        with open(file, 'rb') as fo:
            batch = pickle.load(fo, encoding='bytes')
        data = batch[b'data']
        labels = np.array(batch[b'labels'])
        return data, labels

    X_train, y_train = [], []
    for i in range(1, 6):
        data, labels = load_batch(os.path.join(data_dir, f'data_batch_{i}'))
        X_train.append(data)
        y_train.append(labels)

    X_train = np.concatenate(X_train)
    y_train = np.concatenate(y_train)

    X_test, y_test = load_batch(os.path.join(data_dir, 'test_batch'))

    X_train = X_train.reshape(-1, 3, 32, 32).astype('float32') / 255.0
    X_test = X_test.reshape(-1, 3, 32, 32).astype('float32') / 255.0

    y_train = np.eye(10)[y_train]
    y_test = np.eye(10)[y_test]

    # 将训练数据划分为训练集和验证集
    valid_size = int(X_train.shape[0] * valid_ratio)
    indices = np.arange(X_train.shape[0])
    np.random.shuffle(indices)
    X_train, X_valid = X_train[indices[:-valid_size]], X_train[indices[-valid_size:]]
    y_train, y_valid = y_train[indices[:-valid_size]], y_train[indices[-valid_size:]]

    return X_train, y_train, X_valid, y_valid, X_test, y_test

# 定义数据加载器
def dataloader(X, y, batch_size):
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    for start_idx in range(0, len(X) - batch_size + 1, batch_size):
        excerpt = indices[start_idx:start_idx + batch_size]
        yield X[excerpt], y[excerpt]

# 加载数据集
data_dir = 'Datasets/cifar-10-batches-py'
X_train, y_train, X_valid, y_valid, X_test, y_test = load_cifar10_data(data_dir)

# 展平输入数据
X_train = X_train.reshape(X_train.shape[0], -1)
X_valid = X_valid.reshape(X_valid.shape[0], -1)
X_test = X_test.reshape(X_test.shape[0], -1)

# 训练超参数
num_epochs = 1000
batch_size = 256

# 初始化模型
model = MLP(input_size=3072, batch_size=batch_size, num_classes=10, lr=0.01, hidden_layer_sizes=(256,), activation='tanh')

# 初始化列表以存储每个epoch的损失和准确率
train_losses, valid_losses = [], []
train_accuracies, valid_accuracies = [], []

# 提前停止类
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0):
        self.patience = patience  # 容忍的epoch数
        self.min_delta = min_delta  # 最小变化量
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, valid_loss):
        if self.best_loss is None:
            self.best_loss = valid_loss
        elif valid_loss < self.best_loss - self.min_delta:
            self.best_loss = valid_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

# 设置提前停止
early_stopping = EarlyStopping(patience=10, min_delta=0.001)

# 训练模型
for epoch in range(num_epochs):
    # 训练阶段
    train_loss, train_acc = 0, 0
    with tqdm(dataloader(X_train, y_train, batch_size), unit='batch') as tepoch:
        for data, label in tepoch:
            tepoch.set_description(f"Epoch {epoch + 1} train")

            # 前向传播
            outputs = model.forward(data)
            loss = np.mean(-np.sum(label * np.log(outputs + 1e-8), axis=1))  # 交叉熵损失
            train_loss += loss
            train_acc += (outputs.argmax(1) == label.argmax(1)).sum() / len(X_train)

            # 反向传播
            model.backward(label)

            # 更新进度条中的准确率
            tepoch.set_postfix(train_acc=train_acc)

    train_losses.append(train_loss / len(X_train))
    train_accuracies.append(train_acc)

    # 验证阶段
    valid_loss, valid_acc = 0, 0
    with tqdm(dataloader(X_valid, y_valid, batch_size), unit='batch') as vepoch:
        for data, label in vepoch:
            vepoch.set_description(f"Epoch {epoch + 1} valid")

            # 前向传播
            outputs = model.forward(data)
            loss = np.mean(-np.sum(label * np.log(outputs + 1e-8), axis=1))
            valid_loss += loss
            valid_acc += (outputs.argmax(1) == label.argmax(1)).sum() / len(X_valid)

            vepoch.set_postfix(valid_acc=valid_acc)

    valid_losses.append(valid_loss / len(X_valid))
    valid_accuracies.append(valid_acc)

    # 检查是否应该提前停止
    early_stopping(valid_loss / len(X_valid))
    if early_stopping.early_stop:
        print(f"提前停止在第 {epoch + 1} 轮")
        break


# 测试阶段
def test_model(model, X_test, y_test):
    acc = 0
    with tqdm(range(0, len(X_test), batch_size), unit='batch') as tepoch:
        for i in tepoch:
            tepoch.set_description("Testing")
            data = X_test[i:i+batch_size]
            label = np.argmax(y_test[i:i+batch_size], axis=1)
            if data.shape[0] < batch_size:
                break
            outputs = model.forward(data)
            acc += (outputs.argmax(1) == label).sum() / X_test.shape[0]
        tepoch.set_postfix(test_acc=acc)
    print(f'测试精度: {acc * 100:.2f}%')

# 调用测试函数
test_model(model, X_test, y_test)

# 绘制损失曲线
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
plt.plot(range(1, num_epochs + 1), valid_losses, label='Valid Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss_plt')
plt.legend()

# 绘制准确率曲线
plt.subplot(1, 2, 2)
plt.plot(range(1, num_epochs + 1), train_accuracies, label='Train Accuracy')
plt.plot(range(1, num_epochs + 1), valid_accuracies, label='Valid Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy_plt')
plt.legend()

plt.tight_layout()
plt.show()