# ResNet

In [2]:
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms, datasets, utils
import matplotlib.pyplot as plt
import numpy as np
import torch.optim as optim

In [33]:
# 判断是否有GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

num_epochs = 50 #50轮
batch_size = 50 #50步长
learning_rate = 0.01 #学习率0.01

In [34]:
# 图像预处理
transform = transforms.Compose([
    transforms.Pad(4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32),
    transforms.ToTensor()])


In [67]:
# 训练集
trainset = torchvision.datasets.CIFAR10(root=r'C:\Users\ZeroLoveSeA\Desktop\CV\AlexNet\data', train=True, download=False, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset,batch_size=128, shuffle=True, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat','deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# 测试集
testset = torchvision.datasets.CIFAR10(root=r'C:\Users\ZeroLoveSeA\Desktop\CV\AlexNet\data', train=False, download=False, transform=transform)
testloader = torch.utils.data.DataLoader(testset,batch_size=128, shuffle=False, num_workers=2)

# 网络结构

In [36]:
# 定义基本块

class BasicBlock(nn.Module):

    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()

        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels * BasicBlock.expansion)
        )

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != BasicBlock.expansion * out_channels: # 如果输入输出通道不一致，需要使用1x1卷积调整
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * BasicBlock.expansion)
            )
        
    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x)) # 残差相加
    
# 定义Bottleneck块

class Bottleneck(nn.Module):

    expansion = 4 # 通道倍增数

    def __init__(self, in_channels, out_channels, stride=1):
        super(Bottleneck, self).__init__()

        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * Bottleneck.expansion, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels * BasicBlock.expansion)
        )

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != Bottleneck.expansion * out_channels: # 如果输入输出通道不一致，需要使用1x1卷积调整
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * Bottleneck.expansion, kernel_size=1, bias=False),
                nn.BatchNorm2d(out_channels * Bottleneck.expansion)
            )
        
    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x)) # 残差相加
    

In [45]:
# 定义网络结构

from torch.nn.modules import padding
from torch.nn.modules.batchnorm import BatchNorm2d

class ResNet(nn.Module):
    def __init__(self,in_channels,block,num_blocks,num_classes=10): 
        # block为BasicBlock或Bottleneck，num_blocks为每个stage的块数，num_classes为分类数
        super(ResNet, self).__init__()

        self.block = block
        self.in_channels = 64 # 输入通道数
        self.num_blocks = num_blocks
        self.num_classes = num_classes
        
        # 第一层
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=3, stride=1, padding=1,bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )
        self.pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) # 池化层
        self.conv2_x = self._make_layer(block, 64, num_blocks[0], stride=1) # 第一个stage
        self.conv3_x = self._make_layer(block, 128, num_blocks[1], stride=2) # 第二个stage
        self.conv4_x = self._make_layer(block, 256, num_blocks[2], stride=2) # 第三个stage
        self.conv5_x = self._make_layer(block, 512, num_blocks[3], stride=2) # 第四个stage
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) # 全局平均池化
        self.fc = nn.Linear(512 * block.expansion, num_classes) # 全连接层

    # 构建layer，每个layer包含多个block
    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1) # 第一个block的步长为stride，其余为1
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)
    
    def forward(self, x):
        f1 = self.conv1(x)
        f2 = self.conv2_x(self.pool(f1))
        f3 = self.conv3_x(f2)
        f4 = self.conv4_x(f3)
        f5 = self.conv5_x(f4)
        out = self.avg_pool(f5)
        out = out.view(out.size(0), -1) # 展平拉成一维向量
        out = self.fc(out)
        return f1,f2,f3,f4,f5,out
        


In [39]:

def ResNet18(in_channels):
    return ResNet(in_channels, BasicBlock, [2, 2, 2, 2])

def ResNet34(in_channels):
    return ResNet(in_channels, BasicBlock, [3, 4, 6, 3])

