In [1]:
import numpy as np
from pathlib import Path
import sys

import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
import timm
from timm.utils import AverageMeter
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, ReduceLROnPlateau
from torch.utils.data import DataLoader, random_split
from torch.utils.data import Subset
from warmup_scheduler import GradualWarmupScheduler  

In [2]:
ROOT_PATH = Path.cwd().parent
DATA_PATH = ROOT_PATH / "data"
sys.path.append(ROOT_PATH.as_posix())

BATCH_SIZE = 32
SEED = 42

NUM_FOLDS = 5
NUM_EPOCHS = 10
SMOOTHING = 0.2

DEVICE = "cpu"

In [3]:
from src.data import LeafDataset, get_image_labels
from src.augmentation import get_augmentations
from src.config import AugmentationConfig, ModelConfig
from src.utils import seed_everything

In [4]:
aug_config = AugmentationConfig()

In [5]:
train_augs, test_augs = get_augmentations(**aug_config.model_dump())



In [6]:
model_config = ModelConfig()

In [7]:
def get_model(backbone, pretrained, optimizer_class, lr, scheduler_class, lr_min, warm_up):
    model = timm.create_model(model_name = backbone, pretrained = pretrained)
    if optimizer_class == "Adam":
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    else:
        optimzer = None

    if scheduler_class == "CosineAnnealing":
        after_scheduler = CosineAnnealingWarmRestarts(
            optimizer = optimizer, 
            T_0 = NUM_EPOCHS - warm_up if NUM_EPOCHS > 1 else 1,
            eta_min   = lr_min)
        scheduler = GradualWarmupScheduler(
            optimizer = optimizer, multiplier = 1, total_epoch = warm_up + 1, 
            after_scheduler = after_scheduler)
    else:
        scheduler = GradualWarmupScheduler(
            optimizer = optimizer, multiplier = 1, total_epoch = warm_up + 1)
        
    return model, optimizer, scheduler

In [8]:
class LabelSmoothingCrossEntropy(nn.Module):

    def __init__(self, smoothing = 0.1, reduction = 'mean'):
        super(LabelSmoothingCrossEntropy, self).__init__()
        assert smoothing < 1.0
        self.smoothing  = smoothing
        self.confidence = 1. - smoothing
        self.reduction  = reduction

    def forward(self, x, target):
        logprobs = F.log_softmax(x, dim=-1)
        nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(1))
        nll_loss = nll_loss.squeeze(1)
        smooth_loss = -logprobs.mean(dim=-1)
        loss = self.confidence * nll_loss + self.smoothing * smooth_loss
        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        return loss

In [9]:
train_criterion = LabelSmoothingCrossEntropy(smoothing = SMOOTHING)
val_criterion = nn.CrossEntropyLoss()

CV

In [10]:
image_paths, labels_all = get_image_labels(DATA_PATH)

In [11]:
skf = StratifiedKFold(n_splits=NUM_FOLDS, shuffle=True, random_state=SEED)

In [12]:
for fold, (train_index, val_index) in enumerate(skf.split(image_paths, labels_all)):
    train_dataset = LeafDataset(
        [image_paths[i] for i in train_index],
        [labels_all[i] for i in train_index],
        transform=train_augs
    )
    val_dataset = LeafDataset(
        [image_paths[i] for i in val_index],
        [labels_all[i] for i in val_index],
        transform=test_augs
    )

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    # reset seed
    seed_everything(SEED + fold)

    model, optimizer, scheduler = get_model(**model_config.model_dump())

    for epoch in range(NUM_EPOCHS):
        # training
        model.train()
        train_loss = AverageMeter()
        
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)

            preds = model(inputs)
            loss = criterion(preds, labels)

            loss.backward() 
            
            optimizer.step()
            scheduler.step(epoch + 1 + batch_idx / len(train_loader))  # batch-wise
            optimizer.zero_grad()

            train_loss.update(loss.item(), inputs.size(0))

        # validation
        model.eval()
        val_loss = AverageMeter()
        _probs = []
    
        with torch.no_grad():
            for batch_idx, (inputs, labels) in enumerate(val_loader):
    
                inputs = inputs.to(DEVICE)
                labels = labels.to(DEVICE)
    
                logits = torch.zeros((inputs.shape[0], CFG['num_classes']), device = device)
                probs  = torch.zeros((inputs.shape[0], CFG['num_classes']), device = device)
    
                # compute predictions
                logits = model(inputs)
                probs = logits.softmax(axis=1)
    
                # compute loss
                loss = criterion(logits, labels)
                val_loss.update(loss.item(), inputs.size(0))
    
                # store predictions
                _probs.append(probs.detach().cpu())
    
        _probs = torch.cat(_probs).numpy()

        print(f"Training loss: {train_loss.sum / len(train_loader)}")
        print(f"Validation loss: {val_loss.sum / len(val_loader)}")
        
        lr = scheduler.state_dict()['_last_lr'][0]

        lrs.append(lr)
        train_losses.append(train_loss.sum / len(train_dataset))
        val_losses.append(val_loss.sum / len(val_dataset))
        val_metrics.append(
            (np.argmax(_probs, axis = 1) == [labels[i] for i in val_index]).sum() / len(val_dataset)
        )
        print(f"Validation accuracy: {val_metrics[epoch]}")

  model = create_fn(

KeyboardInterrupt

