In [3]:
from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
!pip install torchsummary
from torchsummary import summary
from tqdm import tqdm
%matplotlib inline
import matplotlib.pyplot as plt
from torchvision.transforms import ToPILImage
import os



In [4]:
misclassified_without_L1_L2_list = {}

train_losses_L1 = []
train_acc_L1 = []
test_losses_L1 = []
test_acc_L1 = []
misclassified_L1_list = {}

train_losses_L2 = []
train_acc_L2 = []
test_losses_L2 = []
test_acc_L2 = []
misclassified_L2_list = {}

train_losses_L1_L2 = []
train_acc_L1_L2 = []
test_losses_L1_L2 = []
test_acc_L1_L2 = []
misclassified_L1_L2_list = {}
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [5]:
def transformations():
  # Train Phase transformations
  train_transforms = transforms.Compose([
                                       #transforms.RandomRotation((-12.0, 12.0), fill=(1,)),
                                        #transforms.RandomRotation(10),
                                        transforms.RandomAffine(degrees=15, translate=(0.1,0.1), scale=(0.9, 1.1)),
                                        transforms.ColorJitter(brightness=0.2, contrast=0.2),
                                        transforms.ToTensor(),
                                        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                       ])

  # Test Phase transformations
  test_transforms = transforms.Compose([
                                       transforms.ToTensor(),
                                       transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                                       ])
  return train_transforms , test_transforms

In [6]:
def train_test_dataloaders(seed, batch_size, workers,train_transforms,test_transforms):
  
  SEED = seed

  # CUDA?
  cuda = torch.cuda.is_available()
  print("CUDA Available?", cuda)

  # For reproducibility
  torch.manual_seed(SEED)

  if cuda:
      torch.cuda.manual_seed(SEED)

  # dataloader arguments - something you'll fetch these from cmdprmt
  dataloader_args = dict(shuffle=True, batch_size=batch_size, num_workers=workers, pin_memory=True) if cuda else dict(shuffle=True, batch_size=batch_size)

  trainset = datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=train_transforms)
  trainloader = torch.utils.data.DataLoader(trainset, **dataloader_args)

  testset = datasets.CIFAR10(root='./data', train=False,
                                        download=True, transform=test_transforms)
  testloader = torch.utils.data.DataLoader(testset, **dataloader_args)
  classes = ('plane', 'car', 'bird', 'cat','deer', 'dog', 'frog', 'horse', 'ship', 'truck')
  return trainloader, testloader

In [7]:
dropout_value = 0.1
class Net(nn.Module):
    def __init__(self, dropout):
        super(Net, self).__init__()
        dropout_value = dropout
        # Input Block
        self.convblock1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            # nn.Dropout(dropout_value)
        ) # output_size = 32 RF=3

        self.convblock2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), padding=2, dilation=2, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            # nn.Dropout(dropout_value)            
        ) # output_size = 32 Rf=5

        # TRANSITION BLOCK 1
        self.pool1 = nn.MaxPool2d(2, 2) # output_size = 16 RF=9
        self.convblock3 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3, 3), padding=2, dilation=2, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            # nn.Dropout(dropout_value)            
        ) # output_size = 16 RF=17

        self.convblock4 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3, 3), padding=1, groups=128 , bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            # nn.Dropout(dropout_value)
        ) # output_size = 16 RF=33

        self.convblock5 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(1, 1), padding=1 , bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            # nn.Dropout(dropout_value)            
        ) # output_size = 16 RF=33

        # TRANSITION BLOCK 2
        self.pool2 = nn.MaxPool2d(2, 2) # output_size = 8 RF=49

        # CONVOLUTION BLOCK 2
        self.convblock6 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(3, 3), padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            # nn.Dropout(dropout_value)            
        ) # output_size = 8 RF=81

        # TRANSITION BLOCK 3
        self.pool3 = nn.MaxPool2d(2, 2) # output_size =4 RF=113
        self.gap = nn.Sequential(
            nn.AvgPool2d(kernel_size=4)
        ) # output_size =1  RF=209
        self.convblock7 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=10, kernel_size=(1, 1), padding=0, bias=False)
        ) # output_size =1  RF=209

    def forward(self, x):
        x = self.convblock1(x)
        x = self.convblock2(x)
        x = self.pool1(x)
        x = self.convblock3(x)        
        x = self.convblock4(x)
        x = self.convblock5(x)
        x = self.pool2(x)
        x = self.convblock6(x)
        x = self.pool3(x)        
        x = self.gap(x)
        x = self.convblock7(x)
        x = x.view(-1, 10)
        return F.log_softmax(x, dim=-1)

