# <p style= "font-weight: bold; font-size: 40px;">"KO-EN Translator" by implementation of Transformer model</p>

## <p style= "font-weight: bold">1. Library & GPU setting(Mac)</p>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import nltk
from konlpy.tag import Mecab
from torch.nn.utils.rnn import pad_sequence
from collections import Counter # used for building vocabulary
from einops import rearrange, reduce, repeat
from icecream import ic
from tqdm import tqdm
import gc
import numpy as np
import wandb
import time
import copy
from collections import defaultdict



print("MPS is available:", torch.backends.mps.is_available())
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
FP16 = True



MPS is available: True


## <p style= "font-weight: bold">2. DataRead&Tokenizing</p>

In [2]:
def read_file(data_path):
  with open(data_path, 'r', encoding='utf-8') as file:
    content = file.readlines()

  content = [line.strip() for line in content]
  return content

train_src = read_file('./data/korean-english-park.train/korean-english-park.train.ko')
train_trg = read_file('./data/korean-english-park.train/korean-english-park.train.en')
valid_src = read_file('./data/korean-english-park.dev/korean-english-park.dev.ko')
valid_trg = read_file('./data/korean-english-park.dev/korean-english-park.dev.en')

mecab = Mecab()

def tokenizer_ko(ko_sentences):
  ko_sentences = [mecab.morphs(sentence) for sentence in ko_sentences]
  return ko_sentences

def tokenizer_en(en_sentences):
  en_sentences = [nltk.word_tokenize(sentence) for sentence in en_sentences]
  return en_sentences

train_src = tokenizer_ko(train_src)
train_trg = tokenizer_en(train_trg)
valid_src = tokenizer_ko(valid_src)
valid_trg = tokenizer_en(valid_trg)

In [3]:
print(train_src[0: 5])
print("#######################")
print(train_trg[0: 5])

[['개인', '용', '컴퓨터', '사용', '의', '상당', '부분', '은', '"', '이것', '보다', '뛰어날', '수', '있', '느냐', '?', '"'], ['모든', '광', '마우스', '와', '마찬가지', '로', '이', '광', '마우스', '도', '책상', '위', '에', '놓', '는', '마우스', '패드', '를', '필요', '로', '하', '지', '않', '는다', '.'], ['그러나', '이것', '은', '또한', '책상', '도', '필요', '로', '하', '지', '않', '는다', '.'], ['79', '.', '95', '달러', '하', '는', '이', '최첨단', '무선', '광', '마우스', '는', '허공', '에서', '팔목', ',', '팔', ',', '그', '외', '에', '어떤', '부분', '이', '든', '그', '움직임', '에', '따라', '커서', '의', '움직임', '을', '조절', '하', '는', '회전', '운동', '센서', '를', '사용', '하', '고', '있', '다', '.'], ['정보', '관리', '들', '은', '동남', '아시아', '에서', '의', '선박', '들', '에', '대한', '많', '은', '(', '테러', ')', '계획', '들', '이', '실패', '로', '돌아갔', '음', '을', '밝혔으며', ',', '세계', '해상', '교역', '량', '의', '거의', '3', '분', '의', '1', '을', '운송', '하', '는', '좁', '은', '해', '로', '인', '말라카', '해협', '이', '테러', '공격', '을', '당하', '기', '쉽', '다고', '경고', '하', '고', '있', '다', '.']]
#######################


## <p style= "font-weight: bold">3. Build vacabulary</p>

In [4]:
def build_vocab(tokenized_sentences, max_size = 50000, min_freq = 2):
  word_counts = Counter()

  for sentence in tokenized_sentences:
    word_counts.update(sentence)

  special_token = ['<pad>', '<sos>', '<eos>', '<unk>']
  vocab = {token: idx for idx, token in enumerate(special_token)}

  words = [word for word, count in word_counts.most_common(max_size - len(special_token)) if count >= min_freq]

  for word in words:
    vocab[word] = len(vocab)

  idx2word = {idx: token for token, idx in vocab.items()}

  return vocab, idx2word

In [5]:
vocab_src, idx2word_src = build_vocab(train_src)
vocab_trg, idx2word_trg = build_vocab(train_trg)
vocab_src_valid, idx2word_src_valid = build_vocab(valid_src)
vocab_trg_valid, idx2word_trg_valid = build_vocab(valid_trg)

In [6]:
PAD_TOKEN = '<pad>'
SOS_TOKEN = '<sos>'
EOS_TOKEN = '<eos>'
UNK_TOKEN = '<unk>'

N = 2
BATCH_SIZE = 32
HIDDEN_DIM = 64
INNER_DIM = 128
LEARNING_RATE = 1e-4
NUM_HEAD = 4 
WEIGHT_DECAY = 0


SEQ_LEN = 64

