In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
cd gdrive/MyDrive/deep_learning/computer_vision/convnet/

In [None]:
#Import Statements
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from random import randint
import utils
import time

In [None]:
#Connect GPU
device= torch.device("cuda")
#device= torch.device("cpu")
print(device)

In [None]:
torch.cuda.get_device_name()

In [None]:
#Import data
train_data=torch.load('../../data/cifar/train_data.pt')
train_label=torch.load('../../data/cifar/train_label.pt')
test_data=torch.load('../../data/cifar/test_data.pt')
test_label=torch.load('../../data/cifar/test_label.pt')

print(train_data.size())
print(test_data.size())

In [None]:
#Data augmentation and EDA
train_data_inverse = torch.flip(train_data,dims=[3])
train_data = torch.cat( (train_data, train_data_inverse),0, )
train_label = torch.cat( (train_label, train_label),0,)

print(train_data.size())

mean = train_data.mean()
std = train_data.std()

print(mean)
print(std)

In [None]:
#Sending data to the GPU
train_data = train_data.to(device)
train_label = train_label.to(device)
test_data = test_data.to(device)
test_label = test_label.to(device)

In [None]:
#Basic block
class BasicBlock(nn.Module):

    def __init__(self, input_channels):
        super().__init__()
        #conv: bs x C x H x W  -->  bs x C x H x W
        self.conv = nn.Conv2d(input_channels, input_channels, kernel_size=3, padding=1)
        #batch normalization
        self.bn = nn.BatchNorm2d(input_channels)

    def forward(self, x):
        y = self.conv(x)
        y = F.relu(y)
        y = self.bn(y)

        z = self.conv(y)
        z = F.relu(z)
        z = self.bn(z)

        output = x + z

        return output

In [None]:
#Convnet Architecture
class ConvNet(nn.Module):
    def __init__(self, input_channels, classes, layers):
        super().__init__()

        #Initial convolutional layer:  bs x 3 x 32 x 32  --> bs x 16 x 32 x 32
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1)   #C: 3-->16, H&W: retained
        self.batchnorm = nn.BatchNorm2d(16)                                 #16 is the number of input channels

        #Basic/Residual Blocks 1: bs x 16 x 32 x 32 --> bs x 16 x 32 x 32
        basic_block_list1 = []
        for i in range(layers):
            basic_block_list1.append(BasicBlock(16))
        self.block_list1 = nn.ModuleList(basic_block_list1)

        #Bridge Block 1 bs x 16 x 32 x 32 --> bs x 32 x 16 x 16
        #WITH BLOCK: self.bridge1 = BridgeBlock(input_size)
        self.bridge1_conv1 = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1)
        self.bridge1_bn1 = nn.BatchNorm2d(32)
        self.bridge1_conv2 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1)
        self.bridge1_bn2 = nn.BatchNorm2d(32)

        #Basic/Residual Blocks 2: bs x 32 x 16 x 16 --> bs x 32 x 16 x 16
        basic_block_list2 = []
        for i in range(layers):
            basic_block_list2.append(BasicBlock(32))
        self.block_list2 = nn.ModuleList(basic_block_list2)

        #Bridge Block 2 bs x 32 x 16 x 16 --> bs x 64 x 8 x 8
        #self.bridge2 = BridgeBlock(hidden_size, 2*hidden_size)
        self.bridge2_conv1 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)
        self.bridge2_bn1 = nn.BatchNorm2d(64)
        self.bridge2_conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.bridge2_bn2 = nn.BatchNorm2d(64)

        #Basic/Residual Blocks 3: bs x 64 x 8 x 8 --> bs x 64 x 8 x 8
        basic_block_list3 = []
        for i in range(layers):
            basic_block_list3.append(BasicBlock(64))
        self.block_list3 = nn.ModuleList(basic_block_list3)

        #Final pooling: bs x 64 x 8 x 8 --> bs x 64 x 1 x 1
        self.avg_pool = nn.AvgPool2d(8)     #Used to be 8
        #Final layer: bs x 64 x 1 x 1 --> bs x 10
        self.linear_layer = nn.Linear(64, classes , bias=False)    #Used to be 64

        #Other
        self.layers = layers
        self.input_channels = input_channels
        self.classes = classes

    def forward(self, x):

        #Initial convolutional layer
        x = self.conv1(x)
        x = self.batchnorm(x)

        #Basic/Residual Blocks 1
        for i in range(self.layers):
            block = self.block_list1[i]
            x = block(x)

        #Bridge Block 1
        x = self.bridge1_conv1(x)
        x = self.bridge1_bn1(x)
        x = F.relu(x)
        x = self.bridge1_conv2(x)
        x = self.bridge1_bn2(x)
        x = F.relu(x)

        #Basic/Residual Blocks 2
        for i in range(self.layers):
            block = self.block_list2[i]
            x = block(x)

        #Bridge Block 2
        x = self.bridge2_conv1(x)
        x = self.bridge2_bn1(x)
        x = F.relu(x)
        x = self.bridge2_conv2(x)
        x = self.bridge2_bn2(x)
        x = F.relu(x)


        #Basic/Residual Blocks 3
        for i in range(self.layers):
            block = self.block_list3[i]
            x = block(x)

        #Final pooling and connected layer
        x = self.avg_pool(x)

        #bs x 64 x 1 x 1 > bs x 64
        inputs = x.view(x.size(0), -1)
        scores = self.linear_layer(inputs) ##used to be just x input

        return scores

