<a href="https://colab.research.google.com/github/wojdzi1607/ColabNotebooks/blob/main/ZPO6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Zaawansowane przetwarzanie obrazu

## Ćwiczenie laboratoryjne 6 – sieci neuronowe w PyTorch

## Klasyfikacja

Utwórzmy prosty, sekwencyjny model sieci.

In [None]:
import torch

from torch import nn

model = nn.Sequential(
    nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(inplace=True),
    nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1),
    nn.ReLU(inplace=True),
    nn.MaxPool2d(kernel_size=2),

    nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
    nn.ReLU(inplace=True),
    nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),
    nn.ReLU(inplace=True),
    nn.MaxPool2d(kernel_size=2),

    nn.Flatten(),
    nn.Linear(in_features=4096, out_features=64),
    nn.ReLU(inplace=True),
    nn.Linear(in_features=64, out_features=32),
    nn.ReLU(inplace=True),
    nn.Linear(in_features=32, out_features=10)
    # Uwaga - nie używamy własnej funkcji aktywacji na koniec - wybierzemy loss function z wbudowaną
)

Podobnie jak w TensorFlow/Keras wybierzmy zbiór danych CIFAR10 oraz podstawowe augmentacje:

In [None]:
import torchvision

augmentations = torchvision.transforms.Compose([
    torchvision.transforms.RandomAffine(degrees=10, translate=(0, 0.1), scale=(0.95, 1.05)),
    torchvision.transforms.RandomHorizontalFlip(),
    torchvision.transforms.ToTensor()
])

dataset = torchvision.datasets.CIFAR10(root='CIFAR10', download=True, transform=augmentations)

In [None]:
print(dataset[0])

Podzielmy zbiór na treningowy i walidacyjny:

In [None]:
valid_length = int(0.15 * len(dataset))
train_length = len(dataset) - valid_length

train_dataset, valid_dataset = torch.utils.data.random_split(dataset, (train_length, valid_length))

Utwórzmy funkcję kosztu oraz optimizer:

In [None]:
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-3)

Utwórzmy loader danych:

In [None]:
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=128, shuffle=True, num_workers=2)
valid_loader = torch.utils.data.DataLoader(dataset=valid_dataset, batch_size=256, num_workers=2)

In [None]:
next(iter(train_loader))[0].shape

Rozpocznijmy trening:

In [None]:
for epoch in range(30):
  for i, (data_batch, labels_batch) in enumerate(train_loader):
    predictions = model(data_batch)
    loss = loss_function(predictions, labels_batch)

    # Before the backward pass, use the optimizer object to zero all of the
    # gradients for the variables it will update (which are the learnable
    # weights of the model). This is because by default, gradients are
    # accumulated in buffers( i.e, not overwritten) whenever .backward()
    # is called. Checkout docs of torch.autograd.backward for more details.
    optimizer.zero_grad()

    # Backward pass: compute gradient of the loss with respect to model
    # parameters
    loss.backward()

    # Calling the step function on an Optimizer makes an update to its
    # parameters
    optimizer.step()

    print(f'\rStep {i + 1}/{len(train_loader)}: loss={loss.item():.4f}', end='')
  print()

  # with torch.no_grad():
  #   losses = []
  #   for i, (data_batch, labels_batch) in enumerate(train_loader):
  #     data_batch = data_batch.to(device)
  #     labels_batch = labels_batch.to(device)
      
  #     predictions = model(data_batch)
  #     loss = loss_function(predictions, labels_batch)
  #     losses.append(loss.item())
    
  #   print(f'Epoch {epoch + 1}: val_loss={torch.mean(torch.tensor(losses))}') 

Spróbujmy przeprowadzić ten sam trening z wykorzystaniem GPU:

In [None]:
device = torch.device('cuda:0')

