In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
import torch
import torchvision
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.functional import F
from tqdm import trange
import numpy as np

In [None]:
temp = torch.ones((10, 2))
temp.device
use_gpu = True
device = torch.device("cuda:0" if torch.cuda.is_available() and use_gpu else "cpu") # New
device

In [None]:
%cd "/content/drive/MyDrive/phase4"

In [5]:
from utils import get_oxford_splits
from utils import custom_plot_training_stats
from utils import plot_conf
from utils import *

In [None]:
A_train_dl, A_test_dl, B_train_dl, B_test_dl, test_all = get_oxford_splits(batch_size = 128,data_loader_seed=111,
pin_memory =True,num_workers= 2)

In [7]:
import torch
import torch.nn as nn

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )

        self.pool1 = nn.MaxPool2d(kernel_size=2)

        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 96, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(96),
            nn.ReLU(inplace=True),
            nn.Conv2d(96, 96, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(96),
            nn.ReLU(inplace=True),
            nn.Conv2d(96, 96, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(96),
            nn.ReLU(inplace=True),
            nn.Conv2d(96, 96, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(96),
            nn.ReLU(inplace=True)
        )

        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.conv4 = nn.Sequential(
            nn.Conv2d(96, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True)
        )

        self.pool3 = nn.MaxPool2d(kernel_size=2)

        self.conv5 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True)
        )

        self.pool4 = nn.MaxPool2d(kernel_size=2)
        self.flat= nn.Flatten()
        self.fc = nn.Linear(4096, 80)
#         t = torch.tensor([[[1, 2],
# ...                    [3, 4]],
# ...                   [[5, 6],
# ...                    [7, 8]]])
# >>> torch.flatten(t)
# tensor([1, 2, 3, 4, 5, 6, 7, 8])
# >>> torch.flatten(t, start_dim=1)
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.pool1(x)
        x = self.conv3(x)
        x = self.pool2(x)
        x = self.conv4(x)
        x = self.pool3(x)
        x = self.conv5(x)
        x = self.pool4(x)
        x = self.flat(x)
        x = self.fc(x)
        return x

# Create an instance of the model
model = MyModel()

In [None]:
!pip install torchsummary
from torchsummary import summary
model =MyModel().to(device)
summary(model, (3, 64, 64))

In [9]:
def train_one_epoch(model: nn.Module, optim: torch.optim.Optimizer,dataloader: DataLoader, loss_fn):

    # utils
    num_samples = len(dataloader.dataset)
    num_batches = len(dataloader)
    running_corrects = 0
    running_loss = 0.0

    model.train() #

    for batch_indx, (inputs, targets) in enumerate(dataloader):
        inputs = inputs.to(device)
        targets = targets.to(device)

        outputs = model(inputs)

        loss = loss_fn(outputs, targets)

        loss.backward() # Compute Gradients
        optim.step() # Update parameters
        optim.zero_grad() # zero the parameter's gradients

        _, preds = torch.max(outputs, dim=1) # Explain, [N]
        # print(preds)
        running_corrects += torch.sum(preds == targets)
        running_loss += loss.item()


        # if batch_indx == 0:
            # print(outputs.device)

    epoch_acc = (running_corrects / num_samples) * 100
    epoch_loss = (running_loss / num_batches)

    return epoch_acc, epoch_loss

In [10]:
def test_model(model: nn.Module,dataloader: DataLoader, loss_fn):

    pred_label=[]
    target_label=[]
    # utils
    num_samples = len(dataloader.dataset)
    num_batches = len(dataloader)
    running_corrects = 0
    running_loss = 0.0

    model.eval() # you must call `model.eval()` to set dropout and batch normalization layers to evaluation mode before running inference.
    with torch.no_grad(): # explain
        # more on torch.no_grad(): https://pytorch.org/tutorials/beginner/basics/autogradqs_tutorial.html#disabling-gradient-tracking

        for batch_indx, (inputs, targets) in enumerate(dataloader): # Get a batch of Data
            inputs = inputs.to(device)
            targets = targets.to(device)

            outputs = model(inputs) # Forward Pass
            loss = loss_fn(outputs, targets) # Compute Loss

            # loss.backward() # Compute Gradients
            # optim.step() # Update parameters
            # optim.zero_grad() # zero the parameter's gradients

            _, preds = torch.max(outputs, 1) #
            running_corrects += torch.sum(preds == targets)
            running_loss += loss.item()
            for i,j in zip(preds,targets):
              pred_label.append(i.cpu())
              target_label.append(j.cpu())
              # print(pred_label)


            if batch_indx == 0:
                print(outputs.device)

    test_acc = (running_corrects / num_samples) * 100
    test_loss = (running_loss / num_batches)

    return test_acc, test_loss,pred_label,target_label