In [8]:
def model_params(model, input_size,device):
  #use_cuda = torch.cuda.is_available()
  #device = torch.device("cuda" if use_cuda else "cpu")
  #print(device)
  model = model.to(device)
  #summary(model, input_size=(1, 28, 28))
  summary(model, input_size)

In [9]:
def compute_L1_Loss(model, data, factor=0.0005):
  l1_crit = nn.L1Loss().to(device)
  reg_loss = 0
  for param in model.parameters():
    zero_vector = torch.rand_like(param) * 0
    reg_loss += l1_crit(param, zero_vector)
  return factor * reg_loss

In [10]:
def train(model, device, train_loader, optimizer, epoch, isL1, train_acc, train_losses):
  model.train()
  pbar = tqdm(train_loader)
  correct = 0
  processed = 0
  for batch_idx, (data, target) in enumerate(pbar):
    # get samples
    data, target = data.to(device), target.to(device)

    # Init
    optimizer.zero_grad()
    # In PyTorch, we need to set the gradients to zero before starting to do backpropragation because PyTorch accumulates the gradients on subsequent backward passes. 
    # Because of this, when you start your training loop, ideally you should zero out the gradients so that you do the parameter update correctly.

    # Predict
    y_pred = model(data)

    # Calculate loss
    loss = F.nll_loss(y_pred, target)
    if isL1:
      loss += compute_L1_Loss(model,data,factor=0.0005)
    train_losses.append(loss)

    # Backpropagation
    loss.backward()
    optimizer.step()

    # Update pbar-tqdm
    
    pred = y_pred.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
    correct += pred.eq(target.view_as(pred)).sum().item()
    processed += len(data)

    pbar.set_description(desc= f'Loss={loss.item()} Batch_id={batch_idx} Accuracy={100*correct/processed:0.2f}')
    train_acc.append(100*correct/processed)

In [11]:
def test(model, device, test_loader, test_acc, test_losses, misclassified_list):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            i=len(misclassified_list)
            orig_data=data.numpy()
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            new_target=target.view_as(pred)
            for x,y,z in zip(pred,new_target,orig_data):
              if x!=y:
                # print("type= {} {} ".format(x,y))
                # print("Z", z.shape)
                misclassified_list[i]=[x,y,z]
                i +=1
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_losses.append(test_loss)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    
    test_acc.append(100. * correct / len(test_loader.dataset))

In [12]:
def plot(train_losses,train_acc,test_losses,test_acc, label):
  fig, axs = plt.subplots(1,2,figsize=(20,8))
  axs[0].plot(test_losses, label=label)
  axs[0].set_title("Test Loss")
  axs[1].plot(test_acc, label=label)
  axs[1].set_title("Test Accuracy")

In [13]:
def plotall():
  fig, axs = plt.subplots(1,2,figsize=(20,8))
  axs[0].plot(test_losses_without_L1_L2, label='without L1 and L2')
  axs[0].plot(test_losses_L1, label='with L1 only')
  axs[0].plot(test_losses_L2, label='with L2 only')
  axs[0].plot(test_losses_L1_L2, label='with L1 and L2 both')
  axs[0].set_title("Test Loss")
  axs[0].legend()
  axs[1].plot(test_acc_without_L1_L2, label='without L1 and L2')
  axs[1].plot(test_acc_L1, label='with L1 only')
  axs[1].plot(test_acc_L2, label='with L2 only')
  axs[1].plot(test_acc_L1_L2, label='with L1 and L2 both')
  axs[1].set_title("Test Accuracy")
  axs[1].legend()
  # plt.savefig('/content/gdrive/My Drive/plot.png')

