In [143]:
import numpy as np

In [144]:
def l1_regularization(weights, lam=0.01):
    return lam * np.sign(weights)

def l2_regularization(weights, lam=0.01):
    return lam * weights

In [145]:
class ConvLayer:
    def __init__(self, input_channels, num_filters, kernel_size, stride=1, padding=0, learning_rate=0.01):
        self.input_channels = input_channels
        self.num_filters = num_filters
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.weights = np.random.randn(num_filters, input_channels, kernel_size, kernel_size) * 0.01
        self.bias = np.random.randn(num_filters, 1)
        self.learning_rate = learning_rate
        
    def forward(self, x):
        self.last_input = x
        batch_size, channels, height, width = x.shape
        
        # Add padding to the input
        padded_height = height + 2 * self.padding
        padded_width = width + 2 * self.padding
        padded_input = np.zeros((batch_size, channels, padded_height, padded_width))
        padded_input[:, :, self.padding:height+self.padding, self.padding:width+self.padding] = x
        
        out_height = (padded_height - self.kernel_size) // self.stride + 1
        out_width = (padded_width - self.kernel_size) // self.stride + 1
        output = np.zeros((batch_size, self.num_filters, out_height, out_width))
        
        for i in range(batch_size):
            for f in range(self.num_filters):
                for y in range(out_height):
                    for x in range(out_width):
                        output[i, f, y, x] = np.sum(
                            padded_input[i, :, y*self.stride:y*self.stride+self.kernel_size, x*self.stride:x*self.stride+self.kernel_size] * self.weights[f]) + self.bias[f][0]
                        
        return output
    
    def backward(self, d_out):
        batch_size, _, out_height, out_width = d_out.shape
        d_weights = np.zeros_like(self.weights)
        d_bias = np.zeros_like(self.bias)
        d_padded_input = np.zeros((batch_size, self.input_channels, self.last_input.shape[2] + 2 * self.padding, self.last_input.shape[3] + 2 * self.padding))
        
        padded_input = np.zeros((batch_size, self.input_channels, self.last_input.shape[2] + 2 * self.padding, self.last_input.shape[3] + 2 * self.padding))
        padded_input[:, :, self.padding:-self.padding, self.padding:-self.padding] = self.last_input


        for i in range(batch_size):
            for f in range(self.num_filters):
                for y in range(out_height):
                    for x in range(out_width):
                        current_d_out = d_out[i, f, y, x]
                        d_weights[f] += current_d_out * padded_input[i, :, y*self.stride:y*self.stride+self.kernel_size, x*self.stride:x*self.stride+self.kernel_size]
                        d_bias[f] += current_d_out
                        d_padded_input[i, :, y*self.stride:y*self.stride+self.kernel_size, x*self.stride:x*self.stride+self.kernel_size] += current_d_out * self.weights[f]
        
        # Remove padding from gradient
        d_input = d_padded_input[:, :, self.padding:self.last_input.shape[2]+self.padding, self.padding:self.last_input.shape[3]+self.padding]
        
        self.weights -= self.learning_rate * d_weights
        self.bias -= self.learning_rate * d_bias
        return d_input, d_weights, d_bias