class TrainDataset(Dataset):
  def __init__(self, src_data, trg_data, src_vocab, trg_vocab):
    super().__init__()
    self.src_data = src_data
    self.trg_data = trg_data
    self.src_vocab = src_vocab
    self.trg_vocab = trg_vocab
  
  def __len__(self):
    return len(self.src_data)
  
  def __getitem__(self, idx):
    src = [SOS_TOKEN] + self.src_data[idx] + [EOS_TOKEN]
    trg_input = [SOS_TOKEN] + self.trg_data[idx]
    trg_output = self.trg_data[idx] + [EOS_TOKEN]
    
    # 토큰을 인덱스로 변환
    src_indices = [self.src_vocab.get(token, self.src_vocab[UNK_TOKEN]) for token in src]
    trg_input_indices = [self.trg_vocab.get(token, self.trg_vocab[UNK_TOKEN]) for token in trg_input]
    trg_output_indices = [self.trg_vocab.get(token, self.trg_vocab[UNK_TOKEN]) for token in trg_output]
    
    # 텐서로 변환
    src_tensor = torch.tensor(src_indices, dtype=torch.long)
    trg_input_tensor = torch.tensor(trg_input_indices, dtype=torch.long)
    trg_output_tensor = torch.tensor(trg_output_indices, dtype=torch.long)
    
    return {
        'src': src_tensor,
        'trg_input': trg_input_tensor,
        'trg_output': trg_output_tensor
    }
  
class ValidDataset(Dataset):
  def __init__(self, src_data, trg_data, src_vocab, trg_vocab):
    super().__init__()
    self.src_data = src_data
    self.trg_data = trg_data
    self.src_vocab = src_vocab
    self.trg_vocab = trg_vocab
  
  def __len__(self):
    return len(self.src_data)
  
  def __getitem__(self, idx):
    src = [SOS_TOKEN] + self.src_data[idx] + [EOS_TOKEN]
    trg_input = [SOS_TOKEN] + self.trg_data[idx]
    trg_output = self.trg_data[idx] + [EOS_TOKEN]
    
    # 토큰을 인덱스로 변환
    src_indices = [self.src_vocab.get(token, self.src_vocab[UNK_TOKEN]) for token in src]
    trg_input_indices = [self.trg_vocab.get(token, self.trg_vocab[UNK_TOKEN]) for token in trg_input]
    trg_output_indices = [self.trg_vocab.get(token, self.trg_vocab[UNK_TOKEN]) for token in trg_output]
    
    # 텐서로 변환
    src_tensor = torch.tensor(src_indices, dtype=torch.long)
    trg_input_tensor = torch.tensor(trg_input_indices, dtype=torch.long)
    trg_output_tensor = torch.tensor(trg_output_indices, dtype=torch.long)
    
    return {
        'src': src_tensor,
        'trg_input': trg_input_tensor,
        'trg_output': trg_output_tensor
    }    


In [7]:
def collate_fn(batch):
  src_tensors = [item['src'] for item in batch]
  trg_input_tensors = [item['trg_input'] for item in batch]
  trg_output_tensors = [item['trg_output'] for item in batch]

  src_padded = pad_sequence(src_tensors, batch_first=True, padding_value=vocab_src[PAD_TOKEN])
  trg_input_padded = pad_sequence(trg_input_tensors, batch_first=True, padding_value=vocab_trg[PAD_TOKEN])
  trg_output_padded = pad_sequence(trg_output_tensors, batch_first=True, padding_value=vocab_trg[PAD_TOKEN])

  return {
    'src': src_padded,
    'trg_input': trg_input_padded,
    'trg_output': trg_output_padded
  }

In [8]:

# 데이터로더 정의
train_dataset = TrainDataset(train_src, train_trg, vocab_src, vocab_trg)
train_dataloader = DataLoader(
    train_dataset, 
    batch_size=BATCH_SIZE, 
    collate_fn = collate_fn,
    shuffle=True, 
    pin_memory=True
)

valid_dataset = ValidDataset(valid_src, valid_trg, vocab_src_valid, vocab_trg_valid)
valid_dataloader = DataLoader(
    valid_dataset, 
    batch_size=BATCH_SIZE, 
    collate_fn = collate_fn,
    shuffle=False,
    pin_memory=True
)

In [9]:
# 첫 번째 배치 가져와서 확인하기
for batch in train_dataloader:
    print("배치 타입:", type(batch))
    print("배치 키:", batch.keys())
    
    # 소스 문장 확인
    print("\n소스 텐서 형태:", batch['src'].shape)
    print("소스 텐서 예시 (첫 번째 문장):", batch['src'][1])
    print("소스 입력:", [idx2word_src.get(idx.item(), "<unk>") for idx in batch['src'][0]])
    
    # 타겟 입력 확인
    print("\n타겟 입력 텐서 형태:", batch['trg_input'].shape)
    print("타겟 입력 텐서 예시 (첫 번째 문장):", batch['trg_input'][0])
    
    # 타겟 출력 확인
    print("\n타겟 출력 텐서 형태:", batch['trg_output'].shape) 
    print("타겟 출력 텐서 예시 (첫 번째 문장):", batch['trg_output'][0])
    
    # 실제 토큰으로 변환 (선택적)
    print("\n첫 번째 문장 해석:")
    print("소스:", [idx2word_src.get(idx.item(), "<unk>") for idx in batch['src'][0]])
    print("타겟 입력:", [idx2word_trg.get(idx.item(), "<unk>") for idx in batch['trg_input'][0]])
    print("타겟 출력:", [idx2word_trg.get(idx.item(), "<unk>") for idx in batch['trg_output'][0]])
    
    # 패딩 비율 계산
    src_pad_count = (batch['src'] == vocab_src[PAD_TOKEN]).sum().item()
    src_total = batch['src'].numel()
    print(f"\n패딩 비율: {src_pad_count/src_total:.2%}")
    
    # 한 배치만 확인하고 종료
    break

