##1. Upload and visualize CIFAR10 dataset

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torch.utils.data.dataset import Dataset
import torch.utils as utils
import torch.nn.functional as F

import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import sklearn.metrics as skl



# Define CNN Architecture
device = 'cuda' if torch.cuda.is_available() else 'cpu'
epochs = 150


#Visualizing CIFAR 10
trainset = torchvision.datasets.CIFAR10(root='./data',download=True)
datanum = len(trainset)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
fig = plt.figure()
ims = np.random.randint(datanum, size=15)

for i in range(15):
    subplot = fig.add_subplot(3,5, i+1)
    subplot.set_xticks([])
    subplot.set_yticks([])
    PILimg, label = trainset[ims[i]]
    subplot.set_title("%s" %classes[label])
    subplot.imshow(PILimg)

plt.show()

## 2. Dataset and dataloader

In [None]:
# Data
print('==> Preparing data..')
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=128, shuffle=False, num_workers=2)

## 3. Network Model

In [None]:
# Model 
class ResNet50(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet50, self).__init__()

        self.conv1 = self.conv1_layer()
        self.conv2 = self.conv2_layer()
        self.conv3 = self.conv3_layer()
        self.conv4 = self.conv4_layer()
        self.conv5 = self.conv5_layer()
        self.linear = nn.Linear(2048, num_classes)


    def conv1_layer(self):
        return nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )


    def conv2_layer(self):
        layers = []
        layers.append(nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
        layers.append(conv_block(in_channels=64, filters = [64,64,256], strides=1))
        layers.append(identity_block(filters = [64,64,256]))
        layers.append(identity_block(filters = [64,64,256]))
        return nn.Sequential(*layers)


    def conv3_layer(self):
        layers = []
        layers.append(conv_block(in_channels=256, filters = [128,128,512], strides=2))
        layers.append(identity_block(filters = [128,128,512]))
        layers.append(identity_block(filters = [128,128,512]))
        layers.append(identity_block(filters = [128,128,512]))
        return nn.Sequential(*layers)


    def conv4_layer(self):
        layers = []
        layers.append(conv_block(in_channels=512, filters = [256,256,1024], strides=2))
        layers.append(identity_block(filters = [256,256,1024]))
        layers.append(identity_block(filters = [256,256,1024]))
        layers.append(identity_block(filters = [256,256,1024]))
        layers.append(identity_block(filters = [256,256,1024]))
        layers.append(identity_block(filters = [256,256,1024]))
        return nn.Sequential(*layers)


    def conv5_layer(self):
        layers = []
        layers.append(conv_block(in_channels=1024,filters = [512,512,2048], strides=2))
        layers.append(identity_block(filters = [512,512,2048]))
        layers.append(identity_block(filters = [512,512,2048]))
        return nn.Sequential(*layers)


    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.conv5(out)
        out = F.avg_pool2d(out, kernel_size=1)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out



class conv_block(nn.Module):

    def __init__(self, in_channels, filters, strides):
        super(conv_block, self).__init__()
        self.filters1, self.filters2, self.filters3 = filters
        self.conv1 = nn.Conv2d(in_channels, self.filters1, kernel_size=1, stride=strides, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(self.filters1)
        self.conv2 = nn.Conv2d(self.filters1, self.filters2, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(self.filters2)
        self.conv3 = nn.Conv2d(self.filters2, self.filters3, kernel_size=1, stride=1, padding=0, bias=False)
        self.bn3 = nn.BatchNorm2d(self.filters3)
        self.shortcut = nn.Sequential(
            nn.Conv2d(in_channels, self.filters3, kernel_size=1, stride=strides, bias=False),
            nn.BatchNorm2d(self.filters3)
        )


    def forward(self, x):

        out = self.conv1(x)
        out = self.bn1(out)
        #out = F.relu(out)
        out = nn.ReLU(inplace=True)(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = F.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)
        out += self.shortcut(x)
        out = F.relu(out)
        return out




class identity_block(nn.Module):

    def __init__(self, filters):
        super(identity_block, self).__init__()
        self.filters1, self.filters2, self.filters3 = filters
        self.conv1 = nn.Conv2d(self.filters3, self.filters1, kernel_size=1, stride=1, padding=0, bias=False)#padding=0=valid
        self.bn1 = nn.BatchNorm2d(self.filters1)
        self.conv2 = nn.Conv2d(self.filters1, self.filters2, kernel_size=3, stride=1, padding=1, bias=False)#padding=1=same
        self.bn2 = nn.BatchNorm2d(self.filters2)
        self.conv3 = nn.Conv2d(self.filters2, self.filters3, kernel_size=1, stride=1, padding=0, bias=False)#padding=0=valid
        self.bn3 = nn.BatchNorm2d(self.filters3)


    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = F.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)
        out += x
        out = F.relu(out)
        return out



print('==> Building model..')
net = ResNet50()


net = net.to(device)
if device == 'cuda':
    net = torch.nn.DataParallel(net)
    cudnn.benchmark = True

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

## 4. Training and Testing

In [None]:
#**************************************************
#                Training and testing
#**************************************************

def train(epoch):
    print('\nEpoch: %d' % epoch)
    print('\nTrain:')
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    print_time = -1
    for batch_idx, (inputs, targets) in enumerate(trainloader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        if batch_idx+1 == len(trainloader):
            print('[%3d/%3d] | Loss: %.3f | Acc: %.3f%% (%d/%d)'%(
                batch_idx+1, len(trainloader), train_loss/(batch_idx+1), 100.*correct/total, correct, total))


def test(epoch):
    print('\nTest:')
    global best_acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    print_time = -1
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            if batch_idx+1 == len(testloader):
                print('[%3d/%3d] | Loss: %.3f | Acc: %.3f%% (%d/%d)'%(
                    batch_idx+1, len(testloader), test_loss/(batch_idx+1), 100.*correct/total, correct, total))

    scheduler.step()
    for param_group in optimizer.param_groups:
        print('Current learning rate is {:f}'.format(param_group['lr']))
                

for epoch in range(epochs):
    train(epoch)
    test(epoch)

## 5. Plot confusion Matrix

In [None]:
def plot_confusion_matrix(cm, classes,normalize=False,title='Confusion matrix',cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    #print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], fmt),horizontalalignment="center",color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')


net.eval()
ylabel = []
yhatlabel = []

for batch_idx, (inputs, targets) in enumerate(testloader):
    inputs, targets = inputs.to(device), targets.to(device)
    outputs = net(inputs)
    _, predicted = outputs.max(1)
    ylabel = np.concatenate((ylabel, targets.cpu().numpy()))
    yhatlabel = np.concatenate((yhatlabel, predicted.cpu().numpy()))
# Compute confusion matrix
cnf_matrix = skl.confusion_matrix(ylabel, yhatlabel)
np.set_printoptions(precision=2)
is_correct = (ylabel == yhatlabel)
acc = np.sum(is_correct * 1) / len(is_correct)
print('accuracy:%.5f' %acc)


# Plot non-normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=classes,
                  title='Confusion matrix, without normalization')

# Plot normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=classes, normalize=True,
                  title='Normalized confusion matrix')

plt.show()