In [None]:
import os
import torch
os.environ['KMP_DUPLICATE_LIB_OK']='True' #用于避免jupyter环境突然关闭
torch.backends.cudnn.benchmark=True #用于加速GPU运算的代码

In [None]:
import torchvision
from torch import nn, optim
from torch.nn import functional as F
from torchvision import transforms as T
from torchvision import models as m
from torch.utils.data import DataLoader

In [None]:
import matplotlib.pyplot as plt
from time import time
import random
import numpy as np
import pandas as pd
import datetime
import gc

In [None]:
torch.manual_seed(1412)
random.seed(1412)
np.random.seed(1412)

In [None]:
torch.cuda.is_available()

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
train = torchvision.datasets.SVHN(root ='D:\study\ML\kaggle\SVHN\dataset'
                                  ,split ="train"
                                  ,download = True
                                    #,transform = T.ToTensor()
                                    )

In [None]:
train = torchvision.datasets.SVHN(root ='D:\study\ML\kaggle\SVHN\dataset'
                                  ,split ="train"
                                  ,download = False
                                  ,transform = T.ToTensor()
                                    )

In [None]:
test = torchvision.datasets.SVHN(root ='D:\study\ML\kaggle\SVHN\dataset'
                                ,split ="test"
                                ,download = True
                                ,transform = T.ToTensor())

In [None]:
train[0][0]

In [None]:
train

In [None]:
test

In [None]:
for x,y in train:
    print(x.shape)
    print(y)
    break

In [None]:
np.unique(train.labels)

In [None]:
def plotsample(data):
    fig, axs = plt.subplots(1,5,figsize=(10,10)) #建立子图
    for i in range(5):
        num = random.randint(0,len(data)-1) #首先选取随机数，随机选取五次
        #而展示图像用的imshow函数最常见的输入格式也是3通道
        npimg = torchvision.utils.make_grid(data[num][0]).numpy()
        nplabel = data[num][1] #提取标签
        #将图像由(3, weight, height)转化为(weight, height, 3)，并放入imshow函数中读取
        axs[i].imshow(np.transpose(npimg, (1, 2, 0)))
        axs[i].set_title(nplabel) #给每个子图加上标签
        axs[i].axis("off") #消除每个子图的坐标轴

In [None]:
plotsample(train)

In [None]:
trainT = T.Compose([T.RandomCrop(28) #沿用Fashion-MNIST的风格，决定使用28x28的尺寸
                    # ,T.RandomRotation(degrees=[-30,30])
                    ,T.ToTensor()
                    ,T.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224,0.225])]
                    #由于是实拍数据集，使用ImageNet的方差和偏差进行归一化
                    #也可以尝试MNIST数据集的数值
                    )
testT = T.Compose([T.CenterCrop(28) #测试集不需要数据增强，因此使用CenterCrop
                    ,T.ToTensor()
                    ,T.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224,
                    0.225])])

In [None]:
train = torchvision.datasets.SVHN(root ='D:\study\ML\kaggle\SVHN\dataset'
                                    ,split ="train"
                                    ,download = False
                                    ,transform = trainT
                                    )
test = torchvision.datasets.SVHN(root ='D:\study\ML\kaggle\SVHN\dataset'
                                    ,split ="test"
                                    ,download = False
                                    ,transform = testT
                                    )

In [None]:
plotsample(train)

In [None]:
torch.manual_seed(1412)
resnet18_ = m.resnet18()
vgg16_ = m.vgg16() #VGG本来参数量就很大，因此我个人较少使用vgg16_bn

In [None]:
resnet18_.bn1

In [None]:
resnet18_.relu

In [None]:
resnet18_.layer2

In [None]:
resnet18_.layer3

