## Env

In [1]:
from google.colab import drive
drive.mount("/content/drive/")

Mounted at /content/drive/


In [2]:
!unzip -qq "/content/drive/MyDrive/MAGISTERKA/datasets/fog-combined.zip" -d "/content/datasets/"

In [3]:
!pip install lightning

Collecting lightning
  Downloading lightning-2.5.1.post0-py3-none-any.whl.metadata (39 kB)
Collecting lightning-utilities<2.0,>=0.10.0 (from lightning)
  Downloading lightning_utilities-0.14.3-py3-none-any.whl.metadata (5.6 kB)
Collecting torchmetrics<3.0,>=0.7.0 (from lightning)
  Downloading torchmetrics-1.7.2-py3-none-any.whl.metadata (21 kB)
Collecting pytorch-lightning (from lightning)
  Downloading pytorch_lightning-2.5.1.post0-py3-none-any.whl.metadata (20 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch<4.0,>=2.1.0->lightning)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch<4.0,>=2.1.0->lightning)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch<4.0,>=2.1.0->lightning)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata

## Setup

In [4]:
from pathlib import Path
from time import time

import torch
from torchvision.models import get_weight
from torchvision.transforms import v2

import numpy as np
import pandas as pd
from sklearn.svm import SVC, LinearSVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.base import ClassifierMixin
from typing import Any, TypeVar
from collections import namedtuple
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix

In [5]:
_T = TypeVar('_T', bound=ClassifierMixin)

def evaluate_model(model: _T, X: Any, y: Any):
    y_pred = model.predict(X)
    accuracy = accuracy_score(y, y_pred)
    f1 = f1_score(y, y_pred, average='macro')
    precision = precision_score(y, y_pred, average='macro')
    recall = recall_score(y, y_pred, average='macro')
    confusion = confusion_matrix(y, y_pred)
    return namedtuple('Evaluation', ['accuracy', 'f1', 'precision', 'recall', 'confusion'])(accuracy, f1, precision, recall, confusion)

def train_model(
        model_cls: _T,
        model_kwargs: dict[str, Any],
        train_df: pd.DataFrame,
        val_df: pd.DataFrame
    ) -> _T:
    model = model_cls(**model_kwargs)
    model.fit(np.stack(train_df['features'].values), train_df['class'])
    train_metrics = evaluate_model(model, np.stack(train_df['features'].values), train_df['class'])
    val_metrics = evaluate_model(model, np.stack(val_df['features'].values), val_df['class'])
    print(f"Model - {model_cls.__name__}")
    print("\tTrain metrics:")
    print(f"\t\tAccuracy: {train_metrics.accuracy:.4f}")
    print(f"\t\tF1: {train_metrics.f1:.4f}")
    print(f"\t\tPrecision: {train_metrics.precision:.4f}")
    print(f"\t\tRecall: {train_metrics.recall:.4f}")
    print("\tValidation metrics:")
    print(f"\t\tAccuracy: {val_metrics.accuracy:.4f}")
    print(f"\t\tF1: {val_metrics.f1:.4f}")
    print(f"\t\tPrecision: {val_metrics.precision:.4f}")
    print(f"\t\tRecall: {val_metrics.recall:.4f}")
    return model

In [6]:
base_transform = v2.Compose([
    v2.Resize((256, 256), v2.InterpolationMode.BILINEAR),
    v2.CenterCrop((224, 224)),
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
])
transforms = {
    "grayscale": v2.Compose([
        base_transform,
        v2.Grayscale(num_output_channels=3)
    ]),
    "color": v2.Compose([
        base_transform,
    ]),
}

In [7]:
BASE_PATH = Path("/content")
DRIVE_PATH = Path("/content/drive/MyDrive/MAGISTERKA")

In [8]:
import sys
sys.path.insert(0, str(DRIVE_PATH / 'src'))
from cnn_model import CNNClassifier, get_dataloader

In [9]:
DATASET_PATHS = {
    'fog-detection': BASE_PATH / 'datasets/fog-detection-dataset-prepared',
    'fog-or-smog': BASE_PATH / 'datasets/fog-or-smog-detection-dataset-prepared',
    'foggy-cityscapes': BASE_PATH / 'datasets/foggy-cityscapes-image-dataset-prepared',
    'combined': BASE_PATH / 'datasets/fog-combined',
}

