In [2]:
import numpy as np
import torch
from torch.nn import Module
from torch.nn import Sequential
from torch.nn import Conv2d
from torch.nn import Linear
from torch.nn import MaxPool2d
from torch.nn import ReLU
from torch.nn import LogSoftmax
import torch.nn as nn
from torch import flatten
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
def data_loader(data_dir,
                batch_size,
                random_seed = 42,
                valid_size = 0.2,
                shuffle = True,
                test = False):
  
  # replace mean and std with appropriate values to apply normalisation
  normalize = transforms.Normalize(
      mean = [0, 0, 0],
      std = [1, 1, 1]
  )

  # transforms
  transform = transforms.Compose([
      # transforms.Resize((siz1, siz2)),
      transforms.ToTensor(),
      normalize
  ])

  if test:
    dataset = datasets.ImageNet(
      root=data_dir,
      train=False,
      download=True,
      transform=transform,
    )

    data_loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle
    )

    return data_loader

  train_dataset = datasets.ImageNet(
      root=data_dir,
      train=True,
      download=True,
      transform=transform,
  )

  valid_dataset = datasets.ImageNet(
      root=data_dir,
      train=True,
      download=True,
      transform=transform,
  )

  num_train = len(train_dataset)
  indices = list(range(num_train))
  split = int(np.floor(valid_size * num_train))

  if shuffle:
      np.random.seed(random_seed)
      np.random.shuffle(indices)

  train_idx, valid_idx = indices[split:], indices[:split]
  train_sampler = SubsetRandomSampler(train_idx)
  valid_sampler = SubsetRandomSampler(valid_idx)

  train_loader = torch.utils.data.DataLoader(
      train_dataset, batch_size=batch_size, sampler=train_sampler)

  valid_loader = torch.utils.data.DataLoader(
      valid_dataset, batch_size=batch_size, sampler=valid_sampler)

  return (train_loader, valid_loader)


train_loader, valid_loader = data_loader(data_dir = './data',
                                         batch_size = 64)

test_loader = data_loader(data_dir = './data',
                          batch_size = 64,
                          test = True)