배치 타입: <class 'dict'>
배치 키: dict_keys(['src', 'trg_input', 'trg_output'])

소스 텐서 형태: torch.Size([32, 61])
소스 텐서 예시 (첫 번째 문장): tensor([    1, 34183,  9572, 15519, 10159,   375,  8574, 19174,  6795,  4578,
         2611,    10,  8271,    21,  2709,  6071,    19, 17608,  2163,  3183,
           10,  7279,   408,     4,   188,     2,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0])
소스 입력: ['<sos>', '미시시피', '외', '에', '앨라배마', '와', '빌', '클린턴', '전', '대통령', '이', '주지사', '를', '지낸', '아칸소', '도', '인종', '에', '따라', '선호', '하', '는', '후보', '가', '극명', '하', '게', '갈렸', '다', '.', '<eos>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', '<pad>', 

In [10]:
class PositionalEncodeing(nn.Module):
  def __init__(self, hidden_dim, max_len = 5000):
    super().__init__()

    pe = torch.zeros(max_len, hidden_dim)
    pos = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
    _2i = torch.arange(0, hidden_dim, step=2, dtype=torch.float)

    pe[:, 0::2] = torch.sin(pos/(10000**(_2i/hidden_dim)))
    pe[:, 1::2] = torch.cos(pos/(10000**(_2i/hidden_dim)))

    pe = pe.unsqueeze(0)
    self.register_buffer('pe', pe)

  def forward(self, x):
    x = x + self.pe[:, :x.size(1), :]

    return x
  

In [11]:
def makeMask(tensor: torch.Tensor, option: str):
    # tensor -> bs, seq_len
    if option == 'padding':
        # 패딩 토큰(PAD_TOKEN)이 있는 위치에 1, 그 외에는 0
        mask = (tensor == vocab_src[PAD_TOKEN]).to(device).unsqueeze(1).unsqueeze(2)
        # print('마스크 함수 :', mask.size())
        
    elif option == 'look_ahead':
        # 패딩 마스크: 패딩 토큰이 있는 위치에 1
        padding_mask = (tensor == vocab_src[PAD_TOKEN]).to(device).unsqueeze(1).unsqueeze(2)
        
        # 룩어헤드 마스크: 미래 토큰이 있는 위치에 1 (대각선 위)
        look_ahead_mask = torch.triu(torch.ones(tensor.size(1), tensor.size(1)), diagonal=1).to(device).bool()
        look_ahead_mask = look_ahead_mask.to(device).unsqueeze(0).unsqueeze(1)
        
        # 두 마스크 결합 (OR 연산): 패딩이거나 미래 토큰이면 1
        mask = padding_mask | look_ahead_mask
    
    return mask

In [12]:
def test_mask_function():
    # 테스트용 텐서 생성
    batch_size = 2
    num_head = 2
    seq_len = 5
    pad_idx = vocab_src[PAD_TOKEN]  # 실제 값으로 대체 필요
    
    # 샘플 시퀀스 생성 (마지막 위치에 패딩 포함)
    test_tensor = torch.ones(batch_size, seq_len, dtype=torch.long)
    test_tensor[0, -1] = pad_idx  # 첫 번째 샘플의 마지막 토큰을 패딩으로 설정
    test_tensor[1, -2:] = pad_idx  # 두 번째 샘플의 마지막 두 토큰을 패딩으로 설정
    
    print("Test tensor:")
    print(test_tensor)
    
    # 패딩 마스크 테스트
    padding_mask = makeMask(test_tensor, 'padding')
    print("\nPadding mask:")
    print(padding_mask)
    print(padding_mask.shape)
    
    # Look-ahead 마스크 테스트
    look_ahead_mask = makeMask(test_tensor, 'look_ahead')
    print(look_ahead_mask.shape)
    print("\nLook-ahead mask (combined with padding):")
    for i in range(batch_size):
        print(f"Sample {i}:")
        print(look_ahead_mask[i])

# 테스트 실행
test_mask_function()

Test tensor:
tensor([[1, 1, 1, 1, 0],
        [1, 1, 1, 0, 0]])

Padding mask:
tensor([[[[False, False, False, False,  True]]],


        [[[False, False, False,  True,  True]]]], device='mps:0')
torch.Size([2, 1, 1, 5])
torch.Size([2, 1, 5, 5])

Look-ahead mask (combined with padding):
Sample 0:
tensor([[[False,  True,  True,  True,  True],
         [False, False,  True,  True,  True],
         [False, False, False,  True,  True],
         [False, False, False, False,  True],
         [False, False, False, False,  True]]], device='mps:0')
Sample 1:
tensor([[[False,  True,  True,  True,  True],
         [False, False,  True,  True,  True],
         [False, False, False,  True,  True],
         [False, False, False,  True,  True],
         [False, False, False,  True,  True]]], device='mps:0')


In [13]:
class MultiHeadAttention(nn.Module):
  def __init__(self, num_head, hidden_dim, device, dropout = 0.1):
    super().__init__()

    assert hidden_dim % num_head == 0


    self.num_head = num_head
    self.hidden_dim = hidden_dim
    self.head_dim = hidden_dim//num_head
    
    self.fc_q = nn.Linear(hidden_dim, hidden_dim)
    self.fc_k = nn.Linear(hidden_dim, hidden_dim)
    self.fc_v = nn.Linear(hidden_dim, hidden_dim)

    self.fc_o = nn.Linear(hidden_dim, hidden_dim)

    self.dropout = nn.Dropout(dropout)

    self.scale = torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float)).to(device)


  def forward(self, query, key, value, mask = None):
    batch_size = query.size(0)
    Q = self.fc_q(query)
    K = self.fc_k(key)
    V = self.fc_v(value)

    # (bs, seq_len, hidden_dim)

    Q = Q.view(batch_size, int(self.num_head), -1, int(self.head_dim))
    K = K.view(batch_size, int(self.num_head), -1, int(self.head_dim))
    V = V.view(batch_size, int(self.num_head), -1, int(self.head_dim))


    # (bs, num_head, seq_len, head_dim)

    score = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

    if mask is not None:
      # print('마스크 size', mask.size())
      score = score.masked_fill(mask == 1, -1e10)

    score = torch.softmax(score, dim = -1)

    x = torch.matmul(self.dropout(score), V)
    
    x = x.permute(0, 2, 1, 3).contiguous()

    output = x.view(batch_size, -1, self.hidden_dim)

    output = self.fc_o(output)

    return output, score


