In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.nn.functional as F
from torchsummary import summary

In [None]:
import tensorflow as tf
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 忽略低级别的警告
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="0"
config = tf.compat.v1.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.9
session = tf.compat.v1.Session(config=config)

In [None]:
# 定义图像转换操作的组合
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.RandomRotation(15),  # 随机旋转，角度范围为[-15, 15]
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),  # 随机颜色调整
    transforms.ToTensor(),  # 转换为张量
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # 标准化操作，使用给定的均值和标准差
])

In [None]:
Batch_Size = 512

In [None]:
# 定义 CIFAR-10 数据集的训练集和测试集，并应用之前定义的图像转换操作
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# 创建用于训练和测试的数据加载器，设置批次大小、是否随机洗牌和工作线程数
trainloader = torch.utils.data.DataLoader(trainset, batch_size=Batch_Size, shuffle=True, num_workers=8)
testloader = torch.utils.data.DataLoader(testset, batch_size=Batch_Size, shuffle=True, num_workers=8)

# 定义 CIFAR-10 数据集的类别标签
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
classes = trainset.classes

In [None]:
import numpy as np
import matplotlib.pyplot as plt
plt.imshow(trainset.data[0])
im,label = next(iter(trainloader))

In [None]:
def imshow(img):
    img = img / 2 + 0.5
    img = np.transpose(img.numpy(),(1,2,0))
    plt.imshow(img)

In [None]:
imshow(im[0])
plt.figure(figsize=(8,12))
imshow(torchvision.utils.make_grid(im[:32]))

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' # 判断是否用GPU

In [None]:
cfg = {
    'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}

In [None]:
class VGG(nn.Module):
    
    def __init__(self, vgg_name):
        super(VGG, self).__init__()
        
        # 定义网络结构的特征提取部分
        self.features = self._make_layers(cfg[vgg_name])
        
        # 定义分类器部分
        self.classifier = nn.Sequential(
            nn.Linear(512, 512),
            nn.ReLU(True),
            nn.Dropout(0.2),
            nn.Linear(512, 512),
            nn.ReLU(True),
            nn.Dropout(0.2),
            nn.Linear(512, 10),
        )

        # 初始化权重
        self._initialize_weight()
        
    def forward(self, x):
        # 前向传播过程
        out = self.features(x)
        out = out.view(out.size(0), -1)
        out = self.classifier(out)
        return out
    
    # 创建网络层
    def _make_layers(self, cfg):
        layers = []
        in_channels = 3  # RGB 初始通道为3
        for x in cfg:
            if x == 'M':
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)] 
            else:
                layers += [nn.Conv2d(in_channels, x, kernel_size=3, padding=1), 
                           nn.BatchNorm2d(x),
                           nn.ReLU(inplace=True)]  
                in_channels = x  
        return nn.Sequential(*layers)
    
    # 初始化参数
    def _initialize_weight(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.xavier_normal_(m.weight.data)
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()

# 创建VGG模型实例并将其移动到指定的设备上
net = VGG('VGG16').to(device)

In [None]:
summary(net,(3,32,32))

In [None]:
x = torch.randn(2,3,32,32).to(device)
y = net(x)

In [None]:
net = VGG('VGG16').to(device)
if device == 'cuda':
    net = nn.DataParallel(net)
    torch.backends.cudnn.benchmark = True

In [None]:
# 定义优化器，损失函数和学习率调度器
optimizer = optim.SGD(net.parameters(), lr=1e-1, momentum=0.9, weight_decay=5e-4)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, verbose=True, patience=5, min_lr=0.000001)  # 动态更新学习率

# 导入时间库
import time

# 设置训练的总轮数
epoch = 50

In [None]:
import os
if not os.path.exists('./model'):
    os.makedirs('./model')
else:
    print('文件已存在')
save_path = './model/VGG16.pth'

In [None]:
from utils import train
from utils import plot_history
Acc, Loss, Lr = train(net, trainloader, testloader, epoch, optimizer, criterion, scheduler, save_path, verbose = True)

In [None]:
plot_history(epoch ,Acc, Loss, Lr)

In [None]:
# 测试模型性能
correct = 0   # 正确分类的样本数
total = 0    # 总共的样本数
testloader = torch.utils.data.DataLoader(testset, batch_size=32, shuffle=True, num_workers=0)
net.eval()  # 将模型设置为评估模式
for data in testloader:  # 遍历测试集
    images, labels = data
    images = images.to(device)
    labels = labels.to(device)
    net.eval()  # 再次确保模型处于评估模式
    if hasattr(torch.cuda, 'empty_cache'):
        torch.cuda.empty_cache()
    outputs = net(images)  # 获取模型输出
    
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)          # 更新总样本数
    correct += (predicted == labels).sum()  # 更新正确分类的样本数

# 计算并打印模型在测试集上的准确率
print('Accuracy of the network on the 10000 test images: %.2f %%' % (100 * correct / total))


In [None]:
# 定义两个列表，用于存储每个类别中测试正确的样本数量和总样本数量，初始化为0
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))

# 将模型设置为评估模式，并在不计算梯度的情况下进行测试
net.eval()
with torch.no_grad():
    for data in testloader:
        images, labels = data
        images = images.to(device)
        labels = labels.to(device)
        if hasattr(torch.cuda, 'empty_cache'):
            torch.cuda.empty_cache()
        outputs = net(images)

        _, predicted = torch.max(outputs.data, 1)
        c = (predicted == labels).squeeze()
        for i in range(len(images)):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1

# 打印每个类别的准确率
for i in range(10):
    print('Accuracy of %5s : %.2f %%' % (classes[i], 100 * class_correct[i] / class_total[i]))


In [None]:
# 从测试数据加载器中获取一个批次的数据
dataiter = iter(testloader)
images, labels = next(dataiter)
images_ = images
images_ = images_.to(device)
labels = labels.to(device)

# 在模型上进行推理
val_output = net(images_)
_, val_preds = torch.max(val_output, 1)

# 计算准确率
correct = torch.sum(val_preds == labels.data).item()

# 将预测结果和标签从GPU移动到CPU
val_preds = val_preds.cpu()
labels = labels.cpu()

# 打印准确率
print("Accuracy Rate = {}%".format(correct/len(images) * 100))

# 可视化预测结果和标签
fig = plt.figure(figsize=(25, 25))
for idx in np.arange(64):    
    ax = fig.add_subplot(8, 8, idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title("{}, ({})".format(classes[val_preds[idx].item()], classes[labels[idx].item()]), 
                 color=("green" if val_preds[idx].item() == labels[idx].item() else "red"))


In [None]:
torch.save(net,save_path[:-4]+str(epoch)+'.pth')