In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
from torch.utils.data.dataset import Dataset
from torchvision import transforms
import pandas as pd
from PIL import Image
import cv2
import numpy as np
import os

In [3]:
# Use the GPU if there is one, otherwise CPU
use_gpu = torch.cuda.is_available()

if use_gpu:
    device = torch.device('cuda:0')
    print("Using GPU")
else:
    device = torch.device('cpu')
    print("Using CPU")

Using GPU


In [5]:
def compute_accuracy(net, testloader, conf=False):
    # Set the network into evaluation mode
    net.eval()
    correct = 0
    total = 0
    zeros = 0
    classes = 5
    
    # Initialize the confusion matrix
    confusion_matrix = torch.zeros(classes, classes)

    with torch.no_grad():
        for i, (images, labels) in enumerate(testloader):
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            # We're assuming batch size of 10 in testloader
            zeros += (predicted == torch.tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to(device)).sum().item()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            # Calculate the confusion matrix if requested
            if conf:
                for t, p in zip(labels.view(-1), predicted.view(-1)):
                    confusion_matrix[t.long(), p.long()] += 1
    
    # Print the confusion matrix if requested
    if conf:
        print("Confusion matrix")
        print(confusion_matrix)
        print("Per-class accuracy")
        print(confusion_matrix.diag()/confusion_matrix.sum(1))
        
    # Return the amount of predicted zeros and the accuracy
    return zeros, (correct / total)

In [6]:
class DatasetDRD(Dataset):
    def __init__(self, csv_path, data_folder, prefix):
        """
        Args:
            csv_path (string): path to csv file
            img_path (string): path to the folder where images are
            transform: pytorch transforms for transforms and tensor conversion
        """
        self.data_folder = data_folder
        self.prefix = prefix
        
        # Transform to tensor
        self.to_tensor = transforms.ToTensor()
        
        # Read the csv file
        self.data_info = pd.read_csv(csv_path)
        
        # Check that the files actually exist
        assert self.data_info['image'].apply(lambda x: os.path.isfile(self.data_folder
                                                                      + '/' + self.prefix + x + '.jpeg')).all(), \
        "Some images referenced in the CSV file were not found"
        
        # First column contains the image paths
        self.image_arr = np.asarray(self.data_info.iloc[:, 0])
        
        # Second column is the labels
        self.label_arr = np.asarray(self.data_info.iloc[:, 1])
        
        # Calculate len
        self.data_len = len(self.data_info.index)

    def __getitem__(self, index):
        # Get image name from the pandas df
        single_image_name = self.image_arr[index]
        # Open image
        img_as_img = Image.open(self.data_folder + '/' + self.prefix + single_image_name + '.jpeg')

        # Transform image to tensor
        img_as_tensor = self.to_tensor(img_as_img)

        # Get label(class) of the image based on the cropped pandas column
        single_image_label = self.label_arr[index]

        return (img_as_tensor, single_image_label)

    def __len__(self):
        return self.data_len

In [10]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(Block, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False, stride=stride)
        self.batch1 = nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.batch2 = nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        self.stride = stride
        
        self.skipconv = nn.Conv2d(in_channels, out_channels, kernel_size=1, padding=0, bias=False, stride=stride)
        self.skipbatch = nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        
        # Detect possible skip connection with conv2d and batchnorm
        self.inout = True if in_channels != out_channels else False

    def forward(self, x):
        skipout = x
        
        out = self.conv1(x)
        out = self.batch1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.batch2(out)
        
        # Perform the actual skip connection with conv2d and batchnorm
        if (self.stride != 1 or self.inout):
            skipout = self.skipconv(skipout)
            skipout = self.skipbatch(skipout)
        
        out += skipout
        out = self.relu(out)
        
        return out

In [11]:
class GroupOfBlocks(nn.Module):
    def __init__(self, in_channels, out_channels, n_blocks, stride=1):
        super(GroupOfBlocks, self).__init__()
        
        # First block stride can be defined to be something but the default
        first_block = Block(in_channels, out_channels, stride)
        # The rest of the blocks have stride as one
        other_blocks = [Block(out_channels, out_channels) for _ in range(1, n_blocks)]
        self.group = nn.Sequential(first_block, *other_blocks)

    def forward(self, x):
        return self.group(x)

In [12]:
class ResNet(nn.Module):
    def __init__(self, n_blocks, n_channels=16, num_classes=5):
        super(ResNet, self).__init__()
        
        # Layers before blocks
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=n_channels, kernel_size=5, stride=1, padding=2, bias=False)
        self.bn1 = nn.BatchNorm2d(n_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Create the group of blocks with desired settings depending on blocks location withing group
        self.groupblocks = [GroupOfBlocks(
            in_channels=n_channels * (2**(0 if (idx-1) == -1 else (idx-1))),
            out_channels=n_channels * 2**(idx),
            n_blocks=x,
            stride=1 if idx == 0 else 2)
                            for idx, x in enumerate(n_blocks)]
        self.groupb = nn.Sequential(*self.groupblocks)

        self.avgpool = nn.AvgPool2d(kernel_size=4, stride=1)
        self.fc = nn.Linear(20736*n_channels, num_classes)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, np.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def forward(self, x, verbose=False):
        # Initial layers
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        # "Group of blocks"
        x = self.groupb(x)

        # The global average pool
        x = self.avgpool(x)

        # Reshape the tensor for the final linear layer
        x = x.view(-1, self.fc.in_features)
        x = self.fc(x)

        return x

In [14]:
# Note: All the training and testing were performed in Google Colaboratory
#           so it is not possible to test the network here.

# The folders and csv files can't be found in Github for obvious reasons
data_drd = DatasetDRD(csv_path='train5k.csv.mod.rot', data_folder='rtrain5k', prefix='600_600_300_')
test_drd = DatasetDRD(csv_path='test1k.csv.mod', data_folder='test1k', prefix='600_600_300_')

# Set up the loaders for training and testing
trainloader = torch.utils.data.DataLoader(data_drd, batch_size=10, shuffle=True, pin_memory=True)
testloader = torch.utils.data.DataLoader(test_drd, batch_size=10, shuffle=False, pin_memory=True)

In [15]:
# The parameters for the selected architecture
n_blocks = [4, 4, 5]
lr_rand = 0.00001
n_channels = 16

# Debug info
print("Running round with architecture: {}, learning rate: {}, n_channels: {}".format(n_blocks, lr_rand, n_channels))

# Initialize the network and move it to the device
net = ResNet(n_blocks, n_channels=n_channels)
net.to(device)

# Set up the criterion (CrossEntropyLoss) and optimizer (Adam)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=lr_rand)

# Train for 10 rounds
n_epochs = 10

# Set the network to training mode and start training
net.train()
for epoch in range(n_epochs):
    running_loss = 0.0
    print_every = 200  # mini-batches
    for i, (inputs, labels) in enumerate(trainloader, 0):
        # Transfer to GPU
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Print loss
        running_loss += loss.item()
        if (i % print_every) == (print_every-1):
            print('[%d, %5d] loss: %.3f' % (epoch+1, i+1, running_loss/print_every))
            running_loss = 0.0

    # Get the accuracy of the network, prints also the confusion matrix
    _, accuracy = compute_accuracy(net, testloader, conf=True)
    print('Accuracy of the network on the test images: {:.4f}'.format(accuracy))

print('Training done!')