In [14]:
x = 2
y = 2
x//y

1

In [15]:
class FeedForwardLayer(nn.Module):
  def __init__(self, hidden_dim, inner_dim, dropout = 0.1):
    super().__init__()

    self.linear1 = nn.Linear(hidden_dim, inner_dim)
    self.linear2 = nn.Linear(inner_dim, hidden_dim)
    self.dropout = nn.Dropout(dropout)

  
  def forward(self, x):
    x = torch.relu(self.linear1(x))
    x = self.dropout(x)
    x = self.linear2(x)

    return x



In [16]:
class EncoderLayer(nn.Module):
  def __init__(self, hidden_dim, num_head, inner_dim, device, dropout = 0.1):
    super().__init__()

    self.multiHeadAttention = MultiHeadAttention(num_head, hidden_dim, device)
    self.layerNorm1 = nn.LayerNorm(hidden_dim)
    self.ffn = FeedForwardLayer(hidden_dim, inner_dim)
    self.layerNorm2 = nn.LayerNorm(hidden_dim)

    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)

  def forward(self, input, mask=None):
    x, attention = self.multiHeadAttention(query=input, key=input, value=input, mask=mask)
    x = self.dropout1(x)
    x = input + x
    output = self.layerNorm1(x)
    output = self.ffn(output)
    output = self.dropout1(output)
    output = input + output
    output = self.layerNorm2(output)

    # bs seq_len hidden_dim

    return output


In [17]:
class Encoder(nn.Module):
  def __init__(self, N, hidden_dim, num_head, inner_dim, device):
    super().__init__()
    vocab_size = len(vocab_src)
    self.embedding = nn.Embedding(vocab_size, hidden_dim, padding_idx=0)
    self.positionalEncoding = PositionalEncodeing(hidden_dim)
    self.encoder_layers = nn.ModuleList([EncoderLayer(hidden_dim, num_head, inner_dim, device) for _ in range(N)])
    self.dropout = nn.Dropout(0.1)
    self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)
  
  def forward(self, input):
    embeded_input = self.embedding(input)*self.scale
    encoder_input = self.dropout(self.positionalEncoding(embeded_input))
    mask = makeMask(input, option='padding')
    for enc_layer in self.encoder_layers:
      output = enc_layer(encoder_input, mask)
      encoder_input = output

    return output


In [18]:
class DecoderLayers(nn.Module):
  def __init__(self, hidden_dim, num_head, inner_dim, device, dropout = 0.1):
    super().__init__()
    self.multiHeadAttention1 = MultiHeadAttention(num_head, hidden_dim, device)
    self.LayerNorm1 = nn.LayerNorm(hidden_dim)
    self.multiHeadAttention2 = MultiHeadAttention(num_head, hidden_dim, device)
    self.LayerNorm2 = nn.LayerNorm(hidden_dim)
    self.ffn = FeedForwardLayer(hidden_dim, inner_dim)
    self.LayerNorm3 = nn.LayerNorm(hidden_dim)

    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)
    self.dropout3 = nn.Dropout(dropout)

  def forward(self, input, encoder_output, paddingMask, lookaheadMask):
    x, attention = self.multiHeadAttention1(query=input, key=input, value=input, mask=lookaheadMask)
    x = self.dropout1(x)
    x = input + x
    x = self.LayerNorm1(x)

    output_, attention = self.multiHeadAttention2(query=x, key=encoder_output, value=encoder_output, mask=paddingMask)
    output_ = self.dropout1(output_)
    output_ = x + output_
    output_ = self.LayerNorm2(output_)

    output = self.ffn(output_)
    output = self.dropout1(output)
    output = output_ + output
    output = self.LayerNorm3(output)

    return output


