In [1]:
import torch
import torchvision
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
from torch.utils.data import sampler
from torch.autograd import Variable


In [2]:
def unpickle(file):
    import pickle
    with open(file) as fo:
        dict = pickle.load(fo, encoding-'bytes')
    return dict

In [3]:
# Define a convolutional block
# A single layer of convolution if shortcut == False, and...
# Two layers of convolution and a residual convolution otherwise.

class ConvBlock(nn.Module):
    
    # midChannels will be of no use if shortcut == False
    def __init__(self, inChannels, midChannels, outChannels, 
                 kernelSize, stride = 1, padding = 0, bias = True, shortcut = False):
        super(ConvBlock, self).__init__()
        if shortcut is False:
            self.left = nn.Sequential(
                nn.Conv2d(inChannels, outChannels, 
                          kernelSize, stride, padding, bias),
                nn.BatchNorm2d(outChannels)
            )
            self.right = None
        else:
            self.left = nn.Sequential(
                nn.Conv2d(inChannels, midChannels, 
                          kernelSize, stride, padding, bias),
                nn.BatchNorm2d(midChannels),
                nn.ReLU(inplace = True),
                nn.Conv2d(midChannels, outChannels, 
                          kernelSize, stride, padding, bias),
                nn.BatchNorm2d(outChannels)
            )
            self.right = nn.Sequential(
                nn.Conv2d(inChannels, outChannels, 
                          kernelSize, stride, padding, bias),
                nn.BatchNorm2d(outChannels)
            )
        
    def forward(self, input):
        out = self.left(input)
        if self.right is not None:
            out += self.right(input)
        return F.relu(out)
    

In [4]:
# Define a fully connected block
# A single layer if shortcut == False, and...
# Two layers and a residual layer otherwise.

class FCBlock(nn.Module):
    
    # midChannels will be of no use if shortcut == False
    def __init__(self, inNodes, midNodes, outNodes, shortcut = False):
        super(FCBlock, self).__init__()
        if shortcut is False:
            self.left = nn.Sequential(
                nn.Linear(inNodes, outNodes)
            )
            self.right = None
        else:
            self.left = nn.Sequential(
                nn.Linear(inNodes, midNodes),
                nn.ReLU(inplace = True),
                nn.Linear(midNodes, outNodes)
            )
            self.right = nn.Sequential(
                nn.Linear(inNodes, outNodes)
            )
            
    def forward(self, input):
        out = self.left(input)
        if self.right is not None:
            out += self.right(input)
        return F.relu(out)
        

In [5]:
class Flatten(nn.Module):
    def forward(self, x):
        N, C, H, W = x.size() # read in N, C, H, W
        return x.view(N, -1)  # "flatten" the C * H * W values into a single vector per image

In [6]:
class ResNet(nn.Module):
    
    def __init__(self):
        super(ResNet, self).__init__()
        
        self.blocks = nn.Sequential(
            ConvBlock(inChannels = 3, midChannels = 8, outChannels = 32, 
                      kernelSize = 3, padding = 1, shortcut = True),
            #ConvBlock(inChannels = 32, midChannels = 128, outChannels = 512, 
            #          kernelSize = 3, padding = 1, shortcut = True),
            
            Flatten(),
            
            FCBlock(inNodes = 32768, midNodes = 4096, outNodes = 512, 
                    shortcut = True),
            FCBlock(inNodes = 512, midNodes = 128, outNodes = 64, 
                    shortcut = True),
            
            FCBlock(inNodes = 64, midNodes = 0, outNodes = 10, 
                    shortcut = False)
            
        )
    
    def forward(self, input):
        out = self.blocks(input)
        return out

In [7]:
class ChunkSampler(sampler.Sampler):
    """Samples elements sequentially from some offset. 
    Arguments:
        num_samples: # of desired datapoints
        start: offset where we should start selecting from
    """
    def __init__(self, num_samples, start = 0):
        self.num_samples = num_samples
        self.start = start

    def __iter__(self):
        return iter(range(self.start, self.start + self.num_samples))

    def __len__(self):
        return self.num_samples

numTrain = 49000
numVal = 1000

trainData = datasets.CIFAR10('./data', train = True,
                           transform = transforms.ToTensor())
trainLoader = DataLoader(trainData, batch_size = 64, 
                              sampler = ChunkSampler(numTrain, 0))

valData = datasets.CIFAR10('./data', train = True,
                           transform = transforms.ToTensor())
valLoader = DataLoader(valData, batch_size = 64, 
                            sampler = ChunkSampler(numVal, numTrain))

testData = datasets.CIFAR10('./data', train = False,
                          transform = transforms.ToTensor())
testLoader = DataLoader(testData, batch_size=64)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [8]:
torch.cuda.is_available()

True

In [9]:
gpuDtype = torch.cuda.FloatTensor

def train(model, lossFunc, optimizer, numEpochs = 1, printEvery = 100):
    for epoch in range(numEpochs):
        print('Starting epoch %d / %d' % (epoch + 1, numEpochs))
        model.train()
        for t, (x, y) in enumerate(trainLoader):
            xVar = Variable(x.type(gpuDtype))
            yVar = Variable(y.type(gpuDtype).long())
            scores = model(xVar)            
            loss = lossFunc(scores, yVar)
            
            if (t + 1) % printEvery == 0:
                print('t = %d, loss = %.4f' % (t + 1, loss.item()))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

def checkAccuracy(model, loader):
    if loader.dataset.train:
        print('Checking accuracy on validation set')
    else:
        print('Checking accuracy on test set')   
    numCorrect = 0
    numSamples = 0
    model.eval() # Put the model in test mode (the opposite of model.train(), essentially)
    for x, y in loader:
        xVar = Variable(x.type(gpuDtype), volatile=True)

        scores = model(xVar)
        _, preds = scores.data.cpu().max(1)
        numCorrect += (preds == y).sum()
        numSamples += preds.size(0)
    acc = float(numCorrect) / numSamples
    print('Got %d / %d correct (%.2f)' % (numCorrect, numSamples, 100 * acc))

In [10]:
model = ResNet().type(gpuDtype)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = 0.0001)

train(model, criterion, optimizer, numEpochs = 3, printEvery = 100)
checkAccuracy(model, valLoader)

Starting epoch 1 / 3
t = 100, loss = 2.0931
t = 200, loss = 1.8322
t = 300, loss = 1.8377
t = 400, loss = 1.4903
t = 500, loss = 1.3859
t = 600, loss = 1.4997
t = 700, loss = 1.6152
Starting epoch 2 / 3
t = 100, loss = 1.3320
t = 200, loss = 1.3252
t = 300, loss = 1.4243
t = 400, loss = 1.1032
t = 500, loss = 1.0739
t = 600, loss = 1.1613
t = 700, loss = 1.3430
Starting epoch 3 / 3
t = 100, loss = 1.1041
t = 200, loss = 1.1149
t = 300, loss = 1.2095
t = 400, loss = 0.8543
t = 500, loss = 0.8849
t = 600, loss = 0.8831
t = 700, loss = 1.0783
Checking accuracy on validation set




Got 549 / 1000 correct (54.90)