In [None]:
def eval_on_test_set():
    net.eval()
    running_error=0
    num_batches=0

    with torch.no_grad():
        for i in range(0,10000,bs):

            minibatch_data =  test_data[i:i+bs]
            minibatch_label= test_label[i:i+bs]

            minibatch_data=minibatch_data.to(device)
            minibatch_label=minibatch_label.to(device)

            inputs = (minibatch_data - mean)/std

            scores = net( inputs )

            error = utils.get_error( scores , minibatch_label)

            running_error += error.item()

            num_batches+=1

    total_error = running_error/num_batches
    print( 'error rate on test set =', total_error*100 ,'percent')

In [None]:
#Network instantiation
net = ConvNet(3, 10, 6)
net = net.to(device)

In [None]:
#Network miscellaneous
criterion = nn.CrossEntropyLoss()
bs = 100

In [None]:
#Training loop
start=time.time()
lr = 0.1

for epoch in range(50):

    if epoch == 40:
      lr = lr * .5

    optimizer = torch.optim.SGD(net.parameters(), lr = lr)

    running_loss=0
    running_error=0
    num_batches=0

    shuffled_indices=torch.randperm(100000)

    for count in range(0,100000,bs):

        # Set the gradients to zeros
        optimizer.zero_grad()

        # create a minibatch
        indices=shuffled_indices[count:count+bs]
        minibatch_data =  train_data[indices]
        minibatch_label=  train_label[indices]

        # send them to the gpu
        minibatch_data=minibatch_data.to(device)
        minibatch_label=minibatch_label.to(device)

        # subtract the mean and divide by the std
        #Do we need this? If the architecture already has a bn implemented?
        inputs= (minibatch_data -mean)/std

        # tell Pytorch to start tracking all operations that will be done on "inputs"
        inputs.requires_grad_()

        # forward the minibatch through the net
        scores=net( inputs )
        ##scores = net(minibatch_data)

        # Compute the average of the losses of the data points in the minibatch
        loss =  criterion( scores , minibatch_label)

        # backward pass to compute dL/dU, dL/dV and dL/dW
        loss.backward()

        # do one step of stochastic gradient descent: U=U-lr(dL/dU), V=V-lr(dL/dU), ...
        optimizer.step()


        # START COMPUTING STATS

        num_batches+=1

        with torch.no_grad():

            running_loss += loss.item()

            error = utils.get_error( scores , minibatch_label)
            running_error += error.item()


    # compute stats for the full training set
    total_loss = running_loss/num_batches
    total_error = running_error/num_batches
    elapsed = time.time()-start

    print('epoch=',epoch, '\t time=', elapsed, '\t loss=', total_loss , '\t error=', total_error*100 ,'percent')
    eval_on_test_set()
    print(' ')

In [None]:
#Test network on random image
idx=randint(0, 10000-1)
im=test_data[idx]

utils.show(im)

im = im.to(device)
im= (im-mean) / std
im=im.view(1,3,32,32)

scores =  net(im)
probs= F.softmax(scores, dim=1)
utils.show_prob_cifar(probs.cpu())