模块导入

In [1]:
#导入基本包
import random
import os
import numpy as np
#导入模型计算基本包
import torch
import torch.nn as nn
from torch.nn import functional as F
#图像及模型处理
import torchvision
from d2l import torch as d2l
#评估绘图包
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
'''
#设置环境变量免责声明
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
'''

"\n#设置环境变量免责声明\nos.environ['KMP_DUPLICATE_LIB_OK'] = 'True'\n"

固定随机数

In [3]:
def setup_seed(seed):
	#  下面两个常规设置了，用来np和random的话要设置 
    np.random.seed(seed) 
    random.seed(seed)

    os.environ['PYTHONHASHSEED'] = str(seed)
    # 禁止hash随机化
    os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'
    # 在cuda 10.2及以上的版本中，需要设置以下环境变量来保证cuda的结果可复现

    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)# 多GPU训练需要设置这个
    torch.manual_seed(seed)

    torch.use_deterministic_algorithms(True)
    # 一些操作使用了原子操作，不是确定性算法，不能保证可复现，设置这个禁用原子操作，保证使用确定性算法
    torch.backends.cudnn.deterministic = True 
    # 确保每次返回的卷积算法是确定的
    torch.backends.cudnn.enabled = False
    # 禁用cudnn使用非确定性算法
    torch.backends.cudnn.benchmark = False
    # 与上面一条代码配套使用，True的话会自动寻找最适合当前配置的高效算法，来达到优化运行效率的问题。
    # False保证实验结果可复现。

定义基本类和函数

In [4]:
class Residual(nn.Module): 
    def __init__(self, input_channels, num_channels,
                 use_1x1conv=False, strides=1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels,
                               kernel_size=3, padding=1, stride=strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels,
                               kernel_size=3, padding=1)
        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels,
                                   kernel_size=1, stride=strides)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        Y += X
        return F.relu(Y)

In [None]:
def resnet_block(input_channels, num_channels, num_residuals,
                 first_block=False):
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            blk.append(Residual(input_channels, num_channels,
                                use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk

评估函数模块

In [7]:
#精度评估函数
def evaluate_accuracy_gpu(net, data_iter, device=None):
    if isinstance(net, nn.Module):
        net.eval()
        if not device:
            device = next(iter(net.parameters())).device
    metric = d2l.Accumulator(2)
    with torch.no_grad():
        for X, y in data_iter:
            if isinstance(X, list):
                X = [x.to(device) for x in X]
            else:
                X = X.to(device)
            y = y.to(device)
            metric.add(d2l.accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]

In [8]:
#绘制多分类ROC曲线
def plot_multiclass_roc_curve(model, test_iter):
    # 获取模型的预测概率和真实标签
    y_true = []
    y_prob = []
    for batch in test_iter:
        inputs, labels = batch
        inputs = inputs.cuda()
        outputs = model(inputs)
        probabilities = outputs.softmax(dim=1)
        y_true.extend(labels.cpu().numpy())
        y_prob.extend(probabilities.cpu().detach().numpy())
    y_true = np.array(y_true)
    y_prob = np.array(y_prob)
    y_prob = np.argmax(y_prob,axis=1)

    #微平均和宏平均方法（Micro-average and Macro-average）
    # 将真实标签转换为二值矩阵
    y_true = label_binarize(y_true, classes=np.unique(y_true))
    y_prob = label_binarize(y_prob, classes=np.unique(y_prob))

    # 将多分类问题转换为二分类问题
    y_true = y_true.ravel()
    y_prob = y_prob.ravel()

    fpr, tpr, thresholds = roc_curve(y_true, y_prob)
    roc_auc = auc(fpr, tpr)

    # 绘制ROC曲线
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.show()

训练函数

In [9]:
def train(net, train_iter, test_iter, num_epochs, lr, device):
    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)
    net.apply(init_weights)
    print('training on', device)
    net.to(device)
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
                            legend=['train loss', 'train acc', 'test acc'])
    timer, num_batches = d2l.Timer(), len(train_iter)
    for epoch in range(num_epochs):
        metric = d2l.Accumulator(3)
        net.train()
        for i, (X, y) in enumerate(train_iter):
            timer.start()
            optimizer.zero_grad()
            X, y = X.to(device), y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y)
            l.backward()
            optimizer.step()
            with torch.no_grad():
                metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
            timer.stop()
            train_l = metric[0] / metric[2]
            train_acc = metric[1] / metric[2]
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (train_l, train_acc, None))
        test_acc = evaluate_accuracy_gpu(net, test_iter)
        animator.add(epoch + 1, (None, None, test_acc))
    print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
          f'test acc {test_acc:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
          f'on {str(device)}')

