In [10]:
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision import datasets

class RSSCN7_DataLoader:
    def __init__(self, data_dir, batch_size=32, shuffle=False):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.shuffle = shuffle

        self.transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        self.dataset = datasets.ImageFolder(root=self.data_dir, transform=self.transform)
        self.train_dataset, self.test_dataset = self.split_dataset()

    def split_dataset(self):
        train_size = int(0.8 * len(self.dataset))
        test_size = len(self.dataset) - train_size

        train_dataset, test_dataset = random_split(self.dataset, [train_size, test_size])
        return train_dataset, test_dataset

    def get_train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=self.shuffle)

    def get_test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)


In [11]:
from torch import nn
import torchvision.models as models

class ResNet18(nn.Module):
    def __init__(self, num_classes=47):
        super(ResNet18, self).__init__()
        self.resnet18 = models.resnet18(pretrained=False)
        self.resnet18.fc = nn.Linear(self.resnet18.fc.in_features, num_classes)

    def forward(self, x):
        return self.resnet18(x)

In [12]:
import torch
import numpy as np
import copy
from torch import nn

def k_nearest_metric(k, model, data_loader, device):
    components = list(model.children())
    truncated_model = nn.Sequential(*components[:-1])
    
    if len(components) == 1:
        components = list(components[0].children())
        truncated_model = nn.Sequential(*list(components)[:-1])
        
    truncated_model = truncated_model.to(device)
        
    tensors = []
    truncated_model.eval()
    with torch.no_grad():
        x = data_loader.dataset[0][0]
        x = x.unsqueeze(0)
        x = x.to(device)
        tensor = truncated_model(x)
        shape = tensor.shape
        
        labels = np.array([], dtype='int64')
        for x, y in data_loader:
            labels = np.append(labels, y.cpu().numpy()) 
            x, y = x.to(device), y.to(device)
    
            tensor = truncated_model(x)
            tensors.append(tensor)

    if len(tensors) > 1 and tensors[-1].shape != tensors[0].shape:
        last_tensor = tensors[-1]
        tensors = tensors[:-1]
    else:
        last_tensor = None
        
    tensors = torch.stack(tensors)
    tensor_length = shape.numel()
    tensors_shape = torch.Size([tensors.shape[0] * tensors.shape[1], tensor_length])
    tensors = tensors.view(tensors_shape)
    
    if last_tensor is not None:
        last_tensor_shape = torch.Size([last_tensor.shape[0], tensor_length])
        last_tensor = last_tensor.view(last_tensor_shape)
        tensors = torch.cat([tensors, last_tensor], dim=0)

    inf_tensor = torch.full(torch.Size([tensor_length]), float('inf'))
    
    metric_results = []
    for i in range(len(tensors)):
        label = labels[i]
        tensor = tensors[i]
        other_tensors = copy.deepcopy(tensors)
        other_tensors[i] = inf_tensor
        distances = torch.norm(tensor - other_tensors, dim=1)
        
        k_nearest_indices = torch.topk(distances, k, largest=False).indices
        k_nearest_indices = k_nearest_indices.tolist()

        matching_labels = np.sum(labels[k_nearest_indices] == label)
        res = matching_labels / k
        metric_results.append(res)
    return np.array(metric_results)

ImageNet, self paced

In [4]:
from torchvision.models import resnet18
# from ..Resnet18.task.model import ResNet18
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
import time
import random

knn_metric_train = []
knn_metric_test = []
acc_train = []
acc_test = []
loss_train =[]
loss_test = []
time_epoch = []
cur_lambda = []
cur_learning_rate = []

random.seed(42)

time0 = time.time()

data_dir = '/kaggle/input/rssnc7/RSSCN7'
batch_size = 32
learning_rate = 0.0001
num_epochs = 100
lambda_beginning = 0.1
lambda_end = 1

rsscn7_data_loader = RSSCN7_DataLoader(data_dir, batch_size=batch_size, shuffle=True)
train_loader = rsscn7_data_loader.get_train_dataloader()
test_loader = rsscn7_data_loader.get_test_dataloader()

model = resnet18(weights='ResNet18_Weights.DEFAULT')
num_filters = model.fc.in_features
model.fc = nn.Linear(num_filters, 7)

######### In case of model pretrained on DTD: uploading the weights #################################################

