해당 노트북은 [Link](https://github.com/ndb796/Deep-Learning-Paper-Review-and-Practice/blob/master/code_practices/Attention_is_All_You_Need_Tutorial_(German_English).ipynb)를 참고하여 제작되었습니다.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
cd /content/drive/MyDrive/AIKU/Github/AIKU-DL-Paper-Review/code_practices/Transformer

/content/drive/MyDrive/AIKU/Github/AIKU-DL-Paper-Review/code_practices/Transformer


## BLEU Score 계산을 위한 라이브러리

In [None]:
!pip install torchtext==0.6.0

Collecting torchtext==0.6.0
  Downloading torchtext-0.6.0-py3-none-any.whl (64 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.2/64.2 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: torchtext
  Attempting uninstall: torchtext
    Found existing installation: torchtext 0.16.0
    Uninstalling torchtext-0.16.0:
      Successfully uninstalled torchtext-0.16.0
Successfully installed torchtext-0.6.0


In [None]:
import torchtext
print(torchtext.__version__)

0.6.0


## Preprocessing

In [None]:
%%capture
!python -m spacy download en # 영어 전처리 모듈 설치
!python -m spacy download de # 독일어 전처리 모듈 설치

### spaCy 라이브러리

In [None]:
import spacy

spacy_en=spacy.load('en_core_web_sm') # 영어 토큰화
spacy_de=spacy.load('de_core_news_sm') # 독일어 토큰화

In [None]:
# 토큰화 기능 써보기
tokenized=spacy_en.tokenizer("I am a graduate student.")

for i,token in enumerate(tokenized):
    print(f'index {i}: {token.text}')

index 0: I
index 1: am
index 2: a
index 3: graduate
index 4: student
index 5: .


In [None]:
# 영어 문장을 토큰화하는 함수
def tokenize_en(text):
    return [token.text for token in spacy_en.tokenizer(text)]
# 독일어 문장을 토큰화하는 함수
def tokenize_de(text):
    return [token.text for token in spacy_de.tokenizer(text)]

### field 라이브러리
+ Source(SRC) : de
+ Target(TRG) : en

In [None]:
from torchtext.data import Field, BucketIterator

SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)

In [None]:
# 영어-독일어 번역 데이터셋 불러오기
from torchtext.datasets import Multi30k

train_dataset = Multi30k(path='./dataset/multi30k/',exts=('train.de','train.en'), fields=(SRC,TRG))
valid_dataset = Multi30k(path='./dataset/multi30k/',exts=('val.de','val.en'), fields=(SRC,TRG))
test_dataset = Multi30k(path='./dataset/multi30k/',exts=('test_2016_flickr.de','test_2016_flickr.en'), fields=(SRC,TRG))

In [None]:
print(f"학습 데이터셋(training dataset) 크기: {len(train_dataset.examples)}개")
print(f"평가 데이터셋(validation dataset) 크기: {len(valid_dataset.examples)}개")
print(f"테스트 데이터셋(testing dataset) 크기: {len(test_dataset.examples)}개")

학습 데이터셋(training dataset) 크기: 29000개
평가 데이터셋(validation dataset) 크기: 1014개
테스트 데이터셋(testing dataset) 크기: 1000개


In [None]:
# 학습 데이터 중 하나를 선택해 출력
print(vars(train_dataset.examples[30])['src'])
print(vars(train_dataset.examples[30])['trg'])

['ein', 'mann', ',', 'der', 'mit', 'einer', 'tasse', 'kaffee', 'an', 'einem', 'urinal', 'steht', '.']
['a', 'man', 'standing', 'at', 'a', 'urinal', 'with', 'a', 'coffee', 'cup', '.']


In [None]:
print(*train_dataset.examples[30].src,sep=' ')

ein mann , der mit einer tasse kaffee an einem urinal steht .


In [None]:
# build_vocab을 이용해 영어/독어 단어 사전 생성
## 최소 2번 이상 등장한 단어만 선택

SRC.build_vocab(train_dataset,min_freq=2)
TRG.build_vocab(train_dataset,min_freq=2)

print(f'len(SRC): {len(SRC.vocab)}')
print(f'len(TRG): {len(TRG.vocab)}')

len(SRC): 7853
len(TRG): 5893


In [None]:
## .stoi(string->int)를 통해서 단어 사전에서의
## 특정 단어와 맵핑된 고유한 정수를 출력
print(TRG.vocab.stoi['abcabc']) # <unk>=0 없는 단
print(TRG.vocab.stoi[TRG.pad_token]) # <pad>=1
print(TRG.vocab.stoi["<sos>"]) # <sos>: 2
print(TRG.vocab.stoi["<eos>"]) # <eos>: 3
print(TRG.vocab.stoi["hello"])
print(TRG.vocab.stoi["world"])

0
1
2
3
4112
1752


In [None]:
# 한 문장의 단어들이 순서대로 네트워크에 입력되어야 함
## 따라서 하나의 배치에 포함된 문장들이 가지는 단어의 개수가 유사하도록 만들기
## 이를 위해 BucketIterator를 사용합니다.
## 배치 크기(batch size): 128
import torch

device=torch.device('cuda' if  torch.cuda.is_available() else 'cpu')

BATCH_SIZE=128

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_dataset,valid_dataset,test_dataset),
    batch_size=BATCH_SIZE,
    device=device
)