图像可视化预测

In [None]:
def get_labels(labels):
    text_labels = ['aca', 'n']
    return [text_labels[int(i)] for i in labels]

In [None]:
def show_images(imgs, num_rows, num_cols, titles=None, scale=1.5):
    figsize = (num_cols * scale, num_rows * scale)
    _, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize)
    axes = axes.reshape(-1)
    for i, (ax, img) in enumerate(zip(axes, imgs)):
        if torch.is_tensor(img):
            img = img.cpu().numpy()
        img = np.clip(img, 0, 1)
        ax.imshow(img)
        ax.axes.get_xaxis().set_visible(False)
        ax.axes.get_yaxis().set_visible(False)
        if titles:
            ax.set_title(titles[i])
    return axes

In [None]:
def predict(net, test_iter, n=10): 
    for X, y in test_iter:
        break
    X, y = X.to('cuda'), y.to('cuda')
    trues = get_labels(y)
    preds = get_labels(net(X).argmax(axis=1))
    titles = [true +'\n' + pred for true, pred in zip(trues, preds)]
    show_images(
        X[0:n].reshape((n, 768, 768, 3)), 1, n, titles=titles[0:n])

主模块

In [10]:
def main():
    #数据处理
    train_path = data_dir + '/training'
    test_path = data_dir + '/testing'
    
    train_data = torchvision.datasets.ImageFolder(train_path,transform=transform)
    test_data = torchvision.datasets.ImageFolder(test_path,transform=transform)

    train_iter=torch.utils.data.DataLoader(train_data,batch_size=batch_size,shuffle=True)
    test_iter=torch.utils.data.DataLoader(test_data,batch_size=batch_size,shuffle=True)

    #训练
    train(model, train_iter, test_iter, num_epochs, lr, device)
    
    #保存模型
    torch.save(model,save_dir)

    #调用模型计算ROC曲线
    plot_multiclass_roc_curve(model, test_iter)
    
    #图像预测结果可视化
    predict(model, test_iter)

模型定义

In [None]:
b1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
                   nn.BatchNorm2d(64), nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))

#定义模型和训练模块
model = nn.Sequential(b1, b2, b3, b4, b5,
                    nn.AdaptiveAvgPool2d((1,1)),
                    nn.Flatten(), nn.Linear(512, 3))

模型预览

In [None]:
X = torch.rand(size=(3, 3, 768, 768), dtype=torch.float32)
for layer in net:
    X = layer(X)
    print(layer.__class__.__name__,'output shape: \t',X.shape)

参数设置

In [2]:
#设置参数
device = d2l.try_gpu()#设置运行模式：CPU/GPU
batch_size, lr, num_epochs = 20, 0.0001, 100#设置批量、学习率和迭代次数
data_dir = '../../数据/'#设置文件读取路径
save_dir = './model_.pt'#设置保存路径
transform = torchvision.transforms.Compose([torchvision.transforms.Resize((224,224)),
                                            torchvision.transforms.ToTensor(),
                                            torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
num_classes = 3#输出类别数量

#设置随机数种子
seed = 0

In [None]:
if __name__ == '__main__':
    main()