In [10]:
DATASET_NORMALIZATION = {
    'fog-detection': {'mean': [0.4850, 0.5044, 0.4878], 'std': [0.2631, 0.2524, 0.2793]},
    'fog-or-smog': {'mean': [0.5411, 0.5339, 0.5088], 'std': [0.2353, 0.2157, 0.2289]},
    'foggy-cityscapes': {'mean': [0.4169, 0.4507, 0.4173], 'std': [0.1860, 0.1835, 0.1820]},
    'combined': {'mean': [0.5017, 0.5087, 0.4826], 'std': [0.2259, 0.2118, 0.2227]}
}

# LSTM IDEA

## Imports

In [15]:
import lightning as L
import torch
import torch.nn as nn
import torch.optim as optim

from typing import Literal
import torchmetrics as tm

## Setup

In [16]:
class LSTMClassifier(L.LightningModule):
    def __init__(
        self,
        image_width: int,
        image_height: int,
        image_channels: int,
        hidden_size: int,
        num_lstm_layers: int,
        num_classes: int,
        dropout: float = 0.0,
        loss: nn.Module | None = None,
        learning_rate: float= 1e-4,
        optimizer_name: Literal['adam', 'adamw'] = 'adam',
    ):
        super().__init__()
        self.save_hyperparameters()

        self.learning_rate = learning_rate
        self.optimizer_name = optimizer_name

        self.image_height = image_height
        self.image_width = image_width
        self.image_channels = image_channels
        self.hiddes_size = hidden_size
        self.num_lst_layers = num_lstm_layers
        self.lstm_input_size = self.image_channels * self.image_width
        self.num_classes = num_classes
        self.dropout = dropout
        self.model = nn.LSTM(
            input_size=self.lstm_input_size,
            hidden_size=self.hiddes_size,
            num_layers=self.num_lst_layers,
            batch_first=True,
            dropout=self.dropout
        )
        self.fc = nn.Linear(self.hiddes_size, self.num_classes)

        if loss is not None:
            self.criterion= loss
        else:
            self.criterion = nn.CrossEntropyLoss()

        task = "multiclass" if num_classes > 2 else "binary"

        self.train_metrics = tm.MetricCollection({
            "accuracy": tm.classification.Accuracy(task=task, num_classes=num_classes),
            "f1": tm.classification.F1Score(task=task, num_classes=num_classes),
            "precision": tm.classification.Precision(task=task, num_classes=num_classes),
            "recall": tm.classification.Recall(task=task, num_classes=num_classes),
        }, prefix="train_")
        self.validation_metrics = self.train_metrics.clone(prefix="val_")
        self.test_metrics = self.train_metrics.clone(prefix="test_")

    def forward(self, x):
        batch_size = x.size(0)

        x = x.permute(0, 2, 1, 3)
        x = x.reshape(batch_size, self.image_height, self.lstm_input_size)

        lstm_out, (hn, cn) = self.model(x)
        last_time_step_out= lstm_out[:, -1, :]

        return self.fc(last_time_step_out)

    def _common_step(self, batch, batch_idx):
        images, labels = batch
        logits = self(images)
        loss = self.criterion(logits, labels)
        preds = torch.argmax(logits, dim=1)
        return loss, preds, labels

    def training_step(self, batch, batch_idx, dataloader_idx=0):
        loss, preds, labels = self._common_step(batch, batch_idx)

        self.log("train_loss", loss)
        self.log_dict(self.train_metrics(preds, labels))

        return loss

    def on_train_epoch_end(self):
        self.train_metrics.reset()

    def validation_step(self, batch, batch_idx, dataloader_idx=0):
        loss, preds, labels = self._common_step(batch, batch_idx)
        self.validation_metrics.update(preds, labels)
        self.log("val_loss", loss)
        return loss

    def on_validation_epoch_end(self):
        self.log_dict(self.validation_metrics.compute())
        self.validation_metrics.reset()


    def test_step(self, batch, batch_idx, dataloader_idx=0):
        loss, preds, labels = self._common_step(batch, batch_idx)
        self.test_metrics.update(preds, labels)
        return loss

    def on_test_epoch_end(self):
        self.log_dict(self.test_metrics.compute())
        self.test_metrics.reset()

    def configure_optimizers(self):
        if self.optimizer_name == 'adam':
            optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        elif self.optimizer_name == 'adamw':
            optimizer = optim.AdamW(self.parameters(), lr=self.learning_rate)
        else:
            raise ValueError(f"Unsupported optimizer: {self.optimizer_name}.")
        return optimizer

