In [35]:
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
from torchvision import datasets, transforms
import torchvision

手动实现单通道卷积

In [36]:
def corr2d(X, K):
    """
    X: 输入, shape (H, W)
    K: 卷积核, shape (k_h, k_w)
    """
    H, W = X.shape
    k_h, k_w = K.shape
    # 初始化结果矩阵
    Y = torch.zeros((H - k_h + 1, W - k_w + 1))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            Y[i, j] = (X[i: i + k_h, j: j + k_w] * K).sum()
    return Y


In [37]:
# 验证卷积操作
X = torch.tensor([[0, 1, 2], [3, 4, 5], [6, 7, 8]])
K = torch.tensor([[0, 1], [2, 3]])
corr2d(X, K)

tensor([[19., 25.],
        [37., 43.]])

手动实现简单单通道卷积核

In [38]:
class Conv2D(nn.Module):
    def __init__(self, kernel_size):
        super(Conv2D, self).__init__()
        # 初始化卷积层的2个参数：卷积核、偏差
        self.weight = nn.Parameter(torch.randn(kernel_size))
        self.bias = nn.Parameter(torch.randn(1))

    def forward(self, x):
        return corr2d(x, self.weight) + self.bias

实现填充和步幅

In [39]:
# 定义一个函数来计算卷积层。它对输入和输出做相应的升维和降维
def comp_conv2d(conv2d, X):
    # (1, 1)代表批量大小和通道数均为1
    X = X.view((1, 1) + X.shape)
    Y = conv2d(X)
    return Y.view(Y.shape[2:])  # 排除不关心的前两维：批量和通道

# 注意这里是两侧分别填充1行或列，所以在两侧一共填充2行或列
conv2d = nn.Conv2d(in_channels=1, out_channels=1, kernel_size=3, padding=1, stride=2)
X = torch.rand(8, 8)
comp_conv2d(conv2d, X).shape

torch.Size([4, 4])

实现输入多通道卷积核

In [40]:
def corr2d_multi_in(X, K):
    # 输入X：维度(C_in, H, W)
    # 卷积核K：维度(C_in, k_h, k_w)
    res = corr2d(X[0, :, :], K[0, :, :])
    for i in range(1, X.shape[0]):
        # 按通道相加
        res += corr2d(X[i, :, :], K[i, :, :])
    return res

测试

In [41]:
X = torch.tensor([[[0, 1, 2], [3, 4, 5], [6, 7, 8]],
                  [[1, 2, 3], [4, 5, 6], [7, 8, 9]]])
K = torch.tensor([[[0, 1], [2, 3]],
                  [[1, 2], [3, 4]]])

corr2d_multi_in(X, K)

tensor([[ 56.,  72.],
        [104., 120.]])

实现输出多通道卷积并测试

In [42]:
def corr2d_multi_in_out(X, K):
    # 对K的第0维遍历，每次同输入X做互相关计算。
    # 所有结果使用stack函数合并在一起
    return torch.stack([corr2d_multi_in(X, k) for k in K])

# X shape: (C_in, H, W)
X = torch.arange(192, dtype=torch.float).view((3, 8, 8))
# K shape: (C_out, C_in, k_h, k_w)
K = torch.arange(108, dtype=torch.float).view((4, 3, 3, 3))
print("kernel shape:", K.shape)

Y = corr2d_multi_in_out(X, K)
print("Y shape:", Y.shape)

kernel shape: torch.Size([4, 3, 3, 3])
Y shape: torch.Size([4, 6, 6])


自定义池化并测试

In [43]:
def pool2d(X, pool_size, mode='max'):
    X = X.float()
    p_h, p_w = pool_size
    Y = torch.zeros((X.shape[0] - p_h + 1, X.shape[1] - p_w + 1))
    for i in range(Y.shape[0]):
        for j in range(Y.shape[1]):
            if mode == 'max':
                Y[i, j] = X[i: i + p_h, j: j + p_w].max()
            elif mode == 'avg':
                Y[i, j] = X[i: i + p_h, j: j + p_w].mean()
    return Y

In [44]:
# 测试数据
X = torch.tensor([[0, 1, 2], [3, 4, 5], [6, 7, 8]])

# 最大池化
print("最大池化")
print(pool2d(X, (2, 2)))

# 平均池化
print("平均池化")
print(pool2d(X, (2, 2), 'avg'))


最大池化
tensor([[4., 5.],
        [7., 8.]])
平均池化
tensor([[2., 3.],
        [5., 6.]])


自定义卷积层

In [45]:
class MyConv2D(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size):
        super(MyConv2D, self).__init__()
        # 初始化卷积层的2个参数：卷积核、偏差
        if isinstance(kernel_size, int):
            kernel_size = (kernel_size, kernel_size)
        self.weight = nn.Parameter(torch.randn((out_channels, in_channels) + kernel_size))
        self.bias = nn.Parameter(torch.randn(out_channels, 1, 1))

    def forward(self, x):
        """
        x: 输入图片, 维度(batch_size, C_in, H, W)
        """
        return corr2d_multi_in_out(x, self.weight) + self.bias

