In [2]:
import numpy as np
import torchvision
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchsummary import summary
import torchvision.models as models

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [4]:
#TODO : def create_ResNet


def create_ResNet(n_blocks,n_channels,n_filters,n_layers=2,down_sample = True):
  """
        args :
            n_blocks : number of residual block to stack
            n_filters : number of filters for every conv
            n_channels : number of input channels
            n_layers : number of conv for each block
            down_sample : should down_sample data with stride of 2, most of the time
                down_sampling is done at the beginig of every batch
    """
    blocks = []
    for i in range(n_blocks):
        blocks += [ResidualBlock(
            n_layers,
            n_channels if i ==0 else n_filters,
            n_filters,
            down_sample = down_sample if i ==0 else False
        )]
    return nn.Sequential(*blocks)
        

class ResNet34(nn.Module):
    """
        Build a ResNet model with 34 layers
        agrs :
            n_channels : number of channels input
            input_size : size of the input data
            output_dim : number of class to classify
            
    """
    def __init__(self,n_channels,input_size,output_dim):
        self.n_filters = 64
        super(ResNet34, self).__init__()   
        self.n_channels = n_channels
        self.input_size = input_size
        self.base_conv = nn.Sequential(
            nn.Conv2d(n_channels,self.n_filters,3, stride = 1,padding = 1),
        #    nn.MaxPool2d(3,2,1)
        )
        self.conv3x3_1 = create_ResNet(3,self.n_filters,self.n_filters,down_sample = False)
        self.conv3x3_2 = create_ResNet(4,self.n_filters,self.n_filters*2)
        self.conv3x3_3 = create_ResNet(6,self.n_filters*2,self.n_filters*4)
        self.conv3x3_4 = create_ResNet(3,self.n_filters*4,self.n_filters*8)
        self.flat = self.compute_out_shape()
        self.avg_pooling = nn.AvgPool2d(self.flat[-1])
        self.fc = nn.Linear(self.flat[1],output_dim)
        
        
    def compute_out_shape(self):
        # Compute the shape of the output tensor of the convNet
        x = torch.FloatTensor(np.ones((1,self.n_channels,self.input_size,self.input_size)))
        x = self.base_conv(x)
        x = self.conv3x3_1(x)
        x = self.conv3x3_2(x)
        x = self.conv3x3_3(x)
        x = self.conv3x3_4(x)
        return x.shape
        
    def forward(self, x):
        x = self.base_conv(x)
        x = self.conv3x3_1(x)
        x = self.conv3x3_2(x)
        x = self.conv3x3_3(x)
        x = self.conv3x3_4(x)
        x = self.avg_pooling(x)
        x = x.view(-1,self.flat[1])
        x = self.fc(x)
        return F.softmax(x)
    
    
        

class ResNet18(nn.Module):
    """
        Build a ResNet model with 18 layers
        agrs :
            n_channels : number of channels input
            input_size : size of the input data
            output_dim : number of class to classify
    """
    def __init__(self,n_channels,input_size,output_dim):
        self.n_filters = 64
        super(ResNet18, self).__init__()   
        self.n_channels = n_channels
        self.input_size = input_size
        self.base_conv = nn.Sequential(
            nn.Conv2d(n_channels,self.n_filters,3, stride = 2,padding = 3),
            nn.MaxPool2d(3,2,1)
        )
        self.conv3x3_1 = create_ResNet(2,self.n_filters,self.n_filters,down_sample = False)
        self.conv3x3_2 = create_ResNet(2,self.n_filters,self.n_filters*2)
        self.conv3x3_3 = create_ResNet(2,self.n_filters*2,self.n_filters*4)
        self.conv3x3_4 = create_ResNet(2,self.n_filters*4,self.n_filters*8)
        self.avg_pooling = nn.AvgPool2d(7)
        self.flat = self.compute_out_shape()
        self.avg_pooling = nn.AvgPool2d(self.flat[-1])
        self.fc = nn.Linear(self.flat[1],output_dim)
        
        
    def compute_out_shape(self):
        # Compute the shape of the output tensor of the convNet
        dummy = torch.FloatTensor(np.ones((1,self.n_channels,self.input_size,self.input_size)))
        x = self.base_conv(dummy)
        x = self.conv3x3_1(x)
        x = self.conv3x3_2(x)
        x = self.conv3x3_3(x)
        x = self.conv3x3_4(x)
        return x.shape
        
    def forward(self, x):
        x = self.base_conv(x)
        x = self.conv3x3_1(x)
        x = self.conv3x3_2(x)
        x = self.conv3x3_3(x)
        x = self.conv3x3_4(x)
        x = self.avg_pooling(x)
        x = x.view(-1,self.flat[1])
        x = self.fc(x)
        return F.softmax(x)

In [5]:
# TODO : create clean ResLayer pytorch style that can be printed as ResLayer
# try callbacks for early stopping
# add lr schedule