model = model.to(device)
for epoch in range(30):
  losses = []
  for i, (data_batch, labels_batch) in enumerate(train_loader):
    data_batch = data_batch.to(device)
    labels_batch = labels_batch.to(device)
    
    predictions = model(data_batch)
    loss = loss_function(predictions, labels_batch)

    # Before the backward pass, use the optimizer object to zero all of the
    # gradients for the variables it will update (which are the learnable
    # weights of the model). This is because by default, gradients are
    # accumulated in buffers( i.e, not overwritten) whenever .backward()
    # is called. Checkout docs of torch.autograd.backward for more details.
    optimizer.zero_grad()

    # Backward pass: compute gradient of the loss with respect to model
    # parameters
    loss.backward()

    # Calling the step function on an Optimizer makes an update to its
    # parameters
    optimizer.step()
    
    losses.append(loss.item())
    print(f'\rStep {i + 1}/{len(train_loader)}: loss={torch.mean(torch.tensor(losses)):.4f}', end='')
  print()

  with torch.no_grad():
    losses = []
    for i, (data_batch, labels_batch) in enumerate(valid_loader):
      data_batch = data_batch.to(device)
      labels_batch = labels_batch.to(device)
      
      predictions = model(data_batch)
      loss = loss_function(predictions, labels_batch)
      losses.append(loss.item())
    
    print(f'Epoch {epoch + 1}: val_loss={torch.mean(torch.tensor(losses)):.4f}')

## Segmentacja, własne wczytywanie danych, PyTorch Lightning

Pobierzmy i rozpakujmy zbiór danych:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!unzip -q "/content/drive/MyDrive/Colab_Files/oxford_pets_dogs_only_binarized_split.zip"

Napiszmy własną klasę `Dataset`:

In [None]:
import numpy as np

from typing import List, Tuple
from pathlib import Path
from PIL import Image


class SegmentationDataset(torch.utils.data.Dataset):
  def __init__(self, paths: List[Path], labels_dir: List[Path], augment: bool = False):
      self._paths = paths
      self._labels_dir = labels_dir

  def __len__(self) -> int:
      return len(self._paths)

  def __getitem__(self, index) -> Tuple[torch.Tensor, torch.Tensor]:
    path = self._paths[index]
    mask_path = self._labels_dir / path.name
    
    input_image = np.asarray(Image.open(str(path)).resize((256, 256))) / 255
    input_mask = np.asarray(Image.open(str(mask_path)).resize((256, 256), resample=Image.NEAREST))

    return torch.from_numpy(np.moveaxis(input_image.astype(np.float32), -1, 0)), torch.from_numpy(input_mask.squeeze().astype(np.float32))[None, :]

Utwórzmy model PyTorch Lightning:

In [None]:
!pip install -U pytorch-lightning segmentation-models-pytorch

In [None]:
import pytorch_lightning as pl
from segmentation_models_pytorch import Unet
from argparse import Namespace
from sklearn.model_selection import train_test_split


class Segmenter(pl.LightningModule):
    def __init__(self, hparams: Namespace):
        super().__init__()

        self.network = Unet(in_channels=3, encoder_name='efficientnet-b0')

        self.loss = nn.BCEWithLogitsLoss()
        
        paths = [path for path in Path('oxford_pets_dogs_only_binarized_split/train/images').iterdir()]
        train_paths, valid_paths = train_test_split(paths, test_size=0.15)

        self.train_dataset = SegmentationDataset(train_paths, Path('oxford_pets_dogs_only_binarized_split/train/labels'), augment=True)
        self.valid_dataset = SegmentationDataset(valid_paths, Path('oxford_pets_dogs_only_binarized_split/train/labels'))

        self.hparams = hparams

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.network(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_pred = self.forward(x)
        loss = self.loss(y_pred, y)
        self.log('train_loss', loss, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_pred = self.forward(x)
        self.log('val_loss', self.loss(y_pred, y), on_epoch=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
        reduce_lr_on_plateau = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5,
                                                                          patience=3, min_lr=1e-6, verbose=True)
        return {
            'optimizer': optimizer,
            'lr_scheduler': reduce_lr_on_plateau,
            'monitor': 'train_loss'
        }

    def train_dataloader(self):
        return torch.utils.data.DataLoader(
            self.train_dataset, batch_size=64, num_workers=2, pin_memory=True, drop_last=True, shuffle=True
        )

    def val_dataloader(self):
        return torch.utils.data.DataLoader(
            self.valid_dataset, batch_size=128, num_workers=2, pin_memory=True
        )

Rozpocznijmy trening:

In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint

hparams = Namespace(lr=1e-3)
model = Segmenter(hparams)

checkpoint_callback = ModelCheckpoint(filepath='{epoch}-{val_loss:.5f}', verbose=True)

trainer = pl.Trainer(callbacks=[checkpoint_callback],
                     gpus=-1, progress_bar_refresh_rate=1, max_epochs=50)
trainer.fit(model)