<a href="https://colab.research.google.com/github/mc-friday/hanghaeAI/blob/main/%5B2%EC%A3%BC%EC%B0%A8%5D%EC%8B%AC%ED%99%94%EA%B3%BC%EC%A0%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [2주차] 심화과제: Multi-head Attention으로 감정 분석 모델 구현하기

In [26]:
!pip install datasets sacremoses



In [27]:
import torch
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import BertTokenizerFast
from tokenizers import (
    decoders,
    models,
    normalizers,
    pre_tokenizers,
    processors,
    trainers,
    Tokenizer,
)
from torch.nn.utils.rnn import pad_sequence
from torch.optim import Adam
import numpy as np
from math import sqrt
import matplotlib.pyplot as plt
from torch import nn
import time

## 1. 데이터셋 (기존의 IMDB dataset을 그대로 활용)

In [28]:
ds = load_dataset("stanfordnlp/imdb")
tokenizer = torch.hub.load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')

Using cache found in /root/.cache/torch/hub/huggingface_pytorch-transformers_main


### 1.1. [MY_CODE] Last word prediction dataset 준비를 위한 함수

In [29]:
def collate_fn(batch):
  max_len = 400
  texts, labels = [], []
  for row in batch:
    labels.append(row['label'])
    texts.append(row['text'])

  texts = torch.LongTensor(tokenizer(texts, padding=True, truncation=True, max_length=max_len).input_ids)
  labels = torch.LongTensor(labels)

  return texts, labels

보시다시피 self-attention의 구현이 어렵지, Transformer layer 하나 구현하는 것은 수업 때 다룬 그림과 크게 구분되지 않는다는 점을 알 수 있습니다.

### 1.2. DataLoader 생성

In [30]:
train_loader = DataLoader(
    ds['train'], batch_size=64, shuffle=True, collate_fn=collate_fn
)
test_loader = DataLoader(
    ds['test'], batch_size=64, shuffle=False, collate_fn=collate_fn
)

## 2. 모델 구현

### 2.1. [MY_CODE] MHA Model

MutiHead-attention

In [31]:
class MHA(nn.Module):
    def __init__(self, d_model, n_heads):
        super().__init__()
        assert d_model % n_heads == 0, "n_heads는 d_model로 나눠 떨어지는 값이어야 합니다."

        self.n_heads = n_heads
        self.depth = d_model // n_heads

        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)

        self.dense = nn.Linear(d_model, d_model)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, mask):
        batch_size, seq_len, _ = x.size()

        # [MHA구현 1] Q, K, V 생성
        Q = self.wq(x)
        K = self.wk(x)
        V = self.wv(x)

        # [MHA구현 2] reshape
        Q = Q.view(batch_size, seq_len, self.n_heads, self.depth)
        K = K.view(batch_size, seq_len, self.n_heads, self.depth)
        V = V.view(batch_size, seq_len, self.n_heads, self.depth)

        # [MHA구현 3] shape으로 transpose
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        # [MHA구현 4]
        scores = torch.matmul(Q, K.transpose(-2, -1)) / sqrt(self.depth)  # (batch_size, n_heads, seq_len, seq_len)

        # [MHA구현 5] 마스킹 추가
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        # [MHA구현 6] Softmax 가중치 계산
        attention_weights = self.softmax(scores)

        context = torch.matmul(attention_weights, V)

        # [MHA구현 7]
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, -1)

        # 최종 선형 변환을 통해 Multi-Head Attention 출력
        output = self.dense(context)

        return output

### 2.2.  [MY_CODE] TransformerLayer Model

multihead-attention과 feed-forward layer를 구현한 모습

In [32]:
class TransformerLayer(nn.Module):
    """
    Transformer Layer 구현
    - MHA: Multi-Head Attention
    - FFN: Feed-Forward Network
    """
    def __init__(self, d_model, dff, n_heads, dropout_rate=0.1):
        super().__init__()
        # MHA: Multi-Head Attention
        self.mha = MHA(d_model, n_heads)

        # FFN: Feed-Forward Network
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff), # 입력 차원 -> 확장 차원
            nn.ReLU(),               # 활성화 함수
            nn.Linear(dff, d_model)  # 확장 차원 -> 원래 차원
        )

        # Layer Normalization 및 Dropout
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.dropout2 = nn.Dropout(dropout_rate)

    def forward(self, x, mask):
        attn_output = self.mha(x, mask)
        x1 = self.layernorm1(x + self.dropout1(attn_output))  # Residual + Dropout + LayerNorm

        ffn_output = self.ffn(x1)                             # Feed-Forward Network 계산
        x2 = self.layernorm2(x1 + self.dropout2(ffn_output))  # Residual + Dropout + LayerNorm

        return x2

