<a href="https://colab.research.google.com/github/yukyeongleee/CodeTemplate/blob/main/M06_tutorial_autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# U-Net 기반 표현학습

이번 튜토리얼에서는 수정된 U-Net 모델을 정의하고, 복원 손실 함수(Reconstruction loss)로 이를 학습합니다. 또한 학습한 인코더의 뒷단에 간단한 MLP 기반의 분류기를 결합하여 downstream 분류 과제를 해결하는데 사용해보겠습니다.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter

import torchvision.utils as vutils
from torchvision.datasets import Caltech101
from torchvision import transforms

import os
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from datetime import datetime
import numpy as np

### Stage 1을 위한 하이퍼파라미터

In [None]:
image_size = 128

num_epochs = 10
batch_size = 64

learning_rate = 1e-3

## Tensorboard 로깅

In [None]:
now = datetime.now()
formattedDate = now.strftime("%H%M%S")
print(formattedDate)

run_name = f'unet-{formattedDate}'

log_dir = os.path.join('./runs', run_name)
os.makedirs(log_dir, exist_ok=True)

writer1 = SummaryWriter(log_dir)

In [None]:
%load_ext tensorboard
# %reload_ext tensorboard
%tensorboard --logdir runs

## Step 1. Encoder 정의

### Step 1-1. EncoderBlock 정의

인코더(수축경로)의 기본 단위가 될 인코더 블록을 정의합니다.

*  인코더 블록은 **두 개의 Conv2d 레이어**와 **마지막 MaxPool2d 레이어**로 구성됩니다. 각 Conv2d의 출력은 **ReLU 활성화함수**를 통과합니다.
*  두 개의 Conv2d 레이어 모두 (kernel_size, stride, padding) = (3, 1, 1)로 설정합니다. 따라서 Conv2d 레이어의 출력은 입력과 동일한 모양을 갖습니다.
*  인코더 블록은 **두 가지 맵: MaxPool2d 레이어의 출력(`out`)과 입력(`feat`)**을 반환합니다. `feat`은 기록해뒀다가 이후 Skip Connection에 활용됩니다.

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.layers = None  # Hint. nn.Sequential
        self.down = None

    def forward(self, x):
        feat = self.layers(x)
        out = self.down(feat)
        return out, feat

### Step 1-2. Encoder 정의

인코더 블록을 조합하여 인코더를 정의합니다.  

*  인코더는 네 개의 인코더 블록으로 구성됩니다. 각 블록의 입출력 채널 수는 다음과 같습니다.
    * Block 1.  `in_channels` → `model_channels`
    * Block 2.  `model_channels` → `model_channels * 2`
    * Block 3.  `model_channels * 2` → `model_channels * 4`
    * Block 4.  `model_channels * 4` → `model_channels * 8`
*  각 블록의 출력 가운데 `out`은 다음 블록으로 전달하고, `feat`은 `features`라는 이름의 리스트에 모아두세요.
*  인코더는 최종 출력(`x`)와 중간 특성들을 모아둔 리스트(`features`)를 반환합니다.


In [None]:
class Encoder(nn.Module):
    def __init__(self, in_channels, model_channels):
        super().__init__()

        self.blocks = None # Hint. nn.ModuleList

    def forward(self, x):
        features = []
        for block in self.blocks:
            pass

        return (x, features)

## Step 2. DecoderBlock 정의

### Step 2-1. DecoderBlock 정의

디코더(확장경로)의 기본 단위가 될 디코더 블록을 정의합니다.

*  디코더 블록은 **첫번째 ConvTranspose 레이어**와 **두 개의 Conv2d 레이어**로 구성됩니다. 각 Conv2d의 출력은 **ReLU 활성화함수**를 통과합니다.
*  두 개의 Conv2d 레이어 모두 (kernel_size, stride, padding) = (3, 1, 1)로 설정합니다. 따라서 Conv2d 레이어의 출력은 입력과 동일한 모양을 갖습니다.
*  디코더 블록은 **두 가지 맵: 이전 블록의 출력(`x`)과 대응하는 인코더 블록의 중간 특성(`encoder_features`)**을 입력받습니다. `x`를 ConvTranspose 레이어에 통과시켜 나온 특성과 `encoder_features`를 채널 방향으로 이어붙이세요. 이어붙인 결과를 이후 두 개의 Conv2d 레이어로 전달합니다.

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        self.up = None
        self.layers = None # Hint. nn.Sequential

    def forward(self, x, encoder_features):

        # Hint. torch.cat

        return x