In [None]:
for i,batch in enumerate(train_iterator):
    src=batch.src
    trg=batch.trg

    print(f'첫번째 배치 크기: {trg.shape}')

    # 현재 배치의 첫번째 문장 출력
    for idx,word in enumerate(trg[0]):
        print(f'idx{idx}: {word} => {TRG.vocab.itos[word]}')
    break

첫번째 배치 크기: torch.Size([128, 26])
idx0: 2 => <sos>
idx1: 4 => a
idx2: 9 => man
idx3: 395 => selling
idx4: 893 => produce
idx5: 8 => on
idx6: 7 => the
idx7: 39 => street
idx8: 5 => .
idx9: 3 => <eos>
idx10: 1 => <pad>
idx11: 1 => <pad>
idx12: 1 => <pad>
idx13: 1 => <pad>
idx14: 1 => <pad>
idx15: 1 => <pad>
idx16: 1 => <pad>
idx17: 1 => <pad>
idx18: 1 => <pad>
idx19: 1 => <pad>
idx20: 1 => <pad>
idx21: 1 => <pad>
idx22: 1 => <pad>
idx23: 1 => <pad>
idx24: 1 => <pad>
idx25: 1 => <pad>


In [None]:
train_iterator.batches

<generator object pool at 0x7aad4dd7cba0>

## Multi Head Attention 아키텍쳐

In [None]:
import torch
import torch.nn as nn

