# CH04.5. **Attention**

#### [][][][][]

## 00. **작업 환경 설정**

#### 00.0. **사전 변수 설정**

In [None]:
SEED_NUM = 2025
BATCH_SIZE = 32
EPOCH_NUM = 100
USE_PRETRAIN_YN = 'N'
MODEL_PTH = '../../model/mnistBiSeq.pt'

#### 00.1. **라이브러리 호출 및 옵션 설정**

In [None]:
#(1) Import libraries
import os
import random
import tqdm
import collections
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import sklearn
import torch
# import torchtext
import torchinfo

#(2) Set options
os.environ['PYTHONHASHSEED'] = str(SEED_NUM)
random.seed(a=SEED_NUM)
np.random.seed(seed=SEED_NUM)
torch.use_deterministic_algorithms(mode=True)
torch.manual_seed(seed=SEED_NUM)
torch.mps.manual_seed(seed=SEED_NUM)

#(3) Define device(hardware)
if torch.backends.mps.is_available() :
    device = torch.device(device='mps')
else :
    device = torch.device(device='cpu')
print(f'>> Device : {device}')

#### 00.2. **사용자정의함수 정의**

In [None]:
#(1) Define `compute_metrics()` function
def compute_metrics(model:torch.torch.nnModule, loader:torch.utils.data.DataLoader) :
    _preds = []
    _targets = []
    model.eval()
    with torch.no_grad() : 
        for inputs, targets in loader :
            inputs = inputs.to(device=device) 
            targets = targets.to(device=device)
            preds = model(x=inputs)
            preds = torch.argmax(input=preds, dim=1)
            _preds.extend(preds.cpu().numpy())
            _targets.extend(targets.cpu().numpy())
    model.train()
    accuracy = sklearn.metrics.accuracy_score(y_true=_targets, y_pred=_preds)
    precision = sklearn.metrics.precision_score(y_true=_targets, y_pred=_preds, average='weighted')
    recall = sklearn.metrics.recall_score(y_true=_targets, y_pred=_preds, average='weighted')
    f1 = sklearn.metrics.f1_score(y_true=_targets, y_pred=_preds, average='weighted')
    output = pd.DataFrame(data={
        'metricName' : ['accuracy', 'precision', 'recall', 'f1'], 
        'value'      : [accuracy, precision, recall, f1] 
    })
    return output

#### 00.3. **클래스 정의**

In [None]:
# ---------------------------
# Encoder Module
# ---------------------------
class Encoder(torch.nnModule):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.embedding = torch.nnEmbedding(input_dim, emb_dim)
        # 양방향 GRU: hidden dim이 2배가 됨에 주의
        self.rnn = torch.nnGRU(emb_dim, hid_dim, n_layers, dropout=dropout, bidirectional=True)
        self.dropout = torch.nnDropout(dropout)
        self.hid_dim = hid_dim

    def forward(self, src):
        # src: [src_len, batch_size]
        embedded = self.dropout(self.embedding(src))
        # embedded: [src_len, batch_size, emb_dim]
        outputs, hidden = self.rnn(embedded)
        # outputs: [src_len, batch_size, hid_dim * 2]
        # hidden: [n_layers * 2, batch_size, hid_dim]
        return outputs, hidden

# ---------------------------
# Attention Module
# ---------------------------
class Attention(torch.nnModule):
    def __init__(self, hid_dim):
        super().__init__()
        # 입력: [batch, src_len, hid_dim + hid_dim] → 중간 차원 hid_dim
        self.attn = torch.nnLinear(hid_dim + hid_dim * 2, hid_dim)
        self.v = torch.nnLinear(hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        # hidden: [batch_size, hid_dim] (디코더의 마지막 층 상태)
        # encoder_outputs: [src_len, batch_size, hid_dim*2]
        src_len = encoder_outputs.shape[0]
        # 디코더 상태를 모든 인코더 시퀀스 길이만큼 반복
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)  # [batch_size, src_len, hid_dim]
        encoder_outputs = encoder_outputs.permute(1, 0, 2)   # [batch_size, src_len, hid_dim*2]
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))  # [batch_size, src_len, hid_dim]
        attention = self.v(energy).squeeze(2)  # [batch_size, src_len]
        return torch.softmax(attention, dim=1)

