<a href="https://colab.research.google.com/github/shella688/INT2-GROUP12/blob/main/CNNImageNetwork.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
import torch
import torch.nn as nn 

class SimpleNet(nn.Module):
  def __init__(self, num_classes=10):
    # Creating layers ooooo spooky
    super(SimpleNet, self).__init__()

    self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=3, stride=1, padding=1)
    # Input layers has input images have 3 channels - RGB
    # We want to apply 12 feature detectors, hence 12 output channels
    # Stride = convolution moves 1 pixel at a time
    # Padding 1 = images padded w/ zeros so input size==output
    # Out channels of this layer == In channels of next layer

    self.relu1 = nn.ReLU()
    # ReLU activation function
    # Appled to incoming features
    # anything <0 is set to 0, otherwise kept the same

    self.conv2 = nn.Conv2d(in_channels=12, out_channels=12, kernel_size=3, stride=1, padding=1)
    self.relu2 = nn.ReLU()

    self.pool = nn.MaxPool2d(kernel_size=2)
    # Reduces dimension of image
    # 4 pixels become 1
    
    self.conv3 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=3, stride=1, padding=1)
    self.relu3 = nn.ReLU()

    self.conv4 = nn.Conv2d(in_channels=24, out_channels=24, kernel_size=3, stride=1, padding=1)
    self.relu4 = nn.ReLU()

    self.fc = nn.Linear(in_features=16 * 16 * 24, out_features=num_classes)
    # Final layer
    # standard, fully-connect layer

  def forward(self, input):
    output = self.conv1(input)
    output = self.relu1(output)

    output = self.conv2(output)
    output = self.relu2(output)

    output = self.pool(output)

    output = self.conv3(output)
    output = self.relu3(output)

    output = self.conv4(output)
    output = self.relu4(output)

    output = self.view(-1, 16*16*24)
    # Flatten entire feature map before passing to image

    output = self.fc(output)

    return output

This is, however, a long-winded way of doing things. We can ~ modularise it ~

In [35]:
# Below code does similar to above, but in modular fashion
# it would form part of a larger SimpleNet

class Unit(nn.Module):
  def __init__(self, in_channels, out_channels):
    super(Unit, self).__init__()

    self.conv = nn.Conv2d(in_channels=in_channels, kernel_size=3, 
                          out_channels=out_channels, stride=1, padding=1)
    self.bn = nn.BatchNorm2d(num_features=out_channels)
    self.relu = nn.ReLU()

  def forward(self, input):
    output = self.conv(input)
    output = self.bn(output) 
    output = self.relu(output) 

    return output

class SimpleModularNet(nn.Module):
  def __init__(self, num_classes=10):
    super(SimpleModularNet, self).__init__()

    # creates 14 layers, with max pooling in between 
    self.unit1 = Unit(in_channels=3, out_channels=32)
    self.unit2 = Unit(in_channels=32, out_channels=32)
    self.unit3 = Unit(32, 32)

    self.pool1 = nn.MaxPool2d(kernel_size=2)

    self.unit4 = Unit(32, 64)
    self.unit5 = Unit(64, 64)
    self.unit6 = Unit(64, 64)
    self.unit7 = Unit(64, 64)

    self.pool2 = nn.MaxPool2d(kernel_size=2)

    self.unit8 = Unit(64, 128)
    self.unit9 = Unit(128, 128)
    self.unit10 = Unit(128, 128)
    self.unit11 = Unit(128, 128)

    self.pool3 = nn.MaxPool2d(kernel_size=2)

    self.unit12 = Unit(128, 128)
    self.unit13 = Unit(128, 128)
    self.unit14 = Unit(128, 128)

    self.avgpool = nn.AvgPool2d(kernel_size=4)
    # this turns feature map into 1x1x128
    # giving us 128 input features

    # Add units to sequential layer in numerical order
    self.net = nn.Sequential(self.unit1, self.unit2, self.unit3, self.unit4, 
                             self.unit5, self.unit6, self.unit7, self.unit8, 
                             self.unit9, self.unit10, self.unit11, self.unit12, 
                             self.unit13, self.unit14, self.avgpool)
    
    self.fc = nn.Linear(in_features=128, out_features=num_classes)

  def forward(self, input):
    output = self.net(input)
    output = output.view(-1, 128)
    # flattening output to also have 128 features
    output = self.fc(output)
    return output