### Step 2-2. Decoder 정의

디코더 블록을 조합하여 디코더를 정의합니다.  

*  디코더는 네 개의 디코더 블록으로 구성됩니다. 각 블록의 입출력 채널 수는 다음과 같습니다.
    * Block 1.  `in_channels` → `model_channels`
    * Block 2.  `model_channels` → `model_channels * 2`
    * Block 3.  `model_channels * 2` → `model_channels * 4`
    * Block 4.  `model_channels * 4` → `model_channels * 8`
*  전달받은 인코더의 중간 특성들(`encoder_features`) 가운데 각 디코더 블록으로 전달할 특성을 선택해야합니다.

In [None]:
class Decoder(nn.Module):
    def __init__(self, model_channels, out_channels):
        super().__init__()

        self.blocks = None # Hint. nn.ModuleList

        self.out = nn.Conv2d(model_channels, out_channels, kernel_size=1) # 1 x 1 conv

    def forward(self, x, encoder_features):
        for i, block in enumerate(self.blocks):
            pass
        x = self.out(x)
        return x

## Step 3. U-Net 정의

위에서 정의한 인코더와 디코더를 사용해서 U-Net을 완성하세요.

<br>

우선, 인코더와 디코더의 입출력 채널 수는 다음과 같습니다.
* Encoder.  `in_channels` → `hidden_channels`
* Decoder.  `hidden_channels * 16` → `out_channels`

<br>

이어서 인코더와 디코더를 연결하는 **브릿지 모듈**을 정의해줍니다.

* 브릿지 모듈은 두 개의 Conv2d 레이어로 이루어집니다. 마찬가지로 모든 Conv2d 레이어의 출력은 ReLU 활성화함수를 통과하도록 설계합니다.
* 두 개의 Conv2d 레이어 모두 (kernel_size, stride, padding) = (3, 1, 1)로 설정합니다. 따라서 Conv2d 레이어의 출력은 입력과 동일한 모양을 갖습니다.
* 첫번째 Conv2d 레이어에서는 입출력 채널 수가 `hidden_channels * 8` → `hidden_channels * 16`으로 변하도록, 두번째 Conv2d 레이어에서는 `hidden_channels * 16` → `hidden_channels * 16`이 되도록 정의합니다.


