## Train a ResNet from scratch

Build the Residual Network specified in Figure 1 and achieve at least 60% test accuracy.

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import h5py
import datetime

import torch
import torch.nn as nn
import torch.utils.data as Data

  from ._conv import register_converters as _register_converters


In [2]:
import torchvision
from torchvision import transforms

In [3]:
import logging

logging.basicConfig(level=logging.INFO,
                    filename='bw.log',
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')

In [3]:
import sys
old_stdout = sys.stdout
log_file = open("message.log","w")
sys.stdout = log_file

In [4]:
# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [3]:
# Data Augmentation
ds_trans = transforms.Compose([transforms.RandomCrop(32,padding=4),
                              transforms.RandomHorizontalFlip(),
                              transforms.ToTensor()])

In [4]:
# Load data
BatchSize = 100
trainset = torchvision.datasets.CIFAR100(root='./data', train=True, download=True,
                                        transform=ds_trans)
train_loader = Data.DataLoader(trainset, batch_size=BatchSize, shuffle=True)
testset = torchvision.datasets.CIFAR100(root='./data', train=False, download=True,
                                       transform=ds_trans)
test_loader = torch.utils.data.DataLoader(testset, batch_size=BatchSize, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [5]:
# Load data for debuging
CIFAR = h5py.File("..\CIFAR10.hdf5","r")
x_train = np.float32(CIFAR['X_train'][:])
x_test = np.float32(CIFAR['X_test'][:])
y_train = np.int32(CIFAR['Y_train'][:])
y_test = np.int32(CIFAR['Y_test'][:])

# Transform into torch.Tensor
x_traints = torch.Tensor(x_train)
y_traints = torch.Tensor(y_train)
x_testts = torch.Tensor(x_test)
y_testts = torch.Tensor(y_test)

# Pack data for debuging
trainset = Data.TensorDataset(x_traints, y_traints)
testset = Data.TensorDataset(x_testts, y_testts)
# Put datasets into Data Loader
BatchSize = 100
train_loader = Data.DataLoader(dataset=trainset, batch_size=BatchSize, shuffle=True)
test_loader = Data.DataLoader(dataset=testset, batch_size=BatchSize, shuffle=False)

In [7]:
# Define residual block
class Residual_block(nn.Module):
    def __init__(self, in_channel, out_channel, kernel_size=3, stride=1, padding=1, downsample=None):
        super(Residual_block, self).__init__()
        self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channel, out_channel, kernel_size=kernel_size, padding=padding)
        self.bn2 = nn.BatchNorm2d(out_channel)
        self.downsample = downsample # (Conv2d + BatchNorm2d) that reduces the size of sample
    
    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

In [8]:
# Define ResNet
class ResNet(nn.Module):
    def __init__(self, block, channel_first, channel_per_layer, block_per_layer, stride_per_layer, 
                 dropout_rate=0.25, num_classes=100):
        super(ResNet, self).__init__()
        self.in_channels = channel_first
        self.conv = nn.Conv2d(in_channels=3, out_channels=channel_first, kernel_size=3, padding=1) # Inputs are 3-dim images
        self.bn = nn.BatchNorm2d(channel_first)
        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout2d(p=dropout_rate)
        if not (len(block_per_layer)==len(channel_per_layer)==len(stride_per_layer)):
            raise ValueError("Length of parameters must be the same!") 
        self.num_layer = len(block_per_layer)
        for i in range(self.num_layer):
            exec("self.layer{} = self.make_layer(block=block, out_channel={}, num_block={}, stride={})".format(
            i+1, channel_per_layer[i], block_per_layer[i], stride_per_layer[i]))
        self.max_pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc = nn.Linear(int(32/np.prod(stride_per_layer)/2)**2 * channel_per_layer[-1], num_classes)
        
    def make_layer(self, block, out_channel, num_block, stride):
        downsample = None
        if (stride!=1) or (self.in_channels != out_channel):
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channel, kernel_size=3, stride=stride, padding=1),
                nn.BatchNorm2d(out_channel))
        layers = []
        layers.append(block(self.in_channels, out_channel, stride=stride, downsample=downsample))
        self.in_channels = out_channel
        for i in range(1, num_block):
            layers.append(block(out_channel, out_channel))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = eval("(".join(["self.layer{}".format(i) for i in range(self.num_layer,0,-1)])+"(out"+")"*self.num_layer)
        out = self.max_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [9]:
# Run CIFAR10 for debuging
# Build model
model = ResNet(block=Residual_block, channel_first=16, channel_per_layer=[16,32,64], 
               block_per_layer=[2,2,2], stride_per_layer=[1,2,2]).to(device)

# Define Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=1e-4)

In [10]:
# Run CIFAR100
# Build model
model = ResNet(block=Residual_block, channel_first=32, channel_per_layer=[32,64,128,256], 
               block_per_layer=[2,4,4,2], stride_per_layer=[1,2,2,2])

# Define Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=1e-4)

In [11]:
# Model training process
start_time = datetime.datetime.now()
num_epochs = 2
num_steps = len(train_loader)

for epoch in range(num_epochs):
    total = 0
    correct = 0 # Count accuracy in each epoch
    for i,(images,labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device).long()
            
        # Forward
        outputs = model(images)
        loss = criterion(outputs, labels)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (labels == predicted).sum().item()
            
        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
            
        if (i+1)%100 == 0:
            print("Epoch[{}/{}], Step[{}/{}], Loss {:4f}, Accuracy {:4f}%".format(
            epoch+1, num_epochs, i+1, num_steps, loss.item(), correct/total*100))
            now_time = datetime.datetime.now()
            print("Total cost time:{}".format(now_time-start_time))

Epoch[1/2], Step[100/500], Loss 4.466354, Accuracy 2.600000%
Total cost time:0:08:33.270588


KeyboardInterrupt: 

In [None]:
# Evaluate with test set
model.eval()
with torch.no_grad():
    correct_test = 0
    total_test = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device).long()
        
        outputs = model(images)
        loss_test = criterion(outputs, labels)
        _, predicted = torch.max(outputs.data, 1)
        total_test += labels.size(0)
        correct_test += (labels == predicted).sum().item()
    
    print("Test Accuracy of the model on test images:{}%".format(correct_test / total_test * 100))

In [None]:
# Save model
torch.save(model,"ResNetScratch.ckpt")

In [11]:
print("this will be written to message.log")
sys.stdout = old_stdout
log_file.close()

In [9]:
a = [1,2,3,4,5,6,7,8,9]
b = [3,4,6,3,8,9,5,3,1]
num_e = len(b)

fig0,ax0 = plt.subplots(1, 1, figsize=(16, 9), dpi=100)

ax0.plot(a,label="Train",color='red',linestyle = '-')
ax0.plot(b,label="Test",color='blue',linestyle = '--')

ax0.set_ylim(ymin=0,ymax=10)
ax0.set_xlabel("Epochs")
ax0.set_ylabel("Accuracy")
ax0.set_title("Accuracy Plot")
axisloc = range(0,num_e,round(num_e/3))
ax0.set_xticks(axisloc)
ax0.legend(loc="best")
plt.savefig("Graph")