## 트랜스포머 모델을 활용한 영어-독일어 번역 모델

In [16]:
!python -m spacy download de_core_news_sm
!python -m spacy download en_core_web_sm

Collecting de-core-news-sm==3.7.0
  Downloading https://github.com/explosion/spacy-models/releases/download/de_core_news_sm-3.7.0/de_core_news_sm-3.7.0-py3-none-any.whl (14.6 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.6/14.6 MB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m0m eta [36m0:00:01[0m[36m0:00:01[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('de_core_news_sm')
Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m 

In [7]:
# Multi30k 데이터세트가 다운로드 되지 않고, Timeout 오류가 발생하는 경우, 다음 셀을 실행한다.

# from torchtext.datasets import multi30k


# multi30k.URL["train"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/training.tar.gz"
# multi30k.URL["valid"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/validation.tar.gz"
# multi30k.URL["test"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/mmt16_task1_test.tar.gz"

In [25]:
### 데이터세트 다운로드 및 전처리

from torchtext.datasets import Multi30k
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# 텍스트 토큰화 함수
def generate_tokens(text_iter, language):
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

    for text in text_iter:
        yield token_transform[language](text[language_index[language]])


SRC_LANGUAGE = "de"
TGT_LANGUAGE = "en"
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ["<unk>", "<pad>", "<bos>", "<eos>"]

# 1. 독일어 말뭉치(de), 영어 말뭉치(en)에 대한 토크나이저 생성
token_transform = {
    SRC_LANGUAGE: get_tokenizer("spacy", language="de_core_news_sm"),
    TGT_LANGUAGE: get_tokenizer("spacy", language="en_core_web_sm"),
}
print("Token Transform:")
print(token_transform)

# 2. 독일어, 영어 언어별 어휘사전 생성
vocab_transform = {}
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    train_iter = Multi30k(split="train", language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    vocab_transform[language] = build_vocab_from_iterator(
        generate_tokens(train_iter, language),
        min_freq=1,
        specials=special_symbols,   # 특수토큰
        special_first=True,         # 특수토큰 맨 앞에
    )

# 3. unk 처리
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    # 인덱스 기본값 <unk>로 설정 (어휘 사전에 없는 토큰에 <unk> 인덱스 할당됨)
    vocab_transform[language].set_default_index(UNK_IDX)

print("Vocab Transform:")
print(vocab_transform)

Token Transform:
{'de': functools.partial(<function _spacy_tokenize at 0x13e426520>, spacy=<spacy.lang.de.German object at 0x2aa5e21d0>), 'en': functools.partial(<function _spacy_tokenize at 0x13e426520>, spacy=<spacy.lang.en.English object at 0x17be726d0>)}
Vocab Transform:
{'de': Vocab(), 'en': Vocab()}


In [26]:
### 트랜스포머 모델 구성

import math
import torch
from torch import nn

# 위치 인코딩 클래스
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        )

        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe[: x.size(0)]
        return self.dropout(x)

