torch nn transformer
- https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import os
import sys
from datetime import datetime

drive_project_root = "/content/drive/MyDrive/#fastcampus"
sys.path.append(drive_project_root)

In [None]:
!pip install -r "/content/drive/MyDrive/#fastcampus/requirements.txt"

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
    print("Select the Runtime > 'Change runtime type' menu to enable a GPU accelerator, ")
    print('and then re-execute this cell.')
else:
    print(gpu_info)

현재 torchmetrics 사용하면 안 되고 강의에 맞추려면 downgrading 필요

In [None]:
pip uninstall torchmetrics

In [None]:
pip install torchmetrics==0.5

In [None]:
# pip install pytorch_lightning

In [None]:
# for data loading
from typing import List, Dict, Union, Any, Optional, Iterable, Callable
from abc import abstractmethod, ABC
from datetime import datetime
from functools import partial
from collections import Counter, OrderedDict
import random
import math
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
from torch import nn, optim
import torch.nn.functional as F

from torch.nn import Transformer

from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from pprint import pprint  # pretty print

# https://pytorch.org/text/stable/index.html
import torchtext
from torchtext import data
from torchtext import datasets
from torchtext.datasets import Multi30k
from torchtext.data.utils import get_tokenizer
from torchtext.data.functional import to_map_style_dataset
from torchtext.vocab import Vocab, build_vocab_from_iterator, vocab
import spacy

# For configuration
from omegaconf import DictConfig, OmegaConf
import hydra
from hydra.core.config_store import ConfigStore

# for logger
from torch.utils.tensorboard import SummaryWriter
import wandb
os.environ['WANDB_START_METHOD'] = 'thread'

In [None]:
print(torchtext.__version__)

In [None]:
from data_utils import dataset_split
from config_utils import flatten_dict, register_config, configure_optimizers_from_cfg, get_loggers, get_callbacks

In [None]:
!python -m spacy download en   # 영어
!python -m spacy download en_core_web_sm
!python -m spacy download de   # 독일어
!python -m spacy downlaod de_core_news_sm

독일어를 영어로 번역할 예정

In [None]:
# !git clone --recursive https://github.com/multi30k/dataset.git multi30k-dataset

In [None]:
# Multi30k를 미리 지정해 둔 data_root 경로에 download & 압축 해제
# train_data, valid_data, test_data = Multi30k(data_cfg.data_root)

test_data 형태는 기본적으로 iterable 형태
- iterable 형태란 처음에 아래 코드 돌리면 이 결과가 나오는데 다시 한 번 돌리면 결과 안 나옴
```python
for i in test_data:
    print(i)
```
- pytorch에서는 이걸 map style로 바꾸면 우리가 원하는 대로 조절 가능
  - 계속 실행해도 똑같이 결과 나옴
  - 단, data가 엄청 큰 경우 특별하게 처리 안 해 놨으면 깨질 수도 있음 (ram이 안 좋은 경우)
  - 그래서 data 엄청 큰 경우에는 map을 사용하지 말거나 똑똑하게 처리해서 사용해야 함

In [None]:
test_data = to_map_style_dataset(test_data)

## 1. token_transform (token...)

In [None]:
def get_token_transform(data_cfg: DictConfig) -> dict:
    token_transform: dict = {}
    token_transform(data_cfg.src_lang) = get_tokenizer(data_cfg.tokenizer, language=data_cfg.src_lang)
    token_transform(data_cfg.tgt_lang) = get_tokenizer(data_cfg.tokenizer, language=data_cfg.tgt_lang)
    return toekn_transform

token_transform = get_token_transform(data_cfg)

## 2. vocab_transform

In [None]:
def yield_tokens(data_iter: Iterable, lang: str, lang2index: Dict[str, int]) -> List[str]:
    for data_sample in data_iter:
        # help function to yield list of tokens
        yield token_transform[lang](data_sample[lang2index[lang]])

def get_vocab_transform(data_cfg: DictConfig) -> dict:
    vocab_transform: dict = {}
    for ln in [data_cfg.src_lang, data_cfg.tgt_lang]:

        # build from train_data
        train_iter = Multi30k(
            split="train", language_pair=(data_cfg.src_lang, data_cfg.trg_lang)
        )

        # create torchtext's Vocab object
        vocab_transform[ln] = build_vocab_from_iterator(
            yield_tokens(
                train_iter,
                ln,
                {
                    data_cfg.src_lang: data_cfg.src_index,
                    data_cfg.tgt_lang: data_cfg.tgt_index
                }
            ),
            min_freq = data_cfg.vocab.min_freq,
            specials = list(data_cfg.vocab.special_symbol2index.keys()),
            special_first = True,   # 이 특수 문자들을 앞으로 보낼지, random 처리할 지, 뒤로 보낼지 -> 일반적으로 앞으로 보냄
        )
    
    # set UNKNOWN as the default index
    # --> toekn이 찾아지지 않는 경우 index를 unknown으로 return
    # = token이 not found가 되었을 때 error가 나지 않고 unknown으로 나타나도록 하기
    # ex) train data에 없었는데 test data에 있는 token
    # 만약 setting되지 않으면 runtime error 발생함
    for ln in (data_cfg.src_lang, data_cfg.tgt_lang):
        vocab_transform[ln].set_default_index(data_cfg.vocab.special_symbol2index['<unk>'])

    return vocab_transform

