<a href="https://colab.research.google.com/github/stephzhan/ECE570CourseProject/blob/main/ECE570_Course_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [32]:
!pip freeze > requirements.txt

In [33]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import time
from typing import List, Dict
import torch.optim as optim
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
from peft import LoraConfig, get_peft_model

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [34]:
class LoraLayer(nn.Module):
    def __init__(self, rank, in_dim, out_dim, alpha, dropout, merge_weights):
        super().__init__()

        self.alpha = alpha
        if rank > 0:
            self.A = torch.nn.Parameter(torch.empty((rank, in_dim)))
            self.B = torch.nn.Parameter(torch.empty((out_dim, rank)))
            self.scaling = self.alpha / rank

        if dropout > 0.:
            self.dropout = nn.Dropout(p=dropout)
        else:
            self.dropout = lambda x: x

        self.merged = False
        self.merge_weights = merge_weights

        self.reset_parameters()
        if device:
            self.to(device)

    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.A, a=math.sqrt(5))
        nn.init.zeros_(self.B)

    def forward(self, x):
        return self.scaling * (self.dropout(x) @ self.A.T @ self.B.T)

In [35]:
class LoraLinear(nn.Module):
    def __init__(self, layer, rank, alpha, dropout, merge_weights):
        super().__init__()

        self.linear = layer
        self.lora = LoraLayer(rank, self.linear.in_features, self.linear.out_features, alpha, dropout, merge_weights)
        self.linear.weight.requires_grad = False
        self.lora.reset_parameters()
        self.linear.reset_parameters()

    def forward(self, x):
        return self.linear(x) + self.lora(x)

