In [None]:
from tqdm import tqdm

# Custom
import models.resnet as resnet
import models.lossnet as lossnet
from config import *
from data.fst_data import train_data, test_data

import timm
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score

import datetime
# Python
import os
import random
import time
# Torch
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data.sampler import SubsetRandomSampler

# Torchvison
import torchvision.transforms as T
import torchvision.models as models

In [None]:
def LossPredLoss(input, target, margin=1.0, reduction='mean'):
    assert len(input) % 2 == 0, 'the batch size is not even.'
    assert input.shape == input.flip(0).shape
    
    input = (input - input.flip(0))[:len(input)//2] # [l_1 - l_2B, l_2 - l_2B-1, ... , l_B - l_B+1], where batch_size = 2B
    target = (target - target.flip(0))[:len(target)//2]
    target = target.detach()

    one = 2 * torch.sign(torch.clamp(target, min=0)) - 1 # 1 operation which is defined by the authors
    
    if reduction == 'mean':
        loss = torch.sum(torch.clamp(margin - one * input, min=0))
        loss = loss / input.size(0) # Note that the size of input is already halved
    elif reduction == 'none':
        loss = torch.clamp(margin - one * input, min=0)
    else:
        NotImplementedError()
    
    return loss

# Train Utils
iters = 0

#
def train_epoch(models, criterion, optimizers, dataloaders, epoch, epoch_loss):
    models['backbone'].train()
    
    models['module'].train()
    global iters

    for data in tqdm(dataloaders['train'], leave=False, total=len(dataloaders['train'])):
        inputs = data[0].to(device)
        labels = data[1].to(device)
        iters += 1

        optimizers['backbone'].zero_grad()
        optimizers['module'].zero_grad()

        scores, features = models['backbone'](inputs)
        target_loss = criterion(scores, labels)
        
        
        if epoch > epoch_loss:
            # After 120 epochs, stop the gradient from the loss prediction module propagated to the target model.
            features[0] = features[0].detach()
            features[1] = features[1].detach()
            features[2] = features[2].detach()
            features[3] = features[3].detach()
        
            
        pred_loss = models['module'](features)
        pred_loss = pred_loss.view(pred_loss.size(0))


        m_backbone_loss = torch.sum(target_loss) / target_loss.size(0)
        m_module_loss   = LossPredLoss(pred_loss, target_loss, margin=MARGIN)
        loss            = m_backbone_loss + WEIGHT * m_module_loss

        loss.backward()
        optimizers['backbone'].step()
        optimizers['module'].step()

def test(models, dataloaders, mode='val'):
    metrics = {}
    preds, labels = [], []
    assert mode == 'val' or mode == 'test'
    models['backbone'].eval()
    models['module'].eval()
    

    with torch.no_grad():
        for (inputs, label) in dataloaders[mode]:
            inputs = inputs.to(device)
            label = label.to(device)

            scores, _ = models['backbone'](inputs)
            # _, preds = torch.max(scores.data, 1)
            labels.extend(label.detach().tolist())
            preds.extend(scores.argmax(axis=1).detach().tolist())
        print(f" labels = {labels}")
        print(f" preds = {preds}")
    metrics['accuracy'] = accuracy_score(y_pred=preds, y_true=labels)
    metrics['f1_score'] = f1_score(y_pred=preds, y_true=labels, average='weighted')
    metrics['precision'] = precision_score(y_pred=preds, y_true=labels, average='weighted')
    metrics['recall'] = recall_score(y_pred=preds, y_true=labels, average='weighted')

            # print("labels " , labels)
            # print("preds " , preds)
    return metrics

#
def train(models, criterion, optimizers, schedulers, dataloaders, num_epochs, epoch_loss):
    print('>> Train a Model.')
    best_acc = 0.
    checkpoint_dir = os.path.join('./cifar10', 'train', 'weights')
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    
    for epoch in tqdm(range(num_epochs)):
            
        train_epoch(models, criterion, optimizers, dataloaders, epoch, epoch_loss)
        schedulers['backbone'].step()
        schedulers['module'].step()

        # Save a checkpoint
        if False and epoch % 5 == 4:
            acc = test(models, dataloaders, 'test')
            if best_acc < acc:
                best_acc = acc
                torch.save({
                    'epoch': epoch + 1,
                    'state_dict_backbone': models['backbone'].state_dict(),
                    'state_dict_module': models['module'].state_dict()
                },
                '%s/active_resnet18_cifar10.pth' % (checkpoint_dir))
            print('Val Acc: {:.3f} \t Best Acc: {:.3f}'.format(acc, best_acc))
    print('>> Finished.')

#
def get_uncertainty(models, unlabeled_loader):
    models['backbone'].eval()
    models['module'].eval()
    uncertainty = torch.tensor([]).to(device)

    with torch.no_grad():
        for (inputs, labels) in unlabeled_loader:
            inputs = inputs.to(device)
            # labels = labels.to(device)

            scores, features = models['backbone'](inputs)
            pred_loss = models['module'](features) # pred_loss = criterion(scores, labels) # ground truth loss
            pred_loss = pred_loss.view(pred_loss.size(0))

            uncertainty = torch.cat((uncertainty, pred_loss), 0)
    
    return uncertainty.cpu()


In [None]:
train_loader = DataLoader(train_data, batch_size=BATCH, 
                                  num_workers = 4, pin_memory=True,  drop_last = True)
test_loader  = DataLoader(test_data,num_workers = 4, batch_size=1)

dataloaders  = {'train': train_loader, 'test': test_loader}

print(len(dataloaders['train']))
# Model

# resnet18    = timm.create_model('resnet50', num_classes = 4).to(device)
model1 = timm.create_model('resnet50', num_classes = 4, pretrained=True, features_only = True, out_indices = (1,2,3,4)).to(device)
model2 = timm.create_model('resnet50', num_classes = 4, pretrained=True).to(device)

# print(f"resnet summary : {summary_(resnet18, (3, 32, 32))}")

# resnet18 = timm.create_model("resnet50", num_classes = 4, pretrained=True).to(device)
loss_module = lossnet.LossNet().to(device)
models      = {'backbone': [model1,model2] , 'module': loss_module}

torch.backends.cudnn.benchmark = False

# Active learning cycles
for cycle in range(CYCLES):
    # Loss, criterion and scheduler (re)initialization
    criterion      = nn.CrossEntropyLoss(reduction='none')
    optim_backbone = optim.SGD(models['backbone'].parameters(), lr=LR, 
                            momentum=MOMENTUM, weight_decay=WDECAY)
    optim_module   = optim.SGD(models['module'].parameters(), lr=LR, 
                            momentum=MOMENTUM, weight_decay=WDECAY)
    sched_backbone = lr_scheduler.MultiStepLR(optim_backbone, milestones=MILESTONES)
    sched_module   = lr_scheduler.MultiStepLR(optim_module, milestones=MILESTONES)

    optimizers = {'backbone': optim_backbone, 'module': optim_module}
    schedulers = {'backbone': sched_backbone, 'module': sched_module}

    # Training and test
    train(models, criterion, optimizers, schedulers, dataloaders, EPOCH, EPOCHL)
    metrics = test(models, dataloaders, mode='test')

In [54]:
from tqdm import tqdm

# Custom
import models.resnet as resnet
import models.lossnet as lossnet
from config import *
from data.fst_data import train_data, test_data

import timm
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score

import datetime
# Python
import os
import random
import time
# Torch
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data.sampler import SubsetRandomSampler

# Torchvison
import torchvision.transforms as T
import torchvision.models as models

model1 = timm.create_model('resnet50', num_classes = 4, pretrained=True, features_only = True, out_indices = (1,2,3,4))
model2 = timm.create_model('resnet50', num_classes = 4, pretrained=True)

In [55]:
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision.transforms as T

from pathlib import Path


img = Image.open("/Users/ibyeong-gwon/Desktop/Git/DeepLearning/211201-AT6T31EH-M232_inpaint_1 ADR_W1.6_H1.7_FS.jpg")
train_transform = T.Compose([
    # transforms.Resize((32, 32)),
    # transforms.Resize((224, 224)),
    transforms.Resize((360, 360)),
    T.RandomHorizontalFlip(),
    # T.RandomCrop(size=360, padding=4),
    
    T.ToTensor(),
    
    # T.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]) # T.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761)) # CIFAR-100
])

img = train_transform(img)
img = img.unsqueeze(0)


In [56]:
import torch.nn as nn
import torch.nn.functional as F
img = Image.open("/Users/ibyeong-gwon/Desktop/Git/DeepLearning/211201-AT6T31EH-M232_inpaint_1 ADR_W1.6_H1.7_FS.jpg")
img = train_transform(img)
img = img.unsqueeze(0)

out4 = model1(img)[3]
out = F.avg_pool2d(out4, 4)
        # print(out.shape)
out = out.view(out.size(0), -1)
print("out shape = ",out.shape)

linear = nn.Linear(18432, 4)

out = linear(out)

print(out)

print(model2(img))

out shape =  torch.Size([1, 18432])
tensor([[0.0475, 0.1764, 0.0296, 0.0270]], grad_fn=<AddmmBackward0>)
tensor([[-0.0040,  0.0128, -0.0513, -0.0288]], grad_fn=<AddmmBackward0>)


In [2]:
model2 = timm.create_model('resnet50', num_classes = 4, pretrained=True)
model2.forward_features(img)

NameError: name 'timm' is not defined