def create_conv_net( activation,n_input,output_activation,conv_type,num_filters,kernel_n,stride,padding,normalize=False):
    """Create a convnet:
        args :
            activation,padding,normalize,stride,kernel_n,num_filters : same as ResidualBlock
            output_activation : activation of last layer
            conv_type : convolution type 
    """
    layers = []
    size = len(num_filters)
    for j in range(size):
        act = activation if j < size-1 else output_activation
        if j==0 :
            layers += [
                conv_type(n_input,num_filters[j],kernel_n[j],stride[j],padding[j])
            ]
            if normalize :
                layers+= [nn.BatchNorm2d(num_filters[j])]
            layers+= [act()]
        else : 
            layers += [
                conv_type(num_filters[j-1],num_filters[j],kernel_n[j],stride[j],padding[j])
            ]
            if normalize :
                layers+= [nn.BatchNorm2d(num_filters[j])]
            #if j < size - 1 :
            layers+= [act()]
        
    return nn.Sequential(*layers)


class ResidualBlock(nn.Module):
    """
        Build a residual block for ResNet
            args : 
                n_layer : number of layers
                n_channels : number of inputs channels
                stride, padding, n_filters, kernel_size : same as conv2d
                act : activation fonction
                batch_norm (bool) : should add batch_norm
                same : use same padding
    """
    def __init__(self,n_layer, n_channels, n_filters,padding = 0, kernel_size=3,stride=1,act=nn.ReLU,down_sample = True,batch_norm= True):
        super(ResidualBlock, self).__init__()   
        self.padding = [(kernel_size-1)//2] * n_layer
        self.stride = [2] + [1] * (n_layer-1) if down_sample else [1] * n_layer
        self.n_filters = [n_filters] * n_layer
        self.kernel_size = [kernel_size] * kernel_size
        self.res_block = create_conv_net(
            act,
            n_channels,
            act,
            nn.Conv2d,
            self.n_filters,
            self.kernel_size,
            self.stride,
            self.padding,
            batch_norm
        )
        
        
        
    def forward(self,x):
        res = x
        out = self.res_block(x)
        # if dimensions does not match, we perform 1x1 conv
        if x.shape[1] != out.shape[1] :
          if res.is_cuda:
            res = nn.Conv2d(x.shape[1],out.shape[1],1,2,0).to(device)(res)
          else :
            res = nn.Conv2d(x.shape[1],out.shape[1],1,2,0)(res)
        return F.relu(out+res)

In [6]:
def weights_init(m):
  # Init weights of the network
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [8]:
if torch.cuda.is_available():
    device = torch.device("cuda:0")
else:
    device = torch.device("cpu")
print(device)

cuda:0


In [90]:
# Hyper parameters
lr = 1e-4
BATCH_SIZE = 64
EPOCH = 10
OUTPUT_DIM = 10

# Path for saving the model
PATH = "/content/drive/My Drive/SavedModels/ResNet34_Cifar10.pt"

In [91]:
torch.save(net.state_dict(), PATH)

In [69]:
train_loader = torch.utils.data.DataLoader(
  torchvision.datasets.CIFAR10("/content/drive/My Drive/Data_sets/Cifar/train", train=True, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=BATCH_SIZE, shuffle=True)

test_loader = torch.utils.data.DataLoader(
  torchvision.datasets.CIFAR10('/content/drive/My Drive/Data_sets/Cifar/test', train=False, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),
  batch_size=1000, shuffle=True)

Files already downloaded and verified
Files already downloaded and verified


In [93]:
loss_memory = []
net = ResNet34(3,32,OUTPUT_DIM).to(device)
opti = optim.Adam(net.parameters(), lr=lr)
#loss_function = nn.CrossEntropyLoss()

for e in range(EPOCH):
    loss_memory = []
    correct_epoch = 0
    test(net,e)
    for batch_idx, (data, targets) in enumerate(train_loader):
        correct_batch = 0
        out = net(data.to(device))
        _, pred = torch.max(out,axis = 1)
        labels = torch.zeros((len(targets),OUTPUT_DIM)).to(device)
        for i in range(len(targets)):
            if pred[i] == targets[i] :
                correct_batch += 1
                correct_epoch += 1
            # one hot encoding
            labels[i][targets[i]] = 1
        loss = F.mse_loss(out,labels)
        loss_memory.append(loss.cpu().detach().numpy())
        opti.zero_grad()
        loss.backward()
        opti.step()
        if batch_idx %10 == 0:
          print('\n Batch : {}/{}, Accuracy: ({:.0f}%), loss : {:.4f} '.format(batch_idx,len(train_loader), correct_batch/BATCH_SIZE*100,loss))
    avg_loss = np.mean(loss_memory)
    print('\n Training Epoch : {}, Avg.Accuracy: ({:.0f}%), Total loss : {:.4f} '.format(e, correct_epoch/len(train_loader.dataset)*100,avg_loss))

torch.save(net.state_dict(), PATH)
    

In [73]:
def test(model,epoch):
    correct = 0
    loss_memory = []
    with torch.no_grad():
        for batch_idx, (data, targets) in enumerate(test_loader):
            out = net(data.to(device))
            _, pred = torch.max(out,axis = 1)
            labels = torch.zeros((len(targets),OUTPUT_DIM)).to(device)
            for i in range(len(targets)):
                if pred[i] == targets[i] :
                    correct += 1
                labels[i][targets[i]] = 1
            loss = F.mse_loss(out,labels)
            loss_memory.append(loss.cpu().detach().numpy())
        total_loss = np.sum(loss_memory)
        print('\n Test Epoch : {},  Accuracy: ({:.0f}%), Total loss : {:.4f} '.format(epoch, correct/len(test_loader.dataset)*100,total_loss))