# 토큰 임베딩 클래스
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, emb_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# seq2seq 클래스 <트랜스포머 모델>
# TokenEmbedding 클래스로 소스 데이터와 입력 데이터를 입력 임베딩으로 변환하여 src_tok_emb와 tgt_tok_emb 생성
class Seq2SeqTransformer(nn.Module):
    def __init__(
        self,
        num_encoder_layers,
        num_decoder_layers,
        emb_size,
        max_len,
        nhead,
        src_vocab_size,
        tgt_vocab_size,
        dim_feedforward,
        dropout=0.1,
    ):
        super().__init__()
        self.src_tok_emb         = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb         = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            d_model = emb_size, max_len = max_len, dropout = dropout
        )
        self.transformer = nn.Transformer(
            d_model            = emb_size,
            nhead              = nhead,
            num_encoder_layers = num_encoder_layers,
            num_decoder_layers = num_decoder_layers,
            dim_feedforward    = dim_feedforward,   # 순방향신경망 은닉층 개수
            dropout            = dropout,           # 드롭아웃 비율
        )
        self.generator = nn.Linear(emb_size, tgt_vocab_size)

    def forward(
        self,
        src,
        trg,
        src_mask,
        tgt_mask,
        src_padding_mask,
        tgt_padding_mask,
        memory_key_padding_mask,
    ):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs    = self.transformer(
            src                     = src_emb,  # [소스(타깃)시퀀스길이, 배치크기, 임베딩차원]
            tgt                     = tgt_emb,
            src_mask                = src_mask, # [소스(타깃)시퀀스길이, 시퀀스길이]
            tgt_mask                = tgt_mask,
            # mask=0 : 모두 동일한 가중치를 가지고 어텐션
            # mask=1 : 모든 가중치 0으로 설정돼 어텐션 수행 x
            # mask=-inf : 해당 위치의 어텐션 결과 가중치가 0 - 해당 위치 정보 무시됨
            # mask=+inf : 일반적으로 적용안함. 어떤 특정 단어나 위치에 대해 모델이 특별히 관심가지도록 할 때 사용.
            memory_mask             = None,     # 인코더 출력의 마스크([타깃시퀀스길이, 소스시퀀스길이])
            src_key_padding_mask    = src_padding_mask,  # [배치크기, 소스(타깃)시퀀스길이]
            tgt_key_padding_mask    = tgt_padding_mask,
            memory_key_padding_mask = memory_key_padding_mask
        )
        return self.generator(outs)

    def encode(self, src, src_mask): 
        return self.transformer.encoder(
            self.positional_encoding(self.src_tok_emb(src)), src_mask
        )

    def decode(self, tgt, memory, tgt_mask): 
        return self.transformer.decoder(
            self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask
        )

In [27]:
### 트랜스포머 모델 구조

from torch import optim


BATCH_SIZE = 128
# DEVICE     = "mps" if torch.backends.mps.is_available() and torch.backends.mps.is_built() else "cpu"

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

model = Seq2SeqTransformer(
    num_encoder_layers = 3, # 인코더 레이어 3개
    num_decoder_layers = 3, # 디코더 레이어 3개
    emb_size           = 512,
    max_len            = 512,
    nhead              = 8,
    src_vocab_size     = len(vocab_transform[SRC_LANGUAGE]),
    tgt_vocab_size     = len(vocab_transform[TGT_LANGUAGE]),
    dim_feedforward    = 512,
).to(DEVICE)
# 무시되는 색인값을 패딩 토큰을 할당
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX).to(DEVICE)
optimizer = optim.Adam(model.parameters())

for main_name, main_module in model.named_children(): 
    print(main_name)
    for sub_name, sub_module in main_module.named_children(): 
        print("└", sub_name)
        for ssub_name, ssub_module in sub_module.named_children(): 
            print("│  └", ssub_name)
            for sssub_name, sssub_module in ssub_module.named_children(): 
                print("│  │  └", sssub_name)

src_tok_emb
└ embedding
tgt_tok_emb
└ embedding
positional_encoding
└ dropout
transformer
└ encoder
│  └ layers
│  │  └ 0
│  │  └ 1
│  │  └ 2
│  └ norm
└ decoder
│  └ layers
│  │  └ 0
│  │  └ 1
│  │  └ 2
│  └ norm
generator


In [28]:
### 배치 데이터 생성

from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

# 전처리 함수를 인자로 받아 이를 차례로 적용하는 함수
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# 인덱스화된 토큰에 특수토큰 적용하는 함수
def input_transform(token_ids):
    return torch.cat(
        (torch.tensor([BOS_IDX]), torch.tensor(token_ids), torch.tensor([EOS_IDX]))
    )

# 배치 단위로 데이터 처리
def collator(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))
    # 패딩
    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch


########## 메인