def ResNet50(in_channels):
    return ResNet(in_channels, Bottleneck, [3, 4, 6, 3])

def ResNet101(in_channels):
    return ResNet(in_channels, Bottleneck, [3, 4, 23, 3])

def ResNet152(in_channels):
    return ResNet(in_channels, Bottleneck, [3, 8, 36, 3])

In [65]:
model = ResNet34(3).to(device) # 输入通道数为3
print(model)

ResNet(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (pool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (conv2_x): Sequential(
    (0): BasicBlock(
      (residual_function): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (residual_function): Sequential(
        (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=Fal

# 训练

In [41]:
# 损失函数
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [43]:
from tqdm import tqdm

# 训练网络
save_path = './ResNet.ckpt'
train_steps = len(trainloader)

def train(net,train_data,valid_data,epochs,optimizer,criterion):
    train_loss_history = []
    valid_loss_history = []

    train_acc_history = []
    valid_acc_history = []

    best_acc = 0.0

    # 训练
    for epoch in range(epochs):
        train_loss = 0.0
        train_acc = 0.0
        valid_loss = 0.0
        valid_acc = 0.0

        net.train() # 训练模式
        for i, data in tqdm(enumerate(train_data),total=len(train_data)):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = net(inputs)[-1]

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)

            _, preds = torch.max(outputs, 1)
            train_acc += torch.sum(preds == labels.data)

        train_loss = train_loss / len(train_data.dataset)
        train_acc = train_acc / len(train_data.dataset)

        train_loss_history.append(train_loss)
        train_acc_history.append(train_acc)
        print('[%d] train loss: %.3f train acc: %.3f' % (epoch + 1, train_loss, train_acc))

        # 验证  
        net.eval() # 评估模式
        with torch.no_grad():
            for i, data in tqdm(enumerate(valid_data)):
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = net(inputs)[-1]

                loss = criterion(outputs, labels)
                valid_loss += loss.item() * inputs.size(0)

                _, preds = torch.max(outputs, 1)
                valid_acc += torch.sum(preds == labels.data)

            valid_loss = valid_loss / len(valid_data.dataset)
            valid_acc = valid_acc / len(valid_data.dataset)

            valid_loss_history.append(valid_loss)
            valid_acc_history.append(valid_acc)
            print('[%d] valid loss: %.3f valid acc: %.3f' % (epoch + 1, valid_loss, valid_acc))

            if valid_acc > best_acc:
                best_acc = valid_acc
                torch.save(net.state_dict(), save_path)
                print('saving model with acc {:.3f}'.format(best_acc))

    # 可视化
    plt.figure()
    plt.plot(train_loss_history, label='train loss')
    plt.plot(valid_loss_history, label='valid loss')
    plt.legend()
    plt.show()

    plt.figure()
    plt.plot(train_acc_history, label='train acc')
    plt.plot(valid_acc_history, label='valid acc')
    plt.legend()
    plt.show()

    print('Finished Training')
    
        

In [None]:
train(model,trainloader,testloader,10,optimizer,criterion)

# 预测

In [None]:
# 加载模型
model.load_state_dict(torch.load('./ResNet.ckpt'), strict=False)
model.eval()


In [61]:
# 预测图片,在图片中显示预测结果
def predict_image(img_path):
    img = Image.open(img_path)
    img = transform(img)
    img = img.unsqueeze(0)
    img = img.to(device)
    outputs = model(img)[-1]
    _, preds = torch.max(outputs, 1)
    plt.imshow(Image.open(img_path))
    plt.title(classes[preds])
    plt.show()


predict_image('dog_test.jpg')

In [None]:
from torchvision import transforms
from PIL import Image

# Load and preprocess the input image
input_image = Image.open('dog_test.jpg')
input_tensor = transform(input_image).unsqueeze(0).to(device)
outputs = model(input_tensor)[-1]
_, predicted = torch.max(outputs, 1)
# 输出预测结果
print(classes[predicted])