In [1]:
from google.colab import drive


drive.mount("/gdrive")

Mounted at /gdrive


In [2]:
from pathlib import Path


p = Path("/gdrive/My Drive/footvid")
p.is_dir()

True

In [3]:
import torch
import torch.nn as nn


print(
    f"Is cuda available? {torch.cuda.is_available()}."
)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
N_POS_TRAIN = 1174
N_NEG_TRAIN = 893

Is cuda available? True.


In [4]:
from pathlib import Path


REPOSITORY_PATH = Path("/gdrive/My Drive/footvid/")

In [5]:
from typing import Dict, List, Tuple

import numpy as np
from torchvision import transforms


REPOSITORY_NAME: str = "footvid"

MODELING_SIZE: Tuple[int, int] = (224, 398)

RGB_CHANNEL_STATS: Dict[str, List[float]] = {
    "mean": [0.485, 0.456, 0.406],
    "std": [0.229, 0.224, 0.225],
}


TRAIN_TRANSFORMS = transforms.Compose(
    [
        transforms.Resize(
            size=tuple(
                (np.asarray(MODELING_SIZE) * (1.0 / (0.85 * 0.8))).astype(int)
            )
        ),
        transforms.CenterCrop(
            size=tuple((np.asarray(MODELING_SIZE) * (1.0 / 0.85)).astype(int))
        ),
        transforms.RandomCrop(size=MODELING_SIZE),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.ToTensor(),
        transforms.Normalize(**RGB_CHANNEL_STATS),
    ]
)

TEST_TRANSFORMS = transforms.Compose(
    [
        transforms.Resize(
            size=tuple((np.asarray(MODELING_SIZE) * (1.0 / 0.8)).astype(int))
        ),
        transforms.CenterCrop(size=MODELING_SIZE),
        transforms.ToTensor(),
        transforms.Normalize(**RGB_CHANNEL_STATS),
    ]
)


In [6]:
import copy
from datetime import datetime
from pathlib import Path
from typing import Callable, Dict, Mapping, NamedTuple, Optional, Union

import numpy as np
import torch
import torch.nn
import torch.optim as optim
from sklearn.metrics import (
    accuracy_score,
    average_precision_score,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
)
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm


MetricNames = Mapping[Callable[[np.ndarray, np.ndarray], float], str]


METRIC_NAMES: MetricNames = {
    accuracy_score: "acc",
    average_precision_score: "ap",
    f1_score: "f1",
    precision_score: "precision",
    recall_score: "recall",
    roc_auc_score: "roc-auc",
}


class EpochResults(NamedTuple):
    outputs: np.ndarray
    targets: np.ndarray
    average_loss: float


class TrainTestDataloaders(NamedTuple):
    train: torch.utils.data.DataLoader
    test: torch.utils.data.DataLoader


class TrainTestEpochResults(NamedTuple):
    train: EpochResults
    test: EpochResults


def freeze_layers(
    model: nn.Module, last_layer: str, inplace: bool = True
) -> Optional[nn.Module]:
    if not isinstance(last_layer, str):
        raise TypeError("Incorrect `last_layer` type. String type expected.")

    if not inplace:
        model = copy.deepcopy(model)

    last_layer = last_layer.split(".")

    def _freeze(model: nn.Module, last_layer: List[str]) -> None:
        if not last_layer:
            return
        for name, child in model.named_children():
            if name == last_layer[0]:
                _freeze(child, last_layer[1:])
                break
            for param in child.parameters():
                param.requires_grad = False
    
    _freeze(model, last_layer)

    return model if not inplace else None


def train(
    model: nn.Module,
    dataloader: torch.utils.data.DataLoader,
    device: torch.device,
    optimizer: optim.Optimizer,
    objective: nn.modules.loss._Loss,
) -> EpochResults:
    if objective.reduction != "mean":
        return ValueError(
            "`objective` parameter accepts only losses with `reduction='mean'`"
        )

    model = model.to(device)
    model.train()

    outputs = []
    targets = []
    average_loss = 0.0
    for input_batch, target_batch in tqdm(dataloader):
        target_batch = target_batch.view(-1, 1).type_as(input_batch)
        targets.append(target_batch.cpu().detach().numpy())

        input_batch = input_batch.to(device)
        target_batch = target_batch.to(device)

        optimizer.zero_grad()

        output_batch = model(input_batch)
        outputs.append(output_batch.cpu().detach().numpy())

        loss = objective(output_batch, target_batch)
        average_loss += loss.item() * input_batch.shape[0]
        loss.backward()

        optimizer.step()

    average_loss /= len(dataloader)

    return EpochResults(
        outputs=np.concatenate(outputs, axis=0),
        targets=np.concatenate(targets, axis=0),
        average_loss=average_loss,
    )


