In [1]:
import os

# 수학 관련 라이브러리
import numpy as np
import math
# pytorch 관련 라이브러리
import torch
import torch.nn as nn 
import torch.nn.functional as F 

# sample input

In [2]:
sample_input = torch.tensor([[  101,  2572,  3217,  5831,  5496,  2010,  2567,  1010,  3183,  2002,
         2170,  1000,  1996,  7409,  1000,  1010,  1997,  9969,  4487, 23809,
         3436,  2010,  3350,  1012,   102,  7727,  2000,  2032,  2004,  2069,
         1000,  1996,  7409,  1000,  1010,  2572,  3217,  5831,  5496,  2010,
         2567,  1997,  9969,  4487, 23809,  3436,  2010,  3350,  1012,   102,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0]])
# 문장 1개에 패딩 붙은 형태


sample_label = torch.tensor([1])

sample = [sample_input,sample_label]

# label : 1 
# sentence 1 : Amrozi accused his brother, whom he called "the witness", of deliberately distorting his evidence.
# sentence 2 : Referring to him as only "the witness", Amrozi accused his brother of deliberately distorting his evidence.


sample_config = {
    "dim": 768,
    "dim_ff": 3072,
    "n_layers": 12,
    "p_drop_attn": 0.1,
    "n_heads": 12,
    "p_drop_hidden": 0.1,
    "max_len": 512,
    "n_segments": 2,
    "vocab_size": 30522
}

class AttributeDict(dict):
    def __getattr__(self, name):
        return self[name]
model_config = AttributeDict(sample_config)

In [3]:
sample_input.size()

torch.Size([1, 128])

# Activation function

In [4]:
def gelu(x):
    return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))

# Layer normalization

In [None]:
# layernorm이 토치에 구현되있음
# 이거 add+ norm부분에 들어가던거 

In [8]:
class LayerNorm(nn.Module):
    def __init__(self, cfg, variance_epsilon=1e-12):
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(cfg.dim))
        self.beta  = nn.Parameter(torch.zeros(cfg.dim))
        self.variance_epsilon = variance_epsilon

    def forward(self, x):
        # get mean, variance
        u = x.mean(-1, keepdim=True) # sequence 방향 mean
        s = (x - u).pow(2).mean(-1, keepdim=True) # sequence 방향 variance
        
        # normalize
        x = (x - u) / torch.sqrt(s + self.variance_epsilon) # (x - mean)/std 
        
        return self.gamma * x + self.beta # gamma, beta를 이용해 mean, std 조정

# Embedding

In [6]:
# get_sinusoid_encoding = postional encoding 
def get_sinusoid_encoding_table(n_position, d_model):
    def cal_angle(position, hid_idx):
        return position / np.power(10000, 2 * (hid_idx // 2) / d_model)
    def get_posi_angle_vec(position):
        return [cal_angle(position, hid_j) for hid_j in range(d_model)]

    sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(n_position)])
    sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # dim 2i
    sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # dim 2i+1
    return torch.FloatTensor(sinusoid_table)

# word embeding부분인듯 
class Embeddings(nn.Module):
    "The embedding module from word, position and token_type embeddings."
    def __init__(self, cfg):
        super().__init__()
        
        self.tok_embed = nn.Embedding(cfg.vocab_size, cfg.dim) # token embedding
        self.pos_embed = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(cfg.max_len, cfg.dim),freeze=True) # position embedding

        self.norm = LayerNorm(cfg)
        self.drop = nn.Dropout(cfg.p_drop_hidden)

    def forward(self, x):
        seq_len = x.size(1)
        pos = torch.arange(seq_len, dtype=torch.long, device=x.device) # 0,1,2,3,4,5, ..., seq_len-1
        pos = pos.unsqueeze(0).expand_as(x) # (S,) -> (B, S)

        e = self.tok_embed(x) + self.pos_embed(pos)
        return self.drop(self.norm(e))

In [9]:
model = Embeddings(model_config)

In [10]:
out = model(sample[0])
out.size()


torch.Size([1, 128, 768])

In [11]:
out

