In [1]:
!pip3 install einops
#!pip install torchvision==0.11.3 -f https://download.pytorch.org/whl/torch_stable.html



# **Imports**

In [12]:
import torch
print('CUDA is available:', torch.cuda.is_available())
!export CUDA_LAUNCH_BLOCKING=1
import os
import numpy as np
import cv2 
from PIL import Image
import torchvision.datasets as datasets
from torchvision import transforms
from torch.cuda.amp import GradScaler, autocast
from torch.utils.data import Dataset, DataLoader, ConcatDataset, SubsetRandomSampler
from torch.optim import lr_scheduler
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt
%matplotlib inline
from pytorchCoatnet import CoAtNet
from losses import ArcMarginProduct
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('Device selected:', device)

CUDA is available: False
Device selected: cpu


# **Config**

In [3]:
# define configuration class

# **Reading data**

In [13]:
# define custom dataset to deal with the data
import ssl
from dataset import DatasetMF
from config import config
ssl.create_default_context = ssl._create_unverified_context

# now init the datasets
train_dataset = datasets.CIFAR100('./', train=True, download=True) 
test_dataset = datasets.CIFAR100('./', train=False, download=True) 
dataset = DatasetMF(ConcatDataset([train_dataset, test_dataset]))

Files already downloaded and verified
Files already downloaded and verified


# ***Modeling***
Here we gonna use CoAtNeT as image feature extractor and we will also use a classification head(another model) to filter the features through arcface(essenatially) loss, e.g. transforming the logits from the backbone model with arcface on another model, then these logits contribute to the cross entropy loss as we backpropagate with respect to both models weights.

In [14]:
# implement GeM pooling
class GeM(torch.nn.Module):
  def __init__(self, p=3, eps=1e-6):
    super(GeM, self).__init__()
    self.p = torch.nn.Parameter(torch.ones(1)*p)
    self.eps = eps

  def forward(self, x):
    return self.gem(x, p=self.p, eps=self.eps)
      
  def gem(self, x, p=3, eps=1e-6):
    return torch.nn.functional.avg_pool2d(x.clamp(min=eps).pow(p), (x.size(-2), x.size(-1))).pow(1./p)


# define ArcFace head classifier
class Net(torch.nn.Module):
  def __init__(self, model, num_classes):
    super(Net, self).__init__()
    self.backbone = model
    self.pool = GeM() # pooling layer
    self.drop = torch.nn.Dropout2d(0.2)
    self.dense = torch.nn.Linear(1, 512)

    # Some interesting theory(tl;dr):
    # here, in the loss function, parameter 's' denotes the radius of 
    # a hypersphere on which the learned embeddings are distributed,
    # while the 'm' parameter denotes to an additive angular margin penalty added 
    # between the features and ground truth weights, e.g. X_i and W_yi
    self.final = ArcMarginProduct(in_features=512, out_features=num_classes, m=config.penalty, s=config.radius, device=device) # defaults are m=0.5, s=30

  # as we need labels for transforming 
  # the logits with arcface loss, we define the 
  # forward method as follows:
  def forward(self, x, y):
    logits = self.backbone(x)
    pooled_features = self.pool(logits.unsqueeze(0).view(config.batch_size, -1, 10, int(logits.shape[1]/10)))
    dropped_features = self.drop(pooled_features)
    emb = self.dense(dropped_features) # get embeddings from dense layer

    # return transformed logits
    return self.final(emb.squeeze(1).squeeze(1), y)


# define pipeline models
# using coatnet3 btw
num_blocks = [2, 2, 6, 14, 2]            # L
channels = [192, 192, 384, 768, 1536]    # D
block_types=['C', 'C', 'T', 'T']         # 'C' for MBConv, 'T' for Transformer
coatnet = CoAtNet(config.img_size, 3, num_blocks, channels, block_types=block_types) # predicting 1000 classes by default
model = Net(coatnet, config.num_classes)

# ***Train and validation functions***

In [15]:
from tqdm import tqdm