In [17]:
def _get_formatted_metric(values: list[float]):
  avg = sum(values) / len(values)
  std = (sum([(v - avg) ** 2 for v in values]) / len(values)) ** 0.5
  return f"{avg:.4f} ± {std:.4f}"


def run_lstm_model(
    hidden_size: int,
    num_lstm_layers: int,
    dropout: float = 0.0,
    repeat: int = 5,
    transform: str = "color",
    normalize: bool = False,
    lr: float = 1e-4,
    version: int = 1,
    reverse_img: bool = False
):
  run_name=f"LSTM_{num_lstm_layers}_{hidden_size}_{dropout}-{transform}-{normalize}"
  save_dir = Path(f"./runs/lstm/{run_name}")

  final_res = {}
  training_times = []
  _transform = transforms[transform]
  if normalize:
    _transform = v2.Compose([
        _transform,
        v2.Normalize(**DATASET_NORMALIZATION['combined'])
    ])
  if reverse_img:
    _transform = v2.Compose([
        _transform,
        v2.Lambda(lambda x: x.flip(2))
    ])

  train_dataloader = get_dataloader(DATASET_PATHS['combined'] / 'train', _transform)
  val_dataloader = get_dataloader(DATASET_PATHS['combined'] / 'val', _transform)
  test_dataloader = {
      dataset_name: get_dataloader(path / 'test', _transform)
      for dataset_name, path in DATASET_PATHS.items()
    }

  for i in range(repeat):
    trainer = L.Trainer(
      max_epochs=50,
      logger=L.pytorch.loggers.TensorBoardLogger(
          save_dir=save_dir,
          name=run_name,
          version=f"{version}000{i+1:02}",
      ),
      callbacks=[
          L.pytorch.callbacks.early_stopping.EarlyStopping(
              monitor="val_loss", mode="min", patience=5, verbose=False
          ),
          L.pytorch.callbacks.ModelCheckpoint(
              monitor="val_f1",
              mode="max",
              dirpath=save_dir / run_name / f"version_{version}",
              filename=run_name,
          ),
      ],
      log_every_n_steps=1,
    )
    model = LSTMClassifier(
      image_width=224,
      image_height=224,
      image_channels=3,
      hidden_size=hidden_size,
      num_lstm_layers=num_lstm_layers,
      num_classes=2,
      dropout=dropout,
      learning_rate=lr,
      optimizer_name='adamw',
    )
    _start = time()
    trainer.fit(
        model=model,
        train_dataloaders=train_dataloader,
        val_dataloaders=val_dataloader,
    )
    _end = time()
    training_times.append(_end - _start)

    res = {
        dataset_name: trainer.test(model, dataloader)[0]
        for dataset_name, dataloader in test_dataloader.items()
    }
    for dataset_name, metrics in res.items():
      if dataset_name not in final_res:
        final_res[dataset_name] = {}
      for metric_name, metric_value in metrics.items():
        if metric_name not in final_res[dataset_name]:
          final_res[dataset_name][metric_name] = []
        final_res[dataset_name][metric_name].append(metric_value)

  latex_table = [
      [
          dataset,
          *[_get_formatted_metric(m) for m in metrics.values()]
      ] for dataset, metrics in final_res.items()
  ]
  latex_table_str = ""
  for line in latex_table:
    latex_table_str += " & ".join([str(l) for l in line]) + " \\\\\n"
  print(f"Training times: {training_times}")
  print(_get_formatted_metric(training_times))
  return latex_table_str


## LSTM

In [None]:
run_lstm_model(
    hidden_size=128,
    num_lstm_layers=2,
    dropout=0.05,
    repeat=5,
    normalize=True,
    reverse_img=True
)

INFO: GPU available: False, used: False
INFO:lightning.pytorch.utilities.rank_zero:GPU available: False, used: False
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: 
  | Name               | Type             | Params | Mode 
----------------------------------------------------------------
0 | model              | LSTM             | 542 K  | train
1 | fc                 | Linear           | 258    | train
2 | criterion          | CrossEntropyLoss | 0      | train
3 | train_metrics      | MetricCollection | 0      | train
4 | validation_metrics | MetricCollection | 0      | train
5 | test_metrics       | MetricCollection | 0      | train
----------------------------------------------------------------
542 K     Trainable params
0         Non-trainable params
542 K     Total params


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]