tensor([[[-1.1387, -0.5771,  0.2106,  ...,  0.0000, -0.5380,  0.0000],
         [ 0.8485,  0.9378,  0.5239,  ..., -0.6106, -2.1513,  0.8872],
         [ 1.8757, -0.4736, -0.0880,  ..., -0.9354, -0.9027,  0.4668],
         ...,
         [-1.4791,  0.8505, -0.1391,  ...,  0.1763,  1.2225,  1.2567],
         [-0.6083,  0.0000, -1.0105,  ...,  0.1768,  1.2280,  1.2622],
         [-0.0116,  0.3409, -1.5030,  ...,  0.1781,  1.2350,  1.2693]]],
       grad_fn=<MulBackward0>)

#  Transformer encoder

In [None]:
class Attention(nn.Module): 
    #Scaled Dot Product Attention
    
    def forward(self, query, key, value, mask=None, dropout=None):
        scores = torch.matmul(query, key.transpose(-2, -1)) \
                 / math.sqrt(query.size(-1)) # scale
        print(mask.size(), scores.size())
        
        # 이부분까지 QK부분 

        # q = [batch, sequence_length , hidden ]
        # k.T = [ batch , sequence_length, hidden ] 이런식으로 
        # matmul 이용해서 3차원 텐서 계산 가능함.  배치 각각에 대하여 matmul 연산 해주는 방법임.  
        
        if mask is not None:
          
            scores = scores.masked_fill(mask == 0, -1e9)

        p_attn = F.softmax(scores, dim=-1)

        if dropout is not None:
            p_attn = dropout(p_attn)

        return torch.matmul(p_attn, value), p_attn # qkv

In [None]:
def split_last(x, shape):                   # [B,S,D] 를 헤드 개수만큼 쪼개주는 함수 , [B,S,H,D/H]
    # [B,T,H] -> [B,T,H1,H2]
    "split the last dimension to given shape"
    shape = list(shape)
    assert shape.count(-1) <= 1
    if -1 in shape:
        shape[shape.index(-1)] = int(x.size(-1) / -np.prod(shape))
    return x.view(*x.size()[:-1], *shape)

def merge_last(x, n_dims):
    "merge the last n_dims to a dimension"
    s = x.size()
    assert n_dims > 1 and n_dims < len(s)
    return x.view(*s[:-n_dims], -1)


class MultiHeadAttention(nn.Module):
    """ Multi-Headed Dot Product Attention """
    def __init__(self, cfg):
        super().__init__()
        self.proj_q = nn.Linear(cfg.dim, cfg.dim) # 워드 임베딩들어왔을떄 q,k, v 만들어주기 위한 웨이트 매트릭스 부분임 . 
        self.proj_k = nn.Linear(cfg.dim, cfg.dim)
        self.proj_v = nn.Linear(cfg.dim, cfg.dim)
        self.drop = nn.Dropout(cfg.p_drop_attn)
        self.scores = None # for visualization
        self.n_heads = cfg.n_heads

    def forward(self, x, mask, x_q=None):
        """
        x, q(query), k(key), v(value) : (B(batch_size), S(seq_len), D(dim))
        mask : (B(batch_size) x S(seq_len))
        * split D(dim) into (H(n_heads), W(width of head)) ; D = H * W
        """
        
        
        
        # (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)
        if x_q is None:
            q, k, v = self.proj_q(x), self.proj_k(x), self.proj_v(x) # 여기다가 정의를 해두었구나 , 인코더부분쓰느거랑 , 디코더의 인풋쓰는 부분. 
        else: 
            q, k, v = self.proj_q(x_q), self.proj_k(x), self.proj_v(x)
        q, k, v = (split_last(x, (self.n_heads, -1)).transpose(1, 2)        # [B,S,D] 를 [B,H, S, D/H] 로 바꿔주는 부분.  
                   for x in [q, k, v])
        # (B, H, S, W) @ (B, H, W, S) -> (B, H, S, S) -softmax-> (B, H, S, S) @ = torch. matmul
        scores = q @ k.transpose(-2, -1) / np.sqrt(k.size(-1)) # @ == torch.matmul (dot product)
        if mask is not None:
            mask = mask[:, None, :].float()
            scores -= 10000.0 * (1.0 - mask)
        scores = self.drop(F.softmax(scores, dim=-1))
        # (B, H, S, S) @ (B, H, S, W) -> (B, H, S, W) -trans-> (B, S, H, W)
        h = (scores @ v).transpose(1, 2).contiguous()
        # -merge-> (B, S, D)
        h = merge_last(h, 2) # 멀티헤드 어텐션으로 나눴던애들 다시 합쳐주는 부분.  
        self.scores = scores
        return h

