<a href="https://colab.research.google.com/github/epbehren3/bilinear_resizer/blob/Main/Learning_Resizer_Model_ECE570.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import time
from typing import List, Dict

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms

import matplotlib.pyplot as plt
from os import EX_PROTOCOL


In [2]:
#Choose which dataset you would like to train on.

!wget https://s3.amazonaws.com/fast-ai-imageclas/imagenette2.tgz
!tar -xzf imagenette2.tgz
!rm imagenette2.tgz
#!wget https://s3.amazonaws.com/fast-ai-imageclas/imagewoof2.tgz
#!tar -xzf imagewoof2.tgz
#!rm imagewoof2.tgz

--2024-11-11 03:37:22--  https://s3.amazonaws.com/fast-ai-imageclas/imagenette2.tgz
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.65.238, 52.217.139.0, 52.216.28.182, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.65.238|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1557161267 (1.5G) [application/x-tar]
Saving to: ‘imagenette2.tgz’


2024-11-11 03:37:49 (55.4 MB/s) - ‘imagenette2.tgz’ saved [1557161267/1557161267]



In [3]:
#Define Hyperparameters -

batchSize = 64
#batchSizeTest = 1000
maxEpoch = 15
learningRate = 0.1
#criterion = nn.CrossEntropyLoss()
resBlocks = 5



if torch.cuda.is_available():
  print("GPU is available")
  device = torch.device('cuda')
else:
  print("GPU is not available")
  device = torch.device('cpu')






GPU is available


In [4]:

#Data Loader - Load images into model for training and validation
#Redo to use ImageNette
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((320, 320)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard normalization for ImageNet
])

data_path = r'/content/imagenette2'
#data_path = r'/content/imagewoof2'
#trainDataSet  = torchvision.datasets.MNIST('data', train=True, download=True, transform=transform)
#testDataSet = torchvision.datasets.MNIST('data', train=False, download=True, transform=transform)
trainDataset = torchvision.datasets.ImageFolder(root=data_path + '/train', transform=transform)
testDataset = torchvision.datasets.ImageFolder(root=data_path + '/val', transform=transform)

trainLoader = torch.utils.data.DataLoader(trainDataset, batch_size=batchSize, shuffle=True)
testLoader = torch.utils.data.DataLoader(testDataset, batch_size=batchSize, shuffle=False)

In [5]:
#ResBlock Model
class resBlock(nn.Module):#Resnet Block as defined in original paper.
    def __init__(self, channelSize = 16,negativeSlope = 0.02, kernelSize = 3, stride=1):
        super(resBlock, self).__init__()


      #Define the sequential NN based on the structure listed in our paper
        self.convBlock = nn.Sequential(
        nn.Conv2d(channelSize,channelSize, kernelSize, stride, padding = 1, bias = False),
        nn.BatchNorm2d(channelSize),
        nn.LeakyReLU(negativeSlope),
        nn.Conv2d(channelSize,channelSize, kernelSize, stride, padding = 1, bias = False),
        nn.BatchNorm2d(channelSize)
      ).to(device)

    def forward(self, x):#Sum at the end of the resblock
        #self.to(device)
        return x + self.convBlock(x)


In [6]:
# Build my Model