def test(
    model: nn.Module,
    dataloader: torch.utils.data.DataLoader,
    device: torch.device,
    objective: nn.modules.loss._Loss,
) -> EpochResults:
    if objective.reduction != "mean":
        return ValueError(
            "`objective` parameter accepts only losses with `reduction='mean'`"
        )

    model = model.to(device)
    model.eval()

    outputs = []
    targets = []
    average_loss = 0.0
    with torch.no_grad():
        for input_batch, target_batch in tqdm(dataloader):
            target_batch = target_batch.view(-1, 1).type_as(input_batch)
            targets.append(target_batch.cpu().detach().numpy())

            input_batch = input_batch.to(device)
            target_batch = target_batch.to(device)

            output_batch = model(input_batch)
            outputs.append(output_batch.cpu().detach().numpy())

            average_loss += (
                objective(output_batch, target_batch).item()
                * input_batch.shape[0]
            )

    average_loss /= len(dataloader)

    return EpochResults(
        outputs=np.concatenate(outputs, axis=0),
        targets=np.concatenate(targets, axis=0),
        average_loss=average_loss,
    )


def run_experiment(
    model: nn.Module,
    dataloaders: TrainTestDataloaders,
    device: torch.device,
    optimizer: optim.Optimizer,
    objective: nn.modules.loss._Loss,
    epochs: int = 10,
    threshold: float = 0.5,
    scheduler: Optional[optim.lr_scheduler._LRScheduler] = None,
    artifacts_dir: Optional[Union[str, Path]] = None,
    writer: Optional[SummaryWriter] = None,
) -> None:
    for epoch in tqdm(range(epochs)):
        train_epoch_results = train(
            model=model,
            dataloader=dataloaders.train,
            device=device,
            optimizer=optimizer,
            objective=objective,
        )
        test_epoch_results = test(
            model=model,
            dataloader=dataloaders.test,
            device=device,
            objective=objective,
        )
        if scheduler is not None:
            scheduler.step()

        if artifacts_dir is not None:
            checkpoint = {
                "epoch": epoch,
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "scheduler": scheduler.state_dict(),
            }
            now = datetime.now()
            torch.save(
                checkpoint,
                Path(artifacts_dir).joinpath(
                    "checkpoint.{}.pth".format(
                        now.strftime("%d-%m-%Y.%H_%M_%S")
                    )
                ),
            )

        if writer is not None:
            train_test_epoch_results = TrainTestEpochResults(
                train=train_epoch_results, test=test_epoch_results
            )
            _write_summary(
                writer=writer,
                epoch=epoch,
                train_test_epoch_results=train_test_epoch_results,
                threshold=threshold,
                scheduler=scheduler,
            )
    if writer is not None:
        writer.close()


def _write_summary(
    writer: torch.utils.tensorboard.SummaryWriter,
    epoch: int,
    train_test_epoch_results: TrainTestEpochResults,
    threshold: float,
    scheduler: Optional[optim.lr_scheduler._LRScheduler] = None,
    metric_names: Optional[MetricNames] = None,
) -> None:
    if metric_names is None:
        metric_names = METRIC_NAMES

    names = ["train", "test"]
    outputs = [
        train_test_epoch_results.train.outputs.flatten(),
        train_test_epoch_results.test.outputs.flatten(),
    ]
    targets = [
        train_test_epoch_results.train.targets.flatten(),
        train_test_epoch_results.test.targets.flatten(),
    ]
    predictions = [
        np.where(
            train_test_epoch_results.train.outputs > threshold, 1.0, 0.0
        ).flatten(),
        np.where(
            train_test_epoch_results.test.outputs > threshold, 1.0, 0.0
        ).flatten(),
    ]

    writer.add_scalar(
        "avg-loss-train", train_test_epoch_results.train.average_loss, epoch
    )
    writer.add_scalar(
        "avg-loss-test", train_test_epoch_results.test.average_loss, epoch
    )

    for score in [average_precision_score, roc_auc_score]:
        for name, output, target in zip(names, outputs, targets):
            writer.add_scalar(
                METRIC_NAMES[score] + "-" + name, score(target, output), epoch
            )

    for score in [accuracy_score, f1_score, precision_score, recall_score]:
        for name, prediction, target in zip(names, predictions, targets):
            writer.add_scalar(
                METRIC_NAMES[score] + "-" + name,
                score(target, prediction),
                epoch,
            )

    if scheduler is not None:
        writer.add_scalar("learning-rate", scheduler.get_last_lr()[0], epoch)