From here, we're loading + augumenting data (specifically the CIFAR10 dataset, which contains 60,000 32x32 colour images in 10 different classes - aeroplanes, cars, birds, cats, deer, dogs, frogs, horsese, ships, trucks)

In [23]:
from torchvision.datasets import CIFAR10
from torchvision.transforms import transforms
from torch.utils.data import DataLoader

In [36]:
# Here we define transformations for training set, randomly flip images,
# crop them, and apply mean + std normalisation
train_transformations = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    # this transforms image into usable format

    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
])

# Load training set
train_set = CIFAR10(root="./data", train=True, transform=train_transformations, 
                    download=True)

# Create loader for said training set
train_loader = DataLoader(train_set, batch_size=32, shuffle=True,
                          num_workers=1)
# training data contains 32 images

Files already downloaded and verified


In [34]:
# We do a similar thing for test data
test_transformations = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
    # however, we don't need the various other transformations,
    # because we're just testign if the model works, NOT training it
    # to recognise images that may be flipped, rotated, and cropped
])

# load test set - set train to False 
test_set = CIFAR10(root="./data", train=False, transform=test_transformations, 
                   download=True)

# Loader for the test set, shuffle is this time set to False
test_loader = DataLoader(test_set, batch_size=32, shuffle=False,
                         num_workers=1)

Files already downloaded and verified


Now, we need to train the model. This uses a variation on gradient descent called the Adam optimiser

In [26]:
from torch.optim import Adam

# check that GPU support is available
cuda_avail = torch.cuda.is_available()

# Create model, optimiser, and loss function
# this is the point where you could import an existing model, ORRRR your
# own homemade one!
model = SimpleModularNet(num_classes=10)

# if cude available, move model to GPU
if cuda_avail:
  model.cuda()

# define optimiser and loss function 
optimiser = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
loss_fn = nn.CrossEntropyLoss()

In [27]:
# Now make a learning rate adjustment function
# it divides learning rate by 10 every 30 epochs
def learning_adjustment_rate(epoch):
  lr = 0.001

  if epoch > 180:
    lr /= 1000000 
  elif epoch > 150:
    lr /= 100000
  elif epoch > 120: 
    lr /= 10000 
  elif epoch > 90:
    lr /= 1000
  elif epoch > 60:
    lr /= 100 
  elif epoch > 30:
    lr /= 10

  for param_group in optimiser.param_groups:
    param_group["lr"] = lr

In [28]:
# here we give ourselves the ability to save + evaluate the model
def save_model(epoch):
  torch.save(model.state_dict(), "cifar10model_{}.model".format(epoch))
  print("Checkpoint saved")

def test():
  model.eval() 
  test_acc = 0.0 
  for i, (images, labels) in enumerate(test_loader):
    # iterate over test images

    if cuda_avail:
      images = Variable(images.cuda()) #(this is underlined but IS allowed)
      labels = Variable(labels.cuda())

    # Predict classes using images from test set 
    outputs = model(images)
    _, prediction = torch.max(outputs.data, 1)
    # maximum prediction picked and compared to actual class to obtain
    # accuracy

    test_acc += torch.sum(prediction == labels.data)

  # calculat average accuracy and loss over all test images 
  test_acc /= 10000

  return test_acc

