<a href="https://colab.research.google.com/github/maxmatical/pytorch-projects/blob/master/AA_conv_Densenet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Experimenting with replacing conv layers in Densenets with attention augmented conv layers

In [0]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
import torchvision

import torchvision.transforms as transforms


import numpy as np
import math

from attention_augmented_convnets import augmented_conv2d

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [0]:
class DenseBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout=0.2, stride = 1, v = 0.2, k = 2, Nh = 4):
        super(DenseBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.leaky_relu = nn.LeakyReLU(inplace=True)
        #self.conv1 = nn.Conv2d(in_channels, 4*out_channels, 1, stride = 1, padding=0, bias=False) # old

        self.conv1 = augmented_conv2d(in_channels, 4*out_channels, kernel_size = 1, stride = 1, padding = 0, dk = k* out_channels, dv = int(v*out_channels), Nh = Nh, relative = True)
        self.bn2 = nn.BatchNorm2d(4*out_channels)
        #self.conv2 = nn.Conv2d(4*out_channels, out_channels, 3, stride = 1, padding=1, bias = False) # old
        self.conv2 = augmented_conv2d(4*out_channels, out_channels, kernel_size = 3, stride = 1, padding=1, dk = k* out_channels, dv = int(v*out_channels), Nh = Nh, relative = True)
        self.dropout_prob = dropout
        self.stride = stride
        
    def forward(self, input):
        out = self.conv1(self.leaky_relu(self.bn1(input)))
        out = F.dropout(out, p=self.dropout_prob, inplace=False, training = self.training)
        out = self.conv2(self.leaky_relu(self.bn2(out)))
        out = F.dropout(out, p=self.dropout_prob, inplace=False, training = self.training)
        out = torch.cat([out,input],1)
        return out

In [0]:
class TransitionBlock(nn.Module):
    def __init__(self, in_channels, out_channels, dropout=0.2, v = 0.2, k = 2, Nh = 4):
        super(TransitionBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.leaky_relu = nn.LeakyReLU(inplace=True)
        # self.conv1 = nn.Conv2d(in_channels, out_channels, 1, stride = 1, padding=0, bias=False) # old
        self.conv1 = augmented_conv2d(in_channels, out_channels, kernel_size = 1, stride = 1, padding=0, dk = k* out_channels, dv = int(v*out_channels), Nh = Nh, relative = True)
        self.dropout_prob = dropout
        self.avgpool = nn.AvgPool2d(2, stride = 2)
        
    def forward(self, input):
        out = self.conv1(self.leaky_relu(self.bn1(input)))
        out = F.dropout(out, p=self.dropout_prob, inplace=False, training = self.training)
        out = self.avgpool(out)
        return out

In [0]:
# DenseNet
growth_rate = 12 # growth rate
compression_rate = 0.5 # theta
class DenseNet(nn.Module):
    def __init__(self, block, layers, n_classes, dropout =0.2,k=growth_rate, theta = compression_rate): # layer is a list
        super(DenseNet, self).__init__()
        
        
        # defining initial in_plane
        in_channel = 2*k
        
        #self.layer = self.make_layer(block, in_channel, k, n_layers, dropout)

        self.dropout = dropout
        
        # defining hyperparameters for the aa_conv
        dv_v = 0.2
        dk_k = 2
        Nh = 4
        

        #initial conv layers
        """
        Not using aa_conv. Not too sure how to properly donwsample with aa_conv layers
        
        """
        self.conv1 = nn.Conv2d(3, in_channel, 7, padding = 3, stride = 2) 
#         self.conv1 = augmented_conv2d(3, in_channel, kernel_size = 7, padding = 3, stride = 2, dk = dk_k*in_channel, dv=int(dv_v*in_channel), Nh = Nh, relative = True) #shoudl take 224 to 112
        self.avgpool1 = nn.AvgPool2d(3, padding = 1, stride = 2)
        
        ####################
        # making denseblocks
        #####################
        
        self.layer1 = self.make_layer(block, in_channel, k, layers[0], dropout) #1st argument is num of dense blocks
        in_channel = int(in_channel+layers[0]*k)
        self.trans1 = TransitionBlock(in_channel, int(math.floor(in_channel*theta)), dropout)
        in_channel = int(math.floor(in_channel*theta))
        
        self.layer2 = self.make_layer(block, in_channel, k, layers[1], dropout) #1st argument is num of dense blocks
        in_channel = int(in_channel+layers[1]*k)
        self.trans2 = TransitionBlock(in_channel, int(math.floor(in_channel*theta)), dropout)
        in_channel = int(math.floor(in_channel*theta))

        self.layer3 = self.make_layer(block,in_channel, k, layers[2], dropout) #1st argument is num of dense blocks
        in_channel = int(in_channel+layers[2]*k)
        self.trans3 = TransitionBlock(in_channel, int(math.floor(in_channel*theta)), dropout)
        in_channel = int(math.floor(in_channel*theta))
        
        self.layer4 = self.make_layer(block, in_channel, k, layers[3], dropout) #1st argument is num of dense blocks
        in_channel = int(in_channel+layers[3]*k)
        
        # pooling and classification
        self.bn = nn.BatchNorm2d(in_channel)
        self.leaky_relu = nn.LeakyReLU(inplace=True)
        self.adaptive_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.adaptive_max_pool = nn.AdaptiveMaxPool2d(1)
        self.linear = nn.Linear(in_channel*2, n_classes) 
        
        
        
    def forward(self, input):
        out = self.avgpool1(self.conv1(input))
        out = self.layer1(out)
        out = self.trans1(out)
        out = self.trans2(self.layer2(out))
        out = self.trans3(self.layer3(out))
        out = self.leaky_relu(self.bn(self.layer4(out)))
        
        # take both adaptive avg pool and adaptive max pool and concat them together
        out_a = self.adaptive_avg_pool(out)
        out_a = out_a.view(out_a.size(0), -1) 
        out_b = self.adaptive_max_pool(out)
        out_b = out_b.view(out_b.size(0), -1) 
        
        out = torch.cat([out_a, out_b],1)
        out = self.linear(out) # output layer

        
        return out
        
    #####################
    # function for making layers
    #####################
    def make_layer(self, block, in_channel, k, n_layers, dropout):
        layers = []
        for i in range(n_layers):
            layers.append(block(in_channel+i*k, k, dropout))
        return nn.Sequential(*layers)

        


In [0]:
net = DenseNet(DenseBlock, layers = [1,1,1,1], n_classes = 10, dropout=0.2).to(device)


In [0]:
from torchsummary import summary

summary(net, (3, 224, 224))


In [0]:

# tests
T = TransitionBlock(3,3)
# D = DenseBlock(3, 3)

In [6]:

# weight initialization
# new init weight        
def init_weight(m):
    if isinstance(m, nn.Conv2d):
        nn.init.kaiming_normal_(m.weight) #he initialize, can use xavier instead
        #nn.init.constant_(m.bias, 0.001) # optional bias
    if isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight) #he initialize, can use xavier instead
        #nn.init.constant_(m.bias, 0.001) # optional bias
    elif type(m) == nn.BatchNorm2d:
        torch.nn.init.constant_(m.weight, 1)
        torch.nn.init.constant_(m.bias, 1)
        
# apply initializers
net.apply(init_weight)

DenseNet(
  (conv1): Conv2d(3, 24, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (avgpool1): AvgPool2d(kernel_size=3, stride=2, padding=1)
  (layer1): Sequential(
    (0): DenseBlock(
      (bn1): BatchNorm2d(24, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (leaky_relu): LeakyReLU(negative_slope=0.01, inplace)
      (conv1): augmented_conv2d(
        (conv_out): Conv2d(24, 46, kernel_size=(1, 1), stride=(1, 1), padding=(1, 1))
        (qkv_conv): Conv2d(24, 50, kernel_size=(1, 1), stride=(1, 1), padding=(1, 1))
        (attn_out): Conv2d(2, 2, kernel_size=(1, 1), stride=(1, 1))
      )
      (bn2): BatchNorm2d(48, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): augmented_conv2d(
        (conv_out): Conv2d(48, 10, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (qkv_conv): Conv2d(48, 50, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (attn_out): Conv2d(2, 2, kernel_size=(1, 1), stride=(1, 1))
      )
 

# Train on CIFAR10


In [7]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
                                        
                                        
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64,
                                         shuffle=False, num_workers=2)

  0%|          | 0/170498071 [00:00<?, ?it/s]

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|█████████▉| 169951232/170498071 [00:18<00:00, 11475614.08it/s]

Files already downloaded and verified


In [0]:

# define loss and optimizer
import torch.optim as optim
learning_rate = 3e-4 # always a good starting point
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr = learning_rate)

In [0]:
n_epochs = 10

for epoch in range(n_epochs):
    running_loss = 0.0
    total_train_loss = 0.0
    for i, train_data in enumerate(trainloader, 0):
        # get the inputs
        inputs, labels = train_data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print loss per n minibatches
        running_loss += loss.item()
        total_train_loss += loss.item()
        if i % 500 == 499:    # print every 500 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 500))
            running_loss = 0.0
    
    # keep track of loss in test dataset 
    correct = 0
    total = 0
    total_test_loss = 0.0
    with torch.no_grad():
        for test_data in testloader:
            test_images, test_labels = test_data
            test_outputs = net(test_images)
            test_loss = criterion(test_outputs, test_labels)
            total_test_loss += test_loss.item()
            _, predicted = torch.max(test_outputs.data, 1)
            total += test_labels.size(0)
            correct += (predicted == test_labels).sum().item()


    
    
    # for printing average loss every epoch
    print("===> Epoch {} Complete: Train Avg. Loss: {:.4f}".format(epoch+1, total_train_loss / len(trainloader)))
    print("===> Epoch {} Complete: Test Avg. Loss: {:.4f}".format(epoch+1, total_test_loss / len(testloader)))
    print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))
print('Finished Training')