## Sztuczne sieci neuronowe - laboratorium 10

In [18]:
import torch
import time
import torch.nn as nn
import torchvision
from torchvision import models
from torchvision.datasets import ImageFolder
from torchvision.transforms import v2

In [19]:
# sprawdzenie, czy GPU jest widoczne
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)

cuda:0


## Transfer learning

Dzisiejsze zajęcia będą dotyczyły zagadnienia **transfer learning** - trenowania modeli polegającego na wykorzystaniu architektury i zestawu wag wytrenowanych wcześniej (np. przez kogoś innego - najczęściej badaczy z największych firm, na dużym zbiorze danych), celem wykorzystania "wiedzy" zgromadzonej w już wytrenowanym modelu i przeniesienia jej (stąd "transfer") do innego, zwykle węższego problemu (np. poprzez dotrenowanie na znacznie mniejszym zbiorze danych).

Fazy te nazywają się odpowiednio **pre-training** (tzw. modele pretrenowane, *pretrained models*) i **fine-tuning**.

Wiele z takich gotowych (pretrenowanych) modeli dostępnych jest w pakiecie `torchvision` - części PyTorcha związanej z przetwarzaniem obrazów.

#### Ćwiczenie
Uruchom poniższą komórkę, aby wypisać dostępne w `torchvision` modele. Nazwy modeli zaczynające się dużą literą oznaczają klasy implementujące poszczególne architektury sieci. Ich odpowiedniki pisane małymi literami to funkcje pozwalające zainicjalizować model (https://pytorch.org/vision/stable/models.html).

Funkcje te mają argument `pretrained` - gdy podamy wartość `True`, inicjalizujemy model pretrenowanymi wagami (dla `False` - losowymi).

Wczytaj po kolei wybrane modele (np. `resnet18`) do zmiennej. Sprawdź jej zawartość.

In [20]:
dir(models)

['AlexNet',
 'AlexNet_Weights',
 'ConvNeXt',
 'ConvNeXt_Base_Weights',
 'ConvNeXt_Large_Weights',
 'ConvNeXt_Small_Weights',
 'ConvNeXt_Tiny_Weights',
 'DenseNet',
 'DenseNet121_Weights',
 'DenseNet161_Weights',
 'DenseNet169_Weights',
 'DenseNet201_Weights',
 'EfficientNet',
 'EfficientNet_B0_Weights',
 'EfficientNet_B1_Weights',
 'EfficientNet_B2_Weights',
 'EfficientNet_B3_Weights',
 'EfficientNet_B4_Weights',
 'EfficientNet_B5_Weights',
 'EfficientNet_B6_Weights',
 'EfficientNet_B7_Weights',
 'EfficientNet_V2_L_Weights',
 'EfficientNet_V2_M_Weights',
 'EfficientNet_V2_S_Weights',
 'GoogLeNet',
 'GoogLeNetOutputs',
 'GoogLeNet_Weights',
 'Inception3',
 'InceptionOutputs',
 'Inception_V3_Weights',
 'MNASNet',
 'MNASNet0_5_Weights',
 'MNASNet0_75_Weights',
 'MNASNet1_0_Weights',
 'MNASNet1_3_Weights',
 'MaxVit',
 'MaxVit_T_Weights',
 'MobileNetV2',
 'MobileNetV3',
 'MobileNet_V2_Weights',
 'MobileNet_V3_Large_Weights',
 'MobileNet_V3_Small_Weights',
 'RegNet',
 'RegNet_X_16GF_Weights'

In [21]:
model = models.resnet18()
print(model)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

### Klasyfikacja binarna - przygotowanie danych

Będziemy trenować model do klasyfikacji binarnej zdjęć pszczół i mrówek z wykorzystaniem transfer learningu.

Zbiór ten jest dostępny do pobrania tutaj: https://download.pytorch.org/tutorial/hymenoptera_data.zip  
(*hymenoptera* - *błonoskrzydłe* https://pl.wikipedia.org/wiki/B%C5%82onkoskrzyd%C5%82e)

Zbiór ten należy rozpakować do katalogu `common/data` (w razie gdyby go jeszcze tam nie było).

In [22]:
import pathlib

DATA_PATH = pathlib.Path("data/hymenoptera_data")

#### Ćwiczenie

Wczytaj zbiór zdjęć (dwa podbiory - train i val) wykorzystując klasę **ImageFolder** dataset dostępną w PyTorch (https://pytorch.org/vision/stable/datasets.html)

Sprawdź rozmiar obu podzbiorów.  
Sprawdź wymiary kilku wybranych zdjęć w zbiorze (używając możliwości klasy `Dataset`, nie przeglądając obrazki w katalogu).

Sprawdź zawartość atrybutów klasy `ImageFolder` (https://pytorch.org/vision/stable/_modules/torchvision/datasets/folder.html#ImageFolder)

In [23]:
transform = v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)])
train_path = DATA_PATH / "train"
val_path = DATA_PATH / "val"
train_dataset = ImageFolder(train_path, transform=transform)
val_dataset = ImageFolder(val_path, transform=transform)

In [24]:
print(len(train_dataset))
print(len(val_dataset))
train_image, train_label = train_dataset[0]
print(train_image.size())
val_image, val_label = val_dataset[0]
print(val_image.size())

244
153
torch.Size([3, 512, 768])
torch.Size([3, 375, 500])


In [25]:
dir(ImageFolder)

['__add__',
 '__annotations__',
 '__class__',
 '__class_getitem__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__len__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__orig_bases__',
 '__parameters__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_format_transform_repr',
 '_repr_indent',
 'extra_repr',
 'find_classes',
 'make_dataset']

#### Ćwiczenie

Domyślnie `ImageFolder` dataset przechowuje obrazki jako `PIL.Image`. Należy je przekształcić do tensorów, aby ich użyć w treningu modeli.

Odszukaj odpowiednią funkcję z `torchvision.transforms` (https://pytorch.org/vision/stable/transforms.html) i ponownie wczytaj zbiory danych z jej wykorzystaniem.

In [26]:
transform = v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)])
train_path = DATA_PATH / "train"
val_path = DATA_PATH / "val"
train_dataset = ImageFolder(train_path, transform=transform)
val_dataset = ImageFolder(val_path, transform=transform)

