# 과제1
코드의 빈칸을 채우고, 셀로 나눈 후 주석을 자유롭게 달아주세요.

## CIFAR-10

rgbscale 이미지, channel 수 : 3, class 수 : 10  
train data : 50000개, test data : 10000개

In [None]:
# Import Libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np

# Data Preparation
transform = transforms.Compose([
    transforms.Resize((32, 32)), # img : (3, 32, 32)
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

## Patch Embedding

In [None]:
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels=3, patch_size=4, emb_size=128, img_size=32): # 입력 채널(R, G, B): 3
        super().__init__()
        self.patch_size = patch_size
        self.projection = nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size)
        self.cls_token = nn.Parameter(torch.randn(1, 1, emb_size)) # 1 x 128
        self.position_embedding = nn.Parameter(torch.randn((img_size // patch_size) ** 2 + 1, emb_size)) # (N+1) x 128 # N = num_patch, 1 = class token

    def forward(self, x):
        batch_size = x.shape[0]
        x = self.projection(x).flatten(2).transpose(1,2) # (batch_size, 128, 8, 8) -> (batch_size, 64, 128)
        cls_tokens = self.cls_token.expand(batch_size, -1, -1) # (batch_size, 1, 128)
        x = torch.cat((cls_tokens, x), dim=1) # (batch_size, 65, 128)
        x += self.position_embedding # (batch_size, 65, 128)
        return x

## Transformer Encoder

In [None]:
# MHA(Multi head attention)
class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size=128, num_heads=8):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.qkv = nn.Linear(emb_size, emb_size * 3) # emb_size 3배로 선형 변환
        self.attention = nn.MultiheadAttention(embed_dim=emb_size, num_heads=num_heads)
        self.linear = nn.Linear(emb_size, emb_size)

    def forward(self, x):
        qkv = self.qkv(x).chunk(3, dim=-1) # 입력 텐서를 query, key, value 3개의 텐서로 분할
        x, _ = self.attention(qkv[0], qkv[1], qkv[2]) # q, k, v값을 기반으로 attention 계산 (65, 128) -> (65, 16)
        x = self.linear(x) # 선형 변환 (65, 16) -> (65, 128)
        return x

# Layer Normalization -> MHA -> Layer Normalization -> MLP
class TransformerBlock(nn.Module):
    def __init__(self, emb_size=128, num_heads=8, ff_hidden_mult=4, dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(emb_size) # LN1
        self.mha = MultiHeadAttention(emb_size, num_heads) # MHA
        self.norm2 = nn.LayerNorm(emb_size) # LN2
        self.ff = nn.Sequential(
            nn.Linear(emb_size, ff_hidden_mult * emb_size),
            nn.ReLU(),
            nn.Linear(ff_hidden_mult * emb_size, emb_size),
        ) # MLP
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = x + self.dropout(self.mha(self.norm1(x))) # LN1 -> MHA 후 dropout 한 값과 입력값을 더함
        x = x + self.dropout(self.ff(self.norm2(x))) # LN2 -> MLP 후 dropout 한 값과 입력값을 더함
        return x

## ViT Modeling 

In [None]:
class ViT(nn.Module):
    def __init__(self, img_size=32, patch_size=4, emb_size=128, num_heads=8, depth=6, num_classes=10):
        super().__init__()
        self.patch_embedding = PatchEmbedding(patch_size=patch_size, emb_size=emb_size, img_size=img_size)
        self.transformer = nn.Sequential(
            *[TransformerBlock(emb_size, num_heads) for _ in range(depth)]
        ) # transformer encoder를 depth개 생성
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(emb_size),
            nn.Linear(emb_size, num_classes)
        ) # classification을 위한 mlp head

    def forward(self, x):
        x = self.patch_embedding(x) # 이미지를 패치 임베딩
        x = self.transformer(x) # 패치 임베딩을 transformer encoder에 통과
        cls_token = x[:, 0]
        x = self.mlp_head(cls_token) # 최종 클래스 예측
        return x

## Train & Test

In [None]:
# Train / Test func
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
    return running_loss / len(dataloader.dataset)

def evaluate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += torch.sum(preds == labels.data)
    return running_loss / len(dataloader.dataset), correct.double() / len(dataloader.dataset)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ViT().to(device)
criterion = nn.CrossEntropyLoss() # loss function : CrossEntropyLoss
optimizer = optim.Adam(model.parameters(), lr=1e-4) # opimizer : Adam

# Step 5 : Train & Test

num_epochs = 10 # epoch : 10
for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    test_loss, test_accuracy = evaluate(model, test_loader, criterion, device)
    print(f'Epoch {epoch+1}/{num_epochs}.. '
          f'Train loss: {train_loss:.4f}.. '
          f'Test loss: {test_loss:.4f}.. '
          f'Test accuracy: {test_accuracy:.4f}')

# Save the trained model
torch.save(model.state_dict(), 'vit_cifar10.pth')

ViT가 large-scale dataset에서 충분히 pre-train 되지 않아 정확도가 낮음. 해당 CIFAR-10 데이터셋만으로는 성능이 좋지 않다.

# 과제2
다른 Dataset으로 Classification을 다시 진행(학습, 평가)해보고 위의 결과와 비교해보세요

# FashionMNIST

grayscale 이미지, 채널 수 : 10, 클래스 수 : 10  
train data : 60000개, test data : 10000개

In [None]:
# Step 1: Import Libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np

# Step 2: Data Preparation
transform = transforms.Compose([
    transforms.Resize((32, 32)), # img : (1, 32, 32)
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,)),
])