In [19]:
class Decoder(nn.Module):
  def __init__(self, N, hidden_dim, num_head, inner_dim, device):
    super().__init__()
    vocab_size = len(vocab_trg)
    self.embedding = nn.Embedding(vocab_size, hidden_dim)
    self.positionalEmbedding = PositionalEncodeing(hidden_dim)
    self.decoder_layers = nn.ModuleList([DecoderLayers(hidden_dim, num_head, inner_dim, device) for _ in range(N)])
    self.dropout = nn.Dropout(0.1)
    self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)
    self.finalFc = nn.Linear(hidden_dim, vocab_size)


  def forward(self, input, enc_src, encoder_output):
    embedding_input = self.embedding(input) * self.scale
    decoder_input = self.positionalEmbedding(embedding_input)
    decoder_input = self.dropout(decoder_input)
    lookaheadMask = makeMask(input, option='look_ahead')
    paddingMask = makeMask(enc_src, option='padding')
    for decoder_layer in self.decoder_layers:
      output = decoder_layer(decoder_input, encoder_output, paddingMask, lookaheadMask)
      decoder_input = output

    logits = self.finalFc(output)
    output = torch.softmax(logits, dim = -1)
    output = torch.argmax(output, dim = -1)
    return logits, output
  

In [20]:
class Transformer(nn.Module):
  def __init__(self, N, hidden_dim, num_head, inner_dim, device):
    super().__init__()
    vocab_size = len(valid_trg)

    self.encoder = Encoder(N, hidden_dim, num_head, inner_dim, device)
    self.decoder = Decoder(N, hidden_dim, num_head, inner_dim, device)

    self.output_layer = nn.Linear(hidden_dim, vocab_size)

  def forward(self, enc_src, dec_src):
    # print('enc_src:', enc_src.size())
    # print('dec_src:', dec_src.size())
    encoder_output = self.encoder(enc_src)
    # print('enc_output:', encoder_output.size())
    logits, output = self.decoder(dec_src, enc_src, encoder_output)
    # print('dec_output:', output.size())

    # print('output size:', output.size())

    return logits, output

In [21]:
model = Transformer(N, HIDDEN_DIM, NUM_HEAD, INNER_DIM, device).to(device)
ic.disable()

In [22]:
for batch in train_dataloader:
    src = batch['src']
    trg_input = batch['trg_input']
    break

from torchinfo import summary

summary(model, 
        input_data1=src, input_data2=trg_input,  # 두 개의 입력 텐서 전달
        )

Layer (type:depth-idx)                        Param #
Transformer                                   --
├─Encoder: 1-1                                --
│    └─Embedding: 2-1                         2,280,256
│    └─PositionalEncodeing: 2-2               --
│    └─ModuleList: 2-3                        --
│    │    └─EncoderLayer: 3-1                 33,472
│    │    └─EncoderLayer: 3-2                 33,472
│    └─Dropout: 2-4                           --
├─Decoder: 1-2                                --
│    └─Embedding: 2-5                         2,522,304
│    └─PositionalEncodeing: 2-6               --
│    └─ModuleList: 2-7                        --
│    │    └─DecoderLayers: 3-3                50,240
│    │    └─DecoderLayers: 3-4                50,240
│    └─Dropout: 2-8                           --
│    └─Linear: 2-9                            2,561,715
├─Linear: 1-3                                 65,000
Total params: 7,596,699
Trainable params: 7,596,699
Non-trainable params

In [23]:
for param in model.named_parameters():
    if 'weight' in param[0] and 'layerNorm' not in param[0]:
        # 텐서가 최소 2차원 이상인지 확인
        if param[1].dim() >= 2:
            torch.nn.init.xavier_uniform_(param[1])

In [24]:
optimizer = torch.optim.Adam(params = model.parameters(), lr = LEARNING_RATE, weight_decay = WEIGHT_DECAY)

In [25]:
def criterion(logits: torch.tensor, targets: torch.tensor):
    return nn.CrossEntropyLoss(ignore_index=0)(logits.view(-1, len(vocab_trg)), targets.view(-1))

In [26]:
def train_one_epoch(model, optimizer, scheduler, dataloader, device, epoch):
    # train 모드로 변경
    model.train()
    
    dataset_size = 0
    running_loss = 0
    running_accuracy = 0
    accuracy = 0
    
    bar = tqdm(enumerate(dataloader), total=len(dataloader))
    
    for step, batch in bar:
        # 데이터 로더가 딕셔너리를 반환하는 경우
        if isinstance(batch, dict):
            src = batch['src'].to(device)
            trg_input = batch['trg_input'].to(device) 
            trg_output = batch['trg_output'].to(device)
        # 데이터 로더가 튜플이나 리스트를 반환하는 경우
        elif isinstance(batch, (tuple, list)):
            src, trg_input, trg_output = batch
            src = src.to(device)
            trg_input = trg_input.to(device)
            trg_output = trg_output.to(device)
        else:
            raise TypeError(f"Unexpected batch type: {type(batch)}")
        
        batch_size = src.shape[0]
        
        # 그래디언트 초기화
        optimizer.zero_grad()
        
        # 순전파
        logits, output = model(enc_src=src, dec_src=trg_input)
        
        # 손실 계산
        loss = criterion(logits, trg_output)
        
        # 역전파
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # 그래디언트 클리핑
        optimizer.step()
        
        # 스케줄러 업데이트
        if scheduler is not None:
            scheduler.step()
        
        # 손실 및 정확도 업데이트
        running_loss += loss.item() * batch_size
        running_accuracy = np.mean(
            output.view(-1).detach().cpu().numpy() == trg_output.view(-1).detach().cpu().numpy())
        
        accuracy += running_accuracy
        dataset_size += batch_size
        
        # 현재 에포크의 평균 손실 계산
        epoch_loss = running_loss / dataset_size
        
        # 진행 바 업데이트
        bar.set_postfix(
            Epoch=epoch, 
            Train_Loss=epoch_loss, 
            LR=optimizer.param_groups[0]["lr"], 
            Accuracy=accuracy / np.float64(step+1)
        )
    
    # 전체 에포크의 평균 정확도 계산
    accuracy /= len(dataloader)
    
    # 메모리 정리
    gc.collect()
    
    # 손실과 정확도 반환
    return epoch_loss, accuracy