# training function for an epoch
def train_epoch(model, train_dataloader):
  model.train()
  train_loss, train_correct = 0.0, 0

  for step, batch in enumerate(tqdm(train_dataloader)):
    # zero gradients
    optim.zero_grad()
    
    # transfer batch to device
    x, y = batch
    x, y = x.to(device), y.to(device)

    # clip the norm of the gradients to 1.0 to prevent "exploding gradients"
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    
    # compute logits and loss, and perform backpropagation and gradient descent step
    logits = model(x.float(), y) # extracting features and transform logits
    loss = loss_fn(logits, y)
    train_loss += loss.item()
    loss.backward()
    optim.step()

    # calculate accuracy
    preds = torch.argmax(logits, dim=1).flatten()
    correct_preds_n = (preds == y).cpu().sum().item()
    train_correct += correct_preds_n

  return train_loss, train_correct


# evaluation function for an epoch
def valid_epoch(model, train_dataloader):
  model.eval()
  val_loss, val_correct = 0.0, 0
  
  for step, batch in enumerate(tqdm(train_dataloader)):
    optim.zero_grad()
    x, y = batch
    x, y = x.to(device), y.to(device)
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    logits = model(x.float(), y) 
    loss = loss_fn(logits, y)
    val_loss += loss.item()
    loss.backward()
    optim.step()
    preds = torch.argmax(logits, dim=1).flatten()
    correct_preds_n = (preds == y).cpu().sum().item()
    val_correct += correct_preds_n
  
  return val_loss, val_correct

# **CV**



In [23]:
from tqdm import tqdm 


# define loss function and optimizer
loss_fn = torch.nn.CrossEntropyLoss()
optim = torch.optim.AdamW(
    model.parameters(),
    lr=config.lr,
    weight_decay=config.weight_decay
)

# cv on 2 folds
k=2
splits=KFold(n_splits=k, shuffle=True, random_state=2022)
foldperf={}
model.to(device)

print(type(dataset))
# dimensions of dataset
print(len(dataset))
print(dataset[0][1])

for fold, (train_idx,val_idx) in enumerate(splits.split(dataset)):
  print('Fold {}'.format(fold + 1))
  train_sampler = SubsetRandomSampler(train_idx)
  test_sampler = SubsetRandomSampler(val_idx)
  train_loader = DataLoader(dataset, batch_size=config.batch_size, sampler=train_sampler, num_workers=2, drop_last=True, pin_memory=True)
  test_loader = DataLoader(dataset, batch_size=config.batch_size, sampler=test_sampler, num_workers=2, drop_last=True, pin_memory=True)
  history = {'train_loss': [], 'test_loss': [],'train_acc':[],'test_acc':[]}

  for epoch in range(config.epochs):
    torch.cuda.empty_cache()
    print('---train:')    
    train_loss, train_correct = train_epoch(model, train_loader)
    print('---eval:')
    test_loss, test_correct = valid_epoch(model, test_loader)
    train_loss = train_loss / len(train_loader.sampler)
    train_acc = train_correct / len(train_loader.sampler) * 100
    test_loss = test_loss / len(test_loader.sampler)
    test_acc = test_correct / len(test_loader.sampler) * 100
    print('---status:')
    print("\tEpoch:{}/{} \n\tAverage Training Loss:{:.4f}, Average Test Loss:{:.4f}; \n\tAverage Training Acc {:.2f}%, Average Test Acc {:.2f}%\n".format(epoch + 1,
                                                                                                                                                          config.epochs,
                                                                                                                                                          train_loss,
                                                                                                                                                          test_loss,
                                                                                                                                                          train_acc,
                                                                                                                                                          test_acc))
    history['train_loss'].append(train_loss)
    history['test_loss'].append(test_loss)
    history['train_acc'].append(train_acc)
    history['test_acc'].append(test_acc)
  
  foldperf['fold{}'.format(fold+1)] = history

torch.save(coatnet,'coatnet_finetuned_cifar100.pt')

<class 'dataset.DatasetMF'>
60000
tensor(19)
Fold 1
---train:


  0%|          | 0/3750 [00:00<?, ?it/s]

<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32, 32, 3)
<class 'numpy.ndarray'> (32,

  0%|          | 0/3750 [00:16<?, ?it/s]


KeyboardInterrupt: 