vocab_transform = get_vocab_transform(data_cfg)

작성 코드 테스트해 보기

In [None]:
print(vocab_transform["de"]["<unk>"])  # 0
print(vocab_transform["en"]["<unk>"])  # 0
print(vocab_transform["de"]["<bos>"])  # 2
print(vocab_transform["en"]["hello"], vocab_transform["de"]["world"])  # 5466, 107  -> return index number for each token

## 3. integrated transforms
- 이 과정에서 text_transform 필요
  - 각각에 대해 token transform -> vocab transform -> torch.tensor transform

In [None]:
# helper function for callate_fn

# *transforms : list 형태로 어떤 것이든 들어올 수 있음
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# convert to torch.tensor with bos & eos
def tensor_transform(token_ids: List[int], bos_index: int, eos_index: int):
    return torch.cat(
        (torch.tensor([bos_index]), torch.tensor(token_ids), torch.tensor([eos_index]))
        )

# src & tgt lang language text_transforms to convert rqaw strings --> tensor indices
def get_text_transform(data_cfg: DictConfig):
    text_transform = {}
    for ln in [data_cfg.src_lang, data_cfg.tgt_lang]:
        text_transform[ln] = sequential_transforms(
            token_transform[ln],
            vocab_transform[ln],
            partial(
                tensor_transform,
                bos_index = data_cfg.vocab.special_symbol2index["<bos>"],
                eos_index = data_cfg.vocab.special_symbol2index["<eos>"]
            )
        )
    return text_transform

text_transform =get_text_transform(data_cfg)

잘 되었는지 test
- 2 : 맨 앞
- 3 : 맨 뒤
- 가운데 : 실제 내가 넣은 단어

In [None]:
print(text_transform['en']('hello'))
print(text_transform['en']('hello,'))
print(text_transform['en']('hello, how'))
print(text_transform['en']('hello, how are you?'))

## 4. collate_fn -> batch를 어떻게 전처리 할까?

In [None]:
def collate_fn(batch, data_cfg: DictConfig):
    src_batch, tgt_batch = [], []

    for src_sample, tgt_sample in batch:
        # rstrip('\n') : to remove '\n'
        src_batch.append(text_transform[data_cfg.src_lang](src_sample.rstring("\n")))
        tgt_batch.append(text_transform[data_cfg.tgt_lang](tgt_sample.rstring("\n")))
    
    src_batch = pad_sequence(src_batch, padding_value = data_cfg.vocab.special_symbol2index["<pad>"])  # use padding if token is small
    tgt_batch = pad_sequence(tgt_batch, padding_value = data_cfg.vocab.special_symbol2index["<pad>"])  # use padding if token is small
    return src_batch, tgt_batch

def get_collate_fn(cfg: DictConfig):
    return partial(collate_fn, data_cfg=cfg.data)

## 5. data loader

In [None]:
def get_multi30k_dataloader(split_mode: str, language_pair: tuple, batch_size: int, collate_fn: Callable):
    iter = Multi30k(split=split_mode, language_pair=language_pair)
    dataset = to_map_style_dataset(iter)  # map style로 바꿔야 error 없이 작업 가능 + 지금은 data 작기 때문에 map style 해주면 훨씬 빠름 (memory 거의 차지X)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, collate_fn=collate_fn)
    return dataloader

잘 되나 확인

In [None]:
# 3 : batch size
# test_dataloader = get_multi30k_dataloader('test', (data_cfg.src_lang, data_cfg.tgt_lang), 3, collate_fn=partial(collate_fn, data_cfg=data_cfg))

In [None]:
# for i in test_dataloader:
#     print(i)

## seed 설정하기

- pytorch lightning을 쓰는 경우 'pytorch lightning seed everyting' 쓰면 아래와 같이 하나하나 모두 SEED 지정할 필요 없음

In [None]:
SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:
def _text_postprocessing(res: List[str]) -> str:
    if "<eos>" in res:
        res = res[:res.index('<eos>')]  # eos index를 찾아서 그 앞까지만 가져오겠다
    if "<pad>" in res:
        res = res[:res.index["<pad>"]]
    res = " ".join(res).replace("<bos>", "")
    return res