#### Ćwiczenie
W kolejnym kroku należy znormalizować wejściowe obrazki. Ponownie odszukaj odpowiednią transformację w `torchvision.transforms` i zbuduj listy transformacji `train_transforms` i `valid_transforms` z użyciem `transforms.Compose`. Wykorzystaj odpowiednie informacje z poniższej komórki.

In [27]:
# średnie i odchylenia standardowe dla kanałów RGB dla zbioru uczącego ImageNet
# ciekawostka: https://github.com/pytorch/vision/issues/1439
IMAGENET_MEANS = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

In [28]:
train_transforms = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=IMAGENET_MEANS, std=IMAGENET_STD)
])

val_transforms = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=IMAGENET_MEANS, std=IMAGENET_STD)
])

#### Ćwiczenie

Należy także doprowadzić oryginalne obrazki do odpowiednich wymiarów (224 na 224 piksele).

W przypadku sieci (pre)trenowanych na danych ImageNet przyjęło się robić to dwukrokowo:
- "resize" obrazka, aby krótszy wymiar miał długość 256
- przycięcie ("crop") obrazka do jego środkowej części 224x224

Rozszerz listę transformacji **podczas walidacji** zgodnie z powyższym opisem, wykorzystując odpowiednie funkcje z https://pytorch.org/vision/stable/transforms.html. Transformacjami treningowymi zajmiemy się w następnym ćwiczeniu.

In [29]:
IMAGENET_IMG_SIZE = 224
IMAGENET_RESIZE = 256

In [30]:
val_transforms = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Resize(IMAGENET_RESIZE),
    v2.CenterCrop(IMAGENET_IMG_SIZE),
    v2.Normalize(mean=IMAGENET_MEANS, std=IMAGENET_STD)
])

#### Ćwiczenie

Podczas treningu warto - zwłaszcza w przypadku posiadania niewielkiego zbioru danych - zastosować tzw. augmentację danych (więcej na kolejnych zajęciach).

Zamiast "sztywnego" resize'owania obrazka i przycinania go względem środka, w czasie treningu:
- dokonaj "resize" do losowej skali, a następnie przytnij do (losowego) fragmentu 224x224
- dodatkowo losowo (domyślnie: prawdopodobieństwo 50%) przerzuć obrazek względem osi pionowej

Znajdź odpowiednie funkcje w https://pytorch.org/vision/stable/transforms.html.
Stwórz w ten sposób listę transformacji `train_transforms`.

Wczytaj ponownie zbiór uczący i walidacyjny, podając odpowiednie listy transformacji do `ImageFolder`.