# pretrained_model_path = "/kaggle/input/resnet18-pretrained-on-dtd/pytorch/version1/1/resnet18_trained_on_DTD_from_80_to_90.pth"
# pretrained_resnet18 = ResNet18()
# pretrained_resnet18.load_state_dict(torch.load(pretrained_model_path, map_location=torch.device(device)))

# model = pretrained_resnet18.to(device)

# model.fc = nn.Linear(47, 7)

criterion = nn.CrossEntropyLoss()
opitmizer = optim.Adam(model.parameters(), lr=learning_rate)
my_device = torch.device("cuda" if torch.cuda.is_available() else "mps")

step = 0.05

def train_model_self_paced(model, train_loader, test_loader, criterion, optimizer, num_epochs, learning_rate):
    device = my_device
    model.to(device)
    counter = 0

    lambda_current = lambda_beginning

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        correct = 0
        total = 0
        train_samples = []

        if lambda_current < 1:
            with torch.no_grad():
                for inputs, labels in train_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    train_samples.append((inputs, labels, loss.item()))

            train_samples.sort(key=lambda x: x[2])  # sort by loss (the first are the easiest)

            num_samples_current = int(lambda_current * len(train_samples))

            easy_enough_samples = train_samples[:num_samples_current]
            easy_enough_inputs = torch.cat([x[0] for x in easy_enough_samples])
            easy_enough_labels = torch.cat([x[1] for x in easy_enough_samples])
            easy_enough_dataset = TensorDataset(easy_enough_inputs, easy_enough_labels)
            easy_enough_loader = DataLoader(easy_enough_dataset, batch_size=batch_size, shuffle=True)
        else:
            easy_enough_loader = train_loader

        for inputs, labels in easy_enough_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
        knn_metric = np.mean(k_nearest_metric(5, model, easy_enough_loader, device))
        train_loss = total_loss / len(easy_enough_loader.dataset)
        train_accuracy = correct / total
        num_images = len(easy_enough_loader.dataset)

        if train_accuracy >= 0.88:
            learning_rate = 0.00001
        
        if train_accuracy >= 0.93:
            learning_rate = 0.000001
            
        if train_accuracy >= 0.96:
            learning_rate = 0.0000001

        cur_time_ = time.time() - time0

        print("learing_rate = ", learning_rate)

        print(
            f'Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}, Images: {num_images}, Lambda: {lambda_current:.2f}, Time: {cur_time_:.2f} seconds')
        print(f'KNN(k=5) metric train: {knn_metric:.4f}')
        if train_accuracy > 0.85:
            if lambda_current < 0.6:
                lambda_current += step
                if lambda_current>1:
                    lambda_current = 1
            else:
                counter = counter + 1
                if counter % 2 == 0:
                    lambda_current += step
                    counter = 0
                    if lambda_current>1:
                        lambda_current = 1
                        
        knn_metric_train.append(knn_metric)
        acc_train.append(train_accuracy)
        loss_train.append(train_loss)
        time_epoch.append(cur_time_)
        cur_lambda.append(lambda_current)
        cur_learning_rate.append(learning_rate)

        evaluate_model(model, test_loader, criterion)

    print('Finished Training Successfully')


def evaluate_model(model, test_loader, criterion):
    model.eval()
    device = next(model.parameters()).device
    total_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_loss = total_loss / len(test_loader.dataset)
    test_accuracy = correct / total
    knn_metric = np.mean(k_nearest_metric(5, model, test_loader, device))

    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')
    print(f'KNN(k=5) metric test: {knn_metric:.4f}')

    knn_metric_test.append(knn_metric)
    acc_test.append(test_accuracy)
    loss_test.append(test_loss)
    

train_model_self_paced(model, train_loader, test_loader, criterion, opitmizer, num_epochs, learning_rate)
print("KNN train:")
print(knn_metric_train)
print("KNN test:")
print(knn_metric_test)
print("Accuracy train:")
print(acc_train)
print("Accuracy test:")
print(acc_test)
print("Loss train:")
print(loss_train)
print("Loss test:")
print(loss_test)
print("Time epoch:")
print(time_epoch)
print("Learning rate:")
print(cur_learning_rate)
print("Lambdas:")
print(cur_lambda)

torch.save(model, 'resnet18_imagenet_self_paced.pth')

DTD, self paced