class BaseTranslateLightningModule(pl.LightningModule):
    def __init__(self, cfg: DictConfig):
        super().__init__()
        self.cfg = cfg
        self.loss_function = torch.nn.CrossEntropyLoss(
            # 학습하지 않을 단어 지정
            ignore_index=cfg.data.vocab.special_symbol2index['<pad>']
        )
    
    def configure_optimizers(self):
        self._optimizers, self._schedulers = configure_optimizers_from_cfg(
            self.cfg, self
        )
        return self._optimizers, self._schedulers
    
    @abstractmethod
    def forward(self, src, tgt, teacher_forcing_ratio: float):
        raise NotImplementedError()
    
    # mode : train, validation, test 지정
    def _forward(self, src, tgt, mode: str, teacher_forcing_ratio: float=0.5):
        # teacher forcing
        #   seq2seq에서 많이 씀
        #   src -> tgt autoregressive 학습하면, 앞 부분은 빠르게 학습함
        #   but, 뒷 부분 학습은 언제? (앞 부분 학습하는 거 기다리기 너무 힘듦)
        #   랜덤으로 미래 정보도 조금 줘서 뒤에 있는 정보도 학습이 가능하게 하자!
        #   0.5 -> 0.5 확률로 teacher forcing 하겠다는 의미

        assert mode in ["train", "val", "test"]

        # get predictions
        # teacher forcing 용 input
        tgt_inputs = tgt[:-1, :]  # delete ends for teacher forcing inputs
        outputs = self(src, tgt_inputs, teacher_forcing_ratio=teacher_forcing_ratio)
        tgt_outputs = tgt[1:, :]  # delete start tokens

        loss = self.loss_function(
            # -1 : 나머지를 한다
            # outputs.shape[-1] : hidden layer의 output
            outputs.reshape(-1, outputs.shape[-1]),  # [[batch * seq_size], other_output_shape]
            tgt_outputs.reshape(-1),
        )

        logs_detail = {
            f"{mode}_src": src,
            f"{mode}_tgt": tgt,
            f"{mode}_results": outputs,
        }

        if mode in ["val", "test"]:
            _, tgt_results = torch.max(outputs, dim=2)  # sequence * batch_size

            src_texts = []  # input
            tgt_texts = []  # 정답
            res_texts = []  # 실제값

            # convert [L * batch * others] --> [batch * L * others]
            for src_i in torch.transpose(src, 0, 1).detach().cpu().numpy().tolist():
                # lookup_tokens : token을 넣으면 text로 바꿔줌
                res = vocab_transform[self.cfg.data.src_lang].lookup_tokens(src_i)
                src_texts.append(_text_postprocessing(res))

            for tgt_i in torch.transpose(tgt, 0, 1).detach().cpu().numpy().tolist():
                res = vocab_transform[self.cfg.data.tgt_lang].lookup_tokens(tgt_i)
                tgt_texts.append(_text_postprocessing(res))
            
            for tgt_res_i in torch.transpose(tgt_results, 0, 1).detach().cpu().numpy().tolist():
                res = vocab_transform[self.cfg.data.tgt_lang].lookup_tokens(tgt_res_i)
                res_texts.append(_text_postprocessing(res))           

            text_result_summary = {
                f"{mode}_src_text": src_texts,
                f"{mode}_tgt_text": tgt_texts,
                f"{mode}_results_text": res_texts,
            }
            print(f"{self.global_step} step: \n src_text: {src_texts[0]}, \n tgt_text: {tgt_texts[0]}, \n result_text: {res_texts[0]}")
            logs_detail.update(text_result_summary)

        return {f"{mode}_loss": loss}, logs_detail
    
    def training_step(self, batch, batch_idx):
        src, tgt = batch[0], batch[1]
        logs, _ = self._forward(src, tgt, "train", self.cfg.model.teacher_forcing_ratio)
        self.log_dict(logs)
        logs["loss"] = logs["train_loss"]
        return logs

    def validation_step(self, batch, batch_idx):
        src, tgt = batch[0], batch[1]
        logs, logs_detail = self._forward(src, tgt, "val", 0.0)
        self.log_dict(logs)
        logs["loss"] = logs["val_loss"]
        logs.update(logs_detail)
        return logs

    def test_step(self, batch, batch_idx):
        src, tgt = batch[0], batch[1]
        logs, logs_detail = self._forward(src, tgt, "test", 0.0)
        self.log_dict(logs)
        logs["loss"] = logs["test_loss"]
        logs.update(logs_detail)
        return logs