In [29]:
# function to train machine
def train(num_epochs):
  best_acc = 0.0

  for epoch in range(num_epochs):
    model.train()
    train_acc = 0.0
    train_loss = 0.0 
    for i, (images, labels) in enumerate(train_loader):
      # move images+labels to GPU if there is one 
      if cuda_avail:
        images = Variable(images.cuda)
        labels = Variable(labels.cuda)

      # clear any accumulated gradients 
      optimiser.zero_grad()
      # this is because weights in NN are adjusted based on the
      # various calculated gradients for each batch 
      # Resetting to 0 prevents images from previous batch affecting
      # current

      # Predict classes using images from test set 
      outputs = model(images)
      # compute loss based on pred vs actual
      loss = loss_fn(outputs, labels)
      # backpropagate loss
      loss.backward()

      # now adjust parameters based on gradient 
      optimiser.step()

      train_loss += loss.cpu().item() * images.size(0)
      _, prediction = torch.max(outputs.data, 1)

      train_acc += torch.sum(prediction == labels.data)

    # adjust learning rate
    learning_adjustment_rate(epoch)

    # calculate average accuracy + loss over all trining images
    train_acc /= 50000
    train_loss /= 50000 

    # evaluate on test set 
    test_acc = test()

    # save model if test accuracy better than current best
    if test_acc > best_acc:
      save_model(epoch) 
      best_acc = test_acc

    # Print the fun stuff
    print("Epoch {}, Train accuracy: {}, Train loss: {}, \
    Test accuracy: {}".format(epoch, train_acc, train_loss, test_acc))


In [37]:
if __name__ == "__main__":
  train(200)

TypeError: ignored

In [None]:
#Import needed packages
import torch
import torch.nn as nn
from torchvision.datasets import CIFAR10
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.autograd import Variable
import numpy as np


class Unit(nn.Module):
    def __init__(self,in_channels,out_channels):
        super(Unit,self).__init__()
        

        self.conv = nn.Conv2d(in_channels=in_channels,kernel_size=3,out_channels=out_channels,stride=1,padding=1)
        self.bn = nn.BatchNorm2d(num_features=out_channels)
        self.relu = nn.ReLU()

    def forward(self,input):
        output = self.conv(input)
        output = self.bn(output)
        output = self.relu(output)

        return output

class SimpleNet(nn.Module):
    def __init__(self,num_classes=10):
        super(SimpleNet,self).__init__()

        #Create 14 layers of the unit with max pooling in between
        self.unit1 = Unit(in_channels=3,out_channels=32)
        self.unit2 = Unit(in_channels=32, out_channels=32)
        self.unit3 = Unit(in_channels=32, out_channels=32)

        self.pool1 = nn.MaxPool2d(kernel_size=2)

        self.unit4 = Unit(in_channels=32, out_channels=64)
        self.unit5 = Unit(in_channels=64, out_channels=64)
        self.unit6 = Unit(in_channels=64, out_channels=64)
        self.unit7 = Unit(in_channels=64, out_channels=64)

        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.unit8 = Unit(in_channels=64, out_channels=128)
        self.unit9 = Unit(in_channels=128, out_channels=128)
        self.unit10 = Unit(in_channels=128, out_channels=128)
        self.unit11 = Unit(in_channels=128, out_channels=128)

        self.pool3 = nn.MaxPool2d(kernel_size=2)

        self.unit12 = Unit(in_channels=128, out_channels=128)
        self.unit13 = Unit(in_channels=128, out_channels=128)
        self.unit14 = Unit(in_channels=128, out_channels=128)

        self.avgpool = nn.AvgPool2d(kernel_size=4)
        
        #Add all the units into the Sequential layer in exact order
        self.net = nn.Sequential(self.unit1, self.unit2, self.unit3, self.pool1, self.unit4, self.unit5, self.unit6
                                 ,self.unit7, self.pool2, self.unit8, self.unit9, self.unit10, self.unit11, self.pool3,
                                 self.unit12, self.unit13, self.unit14, self.avgpool)

        self.fc = nn.Linear(in_features=128,out_features=num_classes)

    def forward(self, input):
        output = self.net(input)
        output = output.view(-1,128)
        output = self.fc(output)
        return output

#Define transformations for the training set, flip the images randomly, crop out and apply mean and std normalization
train_transformations = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32,padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

