MNIST

In [None]:
import torchvision
from torchvision import transforms

trans = [transforms.ToTensor()]
trans.insert(0, transforms.Resize(224))
trans = transforms.Compose(trans)
mnist_train = torchvision.datasets.FashionMNIST(root="./data", train=True, transform=trans)
mnist_test = torchvision.datasets.FashionMNIST(root="./data", train=False, transform=trans)

In [None]:
from torch.utils import data

def make_iters(train_data, test_data, batch_size, num_workers=8):
    return (data.DataLoader(train_data, batch_size, shuffle=True,
                            num_workers=num_workers),
            data.DataLoader(test_data, batch_size, shuffle=False,
                            num_workers=num_workers))

In [None]:
import torch

dev = 'mps'
# dev = 'cpu'

In [None]:
import torch.nn as nn
import torch.nn.functional as F

In [None]:
from sklearn.metrics import accuracy_score

def accuracy(y_pred,y_true):
    y_pred_cls = torch.argmax(nn.Softmax(dim=1)(y_pred),dim=1).data
    return accuracy_score(y_true,y_pred_cls)

VGG16

In [14]:
class res_Block(nn.Module):
    def __init__(self,in_channels,out_channels):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=in_channels // 4, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(in_channels//4),
            nn.ReLU(),
            nn.Conv2d(in_channels=in_channels // 4, out_channels=out_channels, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )
    def forward(self,x):
        out = self.block(x)
        if out.shape == x.shape:
            return out+x
        else:
            return out

class LeNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1), nn.ReLU(),
            # nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.ReLU(),
            # nn.MaxPool2d(2),
            res_Block(64, 64),
            nn.MaxPool2d(2),
            # res_Block(128, 64),
            nn.Conv2d(64, 16, kernel_size=3, padding=1), nn.ReLU(),
            nn.Flatten(),
            nn.Linear(200704, 4096), nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(4096, 1024), nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 10)
        )

    def forward(self,x):
        return self.layers(x)

net = LeNet().to(dev)

In [15]:
import torchkeras as torchkeras
from torchkeras.metrics import Accuracy
from torchkeras import summary

model = torchkeras.KerasModel(net,
                              loss_fn = nn.CrossEntropyLoss(),
                              optimizer= torch.optim.SGD(net.parameters(),lr = 0.0001),
                              metrics_dict = {"acc":Accuracy()}
                             )    # 封装成了keras里面模型的格式

batch_size = 32
num_epochs = 10

train_iter, test_iter = make_iters(mnist_train, mnist_test, batch_size)

for features,labels in train_iter:
    break

summary(model, input_data=features);