# ---------------------------
# Decoder Module
# ---------------------------
class Decoder(torch.nnModule):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout, attention):
        super().__init__()
        self.output_dim = output_dim
        self.attention = attention
        self.embedding = torch.nnEmbedding(output_dim, emb_dim)
        # 입력은 [emb_dim + hid_dim*2] (어텐션으로 가중합된 인코더 출력 포함)
        self.rnn = torch.nnGRU((hid_dim * 2) + emb_dim, hid_dim, n_layers, dropout=dropout)
        self.fc_out = torch.nnLinear((hid_dim * 2) + hid_dim + emb_dim, output_dim)
        self.dropout = torch.nnDropout(dropout)

    def forward(self, input, hidden, encoder_outputs):
        # input: [batch_size] (현재 입력 토큰)
        # hidden: [n_layers, batch_size, hid_dim]
        # encoder_outputs: [src_len, batch_size, hid_dim*2]
        input = input.unsqueeze(0)  # [1, batch_size]
        embedded = self.dropout(self.embedding(input))  # [1, batch_size, emb_dim]
        # 어텐션 가중치 계산 (디코더의 마지막 층 상태 사용)
        a = self.attention(hidden[-1], encoder_outputs)  # [batch_size, src_len]
        a = a.unsqueeze(1)  # [batch_size, 1, src_len]
        encoder_outputs = encoder_outputs.permute(1, 0, 2)  # [batch_size, src_len, hid_dim*2]
        weighted = torch.bmm(a, encoder_outputs)  # [batch_size, 1, hid_dim*2]
        weighted = weighted.permute(1, 0, 2)  # [1, batch_size, hid_dim*2]
        rnn_input = torch.cat((embedded, weighted), dim=2)  # [1, batch_si]

class MyTranslationModel(torch.nnModule) : 
    pass

In [None]:
import torchtext
from torchtext.data import Field, BucketIterator
import spacy
import numpy as np

SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# spacy 모델 로드 (미리 설치 필요)
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

SRC = Field(tokenize=tokenize_de, init_token='<sos>', eos_token='<eos>', lower=True)
TRG = Field(tokenize=tokenize_en, init_token='<sos>', eos_token='<eos>', lower=True)

# IWSLT2016 데이터셋 다운로드 및 분할 (영어-독일어)
train_data, valid_data, test_data = torchtext.datasets.IWSLT2016.splits(
    exts=('.de', '.en'),
    fields=(SRC, TRG)
)

SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)

BATCH_SIZE = 128

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=BATCH_SIZE,
    device=device
)

INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 1
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

attn = Attention(HID_DIM)
encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT, attn)

model = Seq2Seq(encoder, decoder, device).to(device)

optimizer = optim.Adam(model.parameters())
criterion = torch.nnCrossEntropyLoss(ignore_index=TRG.vocab.stoi[TRG.pad_token])

def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        src = batch.src
        trg = batch.trg
        optimizer.zero_grad()
        output = model(src, trg)
        # output: [trg_len, batch_size, output_dim]
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.torch.nnutils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

N_EPOCHS = 10
CLIP = 1

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f}')

<b></b>

## 01. **데이터셋 전처리 및 로드**

#### 01.1. **이미지 전처리 파이프라인 정의**

In [None]:
img_tf = torchvision.transforms.Compose(
    transforms=[
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Lambda(lambda x: x.squeeze(0))
    ]
)

#### 01.2. **데이터셋 로드**

In [None]:
mnist_train = torchvision.datasets.MNIST(root='../../data', train=True, download=True, transform=img_tf)
mnist_test = torchvision.datasets.MNIST(root='../../data', train=False, download=True, transform=img_tf)

#### 01.3. **EDA**

In [None]:
#(1) Print sample of train
len(mnist_train)

In [None]:
#(2) Print image shape 
mnist_train[0][0].shape

In [None]:
#(3) Print frequency of target class
target_freq = collections.Counter()
for i in range(len(mnist_train)):
    input, target = mnist_train[i]
    if isinstance(target, torch.Tensor) :
        target = target.item()
    target_freq[target] += 1
pd.DataFrame(data=list(target_freq.items()), columns=['class', 'count']).sort_values(by='class')