class TransformerTranslateLightningModule(BaseTranslateLightningModule):
    def __init__(self, cfg: DictConfig):
        super().__init__(cfg)

    @abstractmethod
    def forward(self, src, tgt):
        raise NotImplementedError()
    
    # mode : train, validation, test 지정
    def _forward(self, src, tgt, mode: str):
        # teacher forcing
        #   seq2seq에서 많이 씀
        #   src -> tgt autoregressive 학습하면, 앞 부분은 빠르게 학습함
        #   but, 뒷 부분 학습은 언제? (앞 부분 학습하는 거 기다리기 너무 힘듦)
        #   랜덤으로 미래 정보도 조금 줘서 뒤에 있는 정보도 학습이 가능하게 하자!
        #   0.5 -> 0.5 확률로 teacher forcing 하겠다는 의미

        assert mode in ["train", "val", "test"]

        # get predictions
        # teacher forcing 용 input
        tgt_inputs = tgt[:-1, :]  # delete ends
        outputs = self(src, tgt_inputs, teacher_forcing_ratio=teacher_forcing_ratio)
        tgt_outputs = tgt[1:, :]  # delete start tokens

        loss = self.loss_function(
            # -1 : 나머지를 한다
            # outputs.shape[-1] : hidden layer의 output
            outputs.reshape(-1, outputs.shape[-1]),  # [[batch * seq_size], other_output_shape]
            tgt_outputs.reshape(-1),
        )

        logs_detail = {
            f"{mode}_src": src,
            f"{mode}_tgt": tgt,
            f"{mode}_results": outputs,
        }

        if mode in ["val", "test"]:
            _, tgt_results = torch.max(outputs, dim=2)  # sequence * batch_size

            src_texts = []  # input
            tgt_texts = []  # 정답
            res_texts = []  # 실제값

            # convert [L * batch * others] --> [batch * L * others]
            for src_i in torch.transpose(src, 0, 1).detach().cpu().numpy().tolist():
                # lookup_tokens : token을 넣으면 text로 바꿔줌
                res = vocab_transform[self.cfg.data.src_lang].lookup_tokens(src_i)
                src_texts.append(_text_postprocessing(res))

            for tgt_i in torch.transpose(tgt, 0, 1).detach().cpu().numpy().tolist():
                res = vocab_transform[self.cfg.data.tgt_lang].lookup_tokens(tgt_i)
                tgt_texts.append(_text_postprocessing(res))
            
            for tgt_res_i in torch.transpose(tgt_results, 0, 1).detach().cpu().numpy().tolist():
                res = vocab_transform[self.cfg.data.tgt_lang].lookup_tokens(tgt_res_i)
                res_texts.append(_text_postprocessing(res))           

            text_result_summary = {
                f"{mode}_src_text": src_texts,
                f"{mode}_tgt_text": tgt_texts,
                f"{mode}_results_text": res_texts,
            }
            print(f"{self.global_step} step: \n src_text: {src_texts[0]}, \n tgt_text: {tgt_texts[0]}, \n result_text: {res_texts[0]}")
            logs_detail.update(text_result_summary)

        return {f"{mode}_loss": loss}, logs_detail
    
    def training_step(self, batch, batch_idx):
        src, tgt = batch[0], batch[1]
        logs, _ = self._forward(src, tgt, "train")
        self.log_dict(logs)
        logs["loss"] = logs["train_loss"]
        return logs

    def validation_step(self, batch, batch_idx):
        src, tgt = batch[0], batch[1]
        logs, logs_detail = self._forward(src, tgt, "val")
        self.log_dict(logs)
        logs["loss"] = logs["val_loss"]
        logs.update(logs_detail)
        return logs

    def test_step(self, batch, batch_idx):
        src, tgt = batch[0], batch[1]
        logs, logs_detail = self._forward(src, tgt, "test")
        self.log_dict(logs)
        logs["loss"] = logs["test_loss"]
        logs.update(logs_detail)
        return logs

utils for initialization

In [None]:
def init_weights(model: Union[nn.Module, pl.LightningModule]):
    for name, param in model.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)  # 이것도 원래 configuration 처리 해 주면 좋음

## model definition

### 1. encoder

In [None]:
class LSTMEncoder(nn.Module):
    def __init__(
        self,
        input_dim: int,
        embed_dim: int,
        hidden_dim: int,
        n_layers: int,
        dropout: float,
    ):
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout) # embed_dim : input
        self.dropout = nn.Dropout(dropout)

        # initialization of weights
        self.apply(init_weights)

    def forward(self, src):
        # src = [seq_len, batch_size]
        embedded = self.dropout(self.embedding(src))  # [seq_len, batch_size, emb_dim]

        # LSTM은 output, hidden, cell state 모두 알 수 있음
        outputs, (hidden, cell) = self.rnn(embedded)

        # outputs = [seq_len, batch_size, hidden_dim * n directional] # -> n directional : bidirectional인 경우 두 개
        # hidden, cell = [n layers * n directions, batch_size, hidden_dim] 

        # outputs will be used from top hidden layers
        return hidden, cell

### 2. decoder

In [None]:
class LSTMDecoder(nn.Module):
    def __init__(
        self,
        output_dim: int,
        embed_dim: int,
        hidden_dim: int,
        n_layers: int,
        dropout: float
    ):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.output_dim = output_dim
        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout) # embed_dim : input
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, input, hidden, cell):
        # input: [batch size * ...]  --> decoder에서는 이게 start token

        # outputs = [seq_len, batch_size, hidden_dim * n directional] # -> n directional : bidirectional인 경우 두 개
        # hidden, cell = [n layers * 1 direction, batch_size, hidden_dim] 
        
        input = input.unsqueeze(0)  # <- [1, batch_size]
        embedded = self.dropout(self.embedding(input))

        # embedding = [1, batch_size, embed_dim]
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))

        # output = [1, batch_size, hidden_diim]
        # hidden, cell = [n layers * 1 direction, batch_size, hidden_dim]

        prediction = self.fc_out(output.sequeeze(0))  # [batch_size, output_dim]

        return prediction, hidden, cell

### 3. Seq2Seq (cfg <-- encoder, decoder)

In [None]:
class LSTMSeq2Seq(BaseTranslateLightningModule):
    def __init__(self, cfg: DictConfig):
        super().__init__(cfg)

        self.encoder = LSTMEncoder(**cfg.model.enc)
        self.decoder = LSTMDecoder(**cfg.model.dec)

        assert self.encoder.hidden_dim == self.decoder.hidden_dim
        assert self.encoder.n_layers == self.decoder.n_layers

        # parameters init
        self.apply(init_weights)
    
    def forward(self, src, tgt, teacher_forcing_ratio: float = 0.5):

        # src, tgt = [seq_len (can be different), batch_size]
        # for val, test teacher forcing should be 0.0

        batch_size = tgt.shape[1]
        tgt_len = tgt.shape[0]
        tgt_vocab_size = self.decoder.output_dim

        # tensor to store decoder outputs
        outputs = torch.zeros(tgt_len, batch_size, tgt_vocab_size).to(self.device)

        hidden, cell = self.encoder(src)

        # start token input (<sos> token)
        input = tgt[0, :]

        for t in range(1, tgt_len):

            # get 1 cell's output
            output, hidden, cell = self.decoder(input, hidden, cell)

            # set to all outputs results
            outputs[t] = output

            # decide whether going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio

            top1 = output.argmax(1)  # 확률이 가장 높은 token을 뽑겠다

            input = tgt[t] if teacher_force else top1   # 정답 넣기
        
        return outputs