In [5]:
from torchvision.models import resnet18
# from ..Resnet18.task.model import ResNet18
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
import time
import random

knn_metric_train = []
knn_metric_test = []
acc_train = []
acc_test = []
loss_train =[]
loss_test = []
time_epoch = []
cur_lambda = []
cur_learning_rate = []

random.seed(42)

time0 = time.time()

data_dir = '/kaggle/input/rssnc7/RSSCN7'
batch_size = 32
learning_rate = 0.0001
num_epochs = 100
lambda_beginning = 0.1
lambda_end = 1

rsscn7_data_loader = RSSCN7_DataLoader(data_dir, batch_size=batch_size, shuffle=True)
train_loader = rsscn7_data_loader.get_train_dataloader()
test_loader = rsscn7_data_loader.get_test_dataloader()

my_device = torch.device("cuda" if torch.cuda.is_available() else "mps")
pretrained_model_path = "/kaggle/input/resnet18-pretrained-on-dtd/pytorch/version1/1/resnet18_trained_on_DTD_from_80_to_90.pth"
pretrained_resnet18 = ResNet18()
pretrained_resnet18.load_state_dict(torch.load(pretrained_model_path, map_location=torch.device(my_device)))

model = pretrained_resnet18.to(my_device)

model.fc = nn.Linear(47, 7)

criterion = nn.CrossEntropyLoss()
opitmizer = optim.Adam(model.parameters(), lr=learning_rate)

step = 0.05

def train_model_self_paced(model, train_loader, test_loader, criterion, optimizer, num_epochs, learning_rate):
    device = my_device
    model.to(device)
    counter = 0

    lambda_current = lambda_beginning

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        correct = 0
        total = 0
        train_samples = []

        if lambda_current < 1:
            with torch.no_grad():
                for inputs, labels in train_loader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    train_samples.append((inputs, labels, loss.item()))

            train_samples.sort(key=lambda x: x[2])  # sort by loss (the first are the easiest)

            num_samples_current = int(lambda_current * len(train_samples))

            easy_enough_samples = train_samples[:num_samples_current]
            easy_enough_inputs = torch.cat([x[0] for x in easy_enough_samples])
            easy_enough_labels = torch.cat([x[1] for x in easy_enough_samples])
            easy_enough_dataset = TensorDataset(easy_enough_inputs, easy_enough_labels)
            easy_enough_loader = DataLoader(easy_enough_dataset, batch_size=batch_size, shuffle=True)
        else:
            easy_enough_loader = train_loader

        for inputs, labels in easy_enough_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        knn_metric = np.mean(k_nearest_metric(5, model, easy_enough_loader, device))
        train_loss = total_loss / len(easy_enough_loader.dataset)
        train_accuracy = correct / total
        num_images = len(easy_enough_loader.dataset)

        if train_accuracy >= 0.88:
            learning_rate = 0.00001
        
        if train_accuracy >= 0.93:
            learning_rate = 0.000001
            
        if train_accuracy >= 0.96:
            learning_rate = 0.0000001

        cur_time_ = time.time() - time0

        print("learing_rate = ", learning_rate)

        print(
            f'Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}, Images: {num_images}, Lambda: {lambda_current:.2f}, Time: {cur_time_:.2f} seconds')
        print(f'KNN(k=5) metric train: {knn_metric:.4f}')
        if train_accuracy > 0.8:
            if lambda_current < 0.6:
                lambda_current += step
                if lambda_current>1:
                    lambda_current = 1
            else:
                counter = counter + 1
                if counter % 2 == 0:
                    lambda_current += step
                    counter = 0
                    if lambda_current>1:
                        lambda_current = 1

        knn_metric_train.append(knn_metric)
        acc_train.append(train_accuracy)
        loss_train.append(train_loss)
        time_epoch.append(cur_time_)
        cur_lambda.append(lambda_current)
        cur_learning_rate.append(learning_rate)

        evaluate_model(model, test_loader, criterion)

    print('Finished Training Successfully')