In [27]:
@torch.no_grad()
def valid_one_epoch(model, dataloader, device, epoch):
    model.eval()

    dataset_size = 0
    running_loss = 0
    accuracy = 0

    bar = tqdm(enumerate(dataloader), total=len(dataloader))

    for step, batch in bar:
        # 데이터 로더가 딕셔너리를 반환하는 경우
        if isinstance(batch, dict):
            src = batch['src'].to(device)
            trg_input = batch['trg_input'].to(device) 
            trg_output = batch['trg_output'].to(device)
        # 데이터 로더가 튜플이나 리스트를 반환하는 경우
        elif isinstance(batch, (tuple, list)):
            src, trg_input, trg_output = batch
            src = src.to(device)
            trg_input = trg_input.to(device)
            trg_output = trg_output.to(device)
        else:
            raise TypeError(f"Unexpected batch type: {type(batch)}")
        
        batch_size = src.shape[0]

        # MPS는 일부 연산에서 문제가 발생할 수 있으므로, 필요 시 CPU로 폴백
        try:
            logits, output = model(enc_src=src, dec_src=trg_input)
            loss = criterion(logits, trg_output)
        except RuntimeError as e:
            # MPS에서 지원하지 않는 연산이 있을 경우
            if "MPS" in str(e):
                print(f"MPS 오류 발생, CPU로 폴백: {e}")
                # 데이터와 모델을 CPU로 임시 이동
                model_cpu = model.to("cpu")
                src_cpu = src.to("cpu")
                trg_input_cpu = trg_input.to("cpu")
                trg_output_cpu = trg_output.to("cpu")
                
                logits, output = model_cpu(enc_src=src_cpu, dec_src=trg_input_cpu)
                loss = criterion(logits, trg_output_cpu)
                
                # 모델을 다시 원래 장치로 복원
                model.to(device)
            else:
                raise e

        running_loss += loss.item() * batch_size
        dataset_size += batch_size

        # 실시간으로 정보를 표시하기 위한 epoch loss
        val_loss = running_loss / dataset_size
        
        # 출력을 CPU로 이동하여 NumPy 배열로 변환
        output_cpu = output.view(-1).detach().cpu().numpy()
        trg_output_cpu = trg_output.view(-1).detach().cpu().numpy()
        running_accuracy = np.mean(output_cpu == trg_output_cpu)
        
        accuracy += running_accuracy

        bar.set_postfix(
            Epoch=epoch, Valid_Loss=val_loss, LR=optimizer.param_groups[0]["lr"], 
            accuracy=accuracy / float(step + 1)  # np.float64 대신 float 사용
        )

    accuracy /= len(dataloader)

    gc.collect()

    return val_loss, accuracy

In [28]:
def run_training(
    model,
    optimizer,
    scheduler,
    device,
    num_epochs,
    metric_prefix="",
    file_prefix="",
    early_stopping=True,
    early_stopping_step=10,
):
    # To automatically log graidents
    # wandb.watch(model, log_freq=100)

    if torch.backends.mps.is_available():
        print("[INFO] Using Apple Silicon GPU with MPS")
    elif torch.cuda.is_available():
        print("[INFO] Using GPU: {}".format(torch.cuda.get_device_name()))
    else:
        print("[INFO] Using CPU")

    start = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = np.inf
    history = defaultdict(list)
    early_stop_counter = 0

    # num_epochs만큼, train과 val을 실행한다
    for epoch in range(1, num_epochs + 1):
        gc.collect()

        train_epoch_loss, train_accuracy = train_one_epoch(
            model,
            optimizer,
            scheduler,
            dataloader=train_dataloader,
            device=device,
            epoch=epoch,
        )

        val_loss, val_accuracy = valid_one_epoch(
            model, valid_dataloader, device=device, epoch=epoch
        )

        history[f"{metric_prefix}Train Loss"].append(train_epoch_loss)
        history[f"{metric_prefix}Train Accuracy"].append(train_accuracy)
        history[f"{metric_prefix}Valid Loss"].append(val_loss)
        history[f"{metric_prefix}Valid Accuracy"].append(val_accuracy)

        # Log the metrics
        # wandb.log(
        #     {
        #         f"{metric_prefix}Train Loss": train_epoch_loss,
        #         f"{metric_prefix}Valid Loss": val_loss,
        #         f"{metric_prefix}Train Accuracy": train_accuracy,
        #         f"{metric_prefix}Valid Accuracy": val_accuracy,
        #     }
        # )

        print(f"Valid Loss: {val_loss}")

        # deep copy the model
        if val_loss <= best_loss:
            early_stop_counter = 0

            print(
                f"Validation Loss improved ({best_loss} ---> {val_loss})"
            )

            # Update Best Loss
            best_loss = val_loss
            
            # MacOS에서는 모델 저장 전에 CPU로 이동 (권장)
            if 'mps' in str(device):
                cpu_model = copy.deepcopy(model).to('cpu')
                best_model_wts = copy.deepcopy(cpu_model.state_dict())
                
                PATH = "{}epoch{:.0f}_Loss{:.4f}.bin".format(file_prefix, epoch, best_loss)
                torch.save(cpu_model.state_dict(), PATH)
                torch.save(cpu_model.state_dict(), f"{file_prefix}best_{epoch}epoch.bin")
            else:
                # 기존 방식대로 저장
                best_model_wts = copy.deepcopy(model.state_dict())
                
                PATH = "{}epoch{:.0f}_Loss{:.4f}.bin".format(file_prefix, epoch, best_loss)
                torch.save(model.state_dict(), PATH)
                torch.save(model.state_dict(), f"{file_prefix}best_{epoch}epoch.bin")
            
            # Save a model file from the current directory
            # wandb.save(PATH)
            print(f"Model Saved")

        elif early_stopping:
            early_stop_counter += 1
            if early_stop_counter > early_stopping_step:
                break

    end = time.time()
    time_elapsed = end - start
    print(
        "Training complete in {:.0f}h {:.0f}m {:.0f}s".format(
            time_elapsed // 3600,
            (time_elapsed % 3600) // 60,
            (time_elapsed % 3600) % 60,
        )
    )
    print("Best Loss: {:.4f}".format(best_loss))

    # load best model weights
    model.load_state_dict(best_model_wts)

    return model, history