In [None]:
class VGG16(Module):
  def __init__(self, inChannels, classes, hyperparameters):
    super(VGG16, self).__init__()

    self.image_w = hyperparameters['w']
    self.image_h = hyperparameters['h']

    # all conv layers have ReLU activation
    # all calculations are based on sample images from ImageNet
    # the inputs to the layers are flexible according to hyperparameters


    # conv1_1 - 64, f = 3, p = 2, s = 1, out - 224 x 224 x 64
    # conv1_2 - 64, f = 3, p = 2, s = 1, out - 224 x 224 x 64
    # mxpool1 - 64, f = 2, p = 0, s = 2, out - 112 x 112 x 64

    # after conv1
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv2
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after pool
    self.image_w = self.calc_dim(self.image_w, f = 2, p = 0, s = 2)
    self.image_h = self.calc_dim(self.image_h, f = 2, p = 0, s = 2)

    self.super_layer_1 = Sequential(
      Conv2d(in_channels = inChannels, out_channels = 64,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 64, out_channels = 64,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      MaxPool2d(kernel_size = (2, 2), stride = (2, 2))
    )


    # conv2_1 - 128, f = 3, p = 2, s = 1, out - 112 x 112 x 128
    # conv2_2 - 128, f = 3, p = 2, s = 1, out - 112 x 112 x 128
    # mxpool2 - 128, f = 2, p = 0, s = 2, out - 56 x 56 x 128
    # after conv1
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv2
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after pool
    self.image_w = self.calc_dim(self.image_w, f = 2, p = 0, s = 2)
    self.image_h = self.calc_dim(self.image_h, f = 2, p = 0, s = 2)
    self.super_layer_2 = Sequential(
      Conv2d(in_channels = 64, out_channels = 128,
            kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 128, out_channels = 128,
            kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      MaxPool2d(kernel_size = (2, 2), stride = (2, 2))
    )


    # conv3_1 - 64, f = 3, p = 2, s = 1, out - 56 x 56 x 256
    # conv3_2 - 64, f = 3, p = 2, s = 1, out - 56 x 56 x 256
    # conv3_3 - 64, f = 3, p = 2, s = 1, out - 56 x 56 x 256
    # mxpool3 - 64, f = 2, p = 0, s = 2, out - 28 x 28 x 256
    # after conv1
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv2
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv3
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after pool
    self.image_w = self.calc_dim(self.image_w, f = 2, p = 0, s = 2)
    self.image_h = self.calc_dim(self.image_h, f = 2, p = 0, s = 2)
    self.super_layer_3 = Sequential(
      Conv2d(in_channels = 128, out_channels = 256,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 256, out_channels = 256,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 256, out_channels = 256,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      MaxPool2d(kernel_size = (2, 2), stride = (2, 2))
    )


    # conv4_1 - 64, f = 3, p = 2, s = 1, out - 28 x 28 x 512
    # conv4_2 - 64, f = 3, p = 2, s = 1, out - 28 x 28 x 512
    # conv4_3 - 64, f = 3, p = 2, s = 1, out - 28 x 28 x 512
    # mxpool4 - 64, f = 2, p = 0, s = 2, out - 14 x 14 x 512
    # after conv1
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv2
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv3
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after pool
    self.image_w = self.calc_dim(self.image_w, f = 2, p = 0, s = 2)
    self.image_h = self.calc_dim(self.image_h, f = 2, p = 0, s = 2)
    self.super_layer_4 = Sequential(
      Conv2d(in_channels = 256, out_channels = 512,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 512, out_channels = 512,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 512, out_channels = 512,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      MaxPool2d(kernel_size = (2, 2), stride = (2, 2))
    )


    # conv5_1 - 64, f = 3, p = 2, s = 1, out - 14 x 14 x 512
    # conv5_2 - 64, f = 3, p = 2, s = 1, out - 14 x 14 x 512
    # conv5_3 - 64, f = 3, p = 2, s = 1, out - 14 x 14 x 512
    # mxpool5 - 64, f = 2, p = 0, s = 2, out - 7 x 7 x 512
    # after conv1
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv2
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after conv3
    self.image_w = self.calc_dim(self.image_w, f = 3, p = 2, s = 1)
    self.image_h = self.calc_dim(self.image_h, f = 3, p = 2, s = 1)
    # after pool
    self.image_w = self.calc_dim(self.image_w, f = 2, p = 0, s = 2)
    self.image_h = self.calc_dim(self.image_h, f = 2, p = 0, s = 2)
    self.super_layer_5 = Sequential(
      Conv2d(in_channels = 512, out_channels = 512,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 512, out_channels = 512,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      Conv2d(in_channels = 512, out_channels = 512,
             kernel_size = (3, 3), stride = (1, 1), padding = (2, 2)),
      ReLU(),
      MaxPool2d(kernel_size = (2, 2), stride = (2, 2))
    )


    # flattening the tensor: 7 x 7 x 512 -> 25088
    # fc1 - in: 7 x 7 x 512, out: 4096, ReLU
    # fc2 - in: 4096, out: 4096, ReLU
    # fc3 - in: 4096, out: classes
    self.super_layer_6 = Sequential(
      Linear(in_features = self.image_w * self.image_h * 512, out_features = 4096),
      Linear(in_features = 4096, out_features = 4096),
      Linear(in_features = 4096, out_features = classes)
    )

  def forward(self, x):
    # the input is assumed to 
    x = self.super_layer_1(x)
    x = self.super_layer_2(x)
    x = self.super_layer_3(x)
    x = self.super_layer_4(x)
    x = self.super_layer_5(x)
    x = flatten(x, 1)
    x = self.super_layer_6(x)
    out = LogSoftmax(x)

    return out

  def calc_dim(self, dim, f, p, s):
    # f: filter size
    # p: padding
    # s: stride

    return (dim + 2*p - f)//s + 1




In [None]:
num_classes = 21841
num_epochs = 20
batch_size = 16
learning_rate = 0.005

hyperparameters = {
  'image_w': 224,
  'image_h': 224
}

model = VGG16(num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 0.005, momentum = 0.9)  


# Train the model
total_step = len(train_loader)

In [None]:
total_step = len(train_loader)

for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):  
        # Move tensors to the configured device
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
            
    # Validation
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs
    
        print('Accuracy of the network on the {} validation images: {} %'.format(5000, 100 * correct / total)) 

In [None]:
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs

    print('Accuracy of the network on the {} test images: {} %'.format(10000, 100 * correct / total))   