def evaluate_model(model, test_loader, criterion):
    model.eval()
    device = next(model.parameters()).device
    total_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    knn_metric = np.mean(k_nearest_metric(5, model, test_loader, device))
    test_loss = total_loss / len(test_loader.dataset)
    test_accuracy = correct / total

    print(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')
    print(f'KNN(k=5) metric test: {knn_metric:.4f}')

    knn_metric_test.append(knn_metric)
    acc_test.append(test_accuracy)
    loss_test.append(test_loss)


train_model_self_paced(model, train_loader, test_loader, criterion, opitmizer, num_epochs, learning_rate)
print("KNN train:")
print(knn_metric_train)
print("KNN test:")
print(knn_metric_test)
print("Accuracy train:")
print(acc_train)
print("Accuracy test:")
print(acc_test)
print("Loss train:")
print(loss_train)
print("Loss test:")
print(loss_test)
print("Time epoch:")
print(time_epoch)
print("Learning rate:")
print(cur_learning_rate)
print("Lambdas:")
print(cur_lambda)

torch.save(model, 'resnet18_DTD_self_paced.pth')

Transfer_learning_imagenet

In [7]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
import random
import time

time0 = time.time()

random.seed(10)

knn_metric_train = []
knn_metric_test = []
train_acc = []
test_acc = []
loss_train = []
loss_test = []
epoch_time = []
learning_rates = []

class RCCN7DataLoader:
    def __init__(self, data_dir, batch_size=32, shuffle=True):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.shuffle = shuffle

        self.transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        self.dataset = datasets.ImageFolder(root=self.data_dir, transform=self.transform)
        self.train_dataset, self.test_dataset = self.split_dataset()

    def split_dataset(self):
        train_size = int(0.8 * len(self.dataset))
        test_size = len(self.dataset) - train_size

        train_dataset, test_dataset = random_split(self.dataset, [train_size, test_size])
        return train_dataset, test_dataset

    def get_train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=self.shuffle)

    def get_test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)

data_dir = '/kaggle/input/rssnc7/RSSCN7'
batch_size = 32
learning_rate = 0.001

data_loader = RCCN7DataLoader(data_dir=data_dir, batch_size=batch_size, shuffle=True)

resnet18 = models.resnet18(weights='ResNet18_Weights.DEFAULT')


for name, param in resnet18.named_parameters():
    if 'fc' not in name:
        param.requires_grad = False

num_classes = 7

resnet18.fc = nn.Linear(resnet18.fc.in_features, num_classes)

device = "cuda"
resnet18 = resnet18.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(resnet18.parameters(), lr=learning_rate)

#most optimal
epochs = 100

def train_model(model, criterion, optimizer, train_loader, test_loader, num_epochs, learning_rate):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            
        knn_metric = np.mean(k_nearest_metric(5, model, train_loader, device))
        epoch_loss = running_loss / total_samples
        epoch_accuracy = correct_predictions / total_samples

        if epoch_accuracy >= 85:
            learning_rate = 0.0005

        if epoch_accuracy >= 89:
            learning_rate = 0.00005

        if epoch_accuracy >= 92:
            learning_rate = 0.000001

        time_cur = time.time() - time0
        print("learing_rate = ", learning_rate)

        print(f'Training - Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')
        print(f'KNN(k=5) metric train: {knn_metric:.4f}')
        test_loss, test_accuracy, test_knn = evaluate_model(model, criterion, test_loader)
        print(f'Testing - Epoch {epoch+1}/{num_epochs}, Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}')
        print(f'KNN(k=5) metric test: {test_knn:.4f}')

        print(f'Time: {time_cur:.2f} seconds')

        knn_metric_test.append(test_knn)
        knn_metric_train.append(knn_metric)
        train_acc.append(epoch_accuracy)
        test_acc.append(test_accuracy)
        loss_train.append(epoch_loss)
        loss_test.append(test_loss)
        learning_rates.append(learning_rate)
        epoch_time.append(time_cur)

    print('Training complete.')

def evaluate_model(model, criterion, test_loader):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)

    knn_metric = np.mean(k_nearest_metric(5, model, test_loader, device))
    test_loss = running_loss / total_samples
    test_accuracy = correct_predictions / total_samples

    return test_loss, test_accuracy, knn_metric

train_loader = data_loader.get_train_dataloader()
test_loader = data_loader.get_test_dataloader()

train_model(resnet18, criterion, optimizer, train_loader, test_loader, epochs, learning_rate)

print('Training completed successfully.')
print("KNN train:")
print(knn_metric_train)
print("KNN test:")
print(knn_metric_test)
print("Training accuracy:")
print(train_acc)
print("Test accuracy:")
print(test_acc)
print('Loss train:')
print(loss_train)
print("Test loss:")
print(loss_test)
print("Learning rate:")
print(learning_rates)
print("Epoch times:")
print(epoch_time)