In [None]:
class UNet(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super().__init__()

        self.encoder = None
        self.bridge = None # Hint. nn.Sequential
        self.decoder = None

    def forward(self, x):
        x, encoder_features = self.encoder(x)
        x = self.bridge(x)
        x = self.decoder(x, encoder_features)
        return x

모델이 제대로 정의되었는지 확인합니다. 잘 정의되었다면 다음과 같이 출력됩니다.

> input shape: torch.Size([1, 3, 256, 256])  
output shape: torch.Size([1, 3, 256, 256])

In [None]:
unet = UNet(3, 16, 3)

random_input = torch.randn(1, 3, 256, 256)
random_output = unet(random_input)

print('input shape:', random_input.shape)
print('output shape:', random_output.shape)

## Step 3. 데이터 준비

이번 튜토리얼에서는 Caltech101 데이터셋을 사용합니다. torchvision.datasets의 Caltech101 객체를 정의하세요. 이때 다음과 같이 정의된 전처리를 `transfrom`의 값으로 전달해줘야합니다.  

*  PIL Image를 텐서로 변환 (`ToTensor`)
*  크기가 `image_size` x `image_size`가 되도록 조정 (`Resize`)
*  이미지를 흑백, 단일 채널로로 변경 (`Grayscale`)


In [None]:
data_root = './data'

transform = None # Hint. transforms.Compose

dataset = None

데이터셋을 3개의 subset: train, validation, test (8:1:1)로 구분해줍니다. 이때 torch.utils.data 패키지의 `random_split` 함수를 사용하세요.

In [None]:
print(len(dataset)) # 8677

train_dataset, validation_dataset, test_dataset = None, None, None

각 데이터셋에 대한 데이터 로더를 정의하세요.

In [None]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

샘플 배치의 모양을 확인합니다. 올바르게 정의되었다면 아래와 같이 출력됩니다.

> torch.Size([64, 1, 128, 128])   
torch.Size([64])

또한 아래 코드는 일부 샘플 이미지들을 보여줍니다.

In [None]:
sample_data, sample_label = next(iter(train_loader))

print(sample_data.shape, sample_label.shape)

plt.figure(figsize=(10, 10))

print("Sample Labels")
print(sample_label)
plt.imshow(np.transpose(vutils.make_grid(sample_data[:64], nrow=8, padding=2, normalize=True), (1, 2, 0)))
plt.title(f'Sample Images')
plt.show()

## Step 4. 모델 학습

학습에 사용할 장치(GPU 또는 CPU)를 설정합니다. `device`의 값이 GPU가 사용가능한 환경이라면 'cuda', 그렇지 않으면 'cpu'가 되도록 하세요.

In [None]:
device = None
print(device)

모델과 옵티마이저를 정의하세요. 그리고 모델을 위에서 설정한 장치로 옮겨주세요.

In [None]:
unet = UNet(1, 16, 1).to(device) # (in channels) = (out channels) = 1, since the images are in grayscale

optimizer1 = optim.Adam(unet.parameters(), lr=learning_rate)
criterion1 = nn.MSELoss()

모델 학습을 시작합니다.

In [None]:
global_step = 0

In [None]:
unet.train()

for epoch in tqdm(range(num_epochs), desc='Epoch'):
    for x, _ in tqdm(train_loader, desc='Iteration', leave=False):

        # TODO

        if global_step % 30 == 0:
            writer1.add_scalar("stage1_train/loss", loss, global_step)

        global_step += 1

writer1.close()

## Step 5. 모델 평가

학습이 제대로 진행되었는지 원본 이미지와 재구성한 이미지를 비교하세요. 두 개의 이미지 격자에서 동일한 위치에 있는 이미지들은 서로 대응되어야 합니다.

In [None]:
sample_batch, _ = next(iter(test_loader))

plt.figure(figsize=(12, 18))

# 실제 이미지
plt.subplot(1, 2, 1)
plt.axis("off")
plt.title("Original")
plt.imshow(np.transpose(vutils.make_grid(sample_batch[:64], nrow=8, padding=2, normalize=True), (1, 2, 0)))

# 재구성 이미지
unet.eval()
with torch.no_grad():
    recon_batch = unet(sample_batch.to(device))

plt.subplot(1, 2, 2)
plt.axis("off")
plt.title("Reconstructed")
plt.imshow(np.transpose(vutils.make_grid(recon_batch[:64].cpu(), nrow=8, padding=2).cpu(), (1, 2, 0)))
plt.show()

## Step 6. 학습된 표현을 사용해서 downstream 분류 과제 수행

### Stage 2을 위한 하이퍼파라미터

In [None]:
num_epochs = 20
batch_size = 64

learning_rate = 1e-3

### Step 6-1. 간단한 Classification Head 정의

ClassificationHead는 U-Net Encoder의 마지막 feature를 입력받아 클래스 확률을 출력하는 모듈입니다. 이 모듈은 세 개의 레이어: Conv2d, Flatten, Linear로 이루어져 있습니다.

*  Conv2d는 (kernel_size, stride) = (1, 1)로 정의하여 입출력 맵의 크기는 같게 유지하되 채널 수만 바꿔줍니다. 이때, 출력 채널 수가 hidden_channels가 되어야 합니다.
*  Linear는 클래스 확률을 출력해야하므로 출력 채널 수가 num_classes가 되도록 설정합니다.

In [None]:
class ClassificationHead(nn.Module):
    def __init__(self, hidden_channels, num_classes):
        super().__init__()
        self.classifier = None # Hint. nn.Sequential

    def forward(self, x):
        return self.classifier(x)

### Step 6-2. 전체 Classification 모델 정의


UNetClassifier는 U-Net Encoder와 ClassificationHead로 이루어져 있습니다. 이때, 표현을 추출하는 U-Net Encoder 부분은 더 이상 업데이트하지 않습니다.

In [None]:
class UNetClassifier(nn.Module):
    def __init__(self, encoder, hidden_channels, num_classes):
        super().__init__()
        self.encoder = encoder
        self.classifier = ClassificationHead(hidden_channels, num_classes)

    def forward(self, x):
        with torch.no_grad():
            x, _ = self.encoder(x) # (B, 128, 8, 8), 인코더는 업데이트 되지 않기를 원함
        x = self.classifier(x)
        return x

### Step 6-3. 모델 학습

In [None]:
now = datetime.now()
formattedDate = now.strftime("%H%M%S")
print(formattedDate)

run_name = f'classifier-{formattedDate}'

log_dir = os.path.join('./runs', run_name)
os.makedirs(log_dir, exist_ok=True)

writer2 = SummaryWriter(log_dir)

In [None]:
global_step2 = 0

모델과 손실함수, 옵티마이저를 정의합니다. 손실함수로는 CrossEntropyLoss를, 옵티마이저로는 Adam을 사용합니다.   

Hint. 옵티마이저가 어떤 파라미터를 업데이트해야하나요?

In [None]:
model = UNetClassifier(None, 16, 101).to(device) # Fill this None

criterion2 = nn.CrossEntropyLoss()
optimizer2 = optim.Adam(None, lr=0.001) # Fill this None
scheduler2 = optim.lr_scheduler.StepLR(optimizer2, step_size=5, gamma=0.1)

모델을 학습합니다.

In [None]:
for epoch in tqdm(range(num_epochs), desc='Epochs'):

    for image, label in tqdm(train_loader, desc='Iteration', leave=False):

        # TODO:

        if global_step2 % 10 == 0:
            writer2.add_scalar("stage2_train/loss", loss.item(), global_step)

            # 정확도 평가
            correct = None # Hint. torch.argmax
            total = len(label)
            acc = correct / total * 100
            writer2.add_scalar("stage2_train/acc", acc.item(), global_step)

        global_step2 += 1

    scheduler2.step()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item():.6f}, Acc: {acc:.2f}")

