<a href="https://colab.research.google.com/github/minseong-oh/TransformerForTranslation/blob/main/Translator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
!sudo apt-get install -y fonts-nanum
!sudo fc-cache -fv
!rm ~/.cache/matplotlib -rf

In [2]:
%%capture
!pip install gdown
!pip install transformers
!pip install einops # Einstein operations => rearrange 함수를 위해

In [3]:
from google.colab import drive
drive.mount('/content/drive')
import torch
from torch import nn, optim
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from transformers import MarianMTModel, MarianTokenizer
import pandas as pd
from tqdm import tqdm
import math
import random
from einops import rearrange
import matplotlib.pyplot as plt
plt.rc('font', family='NanumBarunGothic')
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(DEVICE)

Mounted at /content/drive
cuda


In [4]:
# for random seed
random_seed = 0
torch.manual_seed(random_seed) #torch에서 동일한 seed를 설정
torch.cuda.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
random.seed(random_seed)

### Load the tokenizer

In [5]:
tokenizer = MarianTokenizer.from_pretrained("Helsinki-NLP/opus-mt-ko-en")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/44.0 [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/842k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/813k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.72M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.39k [00:00<?, ?B/s]



In [6]:
eos_idx = tokenizer.eos_token_id
pad_idx = tokenizer.pad_token_id
print("eos_idx = ", eos_idx)
print("pad_idx = ", pad_idx)

eos_idx =  0
pad_idx =  65000


In [66]:
BATCH_SIZE = 64
LAMBDA = 0
EPOCH = 15
max_len = 100
criterion = nn.CrossEntropyLoss(ignore_index = pad_idx) # pad token 시점에서의 loss 무시

scheduler_name = 'Noam'

warmup_steps = 1000
LR_scale = 0.5

In [8]:
save_model_path = '/content/drive/MyDrive/Colab Notebooks/results/Transformer_small2.pt'
save_history_path = '/content/drive/MyDrive/Colab Notebooks/results/Transformer_small2_history.pt'

In [9]:
# 논문보다 모델 사이즈 축소
n_layers = 3
d_model = 256
d_ff = 512
n_heads = 8
drop_p = 0.1

In [55]:
!gdown https://drive.google.com/uc?id=1Yvsp6Nv4VUWEg0qOVU2TVFwYOXoKUJgm\
data = pd.read_excel('대화체.xlsx')

Downloading...
From: https://drive.google.com/uc?id=1Yvsp6Nv4VUWEg0qOVU2TVFwYOXoKUJgm
To: /content/대화체.xlsx
  0% 0.00/8.17M [00:00<?, ?B/s]100% 8.17M/8.17M [00:00<00:00, 120MB/s]


In [58]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return self.data.shape[0]

    def __getitem__(self, idx):
        return self.data.loc[idx, '원문'], self.data.loc[idx, '번역문']

custom_DS = CustomDataset(data)

train_DS, val_DS, test_DS = torch.utils.data.random_split(custom_DS, [97000,2000,1000])

train_DL = torch.utils.data.DataLoader(train_DS, batch_size = BATCH_SIZE, shuffle = True)
val_DL = torch.utils.data.DataLoader(val_DS, batch_size = BATCH_SIZE, shuffle = True)
test_DL = torch.utils.data.DataLoader(test_DS, batch_size = BATCH_SIZE, shuffle = True)

print(len(train_DS))
print(len(val_DS))
print(len(test_DS))

97000
2000
1000


## Transformer 모델 구현

### Multi-Head Attention



In [72]:
class MHA(nn.Module):
    def __init__(self, d_model, n_heads):
        super().__init__()

        self.n_heads = n_heads

        self.fc_q = nn.Linear(d_model, d_model)
        self.fc_k = nn.Linear(d_model, d_model)
        self.fc_v = nn.Linear(d_model, d_model)
        self.fc_o = nn.Linear(d_model, d_model)

        self.scale = torch.sqrt(torch.tensoe(d_model/n_heads))

    def forward(self, q, k, v, mask=None):
        batch_size = q.shape[0]

        q = self.fc_q(q)
        k = self.fc_k(k)
        v = self.fc_v(v)

        q = rearrange(q, 'b n (h d) -> b h n d', h = self.n_heads)
        k = rearrange(k, 'b n (h d) -> b h n d', h = self.n_heads)
        v = rearrange(v, 'b n (h d) -> b h n d', h = self.n_heads)

        # Attention
        attention_score = q @ k.transpose(-1, -2) / self.scale

        if mask is not None:
            attention_score[mask] = -1e10

        attention_score = torch.softmax(attention_score, dim = -1)
        attention = attention_score @ v
        attention = rearrange(attention, 'b h n d -> b n (h d)') # (64,3,512)
        output = self.fc_o(attention) # nn.Linear(512,512) 통과

        return output

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()

        self.fc = nn.Sequential(nn.Linear(d_model, d_ff), # 비선형성을 위한 늘렸다 줄이기
                                nn.ReLU(),
                                nn.Dropout(drop_p),
                                nn.Linear(d_ff, d_model))
    def forward(self, x):
        output = self.fc(x)
        return output

### Encoder

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, d_ff, n_heads, drop_p):
        super().__init__()

        self.self_atten = MHA(d_model, n_heads)
        self.self_atten_LN = nn.LayerNorm(d_model)

        self.FF = FeedForward(d_model, d_ff)
        self.FF_LN = nn.LayerNorm(d_model)

        self.self_atten_dropout = nn.Dropout(drop_p)

    def forward(self, x, enc_mask):
        residual = self.self_atten(x, x, x, enc_mask)
        residual = self.dropout(residual)
        x = self.self_atten_LN(x + residual)

        residual = self.FF(x)
        residual = self.dropout(residual)
        x = self.FF_LN(x + residual)

        return x

class Encoder(nn.Module):
    def __init__(self, input_imbedding, max_len, n_layers, d_model, d_ff, n_heads, drop_p):
        super().__init__()
        self.scale = torch.sqrt(torch.tensor(d_model))
        self.input_imbedding = input_imbedding
        self.pos_embedding = nn.Embedding(max_len, d_model)

        self.dropout = nn.Dropout(drop_p)

        self.layers = nn.ModuleList([EncoderLayer(d_model, d_ff, n_heads, drop_d) for _ in range(n_layers)])

    def forward(self, src, mask, atten_map_save=False):
        pos = torch.arrange(src.shape[1].expand_as(src).to(DEVICE))
        x = self.scale * self.input_imbedding(src) + self.pos_embedding(pos)
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(x, mask)
            if atten_map_save is True:
                atten_encs = torch.cat([atten_encs, atten_enc[0].unsqueeze(0)], dim = 0)

        return x