In [29]:
# wandb 초기화
# wandb.init(project="korean-english-translator", name="transformer-training")

# 학습 실행
run_training(
    model=model,
    optimizer=optimizer,
    scheduler=torch.optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=100, eta_min=1e-5),
    device=device,
    num_epochs=6,
    metric_prefix="",
    file_prefix="",
    early_stopping=True,
    early_stopping_step=10,
)

[INFO] Using Apple Silicon GPU with MPS


100%|██████████| 2942/2942 [05:48<00:00,  8.44it/s, Accuracy=0.0269, Epoch=1, LR=4.38e-5, Train_Loss=7.7] 
100%|██████████| 32/32 [00:02<00:00, 13.52it/s, Epoch=1, LR=4.38e-5, Valid_Loss=6.09, accuracy=0.045] 


Valid Loss: 6.090151660919189
Validation Loss improved (inf ---> 6.090151660919189)
Model Saved


100%|██████████| 2942/2942 [06:08<00:00,  7.98it/s, Accuracy=0.0673, Epoch=2, LR=1.56e-5, Train_Loss=6.56]
100%|██████████| 32/32 [00:01<00:00, 19.71it/s, Epoch=2, LR=1.56e-5, Valid_Loss=6.2, accuracy=0.072]  


Valid Loss: 6.202142143249512


100%|██████████| 2942/2942 [05:49<00:00,  8.43it/s, Accuracy=0.0878, Epoch=3, LR=8.58e-5, Train_Loss=6.14]
100%|██████████| 32/32 [00:01<00:00, 17.80it/s, Epoch=3, LR=8.58e-5, Valid_Loss=6.46, accuracy=0.0678]


Valid Loss: 6.45847811126709


100%|██████████| 2942/2942 [05:52<00:00,  8.35it/s, Accuracy=0.0989, Epoch=4, LR=7.91e-5, Train_Loss=5.89]
100%|██████████| 32/32 [00:02<00:00, 14.53it/s, Epoch=4, LR=7.91e-5, Valid_Loss=6.6, accuracy=0.0661] 


Valid Loss: 6.604831245422363


100%|██████████| 2942/2942 [05:45<00:00,  8.51it/s, Accuracy=0.106, Epoch=5, LR=1.22e-5, Train_Loss=5.71]
100%|██████████| 32/32 [00:01<00:00, 18.79it/s, Epoch=5, LR=1.22e-5, Valid_Loss=6.77, accuracy=0.0626]


Valid Loss: 6.76709508895874


100%|██████████| 2942/2942 [05:47<00:00,  8.46it/s, Accuracy=0.11, Epoch=6, LR=5.22e-5, Train_Loss=5.57] 
100%|██████████| 32/32 [00:01<00:00, 20.25it/s, Epoch=6, LR=5.22e-5, Valid_Loss=6.86, accuracy=0.0605]


Valid Loss: 6.857190994262695
Training complete in 0h 35m 28s
Best Loss: 6.0902


