In [1]:

"""

Date: 21 Dec 2019

Python version:      3.7
Tensorboard version: 1.14.0
PyTorch version:     1.2.0

@author: Maksim Lavrov

Transfer learning on CIFAR10 dataset (5000 times smaller subset)
    • With a flexible layer implemented on top of the frozen layers

"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms

In [2]:
# Loading data
# transforms

#def torgb(x):
 #   return x.repeat(3, 1, 1)


transform = transforms.Compose(
    [transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], 
                             [0.229, 0.224, 0.225])
    #transforms.Lambda(torgb) # Transform grayscale to RGB
    ])

# datasets
trainset = torchvision.datasets.CIFAR10('./data',
    download=True,
    train=True,
    transform=transform)
testset = torchvision.datasets.CIFAR10('./data',
    download=True,
    train=False,
    transform=transform)

#create a dataset subset to reduce training time

sampler_train = list(range(0, len(trainset), 5000))
sampler_test = list(range(0, len(testset), 5000))
trainset_samp = torch.utils.data.Subset(trainset, sampler_train)
testset_samp = torch.utils.data.Subset(testset, sampler_test)

#set size of batch and learning rate
batch_size=4
lr=0.001

# dataloaders
trainloader = torch.utils.data.DataLoader(trainset_samp, batch_size=batch_size,
                                        shuffle=True, num_workers=2)

testloader = torch.utils.data.DataLoader(testset_samp, batch_size=batch_size,
                                        shuffle=False, num_workers=2)

# constant for classes
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

n_classes = 10

Files already downloaded and verified
Files already downloaded and verified


In [3]:
class FlexiLayer(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1,
                 padding=0, dilation=1, groups=1,
                 bias=True, padding_mode='zeros'):
        kernel_size = kernel_size
        stride = stride
        padding = padding
        dilation = dilation
        super(FlexiLayer, self).__init__(
            in_channels, out_channels, kernel_size, stride, padding, dilation,
            groups, bias, padding_mode)
        
        self.threshold1 = nn.parameter.Parameter(torch.randn((batch_size, out_channels, 224, 224)), requires_grad=True)
        self.memorized = self.threshold1.clone()
        self.memorized_1loop = []
            
    def forward(self, t):
        
        t_1 = F.relu(F.conv2d(t, self.weight)) # get convolution result
        t_2 = F.max_pool2d(t, kernel_size=5, stride=1) # get max result with the same kernel size
        m = nn.Sigmoid()
        condmax = torch.sub(t_2, self.threshold1)
        condconv = torch.sub(t_2, self.threshold1)
        t_2 = m(condmax*50)*t_2 # 
        t_1 = m(condconv*(-50))*t_1 # 
        t = torch.add(t_2, t_1)
        t = F.max_pool2d(t, kernel_size=2, stride=2)
        
        return t

In [4]:
#Model Building
#Feature Extraction
net = models.resnet18(pretrained=True)
for param in net.parameters():
    param.requires_grad = False

alexnet = models.alexnet(pretrained=True)

def plot_filters_single_channel_big(t):
    
    #setting the rows and columns
    nrows = t.shape[0]*t.shape[2]
    ncols = t.shape[1]*t.shape[3]
    
    
    npimg = np.array(t.numpy(), np.float32)
    npimg = npimg.transpose((0, 2, 1, 3))
    npimg = npimg.ravel().reshape(nrows, ncols)
    
    npimg = npimg.T
    
    fig, ax = plt.subplots(figsize=(ncols/10, nrows/200))    
    imgplot = sns.heatmap(npimg, xticklabels=False, yticklabels=False, cmap='gray', ax=ax, cbar=False)

def plot_filters_single_channel(t):
    
    #kernels depth * number of kernels
    nplots = t.shape[0]*t.shape[1]
    ncols = 12
    
    nrows = 1 + nplots//ncols
    #convert tensor to numpy image
    npimg = np.array(t.numpy(), np.float32)
    
    count = 0
    fig = plt.figure(figsize=(ncols, nrows))
    
    #looping through all the kernels in each channel
    for i in range(t.shape[0]):
        for j in range(t.shape[1]):
            count += 1
            ax1 = fig.add_subplot(nrows, ncols, count)
            npimg = np.array(t[i, j].numpy(), np.float32)
            npimg = (npimg - np.mean(npimg)) / np.std(npimg)
            npimg = np.minimum(1, np.maximum(0, (npimg + 0.5)))
            ax1.imshow(npimg)
            ax1.set_title(str(i) + ',' + str(j))
            ax1.axis('off')
            ax1.set_xticklabels([])
            ax1.set_yticklabels([])
   
    plt.tight_layout()
    plt.show()

import matplotlib.pyplot as plt

def plot_filters_multi_channel(t):
    
    #get the number of kernals
    num_kernels = t.shape[0]    
    
    #define number of columns for subplots
    num_cols = 12
    #rows = num of kernels
    num_rows = num_kernels
    
    #set the figure size
    fig = plt.figure(figsize=(num_cols,num_rows))
    
    #looping through all the kernels
    for i in range(t.shape[0]):
        ax1 = fig.add_subplot(num_rows,num_cols,i+1)
        
        #for each kernel, we convert the tensor to numpy 
        npimg = np.array(t[i].numpy(), np.float32)
        #standardize the numpy image
        npimg = (npimg - np.mean(npimg)) / np.std(npimg)
        npimg = np.minimum(1, np.maximum(0, (npimg + 0.5)))
        npimg = npimg.transpose((1, 2, 0))
        ax1.imshow(npimg)
        ax1.axis('off')
        ax1.set_title(str(i))
        ax1.set_xticklabels([])
        ax1.set_yticklabels([])
        
    plt.savefig('myimage.png', dpi=100)    
    plt.tight_layout()
    plt.show()

#visualize weights

def plot_weights(model, layer_num, single_channel = True, collated = False):
    layer = model.features[layer_num] # more general (i.e. for resnet or AlexNet)
    if isinstance(layer, nn.Conv2d):
    #getting the weight tensor data
        weight_tensor = model.features[layer_num].weight.data
    
    if single_channel:
        if collated:
            plot_filters_single_channel_big(weight_tensor)
        else:
            plot_filters_single_channel(weight_tensor)
        
    else:
        if weight_tensor.shape[1] == 3:
            plot_filters_multi_channel(weight_tensor)
        else:
            print("Can only plot weights with three channels with single channel = False")
        
   # else:
       # print("Can only visualize layers which are convolutional")
        
#visualize weights for alexnet - first conv layer
plot_weights(alexnet, 0, single_channel = False)

In [13]:
num_ftrs = net.fc.in_features
print(num_ftrs)
net.fc = nn.Sequential(
    nn.Linear(num_ftrs, 460),
    FlexiLayer(in_channels=3, out_channels=256, kernel_size=5),
    nn.Conv2d(in_channels=256, out_channels=12, kernel_size=5),
    nn.Linear(12*4*4, n_classes)
)

512


In [6]:
#num_ftrs = net.fc.in_features
#net.fc = nn.Sequential(FlexiLayer(in_channels=1, out_channels=256, kernel_size=5),
#                      nn.Linear(256, n_classes))
# Add on classifier
#net.classifier[6] = nn.Sequential(
 #                     FlexiLayer(in_channels=3, out_channels=6, kernel_size=5),
  #                    nn.Linear(256, n_classes))

#num_ftrs = net.fc.in_features
#net.fc = nn.Linear(num_ftrs, n_classes) # 10 classes in the dataset
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9)

In [7]:
#helper functions

def images_to_probs(net, images):
    '''
    Generates predictions and corresponding probabilities from a trained
    network and a list of images
    '''
    output = net(images)
    # convert output probabilities to predicted class
    _, preds_tensor = torch.max(output, 1)
    preds = np.squeeze(preds_tensor.numpy())
    return preds, [F.softmax(el, dim=0)[i].item() for i, el in zip(preds, output)]


def plot_classes_preds(net, images, labels):
    '''
    Generates matplotlib Figure using a trained network, along with images
    and labels from a batch, that shows the network's top prediction along
    with its probability, alongside the actual label, coloring this
    information based on whether the prediction was correct or not.
    Uses the "images_to_probs" function.
    '''
    preds, probs = images_to_probs(net, images)
    # plot the images in the batch, along with predicted and true labels
    fig = plt.figure(figsize=(12, 48))
    for idx in np.arange(4):
        ax = fig.add_subplot(1, 4, idx+1, xticks=[], yticks=[])
        matplotlib_imshow(images[idx], one_channel=True)
        ax.set_title("{0}, {1:.1f}%\n(label: {2})".format(
            classes[preds[idx]],
            probs[idx] * 100.0,
            classes[labels[idx]]),
                    color=("green" if preds[idx]==labels[idx].item() else "red"))
    return fig

def get_num_correct(preds, labels):
    return preds.argmax(dim=1).eq(labels).sum().item()

In [8]:
running_loss = 0.0
for epoch in range(9):  # loop over the dataset multiple times
    
    total_loss = 0
    total_correct = 0

    for i, data in enumerate(trainloader, 0):
        
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        
        preds = net(inputs) # Pass batch

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item() * batch_size
        total_correct += get_num_correct(preds, labels)

        running_loss += loss.item()
            
    print("epoch:", epoch, "loss:", total_loss)
        
print('Finished Training')

RuntimeError: Expected 4-dimensional input for 4-dimensional weight 256 3, but got 2-dimensional input of size [4, 460] instead

In [None]:
print(total_correct / (len(trainset)/1000))

In [None]:
#15:21 start 15:25 finish