[0;31m<<<<<< 🐌 cpu is used >>>>>>[0m
--------------------------------------------------------------------------
Layer (type)                            Output Shape              Param #
Conv2d-1                          [-1, 64, 224, 224]                  640
ReLU-2                            [-1, 64, 224, 224]                    0
Conv2d-3                          [-1, 16, 224, 224]                1,040
BatchNorm2d-4                     [-1, 16, 224, 224]                   32
ReLU-5                            [-1, 16, 224, 224]                    0
Conv2d-6                          [-1, 64, 224, 224]                9,280
BatchNorm2d-7                     [-1, 64, 224, 224]                  128
ReLU-8                            [-1, 64, 224, 224]                    0
MaxPool2d-9                       [-1, 64, 112, 112]                    0
Conv2d-10                         [-1, 16, 112, 112]                9,232
ReLU-11                           [-1, 16, 112, 112]                    

In [16]:
import datetime
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score

def accuracy(y_pred,y_true):
    y_pred_cls = torch.argmax(nn.Softmax(dim=1)(y_pred.to('cpu')),dim=1).data
    return accuracy_score(y_true.to('cpu'),y_pred_cls)

loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=net.parameters(),lr = 0.01)
metric_func = accuracy
metric_name = "accuracy"

net = net.to(dev)

metric_func(net(features.to(dev)), labels.to(dev))

0.0625

In [17]:
log_step_freq = 10

dfhistory = pd.DataFrame(columns = ["epoch","loss",metric_name,"val_loss","val_"+metric_name])
print("Start Training...")
nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("=========="*8 + "%s"%nowtime)

for epoch in range(1,num_epochs+1):

    # 1，训练循环-------------------------------------------------
    net.train()
    loss_sum = 0.0
    metric_sum = 0.0
    step = 1

    for step, (features,labels) in enumerate(train_iter, 1):

        features = features.to(dev)
        labels = labels.to(dev)

        # 梯度清零
        optimizer.zero_grad()

        # 正向传播求损失
        predictions = net(features)
        loss = loss_func(predictions,labels)
        metric = metric_func(predictions,labels)

        # 反向传播求梯度
        loss.backward()
        optimizer.step()

        # 打印batch级别日志
        loss_sum += loss.item()
        metric_sum += metric.item()
        if step%log_step_freq == 0:
            print(("[step = %d] loss: %.3f, "+metric_name+": %.3f") %
                  (step, loss_sum/step, metric_sum/step))

    # 2，验证循环-------------------------------------------------
    net.eval()
    val_loss_sum = 0.0
    val_metric_sum = 0.0
    val_step = 1

    for val_step, (features,labels) in enumerate(test_iter, 1):
        features = features.to(dev)
        labels = labels.to(dev)
        with torch.no_grad():
            predictions = net(features)
            val_loss = loss_func(predictions,labels)
            val_metric = metric_func(predictions,labels)

        val_loss_sum += val_loss.item()
        val_metric_sum += val_metric.item()

    # 3，记录日志-------------------------------------------------
    info = (epoch, loss_sum/step, metric_sum/step,
            val_loss_sum/val_step, val_metric_sum/val_step)
    dfhistory.loc[epoch-1] = info

    # 打印epoch级别日志
    print(("\nEPOCH = %d, loss = %.3f,"+ metric_name + \
          "  = %.3f, val_loss = %.3f, "+"val_"+ metric_name+" = %.3f")
          %info)
    nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print("\n"+"=========="*8 + "%s"%nowtime)

print('Finished Training...')

Start Training...


KeyboardInterrupt: 

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

import matplotlib.pyplot as plt

def plot_metric(dfhistory, metric):
    train_metrics = dfhistory[metric]
    val_metrics = dfhistory['val_'+metric]
    epochs = range(1, len(train_metrics) + 1)
    plt.plot(epochs, train_metrics, 'bo--')
    plt.plot(epochs, val_metrics, 'ro-')
    plt.title('Training and validation '+ metric)
    plt.xlabel("Epochs")
    plt.ylabel(metric)
    plt.legend(["train_"+metric, 'val_'+metric])
    plt.show()


# 观察损失和准确率的变化
plot_metric(dfhistory,"loss")
plot_metric(dfhistory,"accuracy")

GoogleNet

In [None]:
trans = [transforms.ToTensor()]
trans.insert(0, transforms.Resize(96))
trans = transforms.Compose(trans)
mnist_train = torchvision.datasets.FashionMNIST(root="./data", train=True, transform=trans)
mnist_test = torchvision.datasets.FashionMNIST(root="./data", train=False, transform=trans)

In [None]:
class Inception(nn.Module):
    # c1--c4是每条路径的输出通道数
    def __init__(self, in_channels, c1, c2, c3, c4, **kwargs):
        super(Inception, self).__init__(**kwargs)
        # 线路1，单1x1卷积层
        self.p1_1 = nn.Conv2d(in_channels, c1, kernel_size=1)
        # 线路2，1x1卷积层后接3x3卷积层
        self.p2_1 = nn.Conv2d(in_channels, c2[0], kernel_size=1)
        self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
        # 线路3，1x1卷积层后接5x5卷积层
        self.p3_1 = nn.Conv2d(in_channels, c3[0], kernel_size=1)
        self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
        # 线路4，3x3最大汇聚层后接1x1卷积层
        self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.p4_2 = nn.Conv2d(in_channels, c4, kernel_size=1)

    def forward(self, x):
        p1 = F.relu(self.p1_1(x))
        p2 = F.relu(self.p2_2(F.relu(self.p2_1(x))))
        p3 = F.relu(self.p3_2(F.relu(self.p3_1(x))))
        p4 = F.relu(self.p4_2(self.p4_1(x)))
        # 在通道维度上连结输出
        return torch.cat((p1, p2, p3, p4), dim=1)

In [None]:
b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

# stage2 1*1 卷积 3*3 卷积  3*3 最大池化
b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
                   nn.ReLU(),
                   nn.Conv2d(64, 192, kernel_size=3, padding=1),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

# stage3 两个Inception块， 3*3 最大池化  进一步降维
b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
                   Inception(256, 128, (128, 192), (32, 96), 64),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

# stage4 5个Inception块， 3*3 最大池化  进一步降维
b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
                   Inception(512, 160, (112, 224), (24, 64), 64),
                   # Inception(512, 128, (128, 256), (24, 64), 64),
                   # Inception(512, 112, (144, 288), (32, 64), 64),
                   # Inception(528, 256, (160, 320), (32, 128), 128),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
# stage5 2个Inception块， 1*1 平均池化
b5 = nn.Sequential(Inception(512, 128, (160, 320), (32, 128), 128),
                   # Inception(832, 384, (192, 384), (48, 128), 128),
                   nn.AdaptiveAvgPool2d((1, 1)),
                   nn.Flatten())

net = nn.Sequential(b1, b2, b3, b4, b5, nn.Linear(704, 10))

In [None]:
import torchkeras as torchkeras
from torchkeras.metrics import Accuracy
from torchkeras import summary

model = torchkeras.KerasModel(net,
                              loss_fn = nn.CrossEntropyLoss(),
                              optimizer= torch.optim.SGD(net.parameters(),lr = 0.0001),
                              metrics_dict = {"acc":Accuracy()}
                             )    # 封装成了keras里面模型的格式

batch_size = 64
num_epochs = 10

train_iter, test_iter = make_iters(mnist_train, mnist_test, batch_size)

for features,labels in train_iter:
    break

summary(model, input_data=features);

In [None]:
import datetime
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score

def accuracy(y_pred,y_true):
    y_pred_cls = torch.argmax(nn.Softmax(dim=1)(y_pred.to('cpu')),dim=1).data
    return accuracy_score(y_true.to('cpu'),y_pred_cls)

loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=net.parameters(),lr = 0.1)
metric_func = accuracy
metric_name = "accuracy"