In [None]:
class MyResNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.block1 =nn.Sequential(nn.Conv2d(3,64,kernel_size=3,stride=1,padding=1,bias=False)
        ,resnet18_.bn1
        ,resnet18_.relu) #删除池化层
        #后续的架构直接从经典架构中选
        #对尺寸很小的数据集而言，我们的深度本来就不深，因此可以试着在特征图数量上有所增加（增加宽度）
        self.block2 = resnet18_.layer2
        self.block3 = resnet18_.layer3
        #自适应平均池化+线性层，此处都与残差网络一致
        self.avgpool = resnet18_.avgpool
        #输出的线性层自己写，以确保输出的类别数量正确
        self.fc = nn.Linear(in_features=256, out_features=10, bias=True)
    def forward(self,x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.avgpool(x)
        x = x.view(x.shape[0],256)
        x = self.fc(x)
        return x

In [None]:
[*vgg16_.features[0:9]] #星号用于解码
        

In [None]:
[*vgg16_.classifier[1:6]]

In [None]:
class MyVgg(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(*vgg16_.features[0:9] #星号用于解码
        ,nn.Conv2d(128, 128, kernel_size=3,
        stride=1, padding=1)
        ,nn.ReLU(inplace=True)
        ,nn.MaxPool2d(2,2, padding=0, dilation=1,
        ceil_mode=False))
        #进入线性层时输入通道数发生变化，因此线性层需要重写
        #输出层也需要重写
        self.avgpool = vgg16_.avgpool
        self.fc = nn.Sequential(nn.Linear(7*7*128, out_features=4096,bias=True)
        ,*vgg16_.classifier[1:6]
        ,nn.Linear(in_features=4096,
        out_features=10,bias=True))
    def forward(self,x):
        x = self.features(x)
        x = self.avgpool(x)
        x = x.view(x.shape[0],7*7*128)
        x = self.fc(x)
        return x

In [None]:
from torchinfo import summary

In [None]:
summary(MyResNet(),(10,3,28,28),depth=1,device="cpu")

In [None]:
summary(MyVgg(),(10,3,28,28),depth=1,device="cpu")

In [None]:
def IterOnce(net,criterion,opt,x,y):
    sigma = net.forward(x)
    loss = criterion(sigma,y)
    loss.backward()
    opt.step()
    opt.zero_grad(set_to_none=True) #比起设置梯度为0，让梯度为None会更节约内存
    yhat = torch.max(sigma,1)[1]
    correct = torch.sum(yhat == y)
    return correct,loss
def TestOnce(net,criterion,x,y):

    #对测试，一定要阻止计算图追踪
    #这样可以节省很多内存，加速运算
    with torch.no_grad():
        sigma = net.forward(x)
        loss = criterion(sigma,y)
        yhat = torch.max(sigma,1)[1]
        correct = torch.sum(yhat == y)
    return correct,loss

In [None]:
class EarlyStopping():
    def __init__(self, patience=5, tol=0.0005):
        self.patience = patience
        self.tol = tol
        self.counter = 0
        self.lowest_loss = None
        self.early_stop = False
    def __call__(self, val_loss):
        if self.lowest_loss == None:
            self.lowest_loss = val_loss
        elif self.lowest_loss - val_loss > self.tol:
            self.lowest_loss = val_loss
            self.counter = 0
        elif self.lowest_loss - val_loss < self.tol:
            self.counter += 1
            print("\t NOTICE: Early stopping counter {} of {}".format(self.counter,self.patience))
        if self.counter >= self.patience:
            print('\t NOTICE: Early Stopping Actived')
            self.early_stop = True
        return self.early_stop

In [None]:
def fit_test(net,batchdata,testdata,criterion,opt,epochs,tol,modelname,PATH):
    SamplePerEpoch = batchdata.dataset.__len__()
    allsamples = SamplePerEpoch*epochs
    trainedsamples = 0
    trainlosslist = []
    testlosslist = []
    early_stopping = EarlyStopping(tol=tol)
    highestacc = None
    for epoch in range(1,epochs+1):
        net.train()
        correct_train = 0
        loss_train = 0
        for batch_idx, (x, y) in enumerate(batchdata):
            y = y.view(x.shape[0])
            correct,loss = IterOnce(net,criterion,opt,x,y)
            #计算样本总量、总的correct与loss
            trainedsamples += x.shape[0]
            correct_train += correct
            loss_train += loss
            if (batch_idx+1) % 125 == 0:
                print('Epoch{}:[{}/{}({:.0f}%)]'.format(epoch,trainedsamples,allsamples,100*trainedsamples/allsamples))
        TrainAccThisEpoch = float(correct_train*100)/SamplePerEpoch #构成百分数所以*100
        TrainLossThisEpoch = float(loss_train*100)/SamplePerEpoch #loss太小不便于迭代所以*100
        trainlosslist.append(TrainLossThisEpoch)
        net.eval()
        correct_test = 0
        loss_test = 0
        TestSample = testdata.dataset.__len__()
        for x,y in testdata:
            y = y.view(x.shape[0])
            correct,loss = TestOnce(net,criterion,x,y)
            #计算总的correct与loss
            correct_test += correct
            loss_test += loss
        TestAccThisEpoch = float(correct_test*100)/TestSample
        TestLossThisEpoch = float(loss_test*100)/TestSample
        testlosslist.append(TestLossThisEpoch)
        print("\t Train Loss:{:.6f}, Test Loss:{:.6f}, Train Acc:{:.3f}%, TestAcc:{:.3f}%".format(TrainLossThisEpoch, TestLossThisEpoch, TrainAccThisEpoch,TestAccThisEpoch))
        if (highestacc == None) or (highestacc < TestAccThisEpoch):
            highestacc = TestAccThisEpoch
            torch.save(net.state_dict(),os.path.join(PATH,modelname+".pt"))
            print("\t Weights Saved")
            
        early_stop = early_stopping(TestLossThisEpoch)
        if early_stop == True:
            break
        print("【DONE】")
        return trainlosslist, testlosslist

In [None]:
def full_procedure(net,epochs,bs,modelname,PATH,lr=0.001,alpha=0.99,gamma=0,tol=10**(-5),wd=0):
    torch.manual_seed(1412)
    #分割数据，设置num_workers会延长GPU的等待速度因此一般在GPU状态下不设置
    batchdata = DataLoader(train,batch_size=bs,shuffle=True
    ,drop_last=False, num_workers=4, pin_memory=True)
    testdata = DataLoader(test,batch_size=bs,shuffle=False #测试集上不进行shuffle，可以加速运算
    ,drop_last=False, num_workers=4, pin_memory=True)
    #损失函数，优化算法
    criterion = nn.CrossEntropyLoss()
    opt =optim.RMSprop(net.parameters(),lr=lr,alpha=alpha,momentum=gamma,weight_decay=wd)
    #训练
    trainloss, testloss =fit_test(net,batchdata,testdata,criterion,opt,epochs,tol,modelname,PATH)
    return trainloss, testloss

In [None]:
def plotloss(trainloss, testloss):
    plt.figure(figsize=(10, 7))
    plt.plot(trainloss, color="red", label="Trainloss")
    plt.plot(testloss, color="orange", label="Testloss")
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [None]:
PATH = r"D:\study\ML\kaggle\SVHN"

In [None]:
avgtime = []
for i in range(5):
    torch.manual_seed(1412)
    resnet18_ = m.resnet18()
    net = MyResNet().to(device,non_blocking=True)
    start = time() #计算训练时间
    trainloss, testloss = full_procedure(net,epochs=3, bs=256
                            ,modelname="model_seletion_resnet"
                            ,PATH = PATH)
    avgtime.append(time()-start)

In [None]:
avgtime = []
for i in range(5):
        torch.manual_seed(1412)
        vgg16_ = m.vgg16_bn()
        net = MyVgg().to(device,non_blocking=True)
        start = time() #计算训练时间
        #此时使用的是full_procedure的默认参数
        trainloss, testloss = full_procedure(net,epochs=3, bs=256
        ,modelname="model_seletion_vgg"
        ,PATH = PATH)
        avgtime.append(time()-start)

In [None]:
PATH = r"D:\study\ML\kaggle\SVHN\models\ConfirmedResNet"
modelname = "myResNet_test0"
print(modelname)
torch.manual_seed(1412)
resnet18_ = m.resnet18()
net = MyResNet().to(device,non_blocking=True)
start = time() #计算训练时间
trainloss, testloss = full_procedure(net,epochs=10, bs=256
        ,modelname=modelname
        ,PATH = PATH)
print(time()-start)
plotloss(trainloss,testloss)

In [None]:
resnet18_.layer4