(Transformer(
   (encoder): Encoder(
     (embedding): Embedding(35629, 64, padding_idx=0)
     (positionalEncoding): PositionalEncodeing()
     (encoder_layers): ModuleList(
       (0-1): 2 x EncoderLayer(
         (multiHeadAttention): MultiHeadAttention(
           (fc_q): Linear(in_features=64, out_features=64, bias=True)
           (fc_k): Linear(in_features=64, out_features=64, bias=True)
           (fc_v): Linear(in_features=64, out_features=64, bias=True)
           (fc_o): Linear(in_features=64, out_features=64, bias=True)
           (dropout): Dropout(p=0.1, inplace=False)
         )
         (layerNorm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
         (ffn): FeedForwardLayer(
           (linear1): Linear(in_features=64, out_features=128, bias=True)
           (linear2): Linear(in_features=128, out_features=64, bias=True)
           (dropout): Dropout(p=0.1, inplace=False)
         )
         (layerNorm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
  

In [30]:
torch.save(model.state_dict(), 'final.bin')
# wandb.save('final.bin')

In [31]:
# wandb.finish()

In [32]:
import torch
from konlpy.tag import Mecab

# 필요한 상수 정의
PAD_TOKEN = '<pad>'
SOS_TOKEN = '<sos>'
EOS_TOKEN = '<eos>'
UNK_TOKEN = '<unk>'

# MPS 사용 가능하면 사용
device = torch.device('cpu')
print(f"사용 장치: {device} (MPS 호환성 문제로 CPU 사용)")


# 이미 생성된 어휘 사전 사용
# vocab_src, vocab_trg는 이미 정의되어 있다고 가정
print("어휘 사전 사용 준비 완료")

# 모델 클래스 정의가 필요합니다. (이미 정의되어 있다고 가정)
# 실제 환경에 맞게 import 구문 수정 필요
# Transformer 클래스는 이미 정의되어 있다고 가정

# 모델 초기화 (학습 시 사용한 동일한 파라미터 필요)

# 모델 생성
model = Transformer(
    N=N, 
    hidden_dim=HIDDEN_DIM, 
    num_head=NUM_HEAD, 
    inner_dim=INNER_DIM, 
    device=device
)

# 모델 로드
model_path = 'final.bin'  # 저장된 모델 경로
try:
    # 로드된 객체 확인
    loaded_obj = torch.load(model_path, map_location=device)
    
    # 객체 유형에 따라 처리
    if isinstance(loaded_obj, dict) and 'state_dict' in loaded_obj:
        model.load_state_dict(loaded_obj['state_dict'])
    elif isinstance(loaded_obj, dict):
        model.load_state_dict(loaded_obj)
    else:
        model = loaded_obj
    
    model.to(device)
    model.eval()  # 평가 모드 설정
    print(f"모델 로드 완료: {model_path}")
except Exception as e:
    print(f"모델 로드 오류: {e}")
    exit(1)

# Mecab 초기화
mecab = Mecab()

# 번역 함수
def translate(sentence, max_length=50):
    """
    한국어 문장을 영어로 번역
    """
    # 형태소 분석
    tokens = mecab.morphs(sentence)
    
    # 특수 토큰 추가
    tokens = [SOS_TOKEN] + tokens + [EOS_TOKEN]
    
    # 토큰을 인덱스로 변환
    src_indices = [vocab_src.get(token, vocab_src[UNK_TOKEN]) for token in tokens]
    
    # 텐서 변환
    src_tensor = torch.LongTensor(src_indices).unsqueeze(0).to(device)
    
    # 번역 수행
    with torch.no_grad():
        encoder_output = model.encoder(src_tensor)
        
        # 디코더 입력 초기화 (시작 토큰)
        trg_indices = [vocab_trg[SOS_TOKEN]]
        trg_tensor = torch.LongTensor([trg_indices]).to(device)
        
        # 토큰 생성
        for _ in range(max_length):
            # 다음 토큰 예측
            logits, output = model(src_tensor, trg_tensor)
            
            # 가장 확률 높은 토큰 선택
            pred_token = output[0, -1].item()
            
            # 예측 토큰 추가
            trg_indices.append(pred_token)
            trg_tensor = torch.LongTensor([trg_indices]).to(device)
            
            # 종료 토큰이 나오면 중단
            if pred_token == vocab_trg[EOS_TOKEN]:
                break
    
    # 토큰을 단어로 변환
    translated_tokens = [idx2word_trg.get(idx, UNK_TOKEN) for idx in trg_indices]
    
    # 시작 및 종료 토큰 제외
    if translated_tokens[-1] == EOS_TOKEN:
        translated_tokens = translated_tokens[1:-1]
    else:
        translated_tokens = translated_tokens[1:]
    
    # 결과 문장 반환
    return ' '.join(translated_tokens)

# 테스트 및 대화형 인터페이스
def main():
    # 테스트 문장
    test_sentences = [
        "안녕하세요. 만나서 반갑습니다.",
        "오늘 날씨가 정말 좋네요.",
        "인공지능 번역기술은 매우 흥미롭습니다.",
        "저는 한국어를 공부하고 있습니다.",
        "이 모델은 한국어를 영어로 번역합니다."
    ]
    
    print("\n=== 테스트 번역 ===")
    for sentence in test_sentences:
        print(f"\n한국어: {sentence}")
        translated = translate(sentence)
        print(f"영어: {translated}")
    
    print("\n=== 대화형 번역 (종료: q) ===")
    while True:
        user_input = input("\n한국어 문장 입력: ")
        if user_input.lower() == 'q':
            break
        
        translated = translate(user_input)
        print(f"영어 번역: {translated}")

if __name__ == "__main__":
    main()

사용 장치: cpu (MPS 호환성 문제로 CPU 사용)
어휘 사전 사용 준비 완료
모델 로드 완료: final.bin

=== 테스트 번역 ===

한국어: 안녕하세요. 만나서 반갑습니다.


  loaded_obj = torch.load(model_path, map_location=device)


영어: The the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the

한국어: 오늘 날씨가 정말 좋네요.
영어: The the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the

한국어: 인공지능 번역기술은 매우 흥미롭습니다.
영어: The the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the

한국어: 저는 한국어를 공부하고 있습니다.
영어: The the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the the

한국어: 이 모델은 한국어를 영어로 번역합니다.
영어: The the the the the the the the the the the the the the the the the the the th

KeyboardInterrupt: Interrupted by user