In [31]:
train_transforms = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.RandomResizedCrop(IMAGENET_IMG_SIZE),
    v2.RandomVerticalFlip(),
    v2.Normalize(mean=IMAGENET_MEANS, std=IMAGENET_STD)
])

In [32]:
train_dataset = ImageFolder(train_path, transform=train_transforms)
val_dataset = ImageFolder(val_path, transform=val_transforms)

### Transfer learning

Fine-tuningu modeli można dokonać na dwa główne sposoby:
- dotrenować (optymalizować) wszystkie parametry (we wszystkich warstwach) pretrenowanego modelu
- "zamrozić" pretrenowaną część modelu i dotrenować

Na początek zajmiemy się pierwszym z wymienionych sposobów transfer learningu.

#### Ćwiczenie

Załaduj pretrenowaną sieć `resnet18` do zmiennej `model` i dostosuj ją do rozważanego problemu (co musisz zrobić?).
Następnie uruchom trening modelu.

In [33]:
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import matplotlib.pyplot as plt

# Create a TensorBoard writer
writer = SummaryWriter('runs/transfer_learning_experiment')

# Add sample dataset images to TensorBoard
def log_dataset_samples():
    # Get a batch of training data
    dataiter = iter(train_loader)
    images, labels = next(dataiter)
    img_grid = torchvision.utils.make_grid(images[:4])
    writer.add_image('hymenoptera_images', img_grid)

    # Log class names
    writer.add_text('classes', str(train_dataset.classes))

# Modify training loop to log metrics
def training_loop(n_epochs, optimizer, model, loss_fn, train_loader):
    start_time = time.time()

    # Log model graph
    sample_input = next(iter(train_loader))[0][:1].to(device)
    writer.add_graph(model, sample_input)

    model.train()
    for epoch in range(1, n_epochs + 1):
        loss_train = 0.0
        for i, (imgs, labels) in enumerate(train_loader):
            imgs = imgs.to(device=device)
            labels = labels.to(device=device)
            outputs = model(imgs)
            loss = loss_fn(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_train += loss.item()

        epoch_loss = loss_train / len(train_loader)
        # Log loss to TensorBoard
        writer.add_scalar('Loss/train', epoch_loss, epoch)

        if epoch == 1 or epoch % 5 == 0:
            print(f"Epoch {epoch}, Training loss {epoch_loss}")

            # Log model predictions
            if epoch % 5 == 0:
                log_predictions(model, imgs, labels, epoch)

    time_elapsed = time.time() - start_time
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))

# Log model predictions
def log_predictions(model, images, labels, epoch):
    # Get model predictions
    model.eval()
    with torch.no_grad():
        outputs = model(images)
        _, preds = torch.max(outputs, 1)

    # Create a figure with predictions
    fig = plt.figure(figsize=(12, 8))
    for i in range(min(4, len(images))):
        ax = fig.add_subplot(1, 4, i+1)
        # Convert tensor to numpy for plotting
        img = images[i].cpu().numpy().transpose((1, 2, 0))
        # Denormalize image
        img = img * torch.tensor(IMAGENET_STD).numpy() + torch.tensor(IMAGENET_MEANS).numpy()
        img = np.clip(img, 0, 1)
        ax.imshow(img)
        ax.set_title(f"Pred: {train_dataset.classes[preds[i]]}\nTrue: {train_dataset.classes[labels[i]]}")
        ax.axis('off')

    writer.add_figure(f'predictions/epoch_{epoch}', fig, epoch)
    model.train()

In [34]:
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT).to(device)

In [35]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
log_dataset_samples()

# SGD with momentum
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2, momentum=0.9)
loss_fn = nn.CrossEntropyLoss()

training_loop(
    n_epochs = 25,
    optimizer = optimizer,
    model = model,
    loss_fn = loss_fn,
    train_loader = train_loader
)

Epoch 1, Training loss 5.471064269542694
Epoch 5, Training loss 0.33816876262426376
Epoch 10, Training loss 0.18064148351550102
Epoch 15, Training loss 0.08908897079527378
Epoch 20, Training loss 0.05639964737929404
Epoch 25, Training loss 0.05368376523256302
Training complete in 0m 54s


#### Ćwiczenie

Sprawdź jakość wytrenowanego modelu uruchamiając poniższe komórki.

In [36]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=False)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=64, shuffle=False)