In [None]:
# model = MyModel()
# model = model.to(device)

# max_test=0
# best_loss_train=0
# best_loss_test=0
# best_train=0
# best_batch=0
# best_learn=0
# test_acc_list=[]
# train_acc_list=[]
# pred_list=[]
# all_of_history=[]
# targe_list=[]
# acc_history = {}
# loss_history = {}

# best_model=None
# checked=[]
# # torch.load('checked')
# for jj in [256]:#128 ->72.67(0.001) #165,  (worth)
#   model = MyModel()
#   model = model.to(device)
#   acc_history[f'{jj}']={}
#   loss_history[f'{jj}']={}
#   A_train_dl, A_test_dl, B_train_dl, B_test_dl, test_all = get_oxford_splits(batch_size = jj,data_loader_seed=111,pin_memory =True,num_workers= 2)
#   for ii in [0.0001,0.0005,0.005]:#
#     isChecked=False
#     for c in checked:
#       if(c[0]==ii and c[1]==jj):
#         isChecked=True
#         break
#     if( isChecked==False):
#       acc_history[f'{jj}'][f'{ii}']={'test':[], 'train':[]}
#       loss_history[f'{jj}'][f'{ii}']={'test':[], 'train':[]}
#       print(f"lr{ii } , batch{jj}")
#       learning_rate=ii
#       optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
#       cross_entropy = nn.CrossEntropyLoss()

#       for epoch in trange(100):
#           train_acc, train_loss = train_one_epoch(model=model, optim=optimizer, dataloader=A_train_dl, loss_fn=cross_entropy)
#           test_acc, test_loss ,pred,targe= test_model(model=model, dataloader=A_test_dl, loss_fn=cross_entropy)
#           # for i ,j in zip(pred,targe):
#           #   pred_list.append(i)
#           #   targe_list.append(j)

#           acc_history[f'{jj}'][f'{ii}']['train'].append(train_acc)
#           acc_history[f'{jj}'][f'{ii}']['test'].append(test_acc)
#           loss_history[f'{jj}'][f'{ii}']['train'].append(train_loss)
#           loss_history[f'{jj}'][f'{ii}']['test'].append(test_loss)
#           if (max_test<test_acc) :
#             max_test=test_acc
#             best_train=train_acc
#             best_loss_test=test_loss
#             best_loss_train=train_loss
#             best_batch=jj
#             best_learn=ii
#             best_model=model.state_dict()

#           # print(f",train_acc{train_acc}, test_acc{test_acc}, train_loss{train_loss} ,test_loss{test_loss}")
#       checked.append((ii,jj))
#       torch.save(checked,'checked')
#       print(f"train_acc{best_train}, test_acc{max_test}, train_loss{best_loss_train} ,test_loss{best_loss_test}")
#       print("---------------------------------------------------------------------------------------------------------------------------------------------")
# torch.save(best_model, "save_model")
# all_of_history=[acc_history[f"{best_batch}"][f"{best_learn}"],loss_history[f"{best_batch}"][f"{best_learn}"],pred_list,targe_list]
# torch.save(all_of_history,'history')
# print(f"best_batch: {best_batch} , best_learning rate : {best_learn} , best_acc{max}")

In [None]:
model = MyModel()
model = model.to(device)
learning_rate=0.0003
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
cross_entropy = nn.CrossEntropyLoss()

test_acc_list=[]
train_acc_list=[]
pred_list=[]
all_of_history=[]
targe_list=[]
acc_history = {'train': [], 'test': []}
loss_history = {'train': [], 'test': []}
best_model=None
best_test_acc=0
for epoch in trange(40):
    train_acc, train_loss = train_one_epoch(model=model, optim=optimizer, dataloader=A_train_dl, loss_fn=cross_entropy)
    test_acc, test_loss ,pred,targe= test_model(model=model, dataloader=A_test_dl, loss_fn=cross_entropy)
    for i ,j in zip(pred,targe):
      pred_list.append(i)
      targe_list.append(j)

    acc_history['train'].append(train_acc)
    acc_history['test'].append(test_acc)
    loss_history['train'].append(train_loss)
    loss_history['test'].append(test_loss)
    all_of_history=[acc_history,loss_history,pred_list,targe_list]
    torch.save(all_of_history,'history')
    if(best_test_acc<test_acc):
      best_test_acc=test_acc
      best_model=model.state_dict()


    print(f"train_acc{train_acc}, test_acc{test_acc}, train_loss{train_loss} ,test_loss{test_loss}")

torch.save(best_model, "model3")

In [None]:
# for i,param in model.named_parameters():
  # print(f'i{i} ,param{param}')

In [None]:

8# import torch.nn as nn
# import torch.optim as optim
# from sklearn.model_selection import GridSearchCV


