In [None]:
!pip install signate

from googleapiclient.discovery import build
import io, os
from googleapiclient.http import MediaIoBaseDownload
from google.colab import auth


auth.authenticate_user()

drive_service = build('drive', 'v3')
results = drive_service.files().list(
        q="name = 'signate.json'", fields="files(id)").execute()
signate_api_key = results.get('files', [])

filename = "/root/.signate/signate.json"
os.makedirs(os.path.dirname(filename), exist_ok=True)

request = drive_service.files().get_media(fileId=signate_api_key[0]['id'])
fh = io.FileIO(filename, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
    status, done = downloader.next_chunk()
    print("Download %d%%." % int(status.progress() * 100))
os.chmod(filename, 600)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting signate
  Downloading signate-0.9.9-py3-none-any.whl (37 kB)
Collecting six>=1.16
  Downloading six-1.16.0-py2.py3-none-any.whl (11 kB)
Collecting urllib3>=1.26.7
  Downloading urllib3-1.26.14-py2.py3-none-any.whl (140 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m140.6/140.6 KB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9674 sha256=9010e73185df6366ed3d9714d37ed87473e2cb751928c2e17c86d3af6581ad13
  Stored in directory: /root/.cache/pip/wheels/bd/a8/c3/3cf2c14a1837a4e04bd98631724e81f33f462d86a1d895fae0
Successfully built wget
Installing collected packages: wget, urllib3, six, signate
  Atte

In [None]:
!signate download --competition-id=406

sample_submission.csv

train.csv

test_data.zip

train_data.zip

[32m
Download completed.[0m


In [None]:
!unzip -q train_data.zip

In [None]:
!pip install -U -q catalyst timm &> /dev/null
!pip uninstall accelerate 
!pip install accelerate==0.15.0 &> /dev/null

In [None]:
import pandas as pd

train = pd.read_csv('train.csv')

# image name list
image_name_list = train['id'].values

# label list
label_list = train['target'].values

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(image_name_list, label_list, test_size=0.25, stratify=label_list, random_state=42)

Transforms

In [None]:
from torchvision import transforms


class Transforms():

    def __init__(self):
        
        self.data_transform = {
            'train': transforms.Compose([
                transforms.Resize(224),
                transforms.RandomHorizontalFlip(),
                transforms.RandomVerticalFlip(),
                transforms.RandomRotation(degrees=20),
                transforms.RandomAffine(
                    degrees=[-90, 90],
                    translate=[0, 0],
                    scale=[1.0, 1.0],
                    shear=[-0.2, 0.2],
                    ),
                transforms.ColorJitter(
                    brightness=0.1,
                    contrast=0.1,
                    saturation=0.1,
                    hue=0
                    ),
                transforms.ToTensor(),
                transforms.Normalize(
                    [0.485, 0.456, 0.406],
                    [0.229, 0.224, 0.225]
                    ),
                ]),
            'val': transforms.Compose([
                transforms.Resize(224),
                transforms.ToTensor(),
                transforms.Normalize(
                    [0.485, 0.456, 0.406],
                    [0.229, 0.224, 0.225]
                    ),
                ]),
        }
    
    def __call__(self, phase, img):
        return self.data_transform[phase](img)

Dataset

In [None]:
import os
from PIL import Image
from torch.utils.data import Dataset


class ImageDataset(Dataset):
    def __init__(self, image_name_list, label_list, img_dir, transform=None, phase=None):
        self.image_name_list = image_name_list
        self.label_list = label_list
        self.img_dir = img_dir
        self.phase = phase
        self.transform = transform

    def __len__(self):
        return len(self.image_name_list)

    def __getitem__(self, index):
        image_path = os.path.join(self.img_dir, self.image_name_list[index])
        image = Image.open(image_path)
        image = self.transform(self.phase, image)
        label = self.label_list[index]
        
        return image, label

Dataloaders

In [None]:
import torch
from torch.utils.data import DataLoader


def get_loaders(x_train, y_train, x_val, y_val, img_dir) -> tuple:
    train_dataset = ImageDataset(
        image_name_list=x_train,
        label_list=y_train,
        img_dir=img_dir,
        transform=Transforms(),
        phase='train',
    )

    train_dataloader = DataLoader(
        train_dataset,
        batch_size=4,
        shuffle=True,
        num_workers=0,
        pin_memory=True,
    )

    val_dataset = ImageDataset(
        image_name_list=x_val,
        label_list=y_val,
        img_dir=img_dir,
        transform=Transforms(),
        phase='val',
    )

    val_dataloader = DataLoader(
        val_dataset,
        batch_size=1,
        shuffle=False,
        num_workers=0,
        pin_memory=True,
    )

    return {"train": train_dataloader, "valid": val_dataloader}

In [None]:
loaders = get_loaders(
    x_train=x_train, 
    y_train=y_train,
    x_val=x_val, 
    y_val=y_val, 
    img_dir='/content/train_data'
)

layer

In [None]:
from catalyst.contrib.layers import (
    AdaCos,
    AMSoftmax,
    ArcFace,
    ArcMarginProduct,
    CosFace,
    CurricularFace,
    SubCenterArcFace,
)


def get_layers(
    layer_name: str,
    embedding_size: int,
    num_classes: int,
    ):
    if layer_name == 'AdaCos':
        return AdaCos(
            in_features=embedding_size, 
            out_features=num_classes
        )
    elif layer_name == 'AMSoftmax':
        return AMSoftmax(
            in_features=embedding_size, 
            out_features=num_classes
        )
    elif layer_name == 'ArcFace':
        return ArcFace(
            in_features=embedding_size, 
            out_features=num_classes
        )
    elif layer_name == 'ArcMarginProduct':
        return ArcMarginProduct(
            in_features=embedding_size, 
            out_features=num_classes
        )
    elif layer_name == 'CosFace':
        return CosFace(
            in_features=embedding_size, 
            out_features=num_classes
        )
    elif layer_name == 'CurricularFace':
        return CurricularFace(
            in_features=embedding_size, 
            out_features=num_classes
        )
    elif layer_name == 'SubCenterArcFace':
        return SubCenterArcFace(
            in_features=embedding_size, 
            out_features=num_classes
        )
    else:
        raise ValueError(f'Unknown optimizer: {layer_name}')

In [None]:
import timm
import torch
import torch.nn as nn


class EncoderWithHead(nn.Module):
    def __init__(
        self,
        model_name: str, 
        layer_name: str, 
        embedding_size: int,
        num_classes: int,
        pretrained: bool
        ):
        super().__init__()
        self.model_name = model_name
        self.layer_name = layer_name
        self.embedding_size = embedding_size
        self.num_classes = num_classes
        self.pretrained = pretrained

        self.encoder = timm.create_model(
            self.model_name,
            pretrained=self.pretrained,
            num_classes=self.embedding_size,
        )

        self.head = get_layers(
            layer_name = self.layer_name,
            embedding_size=self.embedding_size,
            num_classes=self.num_classes,
        )

    def forward(self, images, targets=None) -> torch.tensor:
        features = self.encoder(images)
        if targets is None:
            return features
        outputs = self.head(features, targets)
        return outputs

In [None]:
model = EncoderWithHead(
    model_name='convnext_base',
    layer_name='ArcFace',
    embedding_size=128,
    num_classes=2,
    pretrained=True,
)
model

Downloading: "https://dl.fbaipublicfiles.com/convnext/convnext_base_1k_224_ema.pth" to /root/.cache/torch/hub/checkpoints/convnext_base_1k_224_ema.pth


EncoderWithHead(
  (encoder): ConvNeXt(
    (stem): Sequential(
      (0): Conv2d(3, 128, kernel_size=(4, 4), stride=(4, 4))
      (1): LayerNorm2d((128,), eps=1e-06, elementwise_affine=True)
    )
    (stages): Sequential(
      (0): ConvNeXtStage(
        (downsample): Identity()
        (blocks): Sequential(
          (0): ConvNeXtBlock(
            (conv_dw): Conv2d(128, 128, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3), groups=128)
            (norm): LayerNorm((128,), eps=1e-06, elementwise_affine=True)
            (mlp): Mlp(
              (fc1): Linear(in_features=128, out_features=512, bias=True)
              (act): GELU()
              (drop1): Dropout(p=0.0, inplace=False)
              (fc2): Linear(in_features=512, out_features=128, bias=True)
              (drop2): Dropout(p=0.0, inplace=False)
            )
            (drop_path): Identity()
          )
          (1): ConvNeXtBlock(
            (conv_dw): Conv2d(128, 128, kernel_size=(7, 7), stride=(1, 1), padding

loss

In [None]:
loss_fn = nn.CrossEntropyLoss()

optimizer

In [None]:
import torch
import torch.optim as optim


optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

Runner

In [None]:
import torch.nn.functional as F
from catalyst import dl, metrics


class CustomRunner(dl.Runner):
    def predict_batch(self, batch):
        # test step
        features, _ = batch
        return self.model.encoder(features.to(self.engine.device))


    def on_loader_start(self, runner):
        super().on_loader_start(runner)
        self.meters = {
            key: metrics.AdditiveMetric(compute_on_call=False)
            for key in ["loss", "accuracy", "recall", "precision", "f1_score", "fbeta_score"]
        }


    def handle_batch(self, batch):
        # model train/valid step
        # unpack the batch
        features, targets = batch
        # run model forward pass
        logits = self.model(features, targets)
        # compute the loss
        loss = self.criterion(logits, targets)
        # compute other metrics of interest
        accuracy = metrics.accuracy(logits, targets)
        recall = metrics.recall(logits, targets)
        precision = metrics.precision(logits, targets)
        f1_score = metrics.f1_score(logits, targets)
        fbeta_score = metrics.fbeta_score(logits, targets, beta=0.5)
        # log metrics
        self.batch_metrics.update(
            {"loss": loss, "accuracy": accuracy[0], "recall": recall.mean(), "precision": precision.mean(), "f1_score": f1_score.mean(), "fbeta_score": fbeta_score.mean()}
        )
        for key in ["loss", "accuracy", "recall", "precision", "f1_score", "fbeta_score"]:
            self.meters[key].update(
                self.batch_metrics[key].item(), self.batch_size
            )
        # run model backward pass
        if self.is_train_loader:
            self.engine.backward(loss)
            self.optimizer.step()
            self.optimizer.zero_grad()


    def on_loader_end(self, runner):
        for key in ["loss", "accuracy", "recall", "precision", "f1_score", "fbeta_score"]:
            self.loader_metrics[key] = self.meters[key].compute()[0]
        super().on_loader_end(runner)
    

    def test_step(self, loader):
        
        return 

In [None]:
runner = CustomRunner()

In [None]:
runner.train(
    model=model,
    criterion=loss_fn,
    optimizer=optimizer,
    loaders=loaders,
    seed=0,
    logdir="./logs",
    num_epochs=50,
    callbacks=[
        dl.EarlyStoppingCallback(patience=10, loader_key="valid", metric_key="loss", minimize=True),
        dl.CheckpointCallback("./logs", loader_key="valid", metric_key="loss", minimize=True, topk=3),
    ],
    verbose=True,
    load_best_on_end=True,
)

TypeError: ignored

test step

In [None]:
for embeddings in runner.predict_loader(loader=loaders["valid"]):
    print(embeddings.shape)

Inference

In [None]:
class InferenceImageDataset(Dataset):
    def __init__(self, image_name_list, img_dir, transform=None, phase=None):
        self.image_name_list = image_name_list
        self.img_dir = img_dir
        self.phase = phase
        self.transform = transform

    def __len__(self):
        return len(self.image_name_list)

    def __getitem__(self, index):
        image_path = os.path.join(self.img_dir, self.image_name_list[index])
        image = Image.open(image_path)
        image = self.transform(self.phase, image)
        
        return image

In [None]:
def get_test_loader(x_test, img_dir):
    test_dataset = InferenceImageDataset(
        image_name_list=x_test,
        img_dir = img_dir,
        transform=Transforms(),
        phase='val',
    )

    test_dataloader = DataLoader(
        test_dataset,
        batch_size=1,
        shuffle=False,
        num_workers=0,
        pin_memory=True,
    )

    return test_dataloader