自定义一个卷积模块（conv+bn+relu）

In [46]:
class MyConvModule(nn.Module):
    def __init__(self,num_classes):
        super(MyConvModule, self).__init__()
        # 定义三层卷积
        self.conv = nn.Sequential(
            MyConv2D(in_channels=3, out_channels=32, kernel_size=3),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )
        # 输出层，将通道数变为分类数量
        self.fc = nn.Linear(32, num_classes)  # 注意：num_classes需要在外部定义或传入

    def forward(self, X):
        # 图片先经过三层卷积，输出维度(batch_size, C_out, H, W)
        out = self.conv(X)
        # 使用平均池化层将图片的大小变为1x1
        out = F.avg_pool2d(out, 30)  # 注意：这里的30应该是out.shape[2]，即图片的宽度
        # 将张量out从shape batch x 32 x 1 x 1 变为 batch x 32
        out = out.squeeze()
        # 输入到全连接层将输出的维度变为10
        out = self.fc(out)
        return out

pytorch定义一个卷积块

In [47]:
class ConvModule(nn.Module):
    def __init__(self, num_classes):
        super(ConvModule, self).__init__()
        # 定义一个三层卷积
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True)
        )
        # 输出层，将通道数变为分类数量
        self.fc = nn.Linear(128, num_classes)  # 注意：num_classes需要在外部定义或传入

    def forward(self, X):
        # 图片先经过三层卷积，输出维度(batch_size, C_out, H, W)
        out = self.conv(X)
        # 使用平均池化层将图片的大小变为1x1
        out = F.avg_pool2d(out, 26)  # 注意：这里的26应该是out.shape[2]，即图片的宽度
        # 将张量out从shape batch x 128 x 1 x 1 变为 batch x 128
        out = out.squeeze()
        # 输入到全连接层将输出的维度变为10
        out = self.fc(out)
        return out

定义训练函数

In [48]:
def train_epoch(net, data_loader, device, optimizer, criterion):
    net.train()  # 指定当前为训练模式
    train_batch_num = len(data_loader)  # 记录共有多少个batch
    total_loss = 0  # 记录Loss
    correct = 0  # 记录共有多少个样本被正确分类
    sample_num = 0  # 记录样本总数

    # 遍历每个batch进行训练
    for batch_idx, (data, target) in enumerate(data_loader):
        batch_idx, (data, target) = batch_idx, (data.to(device), target.to(device))
        # 将图片放入指定的device中
        data = data.to(device).float()
        # 将图片标签放入指定的device中
        target = target.to(device).long()
        # 将当前梯度清零
        optimizer.zero_grad()  # 模型训练 反向传播更新参数
        # 使用模型计算出结果
        output = net(data)
        # 计算损失
        loss = criterion(output, target)
        # 进行反向传播
        loss.backward()
        # 更新模型参数
        optimizer.step()
        # 累加Loss
        total_loss += loss.item()
        # 找出每个样本值最大的索引，即代表预测此图片属于哪个类别
        prediction = torch.argmax(output, 1)
        # 统计预测正确的类别数量
        correct += (prediction == target).sum().item()
        # 累加当前的样本总数
        sample_num += len(prediction)

    # 计算平均的Loss与准确率
    loss = total_loss / train_batch_num
    acc = correct / sample_num
    return loss, acc

定义测试函数

In [49]:
def test_epoch(net, data_loader, device,criterion):
    net.eval()  # 指定当前模式为测试模式
    test_batch_num = len(data_loader)
    total_loss = 0
    correct = 0
    sample_num = 0
    # 指定不进行梯度变化
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(data_loader):
            data = data.to(device).float()
            target = target.to(device).long()
            output = net(data)
            loss = criterion(output, target)
            total_loss += loss.item()
            prediction = torch.argmax(output, 1)
            correct += (prediction == target).sum().item()
            sample_num += len(prediction)
    loss = total_loss / test_batch_num
    acc = correct / sample_num
    return loss, acc

In [50]:
data_dir = "./data"  # 指定数据的位置
# 定义一个transform操作，用户将torch中的数据转换为可以输入到我们模型的形式
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # 首先将数据转换为Tensor
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ]
)  # 将数据进行归一化

# 获取cifar-10数据集并进行transform
cifar_train = torchvision.datasets.CIFAR10(
    root=data_dir, train=True, download=True, transform=transform
)
cifar_test = torchvision.datasets.CIFAR10(
    root=data_dir, train=False, download=True, transform=transform
)

