In [48]:
import torch
import torchvision
import numpy as np

In [49]:
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=torchvision.transforms.ToTensor(), download=True)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, transform=torchvision.transforms.ToTensor(), download=True)

print(train_dataset.data.size())

torch.Size([60000, 28, 28])


In [50]:
train_dataloader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=64, shuffle=True)

x, y = next(iter(train_dataloader))
x.shape, y.shape

(torch.Size([64, 1, 28, 28]), torch.Size([64]))

## sigmoid函数
$$
sigmoid(x) = \frac{1}{1 + e^{-x}}
$$

In [51]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))


## cross entropy 二分类交叉熵
$$
L = -\frac{1}{N}\sum_{i=1}^{N}y_i\log(\hat{y_i}) + (1-y_i)\log(1-\hat{y_i})
$$

In [52]:
def cross_entropy(y, y_hat):
    # 防止log(0)出现
    epsilon = 1e-12
    # 用于限制数组中的值在指定范围内,numpy.clip(a, a_min, a_max, out=None)
    y_pred = np.clip(y_hat, epsilon, 1. - epsilon)
    # 交叉熵
    loss = -(y * np.log(y_pred) + (1 - y) * np.log(1 - y_pred))
    # np.mean()求平均值
    return np.mean(loss)

y = np.array([1, 0, 1, 1])
y_hat = np.array([0.9, 0.1, 0.8, 0.7])
cross_entropy(y, y_hat)

0.19763488164214868

## ReLU函数(把所有负值都变成0)
$$
ReLU(x) = \max(0, x)
$$

In [53]:
def relu(x):
    return np.maximum(0, x)

x = np.array([-1, 1, 2])
print(relu(x))

[0 1 2]


## 池化层
维度：
$$
\text{output\_size} = \frac{\text{input\_size} - \text{pool\_size}}{\text{stride}} + 1
$$

In [54]:
def max_pooling(x, kernel_size, stride):
    # N:样本数,C:通道数,H:高,W:宽
    N, C, H, W = x.shape
    # 计算输出的维度
    # 如果是3*3，池化窗口为2*2,步长为1,那么输出的维度为2*2
    out_h = (H - kernel_size) // stride + 1 
    out_w = (W - kernel_size) // stride + 1
    # 初始化输出
    out = np.zeros((N, C, out_h, out_w))
    # 池化
    for i in range(out_h):
        for j in range(out_w):
            # 取出每个池化窗口
            # 取出batch中所有样本，所有通道，i*stride到i*stride+kernel_size行，j*stride到j*stride+kernel_size列(也就是一个窗口的大小)
            x_masked = x[:, :, i * stride: i * stride + kernel_size, j * stride: j * stride + kernel_size]
            # 每个池化窗口中的最大值（取每个窗口的最大值就是最大池化）
            out[:, :, i, j] = np.max(x_masked, axis=(2, 3))
    return out

x = np.random.randn(2, 3, 3, 3)
out = max_pooling(x, kernel_size=2, stride=1)
print(out.shape)

(2, 3, 2, 2)


## 卷积Conv2d
$$
Y_{ijk} = \sum_{m=0}^{C-1} \sum_{p=0}^{H-1} \sum_{q=0}^{W-1} X_{m, i+p, j+q} \cdot W_{k, m, p, q} + b_k
$$
$$
\text{output\_size} = \frac{\text{input\_size} - \text{kernel\_size} + 2 \times \text{padding}}{\text{stride}} + 1
$$

In [55]:
def conv(x, kernel, bias, stride=2, padding=1):
    # N:样本数,C:通道数,H:高,W:宽
    N, C, H, W = x.shape
    # K:卷积核个数,C:通道数,H:高,W:宽
    # 卷积核的个数就是输出的通道数
    K, C, KH, KW = kernel.shape
    # 计算输出的维度
    # 如果是4*4,卷积核为3*3,步长为2,补丁为1（补丁完就先当于5*5）,那么输出的维度为2*2
    out_h = (H - KH + 2 * padding) // stride + 1 
    out_w = (W - KW + 2 * padding) // stride + 1
    # 初始化输出
    out = np.zeros((N, K, out_h, out_w))
    # 填充
    # （0，0）表示在通道维度上不填充，（0，0）表示在高维度上不填充，（padding，padding）表示在宽维度上填充padding个0，constant_values表示填充的值
    x_pad = np.pad(x, ((0, 0), (0, 0), (padding, padding), (padding, padding)), mode='constant', constant_values=0)
    # 卷积
    for i in range(out_h):
        for j in range(out_w):
            # 取出每个卷积窗口
            # 取出batch中所有样本，所有通道，i*stride到i*stride+KH行，j*stride到j*stride+KW列(也就是一个窗口的大小)
            x_masked = x_pad[:, :, i * stride: i * stride + KH, j * stride: j * stride + KW]
            # 计算每个卷积窗口的卷积
            for k in range(K):
                # 有一个核就有一个多一个图像的深度
                # 计算每个卷积窗口的卷积
                # 映射到输出的图像的第i行第j列的第k个通道
                # x_masked * kernel[k, :, :, :]: 执行逐元素相乘，即将窗口中的每个元素与卷积核中的对应元素相乘。
                # np.sum(..., axis=(1, 2, 3)): 对乘积结果沿着通道维度进行求和，得到卷积操作的结果。这相当于对卷积核在窗口上滑动时，逐元素相乘并相加，最终得到一个输出值。
                # 简单来说就是将每个窗口中的元素与卷积核中的元素相乘，然后将结果相加，得到卷积操作的结果。
                out[:, k, i, j] = np.sum(x_masked * kernel[k, :, :, :], axis=(1, 2, 3))
    # 加上偏置
    # reshape(1, -1, 1, 1)：将偏置转换成1*K*1*1的形状，这样才能与输出相加
    out = out + bias.reshape(1, -1, 1, 1)
    return out

x = np.random.randn(2, 3, 4, 4)
kernel = np.random.randn(3, 3, 3, 3)
bias = np.random.randn(3)
out = conv(x, kernel, bias, stride=2, padding=1)
print(out.shape)

(2, 3, 2, 2)


## numpy实现反向传播

### 前向传播


In [56]:
def forward(x, w, b):
    ## 就和算y_hat一样
    return np.dot(x, w) + b

### sigmoid的导数
$$
\sigma'(x) = \sigma(x)(1-\sigma(x))
$$

In [57]:
def sigmoid_grad(z):
    return sigmoid(z) * (1 - sigmoid(z))

### 交叉熵的导数
$$
H(y, p) = -y \cdot \log(p) - (1 - y) \cdot \log(1 - p)
$$

In [58]:
def cross_entropy_grad(y, p):
    epsilon = 1e-12  # 为了避免log(0)的情况，添加一个微小值
    p = np.clip(p, epsilon, 1 - epsilon)  # 对概率值进行截断，确保不会取到0或1
    return - (y * np.log(p) + (1 - y) * np.log(1 - p))

### 反向传播

In [68]:
def backward(x, y, y_hat, w, b, lr):
    # 参数说明:x:输入,y:标签,y_hat:预测值,w:权重,b:偏置,lr:学习率
    # 计算梯度
    # 交叉熵的导数
    grad_y_hat = cross_entropy_grad(y, y_hat)
    # sigmoid的导数
    grad_z = sigmoid_grad(y_hat)
    # 梯度下降
    grad_w = np.dot(x.T, grad_y_hat * grad_z)
    grad_b = np.sum(grad_y_hat * grad_z, axis=0)
    # 更新参数
    w -= lr * grad_w
    b -= lr * grad_b
    return w, b