train_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Step 3: Define Vision Transformer (ViT) Architecture
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels=1, patch_size=4, emb_size=128, img_size=32):
        super().__init__()
        self.patch_size = patch_size
        self.projection = nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size)
        self.cls_token = nn.Parameter(torch.randn(1, 1, emb_size))
        self.position_embedding = nn.Parameter(torch.randn((img_size // patch_size) ** 2 + 1, emb_size))

    def forward(self, x):
        batch_size = x.shape[0]
        x = self.projection(x).flatten(2).transpose(1,2)
        cls_tokens = self.cls_token.expand(batch_size, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.position_embedding
        return x

class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size=128, num_heads=8):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.qkv = nn.Linear(emb_size, emb_size * 3)
        self.attention = nn.MultiheadAttention(embed_dim=emb_size, num_heads=num_heads)
        self.linear = nn.Linear(emb_size, emb_size)

    def forward(self, x):
        qkv = self.qkv(x).chunk(3, dim=-1)
        x, _ = self.attention(qkv[0], qkv[1], qkv[2])
        x = self.linear(x)
        return x

class TransformerBlock(nn.Module):
    def __init__(self, emb_size=128, num_heads=8, ff_hidden_mult=4, dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(emb_size)
        self.mha = MultiHeadAttention(emb_size, num_heads)
        self.norm2 = nn.LayerNorm(emb_size)
        self.ff = nn.Sequential(
            nn.Linear(emb_size, ff_hidden_mult * emb_size),
            nn.ReLU(),
            nn.Linear(ff_hidden_mult * emb_size, emb_size),
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = x + self.dropout(self.mha(self.norm1(x)))
        x = x + self.dropout(self.ff(self.norm2(x)))
        return x

class ViT(nn.Module):
    def __init__(self, img_size=32, patch_size=4, emb_size=128, num_heads=8, depth=6, num_classes=10):
        super().__init__()
        self.patch_embedding = PatchEmbedding(patch_size=patch_size, emb_size=emb_size, img_size=img_size)
        self.transformer = nn.Sequential(
            *[TransformerBlock(emb_size, num_heads) for _ in range(depth)]
        )
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(emb_size),
            nn.Linear(emb_size, num_classes)
        )

    def forward(self, x):
        x = self.patch_embedding(x)
        x = self.transformer(x)
        cls_token = x[:, 0]
        x = self.mlp_head(cls_token)
        return x

# Step 4: Train / Test func
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
    return running_loss / len(dataloader.dataset)

def evaluate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct += torch.sum(preds == labels.data)
    return running_loss / len(dataloader.dataset), correct.double() / len(dataloader.dataset)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ViT().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Step 5 : Train & Test

num_epochs = 10
for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    test_loss, test_accuracy = evaluate(model, test_loader, criterion, device)
    print(f'Epoch {epoch+1}/{num_epochs}.. '
          f'Train loss: {train_loss:.4f}.. '
          f'Test loss: {test_loss:.4f}.. '
          f'Test accuracy: {test_accuracy:.4f}')

# Save the trained model
torch.save(model.state_dict(), 'vit_fashionmnist.pth')


loss가 CIFAR-10에 비해 미세하게 감소하였다. 많은 데이터셋에 훈련을 시킬수록 ViT 모델의 성능이 향상된다는 것을 알 수 있다.