# cifar-10数据集对应的10个类别
classes = (
    "plane",
    "car",
    "bird",
    "cat",
    "deer",
    "dog",
    "frog",
    "horse",
    "ship",
    "truck",
)
num_classes = 10  # 共十类
epochs = 100  # 训练多少轮
lr = 0.001  # 学习率
batch_size = 512  # batch大小
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 生成dataloader
cifar_trainloader = torch.utils.data.DataLoader(
    cifar_train, batch_size=batch_size, shuffle=True, num_workers=0
)
cifar_testloader = torch.utils.data.DataLoader(
    cifar_test, batch_size=512, shuffle=True, num_workers=0
)

# 初始化模型
# net = MyConvModule().to(device)  # 使用2种方式定义模型
net = ConvModule(num_classes).to(device)

# 使用多元交叉熵损失
criterion = nn.CrossEntropyLoss()
# 使用Adam优化器
optimizer = optim.Adam(net.parameters(), lr=lr)

Files already downloaded and verified
Files already downloaded and verified


In [51]:
# 存储每一个epoch的Loss与acc的变化，便于后面可视化
train_loss_list = []
train_acc_list = []
test_loss_list = []
test_acc_list = []

# 进行训练
for epoch in range(epochs):
    # 在训练集上训练
    train_loss, train_acc = train_epoch(net, data_loader=cifar_trainloader, device=device, optimizer=optimizer, criterion=criterion)
    # 在测试集上验证
    test_loss, test_acc = test_epoch(net, data_loader=cifar_testloader, device=device, criterion=criterion)
    # 保存各个指标
    train_loss_list.append(train_loss)
    train_acc_list.append(train_acc)
    test_loss_list.append(test_loss)
    test_acc_list.append(test_acc)
    print(f"epoch:{epoch}\t train_loss:{train_loss:.4f}\t train_acc:{train_acc}\t"
          f"test_loss:{test_loss:.4f}\t test_acc:{test_acc}")

epoch:0	 train_loss:1.7261	 train_acc:0.38464	test_loss:1.5410	 test_acc:0.456
epoch:1	 train_loss:1.3978	 train_acc:0.50904	test_loss:1.3985	 test_acc:0.5062
epoch:2	 train_loss:1.2681	 train_acc:0.556	test_loss:1.4088	 test_acc:0.4905
epoch:3	 train_loss:1.1957	 train_acc:0.58116	test_loss:1.2616	 test_acc:0.5428
epoch:4	 train_loss:1.1381	 train_acc:0.6003	test_loss:1.2798	 test_acc:0.5433
epoch:5	 train_loss:1.0990	 train_acc:0.61462	test_loss:1.3254	 test_acc:0.5515
epoch:6	 train_loss:1.0611	 train_acc:0.6295	test_loss:1.1369	 test_acc:0.5998
epoch:7	 train_loss:1.0271	 train_acc:0.64044	test_loss:1.1576	 test_acc:0.5851
epoch:8	 train_loss:1.0022	 train_acc:0.64998	test_loss:1.2392	 test_acc:0.544
epoch:9	 train_loss:0.9761	 train_acc:0.65908	test_loss:1.0375	 test_acc:0.6316
epoch:10	 train_loss:0.9574	 train_acc:0.66738	test_loss:1.2654	 test_acc:0.5559
epoch:11	 train_loss:0.9364	 train_acc:0.67478	test_loss:1.0485	 test_acc:0.6293
epoch:12	 train_loss:0.9264	 train_acc:0.680

空洞卷积层实现

In [52]:
class DilatedConvModule(nn.Module):
    def __init__(self):
        super(DilatedConvModule, self).__init__()
        # 定义一个空洞率为1, 2, 5的三层空洞卷积
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=0, dilation=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=0, dilation=2),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=0, dilation=5),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True)
        )
        # 输出层，将通道数变为分类数量
        self.fc = nn.Linear(128, num_classes)

    def forward(self, X):
        # 图片先经过三层空洞卷积
        out = self.conv(X)
        # 使用平均池化层将图片的大小变为1x1
        out = F.avg_pool2d(out, 16)
        # 将张量out从shape batch x 128 x 1 x 1 变为 batch x 128
        out = out.squeeze()
        # 输入到全连接层将输出的维度变为10
        out = self.fc(out)
        return out

残差网络实现

In [53]:
class ResidualBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride=1):
        super(ResidualBlock, self).__init__()
        # 正常卷积部分，堆叠了两层卷积
        self.left = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        # 如果上方卷积没有改变size和channel
        # 则不需要对输入进行变化，故shortcut为空
        self.shortcut = nn.Sequential()
        # 如果上方卷积改变了size或channel
        # 则使用x1卷积改变输入的size和channel，使其保持一致
        if stride != 1 or inchannel != outchannel:
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )

    def forward(self, x):
        # 正常使用卷积操作
        out = self.left(x)
        # 将输入x变换shape后的输入与卷积的输出相加
        out += self.shortcut(x)
        # 经过激活函数后输出
        out = F.relu(out)
        return out

https://www.kaggle.com/competitions/car-classificationproject-vision/data 车辆分类数据集
https://www.kaggle.com/datasets/rajat95gupta/hazing-images-dataset-cvpr-2019 图像去雾数据集