In [7]:
from torchvision import datasets


train_images = datasets.ImageFolder(
    root=REPOSITORY_PATH.joinpath("data", "processed", "train"),
    transform=TRAIN_TRANSFORMS,
)

valid_images = datasets.ImageFolder(
    root=REPOSITORY_PATH.joinpath("data", "processed", "valid"),
    transform=TEST_TRANSFORMS,
)

In [8]:
print("Train: ", train_images.class_to_idx)
print("Valid: ", valid_images.class_to_idx)

Train:  {'neg': 0, 'pos': 1}
Valid:  {'neg': 0, 'pos': 1}


In [9]:
from torch.utils.data import DataLoader


train_dataloader = DataLoader(
    dataset=train_images,  batch_size=64, shuffle=True, num_workers=2
)
valid_dataloader = DataLoader(
    dataset=valid_images,  batch_size=64, shuffle=False, num_workers=2
)

In [10]:
from collections import OrderedDict

import torch
import torch.nn as nn
from torchvision import models


class ResNet(nn.Module):
    def __init__(self, output_size: int) -> None:
        super().__init__()
        resnet50 = models.resnet50(pretrained=True, progress=False)
        conv_layers = []
        for named_child in resnet50.named_children():
            conv_layers.append(named_child)
            if named_child[0] == "layer4":
                break

        self.resnet50_conv = nn.Sequential(OrderedDict(conv_layers))
        self.avgpool = resnet50.avgpool
        self.fc = nn.Linear(
            in_features=resnet50.fc.in_features,
            out_features=output_size,
            bias=True,
        )

        self._gradients = None

    @property
    def gradients(self) -> torch.Tensor:
        return self._gradients

    def set_gradients(self, gradients: torch.Tensor) -> None:
        self._gradients = gradients

    def get_activations(self, x: torch.Tensor) -> torch.Tensor:
        return self.resnet50_conv(x)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.resnet50_conv(x)
        if x.requires_grad:
            x.register_hook(self.set_gradients)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

In [11]:
model = ResNet(output_size=1)

model_fcl = freeze_layers(model, last_layer="fc", inplace=False)
model_fcl = model_fcl.to(DEVICE)

model_cnn = freeze_layers(model, last_layer="resnet50_conv.layer3", inplace=False)
model_cnn = model_cnn.to(DEVICE)

Downloading: "https://download.pytorch.org/models/resnet50-19c8e357.pth" to /root/.cache/torch/hub/checkpoints/resnet50-19c8e357.pth


## FCL

In [None]:
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

from footvid.arena import run_experiment, TrainTestDataloaders


artifacts_dir = REPOSITORY_PATH.joinpath("models", "fcl-resnet-fine-tuning")
artifacts_dir.mkdir(exist_ok=True)
logs_dir = REPOSITORY_PATH.joinpath("logs", "fcl-resnet-fine-tuning")
logs_dir.mkdir(exist_ok=True)
optimizer = optim.SGD(model_fcl.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[5, 10], gamma=0.1)
objective = nn.BCEWithLogitsLoss()
train_test_dataloaders = TrainTestDataloaders(train=train_dataloader, test=valid_dataloader)
writer = SummaryWriter(log_dir=logs_dir)