batch_size = 32

#Load the training set
train_set = CIFAR10(root="./data",train=True,transform=train_transformations,download=True)

#Create a loder for the training set
train_loader = DataLoader(train_set,batch_size=batch_size,shuffle=True,num_workers=4)


#Define transformations for the test set
test_transformations = transforms.Compose([
   transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))

])

#Load the test set, note that train is set to False
test_set = CIFAR10(root="./data",train=False,transform=test_transformations,download=True)

#Create a loder for the test set, note that both shuffle is set to false for the test loader
test_loader = DataLoader(test_set,batch_size=batch_size,shuffle=False,num_workers=4)

#Check if gpu support is available
cuda_avail = torch.cuda.is_available()

#Create model, optimizer and loss function
model = SimpleNet(num_classes=10)

if cuda_avail:
    model.cuda()

optimizer = Adam(model.parameters(), lr=0.001,weight_decay=0.0001)
loss_fn = nn.CrossEntropyLoss()

#Create a learning rate adjustment function that divides the learning rate by 10 every 30 epochs
def adjust_learning_rate(epoch):

    lr = 0.001

    if epoch > 180:
        lr = lr / 1000000
    elif epoch > 150:
        lr = lr / 100000
    elif epoch > 120:
        lr = lr / 10000
    elif epoch > 90:
        lr = lr / 1000
    elif epoch > 60:
        lr = lr / 100
    elif epoch > 30:
        lr = lr / 10

    for param_group in optimizer.param_groups:
        param_group["lr"] = lr




def save_models(epoch):
    torch.save(model.state_dict(), "cifar10model_{}.model".format(epoch))
    print("Checkpoint saved")

def test():
    model.eval()
    test_acc = 0.0
    for i, (images, labels) in enumerate(test_loader):
      
        if cuda_avail:
                images = Variable(images.cuda())
                labels = Variable(labels.cuda())

        #Predict classes using images from the test set
        outputs = model(images)
        _,prediction = torch.max(outputs.data, 1)
        # this was an ERROR 1: We do not need to convert tensor into numpy()
        #prediction = prediction.cpu().numpy()
        test_acc += torch.sum(prediction == labels.data)
             


    #Compute the average acc and loss over all 10000 test images
    test_acc = test_acc / 10000

    return test_acc

def train(num_epochs):
    best_acc = 0.0

    for epoch in range(num_epochs):
        model.train()
        train_acc = 0.0
        train_loss = 0.0
        for i, (images, labels) in enumerate(train_loader):
            #Move images and labels to gpu if available
            if cuda_avail:
                images = Variable(images.cuda())
                labels = Variable(labels.cuda())

            #Clear all accumulated gradients
            optimizer.zero_grad()
            #Predict classes using images from the test set
            outputs = model(images)
            #Compute the loss based on the predictions and actual labels
            loss = loss_fn(outputs,labels)
            #Backpropagate the loss
            loss.backward()

            #Adjust parameters according to the computed gradients
            optimizer.step()
          
            train_loss += loss.cpu().item() * images.size(0)
            _, prediction = torch.max(outputs.data, 1)

            # This was an ERROR 2
            #train_loss += loss.cpu().data[0] * images.size(0)
            #_, prediction = torch.max(outputs.data, 1)
            
            train_acc += torch.sum(prediction == labels.data)

        #Call the learning rate adjustment function
        adjust_learning_rate(epoch)

        #Compute the average acc and loss over all 50000 training images
        train_acc = train_acc / 50000
        train_loss = train_loss / 50000

        #Evaluate on the test set
        test_acc = test()

        # Save the model if the test acc is greater than our current best
        if test_acc > best_acc:
            save_models(epoch)
            best_acc = test_acc


        # Print the metrics
        print("Epoch {}, Train Accuracy: {} , TrainLoss: {} , Test Accuracy: {}".format(epoch, train_acc, train_loss,test_acc))


if __name__ == "__main__":
    train(200)

Files already downloaded and verified
Files already downloaded and verified