### 2.3. [MY_CODE] TextClassifier Model

5-layer 4-head Transformer

In [33]:
class TextClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, dff, n_heads, max_len, dropout_rate=0.1):
        super().__init__()

        self.vocab_size = vocab_size
        self.d_model = d_model
        self.n_layers = n_layers
        self.dff = dff

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.parameter.Parameter(positional_encoding(max_len, d_model), requires_grad=False)
        self.layers = nn.ModuleList([TransformerLayer(d_model, dff, n_heads, dropout_rate) for _ in range(self.n_layers)])

        self.classification = nn.Linear(d_model, 1)

    def forward(self, x):
        mask = (x == tokenizer.pad_token_id)
        mask = mask[:, None, None, :]

        seq_len = x.shape[1]
        x = self.embedding(x)
        x = x * sqrt(self.d_model)
        x = x + self.pos_encoding[:, :seq_len]


        for layer in self.layers:
            x = layer(x, mask)

        x = x[:, 0]
        x = self.classification(x)

        return x

## 3. Positional encoding

In [34]:
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, None], np.arange(d_model)[None, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[None, ...]

    return torch.FloatTensor(pos_encoding)


max_len = 400
print(f"[LOG] {positional_encoding(max_len, 256).shape}")

[LOG] torch.Size([1, 400, 256])


## 4. [MY_CODE] 모델, 손실 함수, 옵티마이저 설정

In [35]:
# 모델 초기화
vocab_size = len(tokenizer)
d_model = 32
n_layers = 5
dff = 32
n_heads = 4
max_len = 400
dropout_rate = 0.1
lr = 0.001

# 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = TextClassifier(vocab_size, d_model, n_layers, dff, n_heads, max_len, dropout_rate).to(device)
optimizer = Adam(model.parameters(), lr=lr)
loss_fn = nn.BCEWithLogitsLoss()


### 4.1. model의 정확도를 측정하는 함수

In [36]:
def accuracy(model, dataloader):
  cnt = 0
  acc = 0

  for data in dataloader:
    inputs, labels = data
    inputs, labels = inputs.to(device), labels.to(device)

    preds = model(inputs)
    # preds = torch.argmax(preds, dim=-1)
    preds = (preds > 0).long()[..., 0]

    cnt += labels.size(0)
    acc += (labels == preds).sum().item()

  return acc / cnt

### 4.2. [MY_CODE] plot 함수

In [37]:
def plot_acc(train_accs, test_accs, title, label1='train', label2='test'):
    """
    학습 정확도 및 테스트 정확도를 시각화하는 함수
    Args:
        train_accs: 각 epoch별 학습 정확도 리스트
        test_accs: 각 epoch별 테스트 정확도 리스트
        title: 그래프 제목
        label1: 학습 데이터 라벨
        label2: 테스트 데이터 라벨
    """
    plt.figure(figsize=(10, 5))
    x = np.arange(len(train_accs))
    plt.plot(x, train_accs, label=label1)
    plt.plot(x, test_accs, label=label2)
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.title(title)
    plt.show()

## 5. [MY_CODE] 학습

In [None]:
train_loss_list = []  # 에포크별 학습 손실
train_acc_list = []   # 에포크별 학습 정확도
test_acc_list = []    # 에포크별 테스트 정확도

n_epochs = 15  # 총 학습 에포크 수

for epoch in range(n_epochs):
    total_loss = 0.0  # 에포크별 총 손실 초기화
    model.train()  # 모델 학습 모드 설정

    # [학습 루프]
    for data in train_loader:
        model.zero_grad()
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device).float()

        preds = model(inputs)[...,0]
        loss = loss_fn(preds, labels)
        loss.backward()
        optimizer.step()

        # 손실 누적
        total_loss += loss.item()

    # 학습 손실 기록
    train_loss_list.append(total_loss)

    # [평가 루프]
    with torch.no_grad():
        model.eval()  # 모델 평가 모드 설정
        train_acc = accuracy(model, train_loader)  # 학습 데이터 정확도 계산
        test_acc = accuracy(model, test_loader)    # 테스트 데이터 정확도 계산

    # 정확도 기록
    train_acc_list.append(train_acc)
    test_acc_list.append(test_acc)

    # 에포크 결과 출력
    print(f"[LOG] Epoch {epoch+1}/{n_epochs} | Train Loss: {total_loss:.4f} | Train Acc: {train_acc:.3f} | Test Acc: {test_acc:.3f}")

## 5.1 [MY_CODE] 그래프 출력

In [None]:
# [MYCODE] 학습 완료 후 그래프 출력
plot_acc(train_acc_list, test_acc_list, title="Train vs Test Accuracy")