class MultiHeadAttentionLayer(nn.Module):
    def __init__(self,hidden_dim,n_heads,dropout_ratio,device):
        super().__init__()

        assert hidden_dim%n_heads==0

        self.hidden_dim=hidden_dim #임베딩 차원
        self.n_heads=n_heads #헤드 개수
        self.head_dim=hidden_dim//n_heads #각 헤드에서의 임베딩 차원

        self.fc_q=nn.Linear(hidden_dim, hidden_dim) #W_Q
        self.fc_k=nn.Linear(hidden_dim, hidden_dim) #W_K
        self.fc_v=nn.Linear(hidden_dim, hidden_dim) #W_V

        self.fc_o=nn.Linear(hidden_dim,hidden_dim) #W_O

        self.dropout=nn.Dropout(dropout_ratio)

        self.scale=torch.sqrt(torch.FloatTensor([self.head_dim])).to(device) #srqt(d_k)

    def forward(self,query,key,value,mask=None):
        batch_size=query.shape[0]

        Q = self.fc_q(query) #XW_Q=Q [n_seq, hidden_dim] x [hidden_dim, hidden_dim] -> [n_seq, hidden_dim]
        K = self.fc_k(key) #XW_Q=Q [n_seq, hidden_dim] x [hidden_dim, hidden_dim] -> [n_seq, hidden_dim]
        V = self.fc_v(value) #XW_Q=Q [n_seq, hidden_dim] x [hidden_dim, hidden_dim] -> [n_seq, hidden_dim]

        # hidden_dim → n_heads X head_dim 형태로 변형
        # n_heads(h)개의 서로 다른 어텐션(attention) 컨셉을 학습하도록 유도
        # [batch_size, seq_len, hidden_dim] -> [batch_size, seq_len, n_heads, head_dim] -> [batch_size, n_heads, seq_len, head_dim]
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)

        # Attention Energy 계산
        # [seq_len x head_dim] * [head_dim x seq_len] -> [seq_len x seq_len]
        # energy: [batch_size, n_heads, query_len, key_len]
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        # 마스크(mask)를 사용하는 경우
        if mask is not None:
            energy=energy.masked_fill(mask==0,-1e10)
            # 마스크(mask) 값이 0인 부분을 -1e10으로 채우기

        # Attention Score 계산
        # [batch_size, n_heads, seq_len, seq_len]
        # attention: [batch_size, n_heads, query_len, key_len]
        attention = torch.softmax(energy, dim=-1) ## dim=-1: 다른 key끼리 더했을 때 1이 되도록

        # Scaled Dot-Product Attention을 계산
        # [seq_len x seq_len] * [seq_len x head_dim] -> [seq_len x head_dim]
        # x: [batch_size, n_heads, query_len, head_dim]
        x = torch.matmul(self.dropout(attention), V)
        x = x.permute(0, 2, 1, 3).contiguous() ## 강제로 메모리 재할당하여 연속적으로 만들기

        # [batch_size, n_heads, seq_len, head_dim] -> [batch_size, seq_len, hidden_dim]
        # hidden_dim=n_heads*head_dim
        x = x.view(batch_size, -1, self.hidden_dim)

        # [batch_size, seq_len, hidden_dim]
        x = self.fc_o(x)

        return x, attention

## Position-wise Feed-forward 아키텍처

In [None]:
class PositionwiseFeedforwardLayer(nn.Module):
    def __init__(self,hidden_dim,pf_dim,dropout_ratio):
        super().__init__()

        self.fc_1=nn.Linear(hidden_dim,pf_dim)
        self.fc_2=nn.Linear(pf_dim,hidden_dim)

        self.dropout=nn.Dropout(dropout_ratio)

    def forward(self,x):
        x=self.dropout(torch.relu(self.fc_1(x)))
        x=self.fc_2(x)
        return x

## EncoderLayer

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self,hidden_dim, n_heads, pf_dim,dropout_ratio,device):
        super().__init__()

        self.self_attn_layer_norm=nn.LayerNorm(hidden_dim)
        self.ff_layer_norm=nn.LayerNorm(hidden_dim)
        self.self_attention=MultiHeadAttentionLayer(hidden_dim,n_heads,dropout_ratio,device)
        self.positionwise_feedforward=PositionwiseFeedforwardLayer(hidden_dim,pf_dim,dropout_ratio)
        self.dropout=nn.Dropout(dropout_ratio)

    def forward(self,src,src_mask):
        # self-attention
        _src,_=self.self_attention(src,src,src,src_mask) ## query,key,value,mask=None

        # dropout+residual -> Layernorm
        src=self.self_attn_layer_norm(src+self.dropout(_src))

        # position-wise feedforward
        _src=self.positionwise_feedforward(src)

        # dropout+residual -> Layernorm
        src=self.ff_layer_norm(src+self.dropout(_src))

        return src

## Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self,input_dim,hidden_dim,n_layers,n_heads,pf_dim,dropout_ratio,device,max_length=100):
        super().__init__()

        self.device=device

        self.tok_embedding=nn.Embedding(input_dim,hidden_dim)
        self.pos_embedding=nn.Embedding(max_length,hidden_dim) #position embedding

        self.layers = nn.ModuleList([EncoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)

    def forward(self,src,src_mask):
        batch_size=src.shape[0]
        src_len=src.shape[1] #seq_len

        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))

        for layer in self.layers:
            src = layer(src, src_mask)

        return src


In [None]:
class Decoder(nn.Module):
    def __init__(sle,f)