In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets
import torchvision.transforms as transforms

In [None]:
# 卷积层小技巧
# 技巧1. 让卷积层的输入和输出形状一样
channels = torch.randint(1, 10, (1,)) # 随机channel整数
conv1 = nn.Conv2d(channels, channels, (3, 3), stride=1, padding=1) # 经典的参数设计，不唯一
x = torch.randn(1, channels, 28, 28) # 初始化数据
x.shape, conv1(x).shape

In [None]:
# 手写实现残差连接1 -> 手写笔记中第一种工程方法
class ResidualBlockSimplified(nn.Module):
    def __init__(self, channel):
        super().__init__()
        self.conv1 = nn.Conv2d(channel, channel, (3, 3), stride=1, padding=1)
        self.conv2 = nn.Conv2d(channel, channel, (3, 3), stride=1, padding=1)

    def forward(self, x):
        inputs = x
        x = F.relu(self.conv1(x))
        x = self.conv2(x)
        x = F.relu(inputs + x)  # 这里就是残差连接
        return x

In [None]:
model = ResidualBlockSimplified(3)
x = torch.randn(1, 3, 28, 28)
model(x).shape

In [None]:
# 技巧2：两个卷积操作的输出形状一样
stride = torch.randint(1, 10, (1,))
in_channels = torch.randint(1, 10, (1,))
out_channels = torch.randint(1, 10, (1,))
conv1 = nn.Conv2d(in_channels, out_channels, (3, 3), stride=stride, padding=1)
conv2 = nn.Conv2d(in_channels, out_channels, (1, 1), stride=stride, padding=0)
x = torch.randn(1, in_channels, 28, 28)
conv1(x).shape, conv2(x).shape

In [None]:
# 手写实现残差连接2 -> 手写笔记中第二种工程方法
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, (3, 3), stride=stride, padding=1) # 输入输出形状可能不同
        self.conv2 = nn.Conv2d(out_channels, out_channels, (3, 3), stride=1, padding=1) # 输入输出的形状相同
        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Conv2d(in_channels, out_channels, (1, 1), stride=stride, padding=0)

    def forward(self, x):
        inputs = x
        x = F.relu(self.conv1(x))
        x = self.conv2(x)
        if self.downsample:
            inputs = self.downsample(inputs)
        outputs = F.relu((x + inputs))  # 这里是残差连接
        return outputs


In [None]:
model = ResidualBlock(3, 4, 2)
x = torch.randn(1, 3, 28, 28)
model(x).shape

In [None]:
# 第二种方法的优化 -> 加入归一化层
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, (3, 3), stride=stride, padding=1) # 输入输出形状可能不同
        self.bn1 = nn.BatchNorm2d(out_channels) # 归一化层
        self.conv2 = nn.Conv2d(out_channels, out_channels, (3, 3), stride=1, padding=1) # 输入输出的形状相同
        self.bn2 = nn.BatchNorm2d(out_channels) # 归一化层
        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Conv2d(in_channels, out_channels, (1, 1), stride=stride, padding=0)
            self.bn3 = nn.BatchNorm2d(out_channels) # 归一化层

    def forward(self, x):
        inputs = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))
        if self.downsample:
            inputs = self.bn3(self.downsample(inputs))
        outputs = F.relu((x + inputs))  # 这里是残差连接
        return outputs

In [None]:
model = ResidualBlock(3, 5, 2)
x = torch.randn(1, 3, 28, 28)
model(x).shape

In [None]:
# 实现残差网络，用于图片分类（残差网络就是上面实现过的残差块的叠加）
class Resnet(nn.Module):
    def __init__(self):
        super().__init__()
        self.block1 = ResidualBlock(1, 20)
        self.block2 = ResidualBlock(20, 40, stride=2)
        self.block3 = ResidualBlock(40, 60, stride=2)
        self.block4 = ResidualBlock(60, 80, stride=2)
        self.block5 = ResidualBlock(80, 100, stride=2)
        self.block6 = ResidualBlock(100, 120, stride=2)
        self.fc = nn.Linear(120, 10)

    def forward(self, x):
        # x : (B, 1, 28, 28)
        B = x.shape[0]
        x = self.block1(x)  # (B, 20, 28, 28)
        x = self.block2(x)  # (B, 40, 14, 14)
        x = self.block3(x)  # (B, 60, 7, 7)
        x = self.block4(x)  # (B, 80, 4, 4)
        x = self.block5(x)  # (B, 100, 2, 2)
        x = self.block6(x)  # (B, 120, 1, 1)
        x = self.fc(x.view(B, -1))      # (B, 10)
        return x


In [None]:
model = Resnet()
x = torch.randn(10, 1, 28, 28)
model(x).shape

In [None]:
torch.manual_seed(726)

# 加载数据
dataset = datasets.MNIST(".\mnist", train=True, download=True, transform=transforms.ToTensor())
train_set, val_set = random_split(dataset, [50000, 10000])
test_set = datasets.MNIST('.\mnist', train=False, download=False, transform=transforms.ToTensor())

# 创建数据加载器
train_loader = DataLoader(train_set, batch_size=500, shuffle=True)
val_loader = DataLoader(val_set, batch_size=500, shuffle=True)
test_loader = DataLoader(test_set, batch_size=500, shuffle=True)

In [None]:
# 评估模型

# 评估轮数，取平均
eval_iters = 10

# 评估训练集，验证集和测试集的loss和精度
def estimate_loss(model):
    re = {}
    # 将模型切换为评估模式
    model.eval()
    re['train'] = _loss(model, train_loader)
    re['val'] = _loss(model, val_loader)
    re['test'] = _loss(model, test_loader)
    # 将模型切换为训练模式
    model.train()
    return re

def _loss(model, dataloader):
    # 估算模型效果
    loss = []
    acc = []
    data_iter = iter(dataloader)
    for t in range(eval_iters):
        inputs, labels = next(data_iter) # inputs:(500, 1, 28, 28) label:(500)
        B = inputs.shape[0]
        logits = model(inputs) # logits: (500, 10)
        loss.append(F.cross_entropy(logits, labels))
        preds = torch.argmax(logits, dim=-1)
        acc.append((preds == labels).sum() / B)
    re = {
        'loss' : torch.tensor(loss).mean().item(),
        'acc': torch.tensor(acc).mean().item()
    }
    return re




In [None]:
def train_model(model, optimizer, epoch=10):
    for e in range(epoch):
        for data in train_loader:
            inputs, labels = data
            logits = model(inputs)
            loss = F.cross_entropy(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        # 每个epoch之后，在训练集、验证集和测试集上查看损失
        stats = estimate_loss(model)
        train_loss = f'{stats["train"]["loss"]:.3f}'
        val_loss = f'{stats["val"]["loss"]:.3f}'
        test_loss = f'{stats["test"]["loss"]:.3f}'
        print(f'epoch {e} train {train_loss} val {val_loss} test {test_loss}')


In [None]:
train_model(model, optim.Adam(model.parameters(), lr=0.01))

In [None]:
estimate_loss(model)