run_experiment(
    model=model_fcl,
    dataloaders=train_test_dataloaders,
    device=DEVICE,
    optimizer=optimizer,
    objective=objective,
    epochs=20,
    threshold=N_POS_TRAIN / (N_POS_TRAIN + N_NEG_TRAIN),
    scheduler=scheduler,
    artifacts_dir=artifacts_dir,
    writer=writer,
)

## CNN layers

In [None]:
artifacts_dir = REPOSITORY_PATH.joinpath("models", "cnn-top2-layers-fine-tuning")
artifacts_dir.mkdir(exist_ok=True)
logs_dir = REPOSITORY_PATH.joinpath("logs", "cnn-top2-layers-fine-tuning")
logs_dir.mkdir(exist_ok=True)
optimizer = optim.SGD(model_cnn.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[6, 11], gamma=0.1)
objective = nn.BCEWithLogitsLoss()
train_test_dataloaders = TrainTestDataloaders(train=train_dataloader, test=valid_dataloader)
writer = SummaryWriter(log_dir=logs_dir)

run_experiment(
    model=model_cnn,
    dataloaders=train_test_dataloaders,
    device=DEVICE,
    optimizer=optimizer,
    objective=objective,
    epochs=20,
    threshold=N_POS_TRAIN / (N_POS_TRAIN + N_NEG_TRAIN),
    scheduler=scheduler,
    artifacts_dir=artifacts_dir,
    writer=writer,
)


 39%|███▉      | 13/33 [00:54<01:25,  4.28s/it][A
 42%|████▏     | 14/33 [00:57<01:14,  3.94s/it][A
 45%|████▌     | 15/33 [01:03<01:18,  4.34s/it][A
 48%|████▊     | 16/33 [01:05<01:00,  3.56s/it][A

## Full

In [None]:
artifacts_dir = REPOSITORY_PATH.joinpath("models", "full-fine-tuning")
artifacts_dir.mkdir(exist_ok=True)
logs_dir = REPOSITORY_PATH.joinpath("logs", "full-fine-tuning")
logs_dir.mkdir(exist_ok=True)
optimizer = optim.SGD(model.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[6, 11], gamma=0.1)
objective = nn.BCEWithLogitsLoss()
train_test_dataloaders = TrainTestDataloaders(train=train_dataloader, test=valid_dataloader)
writer = SummaryWriter(log_dir=logs_dir)

run_experiment(
    model=model,
    dataloaders=train_test_dataloaders,
    device=DEVICE,
    optimizer=optimizer,
    objective=objective,
    epochs=20,
    threshold=N_POS_TRAIN / (N_POS_TRAIN + N_NEG_TRAIN),
    scheduler=scheduler,
    artifacts_dir=artifacts_dir,
    writer=writer,
)

  0%|          | 0/20 [00:00<?, ?it/s]
  0%|          | 0/33 [00:00<?, ?it/s][A
  3%|▎         | 1/33 [01:01<32:55, 61.72s/it][A
  6%|▌         | 2/33 [01:02<22:29, 43.53s/it][A
  9%|▉         | 3/33 [02:01<24:06, 48.22s/it][A
 12%|█▏        | 4/33 [02:03<16:28, 34.09s/it][A
 15%|█▌        | 5/33 [03:02<19:26, 41.66s/it][A
 18%|█▊        | 6/33 [03:03<13:16, 29.50s/it][A
 21%|██        | 7/33 [04:00<16:21, 37.76s/it][A
 24%|██▍       | 8/33 [04:01<11:09, 26.78s/it][A
 27%|██▋       | 9/33 [04:58<14:18, 35.78s/it][A
 30%|███       | 10/33 [04:59<09:44, 25.39s/it][A
 33%|███▎      | 11/33 [05:58<12:56, 35.28s/it][A
 36%|███▋      | 12/33 [05:59<08:46, 25.05s/it][A
 39%|███▉      | 13/33 [06:56<11:36, 34.81s/it][A
 42%|████▏     | 14/33 [06:57<07:49, 24.72s/it][A
 45%|████▌     | 15/33 [07:57<10:35, 35.31s/it][A
 48%|████▊     | 16/33 [07:59<07:06, 25.07s/it][A
 52%|█████▏    | 17/33 [08:56<09:17, 34.82s/it][A
 55%|█████▍    | 18/33 [08:57<06:10, 24.73s/it][A
 58%|█████