In [37]:
def validate(model, train_loader, val_loader):
    model.eval()
    for name, loader in [("train", train_loader), ("val", val_loader)]:
        correct = 0
        total = 0

        with torch.no_grad():
            for imgs, labels in loader:
                imgs = imgs.to(device)
                labels = labels.to(device)
                outputs = model(imgs)
                preds = torch.argmax(outputs, dim=1)
                total += labels.shape[0]
                correct += int((preds == labels).sum())

        accuracy = correct/total
        # Log accuracy to TensorBoard
        writer.add_scalar(f'Accuracy/{name}', accuracy, 0)
        print(f"{name} accuracy: {accuracy}")

In [38]:
validate(model, train_loader, val_loader)

train accuracy: 0.9713114754098361
val accuracy: 0.9150326797385621


#### Ćwiczenie

Jeszcze raz załaduj i przygotuj model `resnet18` (np. do zmiennej `model_frozen`, tym razem "zamrażając" wszystkie pretrenowane warstwy modelu.
Wytrenuj model i sprawdź jego dokładność.

In [39]:
model_frozen = models.resnet18(weights=models.ResNet18_Weights.DEFAULT).to(device)
for param in model_frozen.parameters():
    param.requires_grad = False
for param in model_frozen.fc.parameters():
    param.requires_grad = True

In [40]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# SGD with momentum
optimizer = torch.optim.SGD(model_frozen.parameters(), lr=1e-2, momentum=0.9)
loss_fn = nn.CrossEntropyLoss()

training_loop(
    n_epochs = 25,
    optimizer = optimizer,
    model = model_frozen,
    loss_fn = loss_fn,
    train_loader = train_loader
)

Epoch 1, Training loss 5.782234370708466
Epoch 5, Training loss 0.20699457451701164
Epoch 10, Training loss 0.17769954353570938
Epoch 15, Training loss 0.16707564517855644
Epoch 20, Training loss 0.1356002502143383
Epoch 25, Training loss 0.12208767421543598
Training complete in 0m 39s


In [41]:
def log_pr_curves(model, val_loader):
    # Get model predictions
    model.eval()
    probs = []
    labels_list = []

    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            probs_batch = torch.nn.functional.softmax(outputs, dim=1)
            probs.append(probs_batch.cpu())
            labels_list.append(labels)

    probs = torch.cat(probs)
    labels_tensor = torch.cat(labels_list)

    # For each class
    for i in range(len(train_dataset.classes)):
        writer.add_pr_curve(
            f'PR Curve/{train_dataset.classes[i]}',
            (labels_tensor == i),
            probs[:, i],
            global_step=0
        )

In [42]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=False)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=64, shuffle=False)

In [43]:
validate(model_frozen, train_loader, val_loader)

train accuracy: 0.9221311475409836
val accuracy: 0.9477124183006536


#### Ćwiczenie

Porównaj powyższe wyniki z uzyskanymi dla modelu `Net` stworzonego na wcześniejszych zajęciach (lekko zmodyfikowane wymiary dla warstw gęstych - inny rozmiar obrazków wejściowych).

In [44]:
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 8, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(8 * 56 * 56, 32)
        self.fc2 = nn.Linear(32, 2)

    def forward(self, x):
        out = F.max_pool2d(torch.tanh(self.conv1(x)), 2)
        out = F.max_pool2d(torch.tanh(self.conv2(out)), 2)
        out = out.view(-1, 8 * 56 * 56)
        out = torch.tanh(self.fc1(out))
        out = self.fc2(out)
        return out

In [45]:
net_model = Net()
net_model = net_model.to(device)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# SGD with momentum
optimizer = torch.optim.SGD(net_model.parameters(), lr=1e-2, momentum=0.9)
loss_fn = nn.CrossEntropyLoss()

training_loop(
    n_epochs = 25,
    optimizer = optimizer,
    model = net_model,
    loss_fn = loss_fn,
    train_loader = train_loader
)

Epoch 1, Training loss 0.6916071623563766
Epoch 5, Training loss 0.6447539627552032
Epoch 10, Training loss 0.6474007219076157
Epoch 15, Training loss 0.6238445490598679
Epoch 20, Training loss 0.6129355430603027
Epoch 25, Training loss 0.6064138114452362
Training complete in 0m 35s


In [46]:
validate(net_model, train_loader, val_loader)

train accuracy: 0.639344262295082
val accuracy: 0.6209150326797386


#### Wnioski
Dzięki zajęciom zrozumiałem czym jest transfer learning. Słyszałem już o nim wcześniej w internecie lecz nie wiedziałem, że tak się nazywa. Myślę, iż pozwala na zaoszczędzenie czasu na treningu modelu. Sam planowałem z niego skorzystać w mojej pracy inżynierskiej.