# Transformer (Attention Is All You Need) 구현하기 (2/3)
- [code 참고 : transformer 구현하기(2/3)](https://paul-hyun.github.io/transformer-02/)
- [이론참고 : Attention is all you need 뽀개기](https://pozalabs.github.io/transformer/)

# 0. Settings

In [4]:
!pip install sentencepiece



In [2]:
import sentencepiece as spm
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F

# 1. Config
- Transformer 모델에는 많은 설정이 필요함
- 이 설정을 json 형태로 저장을 하고 이를 읽어서 처리하는 간단한 클래스
- [@ decorator ref](https://yeomko.tistory.com/12)
- [@classmethod and @staticmethod](https://wikidocs.net/16074)

In [3]:
"""configuration json을 읽어들이는 class"""
class Config(dict):
    __getattr__ = dict.__getitem__
    __srtattr__ = dict.__setitem__
    
    @classmethod
    def load(cls, file):
        with open(file, 'r') as f:
            config = json.loads(f.read())
            return Config(config)

In [4]:
# vocab loading
vocab_file = "./data/kowiki.model"
vocab = spm.SentencePieceProcessor()
vocab.load(vocab_file)

True

In [5]:
# 작은 리소스에도 동작 가능하도록 파라미터를 작게 설정함
# GPU에 여유가 있다면 파라미터를 키우면 더 좋은 결과를 얻을 수 있음
config = Config({
    "n_enc_vocab": len(vocab),
    "n_dec_vocab": len(vocab),
    "n_enc_seq": 256,
    "n_dec_seq": 256,
    "n_layer": 6,
    "d_hidn": 256,
    "i_pad": 0,
    "d_ff": 1024,
    "n_head": 4,
    "d_head": 64,
    "dropout": 0.1,
    "layer_norm_epsilon": 1e-12
})
print(config)


{'n_enc_vocab': 8007, 'n_dec_vocab': 8007, 'n_enc_seq': 256, 'n_dec_seq': 256, 'n_layer': 6, 'd_hidn': 256, 'i_pad': 0, 'd_ff': 1024, 'n_head': 4, 'd_head': 64, 'dropout': 0.1, 'layer_norm_epsilon': 1e-12}


# 2. Common Class
- Position Embedding
- Multi-Head Attention
- Feed Forward

## 2-1. Position Embedding
- line 8 : 각 position별 hidden index별 angle값 구하기
- line 9 : hidden even index의 angle값의 sin값 구하기
- line 10: hidden odd index의 angle값의 cos값 구하기

In [6]:
"""sinusoid position encoding"""
def get_sinusoid_encoding_table(n_seq, d_hidn):
    def cal_angle(position, i_hidn):
        return position / np.power(10000, 2*(i_hidn // 2) / d_hidn)
    def get_posi_angle_vec(position):
        return [cal_angle(position, i_hidn) for i_hidn in range(d_hidn)]
    
    sinusoid_table = np.array([get_posi_angle_vec(i_seq) for i_seq in range(n_seq)])
    sinusoid_table[:,0::2] = np.sin(sinusoid_table[:, 0::2]) # even idx 
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:,1::3]) # odd idx 
    
    return sinusoid_table

## 2-2. Multi-Head Attention
### Attention Pad Mask
- Attention을 구할 때 패딩 부분을 제외하기 위해 mask를 구하는 함수
- line 5 : K의 값중에 Pad인 부분을 True로 변경
- line 6 : 구해진 값의 크기를 Q-len, K-len 되도록 변경

In [7]:
"""attention pad mask"""
def get_attn_pad_mask(seq_q, seq_k, i_pad):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    pad_attn_mask = seq_k.data.eq(i_pad)
    pad_attn_mask = pad_attn_mask.unsqueeze(1).example(batch_size, 
                                                       len_q, len_k)
    return pad_attn_mask                                             

### Attention Decoder Mask
- Decoder의 masked multi head attention에서 사용할 mask를 구하는 함수
- 현재단어와 이전단어는 볼 수 있고 다음 단어는 볼 수 없도록 masking함
- line 3: 모든 값이 1인 Q-len, K-len 테이블 생성
- line 4 : 대각선을 기분으로 아래쪽을 0으로 만듦

In [15]:
input = torch.empty(2, 3)
tmp = torch.ones_like(input)
tmp, tmp.unsqueeze(1)

(tensor([[1., 1., 1.],
         [1., 1., 1.]]),
 tensor([[[1., 1., 1.]],
 
         [[1., 1., 1.]]]))

In [None]:
"""attention decoder mask"""
def get_attn_decoder_mask(seq):
    ones_to_column = torch.ones_like(seq).unsqueeze(-1)
    subsequent_mask = ones_to_column.expand(seq.size(0),
                                            seq.size(1), seq.size(1))
    # upper triangular part of a matrix(2-D)
    subsequent_mask = subsequent_mask.triu(diagonal=1)
    return subsequent_mask

### Scaled Dot product Attention
- line 11 : Q * K.transpose(내적값)를 구한다
- line 12 : K-dimension에 루트를 취한 값으로 나눠 줌(scaling)
- line 13 : Mask 적용
- line 15 : Softmax를 취해 각 단어의 가중치 확률분포 attn_prob을 구함
- attn_prob * V를 구함. 구한 값은 Q에 대한 V의 가중치 합 벡터

In [None]:
"""scale dot product attention"""
class ScaledDotProductAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.dropout = nn.Dropout(config.dropout)
        self.scale = 1 / (self.config.d_head ** 0.5)
        
    def forward(self, Q, K, V, attn_mask):
        # (batchsize, n_head, n_q_seq, n_k_seq)
        scores = torch.matmul(Q, K.transpose(-1,-2))
        scores = scores.mul_(self.scale)
        scores.masked_fill__(attn_mask, -1e9)
        # (batchsize, n_head, n_q_seq, n_k_seq)
        attb_prob = nn.Softmax(dim = -1)(scores)
        attn_prob = self.dropout(attn_prob)
        # (batchsize, n_head, n_q_seq, d_v)
        context = torch.matmul(attn_prob, V)
        # (batchsize, n_head, n_q_seq, d_v)
        #,(batchsize, n_head, n_q_seq, n_v_seq)
        return context, attn_prob
        

### Multi-Head Attention
- line 17 : Q * W_Q / multi-head
- line 19 : K * W_K / multi-head
- line 21 : V * W_V / multi-head
- line 27 : ScaledDotProductAttention 클래스를 이용해 각 head별 Attention 구하기
- line 29 : 여러개의 head를 1개로 합침
- line 31 : Linear를 취해 최종 Multi-Head Attention값 구하기

In [16]:
"""multi head attention"""
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        
        d_hidn = self.config.d_hidn
        n_head = self.config.n_head
        d_head = self.config.d_head
        
        self.W_Q = nn.Linear(d_hidn, n_head * d_head)
        self.W_K = nn.Linear(d_hidn, n_head * d_head)
        self.W_V = nn.Linear(d_hidn, n_head * d_head)
        self.scaled_dot_attn = ScaledDotProductAttention(self.config)
        self.linear = nn.Linear(n_head * d_head, d_hidn)
        self.dropout = nn.Dropout(config.dropout)
        
    def forward(self, Q, K, V, attn_mask):
        d_hidn = self.config.d_hidn
        n_head = self.config.n_head
        d_head = self.config.d_head
        
        batch_size = Q.size(0)
        
        # (bs, n_head, n_q_seq, d_head)
        q_s = self.W_Q(Q).view(batch_size,-1,n_head,d_head).transpose(1,2)
        # (bs, n_head, n_k_seq, d_head)
        k_s = self.W_K(K).view(batch_size,-1,n_head,d_head).transpose(1,2)
        # (bs, n_head, n_v_seq, d_head)
        v_s = self.W_V(V).view(batch_size,-1,n_head,d_head).transpose(1,2)

        # (bs, n_head, n_q_seq, n_k_seq)
        attn_mask = attn_mask.unsqueeze(1).repeat(1,n_head,1,1)

        # (bs, n_head, n_q_seq, d_head), (bs, n_head, n_q_seq, n_k_seq)
        context, attn_prob = self.scaled_dot_attn(q_s, k_s, v_s, attn_mask)
        # (bs, n_head, n_q_seq, h_head * d_head)
        context = context.transpose(1, 2).contiguous().view(batch_size,-1,n_head*d_head)
        # (bs, n_head, n_q_seq, e_embd)
        output = self.linear(context)
        output = self.dropout(output)
        # (bs, n_q_seq, d_hidn), (bs, n_head, n_q_seq, n_k_seq)
        return output, attn_prob

## 2-3. FeedForward
- line 14 : Linear를 실행하여 shape을 d_ff(hidden * 4) 크기로 키움
- line 15 : 활성화 함수(relu/gelu)를 실행
- line 17 : Linear를 실행하여 shape을 hidden 크기로 줄임

In [17]:
"""feed forward"""
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.conv1 = nn.Conv1d(in_channels=self.config.d_hidn, out_channels=self.config.d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=self.config.d_ff, out_channels=self.config.d_hidn, kernel_size=1)
        self.active = F.gelu
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, inputs):
        # (bs, d_ff, n_seq)
        output = self.conv1(inputs.transpose(1, 2))
        output = self.active(output)
        # (bs, n_seq, d_hidn)
        output = self.conv2(output).transpose(1, 2)
        output = self.dropout(output)
        # (bs, n_seq, d_hidn)
        return output

# 3. Encoder
## 3-1. Encoder Layer
- Encoder에서 루프를 돌며 처리할 수 있도록 EncoderLayer를 정의하고 여러 개 만들어서 실행 합니다
1. Muilti-head attention 수행
    - Q = K = V인 Self-Attention임
2. 1번의 결과와 input(residual)을 더한 후 LayerNorm을 실행한다
3. 2번의 결과를 입력으로 Feed Forward를 실행한다
4. 3번의 결과와 2번의 결과(residual)을 더한 후 LayerNorm을 실행한다

In [18]:
"""encoder layer"""
class EncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.self_attn = MultiHeadAttention(self.config)
        self.layer_norm1 = nn.LayerNorm(self.config.d_hidn, 
                                        eps=self.config.layer_norm_epsilon)
        self.pos_ffn = PoswiseFeedForwardNet(self.config)
        self.layer_norm2 = nn.LayerNorm(self.config.d_hidn,
                                       eps=self.config.layer_norm_epsilon)
    def forward(self, inputs, attn_mask):
        # (bs, n_enc_seq, d_hidn), (bs, n_head, n_enc_seq, n_enc_seq)
        att_outputs, attn_prob = self.self_attn(inputs, inputs, inputs, attn_mask)
        att_outputs = self.layer_norm1(inputs + att_outputs)
        # (bs, n_enc_seq, d_hidn)
        ffn_outputs = self.pos_ffn(att_outputs)
        ffn_outputs = self.layer_norm2(ffn_outputs + att_outputs)
        # (bs, n_enc_seq, d_hidn), (bs, n_head, n_enc_seq, n_enc_seq)
        return ffn_outputs, attn_prob

## 3-2. Encoder
1. 입력에 대한 position 값 구하기
2. Input Embedding과 position Embedding구하고 더하기
3. 입력에 대한 attention pad mask 구하기
4. for 루프를 돌며 각 layer를 실행 : layer의 입력은 이전 layer의 출력 값임

In [19]:
""" encoder """
class Encoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.enc_emb = nn.Embedding(self.config.n_enc_vocab, self.config.d_hidn)
        sinusoid_table = torch.FloatTensor(get_sinusoid_encoding_table(self.config.n_enc_seq + 1, self.config.d_hidn))
        self.pos_emb = nn.Embedding.from_pretrained(sinusoid_table, freeze=True)

        self.layers = nn.ModuleList([EncoderLayer(self.config) for _ in range(self.config.n_layer)])
    
    def forward(self, inputs):
        positions = torch.arange(inputs.size(1), device=inputs.device, dtype=inputs.dtype).expand(inputs.size(0), inputs.size(1)).contiguous() + 1
        pos_mask = inputs.eq(self.config.i_pad)
        positions.masked_fill_(pos_mask, 0)

        # (bs, n_enc_seq, d_hidn)
        outputs = self.enc_emb(inputs) + self.pos_emb(positions)

        # (bs, n_enc_seq, n_enc_seq)
        attn_mask = get_attn_pad_mask(inputs, inputs, self.config.i_pad)

        attn_probs = []
        for layer in self.layers:
            # (bs, n_enc_seq, d_hidn), (bs, n_head, n_enc_seq, n_enc_seq)
            outputs, attn_prob = layer(outputs, attn_mask)
            attn_probs.append(attn_prob)
        # (bs, n_enc_seq, d_hidn), [(bs, n_head, n_enc_seq, n_enc_seq)]
        return outputs, attn_probs

# 4. Decoder
## 4-1. Decoder Layer
- Decoder에서 루프를 돌며 처리할 수 있도록 DecoderLayer를 정의하고 여러개를 만들어서 실행
1. Muilti-head attention 수행
    - Q = K = V인 Self-Attention임
2. 1번의 결과와 input(residual)을 더한 후 LayerNorm을 실행한다
3. Encoder-Decoder Multi-Head Attention을 수행
    - Q : 2번의 결과
    - K, V : Encoder 결과
4. 3번의 결과와 2번의 결과(residual)을 더한 후 LayerNorm을 실행한다
5. 4번의 결과를 입력으로 Feed Forward를 실행한다
6. 5번의 결과와 4번의 결과(residual)을 더한 후 Layer Norm을 실행한다

In [20]:
""" decoder layer """
class DecoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.self_attn = MultiHeadAttention(self.config)
        self.layer_norm1 = nn.LayerNorm(self.config.d_hidn, eps=self.config.layer_norm_epsilon)
        self.dec_enc_attn = MultiHeadAttention(self.config)
        self.layer_norm2 = nn.LayerNorm(self.config.d_hidn, eps=self.config.layer_norm_epsilon)
        self.pos_ffn = PoswiseFeedForwardNet(self.config)
        self.layer_norm3 = nn.LayerNorm(self.config.d_hidn, eps=self.config.layer_norm_epsilon)
    
    def forward(self, dec_inputs, enc_outputs, self_attn_mask, dec_enc_attn_mask):
        # (bs, n_dec_seq, d_hidn), (bs, n_head, n_dec_seq, n_dec_seq)
        self_att_outputs, self_attn_prob = self.self_attn(dec_inputs, dec_inputs, dec_inputs, self_attn_mask)
        self_att_outputs = self.layer_norm1(dec_inputs + self_att_outputs)
        # (bs, n_dec_seq, d_hidn), (bs, n_head, n_dec_seq, n_enc_seq)
        dec_enc_att_outputs, dec_enc_attn_prob = self.dec_enc_attn(self_att_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)
        dec_enc_att_outputs = self.layer_norm2(self_att_outputs + dec_enc_att_outputs)
        # (bs, n_dec_seq, d_hidn)
        ffn_outputs = self.pos_ffn(dec_enc_att_outputs)
        ffn_outputs = self.layer_norm3(dec_enc_att_outputs + ffn_outputs)
        # (bs, n_dec_seq, d_hidn), (bs, n_head, n_dec_seq, n_dec_seq), (bs, n_head, n_dec_seq, n_enc_seq)
        return ffn_outputs, self_attn_prob, dec_enc_attn_prob

### 4-2. Decoder
1. 입력에 대한 position 값 구하기
2. Input Embedding과 position Embedding구하고 더하기
3. 입력에 대한 attention pad mask 구하기
4. 입력에 대한 decoder attention mask 구하기
5. attention pad mask와 decoder attention mask 중 1곳이라도 mask되어 있는 부분인 mask 되도록 attention mask를 구한다
6. Q(decoder input), K(encoder output)에 대한 attention mask구하기
7. for 루프를 돌며 각 layer를 실행 : layer의 입력은 이전 layer의 출력 값임

In [21]:
""" decoder """
class Decoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.dec_emb = nn.Embedding(self.config.n_dec_vocab, self.config.d_hidn)
        sinusoid_table = torch.FloatTensor(get_sinusoid_encoding_table(self.config.n_dec_seq + 1, self.config.d_hidn))
        self.pos_emb = nn.Embedding.from_pretrained(sinusoid_table, freeze=True)

        self.layers = nn.ModuleList([DecoderLayer(self.config) for _ in range(self.config.n_layer)])
    
    def forward(self, dec_inputs, enc_inputs, enc_outputs):
        positions = torch.arange(dec_inputs.size(1), device=dec_inputs.device, dtype=dec_inputs.dtype).expand(dec_inputs.size(0), dec_inputs.size(1)).contiguous() + 1
        pos_mask = dec_inputs.eq(self.config.i_pad)
        positions.masked_fill_(pos_mask, 0)
    
        # (bs, n_dec_seq, d_hidn)
        dec_outputs = self.dec_emb(dec_inputs) + self.pos_emb(positions)

        # (bs, n_dec_seq, n_dec_seq)
        dec_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs, self.config.i_pad)
        # (bs, n_dec_seq, n_dec_seq)
        dec_attn_decoder_mask = get_attn_decoder_mask(dec_inputs)
        # (bs, n_dec_seq, n_dec_seq)
        dec_self_attn_mask = torch.gt((dec_attn_pad_mask + dec_attn_decoder_mask), 0)
        # (bs, n_dec_seq, n_enc_seq)
        dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs, self.config.i_pad)

        self_attn_probs, dec_enc_attn_probs = [], []
        for layer in self.layers:
            # (bs, n_dec_seq, d_hidn), (bs, n_dec_seq, n_dec_seq), (bs, n_dec_seq, n_enc_seq)
            dec_outputs, self_attn_prob, dec_enc_attn_prob = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
            self_attn_probs.append(self_attn_prob)
            dec_enc_attn_probs.append(dec_enc_attn_prob)
        # (bs, n_dec_seq, d_hidn), [(bs, n_dec_seq, n_dec_seq)], [(bs, n_dec_seq, n_enc_seq)]S
        return dec_outputs, self_attn_probs, dec_enc_attn_probs

# 5. Transformer
1. Encoder Input을 입력으로 Encoder를 실행
2. Encoder Output과 Decoder Input을 입력으로 Decoder를 실행

In [22]:
""" transformer """
class Transformer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.encoder = Encoder(self.config)
        self.decoder = Decoder(self.config)
    
    def forward(self, enc_inputs, dec_inputs):
        # (bs, n_enc_seq, d_hidn), [(bs, n_head, n_enc_seq, n_enc_seq)]
        enc_outputs, enc_self_attn_probs = self.encoder(enc_inputs)
        # (bs, n_seq, d_hidn), [(bs, n_head, n_dec_seq, n_dec_seq)], [(bs, n_head, n_dec_seq, n_enc_seq)]
        dec_outputs, dec_self_attn_probs, dec_enc_attn_probs = self.decoder(dec_inputs, enc_inputs, enc_outputs)
        # (bs, n_dec_seq, n_dec_vocab), [(bs, n_head, n_enc_seq, n_enc_seq)], [(bs, n_head, n_dec_seq, n_dec_seq)], [(bs, n_head, n_dec_seq, n_enc_seq)]
        return dec_outputs, enc_self_attn_probs, dec_self_attn_probs, dec_enc_attn_probs