## Concat 기반 Additive Attention 기반의 모델 새로 정의
- 이번에는 위에 모델과 다르게 bidirection
- GRU encoder 사용 예정
- concat 기반 additive는 encoder, decoder rnn이 다를 수 있음
  - encoder, decoder 각각에 대한 hidden layer 생성 필요

In [None]:
class BidirectionalGRUEncoder(nn.Module):
    def __init__(
        self,
        input_dim: int,
        enbed_dim : int,
        enc_hidden_dim: int,
        dec_hidden_dim: int,
        hidden_dim: int,
        n_layers: int,
        dropout: float
    ):
        super().__init__()
        self.input_dim = input_dim
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.rnn = nn.LSTM(embed_dim, enc_hidden_dim, n_layers, bidirectional=True, dropout=dropout) # embed_dim : input
        self.fc = nn.Linear(enc_hidden_dim * 2, dec_hidden_dim)   # fully connected : hidden layer 부분 좀 달라짐 -> * 2 = bidirectional
        self.dropout = nn.Dropout(dropout)

        # initialization of weights
        self.apply(init_weights)

    def forward(self, src):
        
        embedded = self.dropout(self.embedding(src))

        # GRU는 hidden, cell 둘 다 받지 않고 hidden state만 받는 게 LSTM과 차이점
        outputs , hidden = self.rnn(embedded)

        # encoder RNNs fed through a linear layer to connect decoder
        # hidden 구성 : [forward_1, backward_1, forward_2, backward_2, ...]
        # 우리가 필요한 건 맨 마지막 레이어의 forward, backward 두 개 concat 하게 필요
        # hidden[-2, :, :] : forward
        # hidden[-1, :, :] : backward
        hidden = torch.tanh(self.fc(
            torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
            ))
        
        return outputs, hidden

class ConcatAttention(nn.Module):
    def __init__(self, enc_hidden_dim: int, dec_hidden_dim: int):
        super().__init__()

        # attention score 값 계산
        self.attn = nn.Linear((enc_hidden_dim * 2) + dec_hidden_dim, dec_hidden_dim)  # * 2 : bidirectional

        # weight를 곱해서 최종 attention 계산
        self.v = nn.Linear(dec_hidden_dim, 1, bias=False)   # 1 : 하나의 값 추출, attention에서는 bias 없어야 함

    def forward(self, hidden, encoder_outputs):

        # hidden = [batch_size, dec_hidden_dim] -> from decoder (key, query, value 중 query에 해당하는 값)
        # encoder_outputs = [src_len, batch_size, enc_hidden_dim * 2] --> key, query, value 중 key, value에 해당하는 값

        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]

        # repeat decoder hidden state src_len times
        # unsqueeze를 통해 일단 shape 맞추기
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        # hidden: [batch_size, src_len, dec_hidden_dim]
        # encoder_outputs = [batch_size, src_len, enc_hidden_dim * 2]

        # concat attention이니까 concat 필요
        # 이 경우에는 sequence batch size, sequence length 유지하고 나머지를 concat 하니 dimension은 2
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outpus), dim=2)))

        # energy : [batch_size, src_len, dec_hidden_dim]

        attention = self.v(energy).squeeze(2)

        # attention : [batch_size, src_len]

        return F.softmax(attention, dim=1)   # dim=1 : 0~1 사이 확률 값 return
    
class AttentionalRNNDecoder(nn.Module):
    def __init__(
        self,
        output_dim: int,
        embed_dim: int,
        enc_hidden_dim: int,
        dec_hidden_dim: int,
        n_layers: int,
        dropout: float,
        attention: nn.Module
    ):
        super().__init__()

        self.output_dim = output_dim
        self.attention = attention

        self.embedding = nn.Embedding(output_dim, embed_dim)

        self.rnn = nn.GRU((enc_hidden_dim * 2) + embed_dim, dec_hidden_dim, n_layers, dropout=dropout)

        self.fc_out = nn.Linear((enc_hidden_dim*2) + dec_hidden_dim + embed_dim, output_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, encoder_outputs):

        # input : [batch_size]  -> start token
        # hidden : [batch_size, dec_hidden_dim]
        # encoder_outputs [src_len, batch_size, enc_hidden_dim * 2]

        input = input.unsqueeze(0) # input = [1, batch_size]

        embedded = self.dropout(self.embedding(input))  # 1, batch_size, embed_dim

        a = self.attention(hidden, encoder_outputs)   # [batch_size, src_len]
        a = a.unsqueeze(1)   # [batch_size, 1, src_len]

        encoder_outputs = encoder_outputs.permute(1, 0, 2)  # [batch_size, src_len, enc_hidden_dim * 2]
        weighted = torch.bmm(a, encoder_outputs)  # bmm : batch matrix-matrix product - [batch_size, 1, enc_hidden_dim * 2]
        weighted = weighted.permute(1, 0, 2)      # [1, batch_size, enc_hidden_dim * 2]

        # cat : concat
        rnn_input = torch.cat((embedded, weighted), dim=2)  # [1, batch_size, (enc_hidden_dim * 2 + embed_dim)]

        # hidden_unsqueeze(0) : [1, batch_size, dec_hidden_dim]
        output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
        # output = [seq_len, batch_size, dec_hidden_dim * n directions] -> [1, batch_size, dec_hidden_dim]
        # hidden = [n_layers * n_directions, batch_size, dec_hidden_dims] -> [1, batch_size, dec_hidden_dim]
        
        if not (output == hidden).all():
            raise AssertionError()

        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)

        prediction = self.fc_out(torch.cat((output, weighted, embedded), dim=1))  # [batch_size, output_dim]

        return prediction, hidden.squeeze(0)
    