# param_grid = {
#     'learning_rate': [0.0001,0.001,0.0005,0.005,0.05, 0.01, 0.1],
#     'batch_size':[128,256,200]
# }

# def train_and_evaluate(learning_rate,batch_size):
#     # A_train_dl, A_test_dl, B_train_dl, B_test_dl, test_all = get_oxford_splits(batch_size = 128,data_loader_seed=111,pin_memory =True,num_workers= 2)
#     model = MyModel()
#     model = model.to(device)

#     optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
#     cross_entropy = nn.CrossEntropyLoss()

#     # Lists for storing accuracy and loss for each epoch
#     acc_history = {'train': [], 'test': []}
#     loss_history = {'train': [], 'test': []}

#     for epoch in trange(1):
#         train_acc, train_loss = train_one_epoch(model=model, optimizer=optimizer, dataloader=A_train_dl, loss_fn=cross_entropy)
#         test_acc, test_loss, pred, targe = test_model(model=model, dataloader=A_test_dl, loss_fn=cross_entropy)

#         acc_history['train'].append(train_acc)
#         acc_history['test'].append(test_acc)
#         loss_history['train'].append(train_loss)
#         loss_history['test'].append(test_loss)

#     return acc_history, loss_history, pred, targe

# # Create an instance of GridSearchCV
# gs = GridSearchCV(estimator=train_and_evaluate, param_grid=param_grid, cv=3)

# # Fit the GridSearchCV instance to perform the grid search
# # gs.fit(X=A_train_dl)  # Pass your training data if required

# # Print the best hyperparameters and their respective scores
# print("Best hyperparameters:", gs.best_params_)
# print("Best score:", gs.best_score_)

# # Access the results of each parameter combination
# print("Grid search results:")
# for params, mean_score, _ in gs.cv_results_:
#     print("Parameters:", params)
#     print("Mean score:", mean_score)

# # Save the best model
# best_model = gs.best_estimator_
# torch.save(best_model.state_dict(), "save_model2")

In [None]:
acc_history,loss_history,targe_lists, pred_lists=torch.load("history")
model.load_state_dict(torch.load("model3"))
plot_conf(targe_lists, pred_lists, "confusion_matrix","plot", "matrix_label")

In [None]:
acc_history_cpu = {key: [value.to('cpu').numpy() for value in values] for key, values in acc_history.items()}
loss_history_cpu = {key: np.array(values) for key, values in loss_history.items()}


custom_plot_training_stats(acc_history_cpu, loss_history_cpu, ['train', 'test'], title='demo', dir='demo_plots')

#  section 2

In [11]:
def train_test(model, epochs, out_path,check_all):
    model = model.to(device)
    learning_rate = 0.0003
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    # parameters = filter(lambda p: p.requires_grad, model.parameters())
    # optimizer = torch.optim.Adam(parameters, lr=learning_rate)
    cross_entropy = nn.CrossEntropyLoss()
    all_of_history = []
    acc_history = {'train': [], 'test_B': [], 'all': [],'test_A': []}
    loss_history = {'train': [], 'test_B': [], 'all': [],'test_A': []}

    for epoch in trange(epochs):
        pred_list = []
        target_list = []
        train_acc, train_loss = train_one_epoch(model=model, optim=optimizer, dataloader=B_train_dl, loss_fn=cross_entropy)
        test_acc, test_loss, pred, target = test_model(model=model, dataloader=B_test_dl, loss_fn=cross_entropy)
        acc_all, loss_all, pred_all, target_all = test_model(model=model, dataloader=test_all, loss_fn=cross_entropy)
        acc_a, loss_a, pred_a, target_a = test_model(model=model, dataloader=A_test_dl, loss_fn=cross_entropy)
        for i, j in zip(pred_all, target_all):
            pred_list.append(i)
            target_list.append(j)

        acc_history['train'].append(train_acc)
        acc_history['test_B'].append(test_acc)
        loss_history['train'].append(train_loss)
        loss_history['test_B'].append(test_loss)
        acc_history['all'].append(acc_all)
        loss_history['all'].append(loss_all)
        acc_history['test_A'].append(acc_a)
        loss_history['test_A'].append(loss_a)
        all_of_history = [acc_history, loss_history]
        torch.save(all_of_history, f'history{out_path}')
        if (check_all ):
          plot_conf(target_list, pred_list, "confusion_matrix", f'{out_path}/all', f"conf_matrix_{epoch}")
        if ( check_all==None):
          plot_conf(target_list, pred_list, "confusion_matrix", f'{out_path}', f"conf_matrix_{epoch}")

        print(f"train_acc{train_acc}, test_acc{test_acc}, train_loss{train_loss}, test_loss{test_loss}, all_acc {acc_all}, all_loss {loss_all}")

        acc_history_cpu = {key: [value.to('cpu').numpy() for value in values] for key, values in acc_history.items()}
        loss_history_cpu = {key: np.array(values) for key, values in loss_history.items()}
        if (check_all):
          custom_plot_training_stats(acc_history_cpu, loss_history_cpu, ['train', 'all'], title='plot', dir=f'{out_path}/all')
        if (check_all==None):
          custom_plot_training_stats(acc_history_cpu, loss_history_cpu, ['train', 'all','test_B', 'test_A'], title='plot', dir=f'{out_path}')

        else :
          custom_plot_training_stats(acc_history_cpu, loss_history_cpu, ['train', 'test_B', 'test_A'], title='plot', dir=f'{out_path}/B')