class resizingNetwork(nn.Module):
    def __init__(self,channelSize = 16, kernelSize = 7,negativeSlope = 0.02, stride=1, inputSize = (320,320),numResBlock = resBlocks ):
      super(resizingNetwork, self).__init__()
      #Channel Expander to accomodate input channel size to reccomended channel size.
      self.channelExpander = nn.Sequential(
          nn.Conv2d(in_channels =  3, out_channels = channelSize,kernel_size = 1, stride = stride, padding = 0, bias = False),
          nn.BatchNorm2d(num_features = channelSize),
      )
      #Block1 as defined in the initial structur
      self.convBlock1 = nn.Sequential(
          nn.Conv2d(in_channels=channelSize,out_channels = channelSize, kernel_size=kernelSize,stride = stride,padding = 1, bias = False),
          nn.LeakyReLU(negative_slope = negativeSlope),
          nn.Conv2d(in_channels = channelSize, out_channels=channelSize, kernel_size = 1,stride = stride , padding = 0,bias = False),
          nn.BatchNorm2d(num_features = channelSize),
          nn.LeakyReLU(negative_slope = negativeSlope),
      )
      #ResBlocks.
      self.blockList = [0] * numResBlock
      for r in range(numResBlock):
        self.blockList[r] = resBlock()
      # Block 2 as defined in the structure
      self.convBlock2 = nn.Sequential(
          nn.Conv2d(in_channels = channelSize, out_channels = channelSize, kernel_size = 3, stride = stride,padding = 1, bias = False),
          nn.BatchNorm2d(num_features = channelSize),
      )
      #Block 3 as defined in the structure
      self.convBlock3 = nn.Sequential(
          #Transitional convolution layer to transform 16 channels into 3
          nn.Conv2d(in_channels =  channelSize, out_channels = 3,kernel_size = 1, stride = stride, padding = 0, bias = False),
          nn.BatchNorm2d(num_features = 3),
          nn.Conv2d(in_channels = 3,out_channels = 3 ,kernel_size = 7, stride = 1, padding = 3, bias  = False)
      )


    def forward(self, x):
      #Initial Interpolation
      bilinearOriginal = F.interpolate(x,size = (224,224), mode = 'bilinear', align_corners=False)
      #Expansion layer to transform 3 channels (RGB) into 16 for initial input.
      x = self.channelExpander(x)
      x = self.convBlock1(x)
      #Modified Interpolation
      x = bilinearModified = F.interpolate(x,size = (224,224), mode = 'bilinear', align_corners=False)
      #Apply number of resBlocks
      for resBlock in self.blockList:
        x = resBlock(x)

      x = self.convBlock2(x)
      x = x + bilinearModified
      x = self.convBlock3(x)
      x = x + bilinearOriginal

      return x




#inhereted Resnet 50 Classifier Model

class resnet50Classifier(nn.Module):
    def __init__(self, numClasses =1000, preTrained = False):
        super(resnet50Classifier, self).__init__()

        # Load the un-trained ResNet-50 model
        self.resnet50 = models.resnet50(weights= preTrained)

        # Modify the last fully connected layer to match the number of classes
        numFeatures = self.resnet50.fc.in_features
        self.resnet50.fc = nn.Linear(numFeatures, numClasses)

    def forward(self, x):
       # print("Number of channels after resizing:", x.shape[1])
        return self.resnet50(x)




In [7]:

#Data Trainer - Training process for my resnet model, reimplemented training and testing function that I feveloped for PA3 to train my classification model
def train(model: nn.Module,
          lossFN: nn.modules.loss._Loss,
          optimizer: torch.optim.Optimizer,
          trainLoader: torch.utils.data.DataLoader,
          epoch: int=0)-> List:
    model.train()
    train_loss = []

    for batch_idx, (images, targets) in enumerate(trainLoader):
        images = images.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        output = model(images)
        loss = lossFN(output, targets)
        loss.backward()
        optimizer.step()
        train_loss.append(loss.item())

        if batch_idx % 100 == 0:
            print(f'Epoch {epoch}: [{batch_idx*len(images)}/{len(trainLoader.dataset)}] Loss: {loss.item():.3f}')

    assert len(train_loss) == len(trainLoader)
    return train_loss