class AttentionBasedSeq2Seq(BaseTranslateLightningModule):
    def __init__(self, cfg: DictConfig):
        super().__init__(cfg)

        self.encoder = BidirectionalGRUEncoder(**cfg.model.enc)
        self.attention = ConcatAttention(**cfg.model.attention)
        self.decoder = AttentionalRNNDecoder(
            attention=self.attention, **cfg.model.dec
        )
    
    def forward(self, src, tgt, teacher_forcing_ratio: float = 0.5):

        # src, tgt = [seq_len (can be different), batch_size]
        # for val, test teacher forcing should be 0.0

        batch_size = tgt.shape[1]
        tgt_len = tgt.shape[0]
        tgt_vocab_size = self.decoder.output_dim

        # tensor to store decoder outputs
        outputs = torch.zeros(tgt_len, batch_size, tgt_vocab_size).to(self.device)

        encoder_outputs, hidden = self.encoder(src)

        # start token input (<sos> token)
        input = tgt[0, :]

        for t in range(1, tgt_len):

            # get 1 cell's output
            output, hidden = self.decoder(input, hidden, encoder_outputs)

            # set to all outputs results
            outputs[t] = output

            # decide whether going to use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio

            top1 = output.argmax(1)  # 확률이 가장 높은 token을 뽑겠다

            input = tgt[t] if teacher_force else top1   # 정답 넣기
        
        return outputs

In [None]:
# 1. tokenembedding
# 2. positional encoding
# 3. nn.Transformer