net = net.to(dev)

metric_func(net(features.to(dev)), labels.to(dev))

In [None]:
log_step_freq = 100

dfhistory = pd.DataFrame(columns = ["epoch","loss",metric_name,"val_loss","val_"+metric_name])
print("Start Training...")
nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("=========="*8 + "%s"%nowtime)

for epoch in range(1,num_epochs+1):

    # 1，训练循环-------------------------------------------------
    net.train()
    loss_sum = 0.0
    metric_sum = 0.0
    step = 1

    for step, (features,labels) in enumerate(train_iter, 1):

        features = features.to(dev)
        labels = labels.to(dev)

        # 梯度清零
        optimizer.zero_grad()

        # 正向传播求损失
        predictions = net(features)
        loss = loss_func(predictions,labels)
        metric = metric_func(predictions,labels)

        # 反向传播求梯度
        loss.backward()
        optimizer.step()

        # 打印batch级别日志
        loss_sum += loss.item()
        metric_sum += metric.item()
        if step%log_step_freq == 0:
            print(("[step = %d] loss: %.3f, "+metric_name+": %.3f") %
                  (step, loss_sum/step, metric_sum/step))

    # 2，验证循环-------------------------------------------------
    net.eval()
    val_loss_sum = 0.0
    val_metric_sum = 0.0
    val_step = 1

    for val_step, (features,labels) in enumerate(test_iter, 1):
        features = features.to(dev)
        labels = labels.to(dev)
        with torch.no_grad():
            predictions = net(features)
            val_loss = loss_func(predictions,labels)
            val_metric = metric_func(predictions,labels)

        val_loss_sum += val_loss.item()
        val_metric_sum += val_metric.item()

    # 3，记录日志-------------------------------------------------
    info = (epoch, loss_sum/step, metric_sum/step,
            val_loss_sum/val_step, val_metric_sum/val_step)
    dfhistory.loc[epoch-1] = info

    # 打印epoch级别日志
    print(("\nEPOCH = %d, loss = %.3f,"+ metric_name + \
          "  = %.3f, val_loss = %.3f, "+"val_"+ metric_name+" = %.3f")
          %info)
    nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print("\n"+"=========="*8 + "%s"%nowtime)

print('Finished Training...')