In [146]:
class PoolLayer:
    def __init__(self, pool_size, stride):
        self.pool_size = pool_size
        self.stride = stride
    
    def forward(self, x):
        self.last_input = x
        batch_size, channels, height, width = x.shape
        out_height = (height - self.pool_size) // self.stride + 1
        out_width = (width - self.pool_size) // self.stride + 1
        output = np.zeros((batch_size, channels, out_height, out_width))
        
        for i in range(batch_size):
            for c in range(channels):
                for y in range(0, height - self.pool_size + 1, self.stride):
                    for x_ in range(0, width - self.pool_size + 1, self.stride):
                        region = x[i, c, y:y+self.pool_size, x_:x_+self.pool_size]
                        output[i, c, y // self.stride, x_ // self.stride] = np.max(region)
        
        return output

    def backward(self, d_out):
        d_input = np.zeros_like(self.last_input)
        batch_size, channels, out_height, out_width = d_out.shape
        
        for i in range(batch_size):
            for c in range(channels):
                for y in range(out_height):
                    for x in range(out_width):
                        current_region = self.last_input[i, c, y*self.stride:y*self.stride+self.pool_size, x*self.stride:x*self.stride+self.pool_size]
                        current_max = np.max(current_region)
                        for dy in range(self.pool_size):
                            for dx in range(self.pool_size):
                                if current_region[dy, dx] == current_max:
                                    d_input[i, c, y*self.stride+dy, x*self.stride+dx] = d_out[i, c, y, x]
        
        return d_input


In [147]:
class ActivationLayer:
    def __init__(self, activation='relu'):
        self.activation = activation
    
    def forward(self, x):
        if self.activation == 'relu':
            self.last_input = x
            return np.maximum(0, x)
        elif self.activation == 'sigmoid':
            return 1 / (1 + np.exp(-x))
        elif self.activation == 'tanh':
            return np.tanh(x)
    
    def backward(self, d_out):
        if self.activation == 'relu':
            d_input = d_out.copy()
            d_input[self.last_input <= 0] = 0
            return d_input
        elif self.activation == 'sigmoid':
            sig = self.forward(self.last_input)
            return d_out * sig * (1 - sig)
        elif self.activation == 'tanh':
            tanh = self.forward(self.last_input)
            return d_out * (1 - tanh**2)

In [148]:
class DenseLayer:
    def __init__(self, input_size, output_size, learning_rate=0.01, lam_l1=0.01, lam_l2=0.01):
        self.weights = np.random.randn(input_size, output_size) * 0.01
        self.bias = np.random.randn(output_size, 1)
        self.learning_rate = learning_rate
        self.lam_l1 = lam_l1
        self.lam_l2 = lam_l2
    
    def forward(self, x):
        self.last_input = x
        return np.dot(x, self.weights) + self.bias.T
    
    def backward(self, d_out):
        d_weights = np.dot(self.last_input.T, d_out) + l1_regularization(self.weights, self.lam_l1) + l2_regularization(self.weights, self.lam_l2)
        d_bias = np.sum(d_out, axis=0).reshape(-1, 1)
        d_input = np.dot(d_out, self.weights.T)
        
        self.weights -= self.learning_rate * d_weights
        self.bias -= self.learning_rate * d_bias

        return d_input, d_weights, d_bias

In [149]:
def mse_loss(y_true, y_pred):
    return ((y_true - y_pred) ** 2).mean()

def mse_loss_grad(y_true, y_pred):
    return 2 * (y_pred - y_true) / y_true.size

In [150]:
class ReLULayer:
    def forward(self, x):
        self.last_input = x
        return np.maximum(0, x)

    def backward(self, d_out):
        d_input = d_out.copy()
        d_input[self.last_input <= 0] = 0
        return d_input

In [151]:
def softmax(x):
    exps = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exps / np.sum(exps, axis=1, keepdims=True)

def cross_entropy_loss(y_true, y_pred):
    n_samples = y_true.shape[0]
    y_pred = np.clip(y_pred, 1e-12, 1 - 1e-12)
    logp = - np.log(y_pred[np.arange(n_samples), np.argmax(y_true, axis=1)])
    loss = np.sum(logp) / n_samples
    return loss

def cross_entropy_loss_grad(y_true, y_pred):
    n_samples = y_true.shape[0]
    grad = y_pred.copy()
    grad[np.arange(n_samples), np.argmax(y_true, axis=1)] -= 1
    grad = grad / n_samples
    return grad

In [152]:
class AlexNet:
    def __init__(self, learning_rate=0.001):
        self.learning_rate = learning_rate
        self.conv1 = ConvLayer(input_channels=1, num_filters=32, kernel_size=3, stride=1, padding=1, learning_rate=learning_rate)
        self.relu1 = ActivationLayer(activation='relu')
        self.pool1 = PoolLayer(pool_size=2, stride=2)

        self.conv2 = ConvLayer(input_channels=32, num_filters=64, kernel_size=3, stride=1, padding=1, learning_rate=learning_rate)
        self.relu2 = ActivationLayer(activation='relu')
        self.pool2 = PoolLayer(pool_size=2, stride=2)

        self.conv3 = ConvLayer(input_channels=64, num_filters=128, kernel_size=3, stride=1, padding=1, learning_rate=learning_rate)
        self.relu3 = ActivationLayer(activation='relu')
        self.pool3 = PoolLayer(pool_size=2, stride=2)

        self.fc1 = DenseLayer(input_size=128 * 3 * 3, output_size=256, learning_rate=learning_rate)
        self.relu4 = ActivationLayer(activation='relu')
        
        self.fc2 = DenseLayer(input_size=256, output_size=128, learning_rate=learning_rate)
        self.relu5 = ActivationLayer(activation='relu')
        
        self.fc3 = DenseLayer(input_size=128, output_size=10, learning_rate=learning_rate)

    def forward(self, x):
        x = self.conv1.forward(x)
        x = self.relu1.forward(x)
        x = self.pool1.forward(x)
        
        x = self.conv2.forward(x)
        x = self.relu2.forward(x)
        x = self.pool2.forward(x)
        
        x = self.conv3.forward(x)
        x = self.relu3.forward(x)
        x = self.pool3.forward(x)
        
        x = x.reshape(x.shape[0], -1)
        x = self.fc1.forward(x)
        x = self.relu4.forward(x)
        
        x = self.fc2.forward(x)
        x = self.relu5.forward(x)
        
        x = self.fc3.forward(x)
        
        return softmax(x)

    def backward(self, d_out):
        d_out = self.fc3.backward(d_out)[0]
        d_out = self.relu5.backward(d_out)
        d_out = self.fc2.backward(d_out)[0]
        d_out = self.relu4.backward(d_out)
        d_out = self.fc1.backward(d_out)[0]
        
        d_out = d_out.reshape(-1, 128, 3, 3)
        
        d_out = self.pool3.backward(d_out)
        d_out = self.relu3.backward(d_out)
        d_out = self.conv3.backward(d_out)[0]
        
        d_out = self.pool2.backward(d_out)
        d_out = self.relu2.backward(d_out)
        d_out = self.conv2.backward(d_out)[0]
        
        d_out = self.pool1.backward(d_out)
        d_out = self.relu1.backward(d_out)
        d_out = self.conv1.backward(d_out)[0]

In [153]:
import numpy as np
import os

def load_mnist_images(filename):
    with open(filename, 'rb') as f:
        f.read(16)  # 跳过前16个字节
        images = np.frombuffer(f.read(), dtype=np.uint8)
        images = images.reshape(-1, 28, 28, 1)  # 28x28图像
        return images

def load_mnist_labels(filename):
    with open(filename, 'rb') as f:
        f.read(8)  # 跳过前8个字节
        labels = np.frombuffer(f.read(), dtype=np.uint8)
        return labels

# 加载数据
x_train = load_mnist_images('data//MNIST//raw//train-images-idx3-ubyte')
y_train = load_mnist_labels('data//MNIST//raw//train-labels-idx1-ubyte')
x_test = load_mnist_images('data//MNIST//raw//t10k-images-idx3-ubyte')
y_test = load_mnist_labels('data//MNIST//raw//t10k-labels-idx1-ubyte')

# 预处理数据
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0

# 将标签转换为 one-hot 编码
y_train = np.eye(10)[y_train]
y_test = np.eye(10)[y_test]

print(f'x_train shape: {x_train.shape}')
print(f'y_train shape: {y_train.shape}')
print(f'x_test shape: {x_test.shape}')
print(f'y_test shape: {y_test.shape}')



x_train shape: (60000, 28, 28, 1)
y_train shape: (60000, 10)
x_test shape: (10000, 28, 28, 1)
y_test shape: (10000, 10)


In [154]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Subset

# 数据预处理
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

# 加载 MNIST 训练集和测试集
trainset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
testset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)