torch.save(resnet18, 'resnet18_imagenet_transfer_learning.pth')

In [14]:
import torch
import torch.nn as nn
import random
import time
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
from torchvision import datasets

class RSSCN7_DataLoader:
    def __init__(self, data_dir, batch_size=32, shuffle=False):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.shuffle = shuffle

        self.transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        self.dataset = datasets.ImageFolder(root=self.data_dir, transform=self.transform)
        self.train_dataset, self.test_dataset = self.split_dataset()

    def split_dataset(self):
        train_size = int(0.8 * len(self.dataset))
        test_size = len(self.dataset) - train_size

        train_dataset, test_dataset = random_split(self.dataset, [train_size, test_size])
        return train_dataset, test_dataset

    def get_train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=self.shuffle)

    def get_test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)

time0 = time.time()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
random.seed(10)

knn_metric_train = []
knn_metric_test = []
train_acc = []
test_acc = []
loss_train = []
loss_test = []
epoch_time = []
learning_rates = []

data_dir = '/kaggle/input/rssnc7/RSSCN7'
batch_size = 32
learning_rate = 0.001

data_loader = RSSCN7_DataLoader(data_dir=data_dir, batch_size=batch_size, shuffle=True)

pretrained_model_path = '/kaggle/input/resnet18-pretrained-on-dtd/pytorch/version1/1/resnet18_trained_on_DTD_from_80_to_90.pth'
pretrained_resnet18 = ResNet18()
pretrained_resnet18.load_state_dict(torch.load(pretrained_model_path, map_location=device))

pretrained_resnet18.fc = nn.Linear(pretrained_resnet18.resnet18.fc.in_features, 7)

pretrained_resnet18 = pretrained_resnet18.to(device)

for name, param in pretrained_resnet18.named_parameters():
    if 'fc' not in name:
        param.requires_grad = False

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(pretrained_resnet18.parameters(), lr=learning_rate)

# most optimal
epochs = 100

def train_model(model, criterion, optimizer, train_loader, test_loader, num_epochs, learning_rate):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        total_samples = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)
        
        knn_metric = np.mean(k_nearest_metric(5, model, train_loader, device))
        epoch_loss = running_loss / total_samples
        epoch_accuracy = correct_predictions / total_samples 

        if epoch_accuracy >= 0.85:
            learning_rate = 0.0001


        time_cur = time.time() - time0
        print("learning_rate = ", learning_rate)

        print(f'Training - Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')
        print(f'KNN(k=5) metric train: {knn_metric:.4f}')
        test_loss, test_accuracy, test_knn = evaluate_model(model, criterion,  test_loader)
        print(f'Testing - Epoch {epoch+1}/{num_epochs}, Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}')
        print(f'KNN(k=5) metric test: {test_knn:.4f}')

        print(f'Time: {time_cur:.2f} seconds')

        knn_metric_test.append(test_knn)
        knn_metric_train.append(knn_metric)
        train_acc.append(epoch_accuracy)
        test_acc.append(test_accuracy)
        loss_train.append(epoch_loss)
        loss_test.append(test_loss)
        learning_rates.append(learning_rate)
        epoch_time.append(time_cur)

    print('Training complete.')

def evaluate_model(model, criterion, test_loader):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)

    knn_metric = np.mean(k_nearest_metric(5, model, test_loader, device))
    test_loss = running_loss / total_samples
    test_accuracy = correct_predictions / total_samples

    return test_loss, test_accuracy, knn_metric

train_loader = data_loader.get_train_dataloader()
test_loader = data_loader.get_test_dataloader()

train_model(pretrained_resnet18, criterion, optimizer, train_loader, test_loader, epochs, learning_rate)

print('Training completed successfully.')
print("KNN train:")
print(knn_metric_train)
print("KNN test:")
print(knn_metric_test)
print("Training accuracy:")
print(train_acc)
print("Test accuracy:")
print(test_acc)
print('Loss train:')
print(loss_train)
print("Test loss:")
print(loss_test)
print("Learning rate:")
print(learning_rates)
print("Epoch times:")
print(epoch_time)

torch.save(pretrained_resnet18, 'resnet18_DTD_transfer_learning.pth')