class PositionalEncoding(nn.Module):
    def __init__(
        self,
        emb_size: int,
        dropout: float,
        maxlen: int = 5000
    ):
        super().__init__()
        den = torch.exp(-torch.arange(0, embed_size, 2)*math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))   # 0으로 초기화

        # sin: 2i
        # 0::2 : 0에서 시작해서 2개씩 뛰어넘기
        pos_embedding[:, 0::2] = torch.sin(pos * den)

        # cos: 2i + 1
        pos_embedding[:, 1::2] = torch.cos(pos * den)

        pos_embedding = pos_embedding.unsqueeze(-2) # 마지막 정보 없애기

        self.dropout = nn.Dropout(dropout)
        self.register_buffer("pos_embedding", pos_embedding)  # "pos_embedding"을 call하면 pos_embedding 값을 가져옴

    def forward(self, token_embedding: torch.Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

class TokenEmbedding(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        embed_size: int
    ):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.embed_size = embed_size
    
    def forward(self, tokens: torch.Tensor):
        # * math.sqrt(self.embed_size) : scaling for embed size
        return self.embedding(token.long()) * math.sqrt(self.embed_size)

class TransformerSeq2Seq(TransformerTranslateLightningModule):
    def __init__(self, cfg: DictConfig):
        super().__init__(cfg)
        self.cfg = cfg
        num_encoder_layers = self.cfg.model.num_encoder_layers
        num_decoder_layers = self.cfg.model.num_decoder_layers
        embed_size = self.cfg.model.embed_size
        nhead = self.cfg.model.nhead
        src_vocab_size = self.cfg.model.src_vocab_size
        tgt_vocab_size = self.cfg.model.tgt_vocab_size
        dim_feedforward = self.cfg.model.dim_feedforward
        dropout = self.cfg.model.dropout

        self.transformer = Transformer(
            d_model = embed_size,
            nhead = nhead,
            num_encoder_layers = num_encoder_layers,
            num_decoder_layers = num_decoder_layers,
            dim_feedforward = dim_feedforward,
            dropout = dropout
        )

        self.generator = nn.Linear(embed_size, tgt_vocab_size)
        self.src_token_embed = TokenEmbedding(src_vocab_size, embed_size)
        self.tgt_token_embed = TokenEmbedding(tgt_vocab_size, embed_size)
        self.positional_encoding = PositionalEncoding(embed_size, dropout=dropout)

    def generate_squre_subsequent_mask(self, sz: int):
        # device=self.device : 현재 device를 인식해서 GPU를 쓰는 중이면 GPU에 맞게 변환
        mask = (torch.triu(torch.ones((sz, sz), device=self.device)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float("-inf")).masked_fill(mask == 1, float(0.0))
        return mask

    def create_mask(self, src, tgt):
        src_seq_len = src.shape[0]
        tgt_seq_len = tgt.shape[0]

        tgt_mask = self.generate_square_subsequent_mask(tgt_seq_len)
        # device=self.device : 현재 device를 인식해서 GPU를 쓰는 중이면 GPU에 맞게 변환
        src_mask = torch.zeros((src_seq_len, src_seq_len), device=self.device).type(torch.bool)

        src_padding_mask = (src == self.cfg.data.vocab.special_symbol2index["<pad>"]).transpose(0, 1)
        tgt_padding_mask = (tgt == self.cfg.data.vocab.special_symbol2index["<pad>"]).transpose(0, 1)
        return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
    
    def forward(self, src: torch.Tensor, tgt: torch.Tensor):

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = self.create_mask(src, tgt)
        memory_key_padding_mask = src_padding_mask

        src_emb = self.positional_encoding(self.src_token_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_token_emb(tgt))

        outs = self.transformer(
            src_emb, tgt_emb, src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask, memory_key_padding_mask
        )

        return self.generator(outs)

    def encode(self, src: torch.Tensor, src_mask: torch.Tensor):
        return self.transformer.encoder(self.positional_encoding(self.src_token_emb(src)), src_mask)
    
    def decode(self, tgt: torch.Tensor, memory: torch.Tensor, tgt_mask: torch.Tensor):
        return self.transformer.decoder(self.positional_encoding(self.tgt_token_emb(tgt)), memory, tgt_mask)

In [None]:
data_spacy_de_en_cfg = {
    "name": "spacy_de_en",
    "data_root": os.path.join(os.getcwd(), "data"),
    "tokenizer": "spacy",
    "src_lang": "de",  # source
    "tgt_lang": "en",  # target
    "src_index": 0,
    "tgt_index": 1,
    "vocab": {
        # special token을 index로 처리
        "special_symbol2index": {
            "<unk>": 0,   # unknown token
            "<pad>": 1,   # sequence 길이가 안 맞는 경우 뒷 부분 padding 처리
            "<bos>": 2,   # 문장의 시작
            "<eos>": 3,   # 문장의 끝
        },
        "special_first": True,
        "min_freq": 2
    }
}

# 잘 만들어졌는지 중간 확인
data_cfg = OmegaConf.create(data_spacy_de_en_cfg)
# 예쁘게 프린트하는 방법1) to_yaml 사용
# print(OmegaConf.to_yaml(data_cfg))  # 문제 없이 잘 프린트 되면 된 것
# 방법2) pprint 사용 (pretty pring) - 단, pprint는 dictionary 형태여야 함
pprint(dict(data_cfg))

# get_dataset
train_data, valid_data, test_data = Multi30k(data_cfg.data_root)

token_transform = get_token_transform(data_cfg)
vocab_transform = get_vocab_transform(data_cfg)

### model configs

In [None]:
model_translate_lstm_seq2seq_cfg = {
    "name": "LSTMSeq2Seq",
    # "out_dim": len(vocab_transform(data_cfg))
    "enc" : {
        "input_dim": len(vocab_transform(data_cfg.src_lang)),
        "embed_dim": 256,
        "hidden_dim": 256,
        "n_layers": 2,
        "dropout": 0.5,
    },
    "dec": {
        "output_dim": len(vocab_transform(data_cfg.tgt_lang)),
        "embed_dim": 256,
        "hidden_dim": 256,
        "n_layers": 2,
        "dropout": 0.5,
    },
    "teacher_forcing_ratio": 0.5
}

model_translate_attention_based_seq2seq_cfg = {
    "name": "AttentionBasedSeq2Seq",
    # "out_dim": len(vocab_transform(data_cfg))
    "enc" : {
        "input_dim": len(vocab_transform(data_cfg.src_lang)),
        "embed_dim": 256,
        "enc_hidden_dim": 512,
        "dec_hidden_dim": 512,
        "n_layers": 1,
        "dropout": 0.5,
    },
    "dec": {
        "output_dim": len(vocab_transform(data_cfg.tgt_lang)),
        "embed_dim": 256,
        "enc_hidden_dim": 512,
        "dec_hidden_dim": 512,
        "n_layers": 1,
        "dropout": 0.5,
    },
    "attention": {
        "enc_hidden_dim": 512,
        "dec_hidden_dim": 512,
    },
    "teacher_forcing_ratio": 0.5
}

model_translate_transformer_seq2seq_cfg = {
    "name": "TransformerSeq2Seq",
    "num_encoder_layers": 3,
    "num_decoder_layers": 3,
    "embed_size": 512,
    "nhead": 8,
    "src_vocab_size": len(vocab_transform(data_cfg.src_lang)),
    "tgt_vocab_size": len(vocab_transform(data_cfg.tgt_lang)),
    "dim_feedforward": 512,  # 너무 크면 모델 느려짐
    "dropout": 0.5,
}

### opt config

In [None]:
opt_cfg = {
    "optimizers": [
                   {
                    # config_utils.py 파일 참고
                    "name": "RAdam",
                    "kwargs": {
                        "lr": 1e-3,
                    }
                   }
    ],
    "lr_schedulers": [
                      {
                          "name": None,
                       "kwargs": {
                           "warmup_end_steps": 1000
                           }
                       }
    ]
}

_merged_cfg_presets = {
    "LSTM_seq2seq_de_en_translate": {
        "opt": opt_cfg,
        "data": data_spacy_de_en_cfg,
        "model": model_translate_lstm_seq2seq_cfg,
    },
    "attention_based_seq2seq_de_en_translate": {
        "opt": opt_cfg,
        "data": data_spacy_de_en_cfg,
        "model": model_translate_attention_based_seq2seq_cfg,
    },  
    "transformer_seq2seq_de_en_translate": {
        "opt": opt_cfg,
        "data": data_spacy_de_en_cfg,
        "model": model_translate_transformer_seq2seq_cfg,
    },     
}

# clear config hydra instance first
hydra.core.global_hydra.GlobalHydra.instance().clear()

# register preset configs
register_config(_merged_cfg_presets)

# initialization & compose configs
hydra.initialize(config_path=None)
cfg = hydra.compose("transformer_seq2seq_de_en_translate")

# override some cfg
run_name = f"{datetime.now().isoformat(timespec='seconds')}-{cfg.model.name}-{cfg.data.name}"

project_root_dir = os.path.join(drive_project_root, "runs", "de_en_translate_tutorials")

In [None]:
save_dir = os.path.join(project_root_dir, run_name)
run_root_dir = os.path.join(project_root_dir, run_name)

# train configs
train_cfg = {
    "train_batch_size": 128,
    "val_batch_size": 32,
    "test_batch_size": 32,
    "train_val_split": [0.9, 0.1],
    "run_root_dir": run_root_dir,
    "trainer_kwargs": {
        "accelerator": "dp", # 하나의 gpu로 할 때는 dp로 하지만 multiple gpu인 경우 ddp 등 설정 가능
        "gpus": "0",         # "gpus": "0",  # 0번 gpu 사용하기
        "max_epochs": 50,    
        # 1.0 : train epoch가 끝날 때 validation check을 하겠다
        # 0.5 : train epoch가 절반 돌았을 때 validation check 하겠다
        # integer인 경우 : 몇 step마다 돌 지 설정하는 것
        "val_check_interval": 1.0,
        "log_every_n_steps": 100,   # 100번 step마다 한다
        "flush_logs_every_n_steps": 100,
    },    
}


# logger configs
log_cfg = {
    "loggers": {
        "WandbLogger": { 
            "project": "fastcampus_de_en_translate_tutorials",
            "name": run_name,
            "tags": ["fastcampus_de_en_translate_tutorials"],
            "save_dir": run_root_dir,
        },
        "TensorBoardLogger": {
            "save_dir": project_root_dir,
            "name": run_name,
            },
    },
    "callbacks": {
        "ModelCheckpoint": {
            "save_top_k": 3,
            "monitor": "val_loss",
            "mode": "min",
            "verbose": True,
            "dirpath": os.path.join(run_root_dir, "weights"),
            "filename": "{epoch}-{val_loss:.3f}"
            },
        "EarlyStopping": {
            "monitor": "val_loss",
            "mode": "min",
            "patience": 3,
            "verbose": True,
            }
    }
}

OmegaConf.set_struct(cfg, False)
cfg.train = train_cfg
cfg.log = log_cfg

# lock config
OmegaConf.set_struct(cfg, True)
print(OmegaConf.to_yaml(cfg))

In [None]:
# dataloader def

train_dataloader = get_multi30k_dataloader(
    "train",
    (data_cfg.src_lang, data_cfg.tgt_lang),
    cfg.train.train_batch_size,
    collate_fn=get_collate_fn(cfg)
)

val_dataloader = get_multi30k_dataloader(
    "valid",
    (data_cfg.src_lang, data_cfg.tgt_lang),
    cfg.train.val_batch_size,
    collate_fn=get_collate_fn(cfg)
)

test_dataloader = get_multi30k_dataloader(
    "test",
    (data_cfg.src_lang, data_cfg.tgt_lang),
    cfg.train.test_batch_size,
    collate_fn=get_collate_fn(cfg)
)

### pl translater def & get model

In [None]:
def get_pl_model(cfg: DictConfig, checkpoint_path: Optional[str] = None):

    if cfg.model.name == "LSTMSeq2Seq":
        model = LSTMSeq2Seq(cfg)
    elif cfg.model.name == "AttentionBasedSeq2Seq":
        model = AttentionBasedSeq2Seq(cfg)
    elif cfg.model.name == "TransformerSeq2Seq":
        model = TransformerSeq2Seq(cfg)
    else:
        raise NotImplementedError("Not Implemented model")
    
    if checkpoint_path is not None:
        model = model.load_from_checkpoint(cfg, checkpoint_path = checkpoint_path)
    return model

model = None
model = get_pl_model(cfg)
print(model)

### pytorch lightning trainer def

In [None]:
logger = get_loggers(cfg)
callbacks = get_callbacks(cfg)

trainer = pl.Trainer(
    callbacks = callbacks,
    logger = logger,
    default_root_dir = cfg.train.run_root_dir,
    num_sanity_val_steps = 3,
    **cfg.train.trainer_kwargs
)

In [None]:
trainer.fit(model, train_dataloader, val_dataloader)