# ZoomNet 旁路池化

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.utils.data import sampler
from PIL import Image
import torchvision.datasets as dset
import torchvision.transforms as T
import numpy as np
import torch.nn.functional as F
from tensorboardX import SummaryWriter

# dataset

In [2]:
#不变
zoom_none = T.Compose([
    T.ToTensor(),
    ])

#放大
zoom_in = T.Compose([
    T.Resize(45),
    T.CenterCrop(28),
    T.ToTensor(),
    ])

#缩小
zoom_out = T.Compose([
    T.Resize(10),
    T.Pad(9),
    T.ToTensor(),
    ])

#缩小右下
zoom_out_RD = T.Compose([
    T.Resize(10),
    T.Pad((13,13,5,5)),#左上右下
    T.ToTensor(),
    ])

#缩小到左上
zoom_out_LU = T.Compose([
    T.Resize(10),
    T.Pad((5,5,13,13)),#左上右下
    T.ToTensor(),
    ])

#缩小到左下
zoom_out_LD = T.Compose([
    T.Resize(10),
    T.Pad((5,13,13,5)),#左上右下
    T.ToTensor(),
    ])

#缩小到右上
zoom_out_RU = T.Compose([
    T.Resize(10),
    T.Pad((13,5,5,13)),#左上右下
    T.ToTensor(),
    ])



def get_mnist_dataloader(transforms, batch_size, shuffle=False, if_print=False):
    set_train = dset.MNIST('../MNIST', train=True, transform=transforms, download=False)
    loader_train = DataLoader(set_train, batch_size=batch_size, shuffle=shuffle)
    set_test = dset.MNIST('../MNIST', train=False, transform=transforms,download=False)
    loader_test = DataLoader(set_test, batch_size=batch_size, shuffle=shuffle)
    if if_print:
        print("训练集大小：",set_train.train_data.size())
        print("训练集标签：",set_train.train_labels.size())
        print("测试集大小：",set_test.test_data.size())
        print("测试集标签：",set_test.test_labels.size())
    return loader_train, loader_test

# show imgs on tensorboard

In [3]:
train_none, test_none = get_mnist_dataloader(zoom_none, 64, if_print=True)
train_zoom_in, test_zoom_in = get_mnist_dataloader(zoom_in, 64)
train_zoom_out, test_zoom_out = get_mnist_dataloader(zoom_out, 64)
train_RD, test_RD = get_mnist_dataloader(zoom_out_RD, 64)
train_LU, test_LU = get_mnist_dataloader(zoom_out_LU, 64)
train_LD, test_LD = get_mnist_dataloader(zoom_out_LD, 64)
train_RU, test_RU = get_mnist_dataloader(zoom_out_RU, 64)


test_imgs = next(iter(test_RU))
print(test_imgs[0].shape)
writer = SummaryWriter()
writer.add_image('Image', test_imgs[0], 1)
writer.close()

训练集大小： torch.Size([60000, 28, 28])
训练集标签： torch.Size([60000])
测试集大小： torch.Size([10000, 28, 28])
测试集标签： torch.Size([10000])
torch.Size([64, 1, 28, 28])


# train

In [4]:
def reset(m):
    if hasattr(m, 'reset_parameters'):
        m.reset_parameters()
    
def train(model, train_data, test_data, num_epochs = 1, print_every = 200):
    model.apply(reset)
    loss_fn = nn.CrossEntropyLoss().cuda()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    for epoch in range(num_epochs):
        print('Starting epoch %d / %d' % (epoch + 1, num_epochs))
        check_accuracy(model, test_data)
        model.train()
        for t, (x, y) in enumerate(train_data):
            x_var = Variable(x.cuda())
            y_var = Variable(y.cuda().long())
            scores = model(x_var)        
            loss = loss_fn(scores, y_var) 
            if (t + 1) % print_every == 0:
                print('t = %d, loss = %.4f' % (t + 1, loss.data[0]))            
            optimizer.zero_grad()
            loss.backward() 
            optimizer.step()
    print("\n test_none_accuracy")
    check_accuracy(model, test_none)
    print("test_zoom_in_accuracy")
    check_accuracy(model, test_zoom_in)
    print("test_zoom_out")
    check_accuracy(model, test_zoom_out)
    print("test_RD")
    check_accuracy(model, test_RD)
    print("test_LU")
    check_accuracy(model, test_LU)
    print("test_LD")
    check_accuracy(model, test_LD)
    print("test_RU")
    check_accuracy(model, test_RU)
            