def test(model: nn.Module,
         lossFN: nn.modules.loss._Loss,
         testLoader: torch.utils.data.DataLoader,
         epoch: int=0)-> Dict:

    model.eval()

    test_stat = {
        "loss": 0,
        "accuracy": 0,
        "prediction": []
    }

    with torch.no_grad():
        for images, targets in testLoader:
            images = images.to(device)
            targets = targets.to(device)
            output = model(images)
            test_stat["loss"] += lossFN(output, targets).item()
            test_stat["accuracy"] += (torch.argmax(output, dim=1) == targets).sum().item()
            test_stat["prediction"].append(torch.argmax(output, dim=1))

    test_stat["accuracy"] /= len(testLoader.dataset)
    test_stat["accuracy"] *= 100
    test_stat["loss"] /= len(testLoader)
    test_stat["prediction"] = torch.cat(test_stat["prediction"])

    # dictionary should include loss, accuracy and prediction
    print(f"Accuracy: {test_stat['accuracy']:.4f}%")

    assert "loss" and "accuracy" and "prediction" in test_stat.keys()
    # "prediction" value should be a 1D tensor
    assert len(test_stat["prediction"]) == len(testLoader.dataset)
    assert isinstance(test_stat["prediction"], torch.Tensor)
    return test_stat

# New Section

In [8]:
#Joint Classifier Model with Learned Resizer, and Untrained Resnet model.
class modifiedClassifier(nn.Module):
    def __init__(self, resizingNetwork, resnet50Classifier):
        super(modifiedClassifier, self).__init__()

        self.resizingNetwork = resizingNetwork
        self.resnet50Classifier = resnet50Classifier

    def forward(self, x):#Feeds through Learned Resizer into the classifier.
        x = self.resizingNetwork(x)
        x = self.resnet50Classifier(x)
        return x


In [9]:
#Define Classifiers

#Classifier using traditional Method of Bilinear Interpolation
classifierTraditional = resnet50Classifier()
classifierTraditional.to(device)

#Modified model using Learned Resizer fed into traditional classifier
resizerModel = resizingNetwork()
resnetModel = resnet50Classifier()
classifierModified = modifiedClassifier(resizerModel, resnetModel)
classifierModified.to(device)



modifiedClassifier(
  (resizingNetwork): resizingNetwork(
    (channelExpander): Sequential(
      (0): Conv2d(3, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (convBlock1): Sequential(
      (0): Conv2d(16, 16, kernel_size=(7, 7), stride=(1, 1), padding=(1, 1), bias=False)
      (1): LeakyReLU(negative_slope=0.02)
      (2): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (3): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (4): LeakyReLU(negative_slope=0.02)
    )
    (convBlock2): Sequential(
      (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (convBlock3): Sequential(
      (0): Conv2d(16, 3, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (1): BatchNorm2d(3, eps=1e-05, momentum=0

In [10]:
#Comparison Model - Classify both Traditional Biliinear Resizer, and Learned Resizer to run, and then compare outputs.
optimizer = optim.SGD(classifierModified.parameters(), lr=learningRate, momentum=0.9)
optimizerTraditional = optim.SGD(classifierTraditional.parameters(), lr=learningRate, momentum=0.9)

start = time.time()


testOutTrad = []
testOutMod = []
for epoch in range(1,maxEpoch + 1):#Train and Test both models, record both testing outputs for later use.
  print("\nModified:")
  trainLoss = train(classifierModified, nn.CrossEntropyLoss(), optimizer, trainLoader, epoch)
  testStat = test(classifierModified, nn.CrossEntropyLoss(), testLoader, epoch)
  testOutMod.append(testStat)

  print("\nTraditional:")
  trainLossTraditional = train(classifierTraditional, nn.CrossEntropyLoss(), optimizerTraditional, trainLoader, epoch)
  testStatTraditional = test(classifierTraditional, nn.CrossEntropyLoss(), testLoader, epoch)
  testOutTrad.append(testStatTraditional)

#Comaprison Fuction


end = time.time()
print(f'Finished Training after {end-start} s ')


Modified:
Epoch 1: [0/9469] Loss: 7.020


KeyboardInterrupt: 

In [None]:
!pip freeze > requirements.txt