### Step 6-4. 모델 평가

평가 데이터셋에 대한 **정확도(맞춘 샘플 수/전체 샘플 수 * 100)**를 계산하세요.

In [None]:
model.eval()

with torch.no_grad():
    correct = 0
    total = 0

    for input, label in tqdm(test_loader, desc='Iteration', leave=False):
        input = input.to(device)
        label = label.to(device)
        pred = model(input)

        correct += None # Hint. torch.argmax
        total += len(label)

    # 정확도 계산
    acc = correct / total * 100
    print(f"Test Accuracy: {acc:.2f}")

아래 코드는 평가 이미지 12장과 그에 대한 (정답, 예측) 클래스를 보여줍니다.

In [None]:
plt.subplots(3, 4, figsize=(10, 8))

indices = torch.randint(0, len(test_dataset), (12,))
print(indices)

with torch.no_grad():
    for i, idx in enumerate(indices):
        img, label = test_dataset[idx]
        img = img.to(device)
        pred = model(img.unsqueeze(0)) # 배치 차원 추가

        pred_class = torch.argmax(pred, dim=1).item() # tensor([v]) -> v
        pred_prob = F.softmax(pred, dim=1)

        plt.subplot(3, 4, i+1)
        plt.imshow(img.squeeze().cpu(), cmap='gray') # 배치 차원 제거
        plt.title(f"True: {label} / Pred: {pred_class} ({pred_prob[0][pred_class].item():.2f})")
        plt.axis('off')

plt.tight_layout()
plt.show()