In [36]:
class LoraConv(nn.Module):
    def __init__(self, layer, rank, alpha, dropout, merge_weights):
        super().__init__()

        self.conv = layer
        k_size = self.conv.kernel_size
        self.lora = LoraLayer(rank * k_size, self.conv.in_channels * k_size[0], self.conv.out_channels//self.conv.groups * k_size[1], alpha, dropout, merge_weights)
        self.conv.weight.requires_grad = False

    def forward(self, x):
          return self.conv._conv_forward(x, self.conv.weight + (self.lora_B @ self.lora_A).view(self.conv.weight.shape) * self.scaling, self.conv.bias)

In [37]:
class LoraEmbedding(nn.Module):
  def __init__(self, layer, rank, alpha, dropout, merge_weights):
        super().__init__()

        self.embedding = layer
        self.lora = LoraLayer(rank, layer.num_embeddings, layer.embedding_dim, alpha, dropout, merge_weights)
        self.embedding.weight.requires_grad = False

  def forward(self, x):
        pass

In [38]:
class LoraMergedLinear(nn.Module):
  def __init__(self, layer, rank, alpha, dropout, merge_weights):
        super().__init__()

        self.embedding = layer
        self.lora = LoraLayer(rank, layer.num_embeddings, layer.embedding_dim, alpha, dropout, merge_weights)
        self.embedding.weight.requires_grad = False

  def forward(self, x):
        pass

In [39]:
## CODE REFERENCED FROM ECE570 ASSIGNMENT 3 ##

def train(model: nn.Module,
          loss_fn: nn.modules.loss._Loss,
          optimizer: torch.optim.Optimizer,
          train_loader: torch.utils.data.DataLoader,
          epoch: int=0)-> List:
    model.train()
    train_losses = []
    train_counter = []

    for batch_idx, (images, targets) in enumerate(train_loader):
      images, targets = images.to(device), targets.to(device)
      optimizer.zero_grad()
      output = model(images)
      loss = loss_fn(output, targets)
      loss.backward()
      optimizer.step()
      train_losses.append(loss.item())

      if batch_idx % 100 == 0:
        train_counter.append(
        (batch_idx*len(images)) + ((epoch-1)*len(train_loader.dataset)))
      torch.cuda.empty_cache()
    assert len(train_losses) == len(train_loader)
    return train_losses

def test(model: nn.Module,
         loss_fn: nn.modules.loss._Loss,
         test_loader: torch.utils.data.DataLoader,
         epoch: int=0)-> Dict:
    model.eval()

    test_loss = 0
    correct = 0
    test_stat = dict()
    test_losses = []
    test_counter = []
    total_num = 0
    pred_list = []

    with torch.no_grad():
      for images, targets in test_loader:
        images, targets = images.to(device), targets.to(device)
        output = model(images)
        test_loss += F.nll_loss(output, targets, reduction='sum').item()
        pred = output.data.argmax(1) # we get the estimate of our result by look at the largest class value
        correct += pred.eq(targets.data.view_as(pred)).sum() # sum up the corrected samples
        pred_list.extend(list(pred))
        total_num = total_num + 1

      test_loss /= len(test_loader.dataset)
      test_losses.append(test_loss)
      test_counter.append(len(test_loader.dataset)*epoch)

    test_stat['loss'] = test_loss
    test_stat['accuracy'] = correct / len(test_loader.dataset)
    test_stat['prediction'] = torch.Tensor(pred_list).to(torch.long)

    print(f"Test result on epoch {epoch}: Acc: {100*test_stat['accuracy']:.3f}%")
    return test_stat

In [40]:
## CODE TAKEN FROM ECE570 ASSIGNMENT 3 ##
train_transform = torchvision.transforms.Compose([torchvision.transforms.RandomResizedCrop(224),
                                                  torchvision.transforms.RandomHorizontalFlip(),
                                                  torchvision.transforms.ToTensor(),
                                                  torchvision.transforms.Normalize((0.485,0.456,0.406), (0.229,0.224,0.225))])

test_transform = torchvision.transforms.Compose([torchvision.transforms.Resize(256),
                                                 transforms.CenterCrop(224),
                                                  torchvision.transforms.ToTensor(),
                                                  torchvision.transforms.Normalize((0.485,0.456,0.406), (0.229,0.224,0.225))])

train_dataset = torchvision.datasets.CIFAR10('data', train=True, download=True, transform=train_transform)
test_dataset = torchvision.datasets.CIFAR10('data', train=False, download=True, transform=test_transform)

batch_size_train, batch_size_test = 256, 512

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size_train, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size_test, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [41]:
## CODE REFERENCED FROM ECE570 ASSIGNMENT 3 ##
rank = [2, 4, 6, 8]
max_epoch = 8
criterion = nn.CrossEntropyLoss()

for i in rank:
  start = time.time()
  resnet18_LORA = models.resnet18(pretrained=True)
  resnet18_LORA.to(device)
  resnet18_LORA.fc = LoraLinear(layer=resnet18_LORA.fc, rank=i, alpha=i, dropout=0.5, merge_weights=False)
  optimizer = optim.SGD(resnet18_LORA.parameters(), lr=0.1, momentum=0.8)
  for epoch in range(max_epoch):
    train(resnet18_LORA, criterion, optimizer, train_loader, epoch)
    test(resnet18_LORA, criterion, test_loader, epoch)
  end = time.time()
  print(f'Finished Training after {end-start} s ')

Test result on epoch 0: Acc: 20.330%
Test result on epoch 1: Acc: 48.970%
Test result on epoch 2: Acc: 75.010%
Test result on epoch 3: Acc: 80.500%
Test result on epoch 4: Acc: 84.220%
Test result on epoch 5: Acc: 85.800%
Test result on epoch 6: Acc: 86.870%
Test result on epoch 7: Acc: 90.660%
Finished Training after 907.1683824062347 s 
Test result on epoch 0: Acc: 81.890%
Test result on epoch 1: Acc: 87.590%
Test result on epoch 2: Acc: 89.630%
Test result on epoch 3: Acc: 91.440%
Test result on epoch 4: Acc: 91.330%
Test result on epoch 5: Acc: 91.140%
Test result on epoch 6: Acc: 91.380%
Test result on epoch 7: Acc: 92.710%
Finished Training after 902.1033148765564 s 
Test result on epoch 0: Acc: 30.860%
Test result on epoch 1: Acc: 42.360%
Test result on epoch 2: Acc: 63.570%
Test result on epoch 3: Acc: 75.820%
Test result on epoch 4: Acc: 78.920%
Test result on epoch 5: Acc: 80.470%
Test result on epoch 6: Acc: 85.200%
Test result on epoch 7: Acc: 86.520%
Finished Training afte

In [42]:
## CODE REFERENCED FROM ECE570 ASSIGNMENT 3 ##
for i in rank:
  start = time.time()
  resnet18_FREEZE_LORA =  models.resnet18(pretrained=True)
  resnet18_FREEZE_LORA.to(device)
  for param in resnet18_FREEZE_LORA.parameters():
    param.requires_grad = False

  for param in resnet18_FREEZE_LORA.layer3.parameters():
      param.requires_grad = True
  for param in resnet18_FREEZE_LORA.layer4.parameters():
      param.requires_grad = True
  resnet18_FREEZE_LORA.fc = LoraLinear(layer=resnet18_FREEZE_LORA.fc, rank=i, alpha=i, dropout=0.5, merge_weights=False)
  optimizer = optim.SGD([param for param in resnet18_FREEZE_LORA.parameters() if param.requires_grad], lr=0.1, momentum=0.8)
  for epoch in range(max_epoch):
    train(resnet18_FREEZE_LORA, criterion, optimizer, train_loader, epoch)
  test(resnet18_FREEZE_LORA, criterion, test_loader, epoch)
  end = time.time()
  print(f'Finished Training after {end-start} s ')

Test result on epoch 7: Acc: 30.720%
Finished Training after 678.0140495300293 s 
Test result on epoch 7: Acc: 85.600%
Finished Training after 677.3565816879272 s 
Test result on epoch 7: Acc: 91.400%
Finished Training after 676.575676202774 s 
Test result on epoch 7: Acc: 82.410%
Finished Training after 677.5734732151031 s 


In [43]:
def replace_conv_with_lora(model, rank, alpha, dropout, merge_weights):
    for name, module in model.named_modules():
        if isinstance(module, nn.Conv2d):
            lora_layer = LoraConv(module, rank, alpha, dropout, merge_weights)
            setattr(model, name, lora_layer)
    return model

In [45]:
## CODE TAKEN FROM ECE570 ASSIGNMENT 3 ##

resnet18_FT = models.resnet18(pretrained=True)
resnet18_FT = resnet18_FT.to(device)



In [51]:
## CODE TAKEN FROM ECE570 ASSIGNMENT 3 ##

start = time.time()

max_epoch = 8
optimizer = optim.SGD(resnet18_FT.parameters(), lr=0.1, momentum=0.8)
criterion = nn.CrossEntropyLoss()
for epoch in range(max_epoch):
  train(resnet18_FT, criterion, optimizer, train_loader, epoch)
  test(resnet18_FT, criterion, test_loader, epoch)

end = time.time()
print(f'Finished Training after {end-start} s ')

Test result on epoch 0: Acc: 80.140%
Test result on epoch 1: Acc: 83.690%
Test result on epoch 2: Acc: 86.080%
Test result on epoch 3: Acc: 85.600%
Test result on epoch 4: Acc: 87.000%
Test result on epoch 5: Acc: 87.580%
Test result on epoch 6: Acc: 88.520%
Test result on epoch 7: Acc: 88.680%
Finished Training after 902.0984711647034 s 


In [52]:
resnet18_FR = models.resnet18(pretrained=True)
resnet18_FR = resnet18_FR.to(device)

In [53]:
start = time.time()

for param in resnet18_FR.parameters():
    param.requires_grad = False

for param in resnet18_FR.fc.parameters():
    param.requires_grad = True
for param in resnet18_FREEZE_LORA.layer3.parameters():
    param.requires_grad = True
for param in resnet18_FREEZE_LORA.layer4.parameters():
    param.requires_grad = True

max_epoch = 8
optimizer = optim.SGD([param for param in resnet18_FR.parameters() if param.requires_grad], lr=0.1, momentum=0.8)
criterion = nn.CrossEntropyLoss()
for epoch in range(max_epoch):
  train(resnet18_FR, criterion, optimizer, train_loader, epoch)
  test(resnet18_FR, criterion, test_loader, epoch)

end = time.time()
print(f'Finished Training after {end-start} s ')

Test result on epoch 0: Acc: 56.870%
Test result on epoch 1: Acc: 54.730%
Test result on epoch 2: Acc: 68.700%
Test result on epoch 3: Acc: 58.890%
Test result on epoch 4: Acc: 68.540%
Test result on epoch 5: Acc: 70.150%
Test result on epoch 6: Acc: 56.500%
Test result on epoch 7: Acc: 58.860%
Finished Training after 779.1638870239258 s 


In [49]:
for i in rank:
  start = time.time()
  resnet18_TE = models.resnet18(pretrained=True)
  resnet18_TE = resnet18_TE.to(device)

  peft_config = LoraConfig(
      task_type="IMAGE_CLASSIFICATION",
      r=i,
      lora_alpha=i,
      target_modules=["fc"],
      lora_dropout=0.5,)
  resnet18_TE = get_peft_model(resnet18_TE, peft_config)
  for param in resnet18_TE.layer3.parameters():
      param.requires_grad = True
  for param in resnet18_TE.layer4.parameters():
      param.requires_grad = True

  optimizer = optim.SGD(resnet18_TE.parameters(), lr=0.1, momentum=0.8)

  for epoch in range(max_epoch):
      train(resnet18_TE, criterion, optimizer, train_loader, epoch)
  test(resnet18_TE, criterion, test_loader, epoch)
  end = time.time()
  print(f'Finished Training after {end - start} s ')



Test result on epoch 7: Acc: 91.940%
Finished Training after 677.086015701294 s 
Test result on epoch 7: Acc: 91.330%
Finished Training after 678.5429043769836 s 
Test result on epoch 7: Acc: 91.720%
Finished Training after 678.6246709823608 s 
Test result on epoch 7: Acc: 91.920%
Finished Training after 678.3967809677124 s 