text_transform = {}
for language in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[language] = sequential_transforms(
        # 문장을 토큰화               # 각 토큰 인덱스화             # 인덱스화된 토큰에 특수토큰 추가
        token_transform[language], vocab_transform[language], input_transform
    )

data_iter  = Multi30k(split="valid", language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
dataloader = DataLoader(data_iter, batch_size=BATCH_SIZE, collate_fn=collator)
source_tensor, target_tensor = next(iter(dataloader))

print("(source, target):")
print(next(iter(data_iter)))

print("source_batch:", source_tensor.shape)
# [소스(타깃)시퀀스길이, 배치크기]
print(source_tensor)

print("target_batch:", target_tensor.shape)
print(target_tensor)

(source, target):
('Eine Gruppe von Männern lädt Baumwolle auf einen Lastwagen', 'A group of men are loading cotton onto a truck')
source_batch: torch.Size([35, 128])
tensor([[   2,    2,    2,  ...,    2,    2,    2],
        [  14,    5,    5,  ...,    5,   21,    5],
        [  38,   12,   35,  ...,   12, 1750,   69],
        ...,
        [   1,    1,    1,  ...,    1,    1,    1],
        [   1,    1,    1,  ...,    1,    1,    1],
        [   1,    1,    1,  ...,    1,    1,    1]])
target_batch: torch.Size([30, 128])
tensor([[   2,    2,    2,  ...,    2,    2,    2],
        [   6,    6,    6,  ...,  250,   19,    6],
        [  39,   12,   35,  ...,   12, 3254,   61],
        ...,
        [   1,    1,    1,  ...,    1,    1,    1],
        [   1,    1,    1,  ...,    1,    1,    1],
        [   1,    1,    1,  ...,    1,    1,    1]])


In [29]:
### 어텐션 마스크 생성

# 마스크를 생성하는 함수
def generate_square_subsequent_mask(s):
    # 상삼각행렬 생성
    mask = (torch.triu(torch.ones((s, s), device=DEVICE)) == 1).transpose(0, 1)
    # 마스크 적용
    mask = (
        mask.float()
        .masked_fill(mask == 0, float("-inf")) 
        .masked_fill(mask == 1, float(0.0))
    )
    return mask

# 패딩 마스크를 생성하는 함수
def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask


target_input = target_tensor[:-1, :]
target_out = target_tensor[1:, :]

source_mask, target_mask, source_padding_mask, target_padding_mask = create_mask(
    source_tensor, target_input
)

# source_mask: 셀프 어텐션 과정에서 참조되는 소스 데이터의 시퀀스 범위
# False는 셀프 어텐셤 참조 토큰, True는 제외되는 토큰
print("source_mask:", source_mask.shape)
print(source_mask)
# target_mask: [쿼리시퀀스길이, 키시퀀스길이]
print("target_mask:", target_mask.shape)
print(target_mask)
# 소스(타깃) 배치 데이터에서 텍스트 토큰이 존재하는지 여부
# False는 해당 토큰 존재, True는 해당 토큰이 패딩으로 채워짐
print("source_padding_mask:", source_padding_mask.shape)
print(source_padding_mask)
print("target_padding_mask:", target_padding_mask.shape)
print(target_padding_mask)

source_mask: torch.Size([35, 35])
tensor([[False, False, False,  ..., False, False, False],
        [False, False, False,  ..., False, False, False],
        [False, False, False,  ..., False, False, False],
        ...,
        [False, False, False,  ..., False, False, False],
        [False, False, False,  ..., False, False, False],
        [False, False, False,  ..., False, False, False]])
target_mask: torch.Size([29, 29])
tensor([[0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf,
         -inf, -inf, -inf, -inf, -inf],
        [0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf,
         -inf, -inf, -inf, -inf, -inf],
        [0., 0., 0., -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf, -inf,
         -inf, -inf, -inf, -inf, -inf],
   

In [30]:
### 모델 학습 및 평가

def run(model, optimizer, criterion, split):
    model.train() if split == "train" else model.eval()
    data_iter  = Multi30k(split=split, language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    dataloader = DataLoader(data_iter, batch_size=BATCH_SIZE, collate_fn=collator)
    
    losses = 0
    # 소스 데이터와 타깃 데이터 입력받음
    for source_batch, target_batch in dataloader:
        source_batch = source_batch.to(DEVICE)
        target_batch = target_batch.to(DEVICE)

        target_input  = target_batch[:-1, :]
        target_output = target_batch[1:, :]

        # 패딩 마스크 생성
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(
            source_batch, target_input
        )

        # seq2seq Transformer
        logits = model(
            src                     = source_batch,
            trg                     = target_input,
            src_mask                = src_mask,
            tgt_mask                = tgt_mask,
            src_padding_mask        = src_padding_mask,
            tgt_padding_mask        = tgt_padding_mask,
            memory_key_padding_mask = src_padding_mask,
        )

        optimizer.zero_grad()
        loss = criterion(logits.reshape(-1, logits.shape[-1]), target_output.reshape(-1))
        if split == "train":
            loss.backward()
            optimizer.step()
        losses += loss.item()

    return losses / len(list(dataloader))


for epoch in range(5):
    train_loss = run(model, optimizer, criterion, "train")
    val_loss   = run(model, optimizer, criterion, "valid")
    print(f"Epoch: {epoch+1}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}")

Epoch: 1, Train loss: 4.702, Val loss: 3.913
Epoch: 2, Train loss: 3.710, Val loss: 3.626
Epoch: 3, Train loss: 3.444, Val loss: 3.520
Epoch: 4, Train loss: 3.312, Val loss: 3.540
Epoch: 5, Train loss: 3.238, Val loss: 3.532


In [32]:
### 트랜스포머 모델 번역 결과

# 모델 번역 방식은 그리디 디코딩 방식
# 그리디 디코딩 : 생성한 확률분포에서 현재 시점에서 가장 높은 확률을 가진 단어를 선택하여 디코딩
def greedy_decode(model, source_tensor, source_mask, max_len, start_symbol):
    source_tensor = source_tensor.to(DEVICE)
    source_mask   = source_mask.to(DEVICE)

    memory = model.encode(source_tensor, source_mask)
    ys     = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len - 1): 
        memory      = memory.to(DEVICE)
        target_mask = generate_square_subsequent_mask(ys.size(0))
        target_mask = target_mask.type(torch.bool).to(DEVICE)

        out  = model.decode(ys, memory, target_mask) # [토큰개수, 배치크기, 확률]
        out  = out.transpose(0, 1)                   # [배치크기, 토큰개수, 확률]
        prob = model.generator(out[:, -1])           # [배치크기, 확률]
        _, next_word = torch.max(prob, dim=1)        # 가장 확률 높은 토큰 인덱스
        next_word    = next_word.item()

        ys = torch.cat(
            [ys, torch.ones(1, 1).type_as(source_tensor.data).fill_(next_word)], dim=0
        )
        if next_word == EOS_IDX:
            break
    return ys


def translate(model, source_sentence):
    model.eval()
    source_tensor = text_transform[SRC_LANGUAGE](source_sentence).view(-1, 1)
    num_tokens    = source_tensor.shape[0]
    src_mask      = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    tgt_tokens    = greedy_decode(
        model, source_tensor, src_mask, max_len = num_tokens + 5, start_symbol = BOS_IDX
    ).flatten()
    output = vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))[1:-1] # bos,eos토큰 슬라이싱
    return " ".join(output)


output_oov = translate(model, "Eine Gruppe von Menschen steht vor einem Iglu .")
output     = translate(model, "Eine Gruppe von Menschen steht vor einem Gebäude .")
print(output_oov)
print(output)

A group of people are standing outside .
A group of people are standing outside .