def check_accuracy(model, test_data):
    num_correct = 0
    num_samples = 0
    model.eval() 
    for x, y in test_data:
        x_var = Variable(x.cuda(), volatile=True)
        scores = model(x_var)
        _, preds = scores.data.cpu().max(1)
        num_correct += (preds == y).sum()
        num_samples += preds.size(0)
    acc = float(num_correct) / num_samples
    print('Got %d / %d correct (%.2f)' % (num_correct, num_samples, 100 * acc))

### PoolBlock

In [5]:
#PoolBlock
class PoolBlock(nn.Module):
    """Pool模块"""

    def __init__(self, in_ch, out_ch):
        super(PoolBlock, self).__init__()
        self.conv_layer = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, stride=1, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
        self.pool = nn.MaxPool2d(28,28)
        self.conv2 = nn.Conv2d(out_ch*2,out_ch,1)

    def forward(self, x):
        res = x
        x = self.conv_layer(x)
        #x = x + res
        x2 = self.pool(x)
        x2 = x2.expand_as(x)
        x = torch.cat((x,x2),dim=1)
        x = self.conv2(x)
        return x

#test
x = torch.randn(1,1,28,28)
x = Variable(x)
pool_net = PoolBlock(1,5)
y = pool_net(x)
y.shape

torch.Size([1, 5, 28, 28])

# new_net

In [6]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.feature = nn.Sequential(
            PoolBlock(1,64),
            PoolBlock(64,64),
            PoolBlock(64,64),
            PoolBlock(64,64),
            PoolBlock(64,32),
        )
        self.classifier = nn.Sequential(
            nn.Linear(32*28*28, 10),
        )
    def forward(self, x):
        x = self.feature(x)
        #print(x.shape)
        x = x.view(x.size(0),-1)
        x = self.classifier(x)
        return x

net = Net().cuda()

input = Variable(torch.randn(5, 1, 28, 28)).cuda()
out = net(input)
print(out.shape)

torch.Size([5, 10])


# run

In [60]:
# 训练naive_net
train(net, 
      train_zoom_out,
      test_zoom_out, 
      num_epochs=10)

Starting epoch 1 / 10
Got 980 / 10000 correct (9.80)
t = 200, loss = 1.0489
t = 400, loss = 0.3484
t = 600, loss = 0.4907
t = 800, loss = 0.3932
Starting epoch 2 / 10
Got 8405 / 10000 correct (84.05)
t = 200, loss = 0.2754
t = 400, loss = 0.2686
t = 600, loss = 0.2656
t = 800, loss = 0.0665
Starting epoch 3 / 10
Got 8731 / 10000 correct (87.31)
t = 200, loss = 0.3065
t = 400, loss = 0.3054
t = 600, loss = 0.2174
t = 800, loss = 0.1535
Starting epoch 4 / 10
Got 9334 / 10000 correct (93.34)
t = 200, loss = 0.1528
t = 400, loss = 0.1769
t = 600, loss = 0.2032
t = 800, loss = 0.0365
Starting epoch 5 / 10
Got 9338 / 10000 correct (93.38)
t = 200, loss = 0.2943
t = 400, loss = 0.1492
t = 600, loss = 0.3521
t = 800, loss = 0.1463
Starting epoch 6 / 10
Got 9623 / 10000 correct (96.23)
t = 200, loss = 0.1067
t = 400, loss = 0.1276
t = 600, loss = 0.0880
t = 800, loss = 0.0181
Starting epoch 7 / 10
Got 9622 / 10000 correct (96.22)
t = 200, loss = 0.1217
t = 400, loss = 0.1528
t = 600, loss = 0.1

In [8]:
#对于位移不变性的泛化非常成功
#可以继续试验旋转不变性
#可以尝试膨胀卷积

In [61]:
torch.save(net.state_dict(),"旁路池化zoom_out")

In [62]:
net.load_state_dict(torch.load("旁路池化zoom_out"))

In [64]:
zoom_out_RO = T.Compose([
    T.Resize(10),
    T.Pad((9,9,9,9)),#左上右下
    T.RandomRotation(30),
    T.ToTensor(),
    ])