In [None]:
# model2=MyModel()
# model2.load_state_dict(torch.load("model3"))
# for i,param in model2.named_parameters():
#   print(f'i{i} ,param{param}')


In [12]:
def copy_model():
  new_model=MyModel()
  new_model.load_state_dict(torch.load("model3"))
  in_features = new_model.fc.in_features
  last_w_fc= nn.Parameter(new_model.fc.weight[:80].detach().clone(), requires_grad=True)
  last_b_fc= nn.Parameter(new_model.fc.bias[:80].detach().clone(), requires_grad=True)

  new_model.fc = nn.Linear(in_features, 100)

  # Creating new parameters with old and new combined
  new_w_fc = nn.Parameter(torch.cat((last_w_fc, new_model.fc.weight[80:])), requires_grad=True)
  new_b_fc = nn.Parameter(torch.cat((last_b_fc, new_model.fc.bias[80:])), requires_grad=True)

  new_model.fc.weight = new_w_fc
  new_model.fc.bias = new_b_fc
  return new_model

section 2.1

---



In [None]:
model2=copy_model()
train_test(model2,20,"2_1",True)
# for i,param in model2.named_parameters():
#   print(f'i{i} ,param{param}')

In [None]:
model2=copy_model()
train_test(model2,50,"2_1",False)

In [None]:
# # add random noise
# augmented_dataset = []
# original_dataset = B_train_dl

# for i in range(10):
#   for samples, labels in original_dataset:
#     for sample,label in zip(samples, labels):
#       noise = torch.randn(sample.shape) * 0.1 # You can adjust the scale factor
#       # Add noise to your features
#       augmented_sample = sample + noise
#       augmented_dataset.append((augmented_sample, label))

# # Combine original and augmented datasets
# combined_dataset = original_dataset + augmented_dataset

# # Create DataLoader for training
# batch_size = 128
# def seed_worker(worker_id):
#         # worker_seed = torch.initial_seed() % 2 ** 32
#         np.random.seed(111)
#         random.seed(111)
# g = torch.Generator()
# g.manual_seed(111)
# train_loader = torch.utils.data.DataLoader(
#         combined_dataset,
#         batch_size = batch_size,
#         shuffle=True,
#         worker_init_fn=seed_worker,
#         generator= torch.Generator(),
#         drop_last=False,
#         pin_memory=True,
#         num_workers=2
#     )


section 2.2

In [None]:
model3=copy_model()

for param in model3.conv1.parameters():
    param.requires_grad = False

for param in model3.conv2.parameters():
    param.requires_grad = False

for param in model3.conv3.parameters():
    param.requires_grad = False

for param in model3.conv4.parameters():
    param.requires_grad = False

for param in model3.conv5.parameters():
    param.requires_grad = False

train_test(model3,20,"2_2",True)


In [None]:
model3=copy_model()

for param in model3.conv1.parameters():
    param.requires_grad = False

for param in model3.conv2.parameters():
    param.requires_grad = False

for param in model3.conv3.parameters():
    param.requires_grad = False

for param in model3.conv4.parameters():
    param.requires_grad = False

for param in model3.conv5.parameters():
    param.requires_grad = False

train_test(model3,50,"2_2",False)

section 2.3


In [None]:
model4 = MyModel()
# model4.load_state_dict(torch.load("model3"))

in_features = model4.fc.in_features
# model4.fc = nn.Linear(in_features, 100)

model4=copy_model()

for param in model4.conv1.parameters():
    param.requires_grad = False

for param in model4.conv2.parameters():
    param.requires_grad = False

for param in model4.conv3.parameters():
    param.requires_grad = False

for param in model4.conv4.parameters():
    param.requires_grad = False

for param in model4.conv5.parameters():
    param.requires_grad = False

mask = torch.cat([torch.zeros(80, in_features), torch.ones(20, in_features)], dim=0).to(device)

def masked_backward_hook(grad):
    return grad * mask

# Apply the mask to the gradients of the last FC layer
model4.fc.weight.register_hook(masked_backward_hook)


train_test(model4,25, "2_3",None)