In [13]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import torch
import torchvision
import numpy as np
import pandas as pd
import random
import python_utils
import os
import matplotlib.pyplot as plt
from torch import optim
import torch.nn as nn
from torchvision import datasets, transforms, models
from PIL import Image
from torch.autograd import Variable
from torch.optim import Adam
import torch.nn.functional as F

In [2]:
# 该模型共包含两个卷积层和两个全连接层
class SimpleNet(nn.Module):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.conv1 = nn.Conv2d(3,10,kernel_size=(5,5))
        self.conv2 = nn.Conv2d(10,20,kernel_size=(3,3))
        self.linear1 = nn.Linear(20*28*28,500)
        self.linear2 = nn.Linear(500,12)
        
    def forward(self, x):
        size_in = x.size(0)
        mid = self.conv1(x)
        mid = F.relu(mid)
        mid = F.max_pool2d(mid,2,2)
        mid = self.conv2(mid)
        mid = F.relu(mid)
        mid = mid.view(size_in,-1)
        mid = self.linear1(mid)
        mid = F.relu(mid)
        mid = self.linear2(mid)
        out = F.log_softmax(mid,dim = 1)
        return out 

In [11]:
# 设置后续需要的参数（部分实际未被使用）
class VarsConfig(object):
    train_data = "../E3_data/train_img"
    test_data = "../E3_data/val_img"
    pre_data = "../E3_data/test_img"

    epoch = 20
    batch_size = 256
    img_height = 64
    img_weight = 64
    seed = 666


config = VarsConfig()

In [4]:
# 设置图像变换
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # 随机旋转变换
    transforms.RandomCrop(64, padding=4),  # 随机裁剪
    transforms.ToTensor(),  # 转换为tensor格式
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 标准化
])

test_transform = transforms.Compose([
    transforms.ToTensor(),  # 转换为tensor格式
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 标准化
])

In [5]:
# 读取训练集和测试集
train_data = datasets.ImageFolder(root=config.train_data,
                                  transform=train_transform)
test_data = datasets.ImageFolder(root=config.test_data,
                                 transform=test_transform)
train_loader = torch.utils.data.DataLoader(train_data,
                                           batch_size=config.batch_size,
                                           shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data,
                                          batch_size=config.batch_size,
                                          shuffle=False)

In [7]:
# 检查GPU是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 因为训练多个模型时可能运行的时候出现了GPU内存不足的提示，可以通过下面更改为强制CPU运行
#device = "cpu"

# 生成模型
model = SimpleNet()
model.to(device)
for param in model.parameters():
    param.requires_grad = True

# 定义估计函数
optimizer = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
loss_fn = nn.CrossEntropyLoss()
print(model)

SimpleNet(
  (conv1): Conv2d(3, 10, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(10, 20, kernel_size=(3, 3), stride=(1, 1))
  (linear1): Linear(in_features=15680, out_features=500, bias=True)
  (linear2): Linear(in_features=500, out_features=12, bias=True)
)


In [9]:
# 随着epoch的增大将学习率进行降低以提高准确性
def adjust_learning_rate(epoch):
    lr = 0.001

    if epoch > 30:
        lr = lr / 1000000
    elif epoch > 25:
        lr = lr / 100000
    elif epoch > 20:
        lr = lr / 10000
    elif epoch > 15:
        lr = lr / 1000
    elif epoch > 10:
        lr = lr / 100
    elif epoch > 5:
        lr = lr / 10

    for param_group in optimizer.param_groups:
        param_group["lr"] = lr


# 定义模型保存函数
def save_models(epoch):
    torch.save(model, "SimpleNet1_{}.mdl".format(epoch + 1))
    print("Chekcpoint saved")


# 定义测试集准确度估计过程
def test():
    model.eval()
    test_acc = 0.0
    test_loss = 0.0
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        _, prediction = torch.max(outputs.data, 1)
        loss = loss_fn(outputs, labels)
        test_loss += loss.item()

        test_acc += torch.sum(prediction == labels.data)

    test_acc = test_acc / len(test_data)
    test_loss = test_loss / len(test_loader.dataset)

    return test_acc, test_loss

In [10]:
# 定义训练过程
def train(num_epochs):
    best_acc = 0.0
    best_epoch = 0

    for epoch in range(num_epochs):
        model.train()
        train_acc = 0.0
        train_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            # 清空梯度
            optimizer.zero_grad()
            # 预测标签
            outputs = model(images)
            # 计算损失函数值
            loss = loss_fn(outputs, labels)
            # 反向传播
            loss.backward()
            # 调整参数
            optimizer.step()
            # 统计损失
            train_loss += loss.item()
            _, prediction = torch.max(outputs.data, 1)
            # 计算训练集正确预测数
            train_acc += torch.sum(prediction == labels.data)

        # 改变学习率
        adjust_learning_rate(epoch)

        # 计算准确率和损失
        train_acc = train_acc / len(train_data)
        train_loss = train_loss / len(train_loader.dataset)

        # 估计测试集准确度
        test_acc, test_loss = test()

        # 将更优的模型保存下来
        if test_acc > best_acc:
            save_models(epoch)
            best_acc = test_acc
            best_epoch = epoch + 1

        print(
            "Epoch {}, Train Accuracy: {} , TrainLoss: {} , Test Accuracy: {}, TestLoss: {}".
            format(epoch + 1, train_acc, train_loss, test_acc, test_loss))
    return best_epoch

In [14]:
best_epoch = train(config.epoch)