# Base feedforward network

In [12]:
class PositionWiseFeedForward(nn.Module):
    """ FeedForward Neural Networks for each position """
    def __init__(self, cfg):
        super().__init__()
        self.fc1 = nn.Linear(cfg.dim, cfg.dim_ff)
        self.fc2 = nn.Linear(cfg.dim_ff, cfg.dim)

    def forward(self, x):
        # (B, S, D) -> (B, S, D_ff) -> (B, S, D)
        return self.fc2(gelu(self.fc1(x)))

# Transformer

In [None]:
class Encoder_Block(nn.Module):
    """ Transformer Block """
    def __init__(self, cfg):
        super().__init__()
        self.attn = MultiHeadAttention(cfg)
        self.proj = nn.Linear(cfg.dim, cfg.dim)
        self.norm1 = LayerNorm(cfg)
        self.pwff = PositionWiseFeedForward(cfg)
        self.norm2 = LayerNorm(cfg)
        self.drop = nn.Dropout(cfg.p_drop_hidden)

    def forward(self, x, mask):
        h = self.attn(x, mask)
        h = self.norm1(x + self.drop(self.proj(h)))
        h = self.norm2(h + self.drop(self.pwff(h)))
        return h
    
def get_attn_pad_mask(seq_q, seq_k):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    # eq(zero) is PAD token
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)  # batch_size x 1 x len_k(=len_q), one is masking
    return pad_attn_mask.expand(batch_size, len_q, len_k)  # batch_size x len_q x len_k
    
def get_attn_subsequent_mask(seq):
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    subsequent_mask = np.triu(np.ones(attn_shape), k=1)
    subsequent_mask = torch.tensor(subsequent_mask, device=seq.device).byte()
    return subsequent_mask
    
    
class Decoder_Block(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.self_attention = MultiHeadAttention(cfg)    # 디코더의 인풋 부분
        self.encoder_attention = MultiHeadAttention(cfg) # encoder로 부터 온애
        
        self.norm1 = LayerNorm(cfg)
        self.proj1 = nn.Linear(cfg.dim, cfg.dim)
        self.norm2 = LayerNorm(cfg)
        self.proj2 = nn.Linear(cfg.dim, cfg.dim)
        
        self.pwff = PositionWiseFeedForward(cfg)
        self.norm3 = LayerNorm(cfg)
        
        self.drop = nn.Dropout(cfg.p_drop_hidden)
        
    def forward(self,x , enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
        
        
        # self-attention -> add&norm
        h = self.self_attention(x, dec_self_attn_mask)
        h = self.norm1(x + self.drop(self.proj1(h)))
        
        # encoder attention -> add&norm
        h2 = self.encoder_attention(enc_outputs, dec_enc_attn_mask, x_q=h)
        h = self.norm2(h + self.drop(self.proj2(h2))) 
        
        # feedforward network
        h = self.norm3(h + self.drop(self.pwff(h)))
        
        return h

class Transformer(nn.Module):
    """ Transformer with Self-Attentive Blocks"""
    def __init__(self, cfg):
        super().__init__()
        #====================encoder===========================
        self.encoder_embed = Embeddings(cfg)
        self.encoder_blocks = nn.ModuleList([Encoder_Block(cfg) for _ in range(cfg.n_layers)])

        #====================decoder============================
        self.decoder_embed = Embeddings(cfg)
        self.decoder_blocks = nn.ModuleList([Decoder_Block(cfg) for _ in range(cfg.n_layers)])
        
        #=========================================================
        self.projection = nn.Linear(cfg.dim, cfg.vocab_size)
        
        
    def forward(self, enc_inputs, dec_inputs):
        #============encoder============
        h = self.encoder_embed(enc_inputs)
        enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)
        for block in self.encoder_blocks:
            h = block(h, enc_self_attn_mask)
            
        enc_outputs = h
        
        
        #============decoder============
        
        # self attention mask
        dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).float()
        dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs).float()
        dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0)

        # encoder attention mask
        dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs)
        
        
        # embedding
        h = self.decoder_embed(dec_inputs)
        
        
        for block in self.decoder_blocks:
            h = block(h, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
        #============projection==========
        out = self.projection(h)
        
        return out

In [None]:
model = Transformer(model_config)
out = model(sample[0],sample[0])
out.size()

torch.Size([1, 128, 30522])