train_RO, test_RO = get_mnist_dataloader(zoom_out_RO, 64)


test_imgs = next(iter(test_RO))
print(test_imgs[0].shape)
writer = SummaryWriter()
writer.add_image('Image', test_imgs[0], 1)
writer.close()
check_accuracy(net, test_RO)

torch.Size([64, 1, 28, 28])
Got 7667 / 10000 correct (76.67)


In [38]:
class NaiveNet(nn.Module):
    def __init__(self):
        super(NaiveNet, self).__init__()
        self.feature = nn.Sequential(
            nn.Conv2d(1, 32, 3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(32, 32, 3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(32, 32, 3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            
        )
        self.classifier = nn.Sequential(
            nn.Linear(32, 10),
        )
    def forward(self, x):
        x = self.feature(x)

        x = x.view(x.size(0),-1)
        x = self.classifier(x)
        return x

naive_net = NaiveNet().cuda()

input = Variable(torch.randn(5, 1, 28, 28)).cuda()
out = naive_net(input)
print(out.shape)

torch.Size([5, 10])


In [48]:
train(naive_net, 
      train_zoom_out,
      test_zoom_out, 
      num_epochs=10)

Starting epoch 1 / 10
Got 989 / 10000 correct (9.89)
t = 200, loss = 0.9017
t = 400, loss = 0.4526
t = 600, loss = 0.3599
t = 800, loss = 0.4901
Starting epoch 2 / 10
Got 9204 / 10000 correct (92.04)
t = 200, loss = 0.2919
t = 400, loss = 0.2195
t = 600, loss = 0.2348
t = 800, loss = 0.3003
Starting epoch 3 / 10
Got 9518 / 10000 correct (95.18)
t = 200, loss = 0.2182
t = 400, loss = 0.1961
t = 600, loss = 0.1983
t = 800, loss = 0.2018
Starting epoch 4 / 10
Got 9579 / 10000 correct (95.79)
t = 200, loss = 0.1647
t = 400, loss = 0.1741
t = 600, loss = 0.1892
t = 800, loss = 0.1514
Starting epoch 5 / 10
Got 9612 / 10000 correct (96.12)
t = 200, loss = 0.1430
t = 400, loss = 0.1466
t = 600, loss = 0.1891
t = 800, loss = 0.1276
Starting epoch 6 / 10
Got 9639 / 10000 correct (96.39)
t = 200, loss = 0.1240
t = 400, loss = 0.1417
t = 600, loss = 0.1896
t = 800, loss = 0.1185
Starting epoch 7 / 10
Got 9659 / 10000 correct (96.59)
t = 200, loss = 0.1118
t = 400, loss = 0.1402
t = 600, loss = 0.1

In [41]:
check_accuracy(naive_net, test_out)

Got 9767 / 10000 correct (97.67)


In [57]:
zoom_out_RO = T.Compose([
    T.Resize(10),
    T.Pad(9),#左上右下
    T.RandomRotation(30),
    T.ToTensor(),
    ])

In [59]:
train_RO, test_RO = get_mnist_dataloader(zoom_out_RO, 64)
check_accuracy(naive_net, test_RO)
#旋转90度，精度47.11，30度，精度85.75%
#在旋转不变性上，旁路池化性能有下降

Got 8574 / 10000 correct (85.74)


In [105]:
##错位测试

def CuoWei(x):
    y = torch.zeros(1,28,28)
    x1,x2 = torch.split(x,5,dim=1)
    y[:,2:7,2:12] = x1
    y[:,17:22,15:25] = x2
    return y

CW = T.Compose([
    T.Resize(10),
    T.ToTensor(),
    T.Lambda(lambda x: CuoWei(x))
    ])

train_CW, test_CW = get_mnist_dataloader(CW, 64)

test_imgs = next(iter(train_CW))[0]
#test_imgs = CuoWei(test_imgs)
print(test_imgs.shape)
#test_imgs = CuoWei(test_imgs)
writer = SummaryWriter()
writer.add_image('Image', test_imgs, 1)
writer.close()

torch.Size([64, 1, 28, 28])


In [106]:
check_accuracy(naive_net, test_CW)

Got 1285 / 10000 correct (12.85)


In [110]:
check_accuracy(net, test_CW)

Got 2947 / 10000 correct (29.47)