In [14]:
def plot_misclassified_images(missclassified,filename):
  mis_Class_list=list(missclassified.values())
  # each_Item=missclassified.keys()
  # print("type= ",(mis_Class_list[1][2]))
  print("classified= ",mis_Class_list[0][2].shape)
  print("classified1= ",mis_Class_list[0][2].reshape(28,28,1).shape)
  fig = plt.figure(figsize=(10, 10))  # width, height in inches
  columns = 5
  rows = 5
  for i in range(columns*rows):
      sub = fig.add_subplot(rows, columns, i+1)
      each_img=mis_Class_list[i][2]
      shape_change=each_img.reshape(28,28)
      # print("new shape= ",shape_change.shape)
      # sub.imshow(plt.imshow(shape_change,cmap='gray',interpolation='none'))
      plt.imshow(shape_change,cmap='gray',interpolation='none')
      sub.set_title("Pred={}, Act={}".format(mis_Class_list[i][0].tolist()[0],mis_Class_list[i][1].tolist()[0]))
  plt.tight_layout()
  plt.savefig(os.path.join('/content/gdrive/My Drive',filename))
  plt.show()

In [15]:
def save_model(model, path):
  torch.save(model.state_dict(), path)

In [16]:
def invoke_process_without_L1_and_L2(model,device,input_size,train_loader,test_loader,epoch,optimizer):
  # train_transforms, test_transforms = transformations()
  # train_loader, test_loader = train_test_dataloaders(1, 64, 4,train_transforms,test_transforms)
  train_losses_without_L1_L2 = []
  test_losses_without_L1_L2 = []
  train_acc_without_L1_L2 = []
  test_acc_without_L1_L2 = []
  model =  model #Net(1.0).to(device)
  model_params(model, input_size,device)  
  # optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
  EPOCHS = epoch
  for epoch in range(EPOCHS):
      print("EPOCH:", epoch)
      train(model, device, train_loader, optimizer, epoch, False, train_acc_without_L1_L2, train_losses_without_L1_L2)
      test(model, device, test_loader, test_acc_without_L1_L2, test_losses_without_L1_L2, misclassified_without_L1_L2_list)
  plot(train_losses_without_L1_L2,train_acc_without_L1_L2, test_losses_without_L1_L2, test_acc_without_L1_L2, 'without L1 and L2')
  #save_model(model, '/content/gdrive/My Drive/Assign-6-without-L1AndL2.pth')
  #save_model(model, 'savedmodel.pth')
  print("miss",len(misclassified_without_L1_L2_list))
  # plot_misclassified_images(misclassified_without_L1_L2_list)

In [18]:
def invoke_process_with_L1(model,device,input_size,train_loader,test_loader,epoch,optimizer):
  train_transforms, test_transforms = transformations()
  train_loader, test_loader = train_test_dataloaders(1, 64, 4,train_transforms, test_transforms)
  train_losses_L1 = []
  train_acc_L1 = []
  test_losses_L1 = []
  test_acc_L1 = []
  model =  model#Net(1.0).to(device)
  model_params(model, input_size,device)  
  optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
  EPOCHS = epoch
  for epoch in range(EPOCHS):
      print("EPOCH:", epoch)
      train(model, device, train_loader, optimizer, epoch, True, train_acc_L1, train_losses_L1)
      test(model, device, test_loader, test_acc_L1, test_losses_L1, misclassified_L1_list )
  plot(train_losses_L1,train_acc_L1, test_losses_L1, test_acc_L1, 'with L1 only')
  # save_model(model, '/content/gdrive/My Drive/Assign-6-with-L1.pth')
  # plot_misclassified_images(misclassified_L1_list,"misclassified_L1_list.png")

In [17]:
'''ResNet in PyTorch.
For Pre-activation ResNet, see 'preact_resnet.py'.
Reference:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Deep Residual Learning for Image Recognition. arXiv:1512.03385
'''
import torch
import torch.nn as nn
import torch.nn.functional as F


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        #return out
        return F.log_softmax(out, dim=-1)


def ResNet18():
    return ResNet(BasicBlock, [2,2,2,2])

def ResNet34():
    return ResNet(BasicBlock, [3,4,6,3])

def ResNet50():
    return ResNet(Bottleneck, [3,4,6,3])

def ResNet101():
    return ResNet(Bottleneck, [3,4,23,3])

def ResNet152():
    return ResNet(Bottleneck, [3,8,36,3])


#def test():
#    net = ResNet18()
#    y = net(torch.randn(1,3,32,32))
#    print(y.size())

# test()