In [None]:
#(4) Display image
show_img(df=mnist_train, index=5)

#### 01.4. **데이터로더 변환**

In [None]:
mnist_train_loader = torch.utils.data.DataLoader(dataset=mnist_train, batch_size=BATCH_SIZE, shuffle=True)
mnist_test_loader = torch.utils.data.DataLoader(dataset=mnist_test, batch_size=BATCH_SIZE, shuffle=True)

<b></b>

## 02. **모델 구축 및 학습**

#### 02.1. **모델 정의**

In [None]:
#(1) Define hyper-parameter
seq_len = mnist_train[0][0].shape[0]
input_size = mnist_train[0][0].shape[1]
layers_num = 2
hidden_size = 12
class_num = 10

In [None]:
#(2) Define `model`
model = MyTranslationModel(
    input_size=input_size, 
    hidden_size=hidden_size, 
    seq_len=seq_len, 
    layers_num=layers_num,
    class_num=class_num,
    device=device
).to(dtype=torch.float32)

#(3) Display `model`
torchinfo.summary(
    model=model, 
    input_size=[BATCH_SIZE]+list(mnist_train[0][0].shape),
    device=device
)

In [None]:
#(4) Define loss function
criterion = torch.torch.nnCrossEntropyLoss()

#(5) Define optimizer(optimization method)
optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-3, weight_decay=1e-7)

#(6) Define Scheduler
# scheduler = torch.optim.lr_scheduler.StepLR(optimizer=optimizer, step_size=30, gamma=0.1)

#### 02.2. **학습 전 변수 정의**

In [None]:
batch_len = len(mnist_train_loader)
if USE_PRETRAIN_YN == 'Y' :
    checkpoint = torch.load(f=MODEL_PTH)
    model.load_state_dict(state_dict=checkpoint['model'])
    optimizer.load_state_dict(state_dict=checkpoint['optimizer'])
    epoch = checkpoint['epoch']
    loss_hist = checkpoint['loss_hist']
    best_loss = loss_hist[-1]
else :
    epoch = 0
    loss_hist = []
    best_loss = float('inf')
print(f">> Epoch={epoch}, Train Loss={best_loss}")

#### 02.3. **모델 학습**

In [None]:
progress_bar = tqdm.trange(epoch, EPOCH_NUM)
for epoch in progress_bar : 
    last_loss = 0.0
    model.train()
    for inputs, targets in mnist_train_loader :
        optimizer.zero_grad() 
        preds = model(x=inputs)
        loss = criterion(input=preds, target=targets.to(device=device))
        loss.backward()
        optimizer.step()
        last_loss += loss.item()
    last_loss_avg = last_loss / batch_len
    loss_hist.append(last_loss_avg)
    if last_loss_avg < best_loss :
        best_epoch = epoch
        best_loss = last_loss_avg
        torch.save(
            obj={
                'epoch'     : epoch,
                'loss_hist' : loss_hist,
                'model'     : model.state_dict(),
                'optimizer' : optimizer.state_dict()
            }, 
            f=MODEL_PTH
        )
    # scheduler.step()
    progress_bar.set_postfix(ordered_dict={'last_epoch':epoch+1, 'last_loss':last_loss_avg, 'best_epoch':best_epoch, 'best_loss':best_loss}) 

<b></b>

## 03. **모델 평가**

#### 03.1. **최적 성능 모델 로드**

In [None]:
checkpoint = torch.load(f=MODEL_PTH)
model.load_state_dict(state_dict=checkpoint['model'])
print(f'>> Best Epoch : {checkpoint["epoch"]}, Best Loss : {checkpoint["loss_hist"][-1]}')

#### 03.2. **과소 적합 확인**

In [None]:
#(1) Plot traing loss
plt.figure(figsize=(12, 6))
plt.title(label='Training Loss')
plt.xlabel(xlabel='epoch')
plt.ylabel(ylabel='loss')
plt.plot(loss_hist)
plt.show()

In [None]:
#(2) Check metrics
compute_metrics(model=model, loader=mnist_train_loader)

#### 03.3. **일반화 성능 평가**

In [None]:
compute_metrics(model=model, loader=mnist_test_loader)