# 选择一小部分数据进行训练和测试
train_subset = Subset(trainset, list(range(1000)))  # 使用前1000个训练样本
test_subset = Subset(testset, list(range(1000)))    # 使用前1000个测试样本

trainloader = torch.utils.data.DataLoader(train_subset, batch_size=32, shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(test_subset, batch_size=32, shuffle=False, num_workers=2)

## 由于使用cpu训练，为了时间效率，仅使用前1000个样本

In [155]:
import numpy as np

# 假设 ConvLayer, ReLULayer, PoolLayer, DenseLayer 已经定义

# 初始化模型
model = AlexNet(learning_rate=0.001)

# 训练参数
epochs = 10
batch_size = 32

# 训练模型
for epoch in range(epochs):
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data

        # 将数据转换为 numpy 数组
        inputs = inputs.numpy()
        labels = np.eye(10)[labels.numpy()]  # 转换为 one-hot 编码

        # 前向传播
        y_pred = model.forward(inputs)

        # 计算损失和梯度
        loss = cross_entropy_loss(labels, y_pred)
        loss_grad = cross_entropy_loss_grad(labels, y_pred)

        # 反向传播和更新参数
        model.backward(loss_grad)

        running_loss += loss

    print(f'Epoch {epoch + 1}/{epochs}, Loss: {running_loss / len(trainloader):.3f}')

print('Finished Training')

# 测试模型性能
correct_predictions = 0
total_predictions = 0
for i, data in enumerate(testloader, 0):
    inputs, labels = data

    # 将数据转换为 numpy 数组
    inputs = inputs.numpy()
    labels = labels.numpy()

    y_pred = model.forward(inputs)
    correct_predictions += np.sum(np.argmax(y_pred, axis=1) == labels)
    total_predictions += labels.shape[0]

accuracy = correct_predictions / total_predictions
print(f'Accuracy of the network on the 10000 test images: {accuracy * 100:.2f}%')

Epoch 1/10, Loss: 2.647
Epoch 2/10, Loss: 2.568
Epoch 3/10, Loss: 2.701
Epoch 4/10, Loss: 2.613
Epoch 5/10, Loss: 2.588
Epoch 6/10, Loss: 2.615
Epoch 7/10, Loss: 2.645
Epoch 8/10, Loss: 2.528
Epoch 9/10, Loss: 2.581
Epoch 10/10, Loss: 2.379
Finished Training
Accuracy of the network on the 10000 test images: 84.00%
