In [1]:
import math
import numpy as np
from copy import copy
import torch
import torch.nn.functional as F
from torch.autograd import Variable
from torch import nn

In [2]:
B = 64    # 배치 사이즈
M = 10    # 토큰의 최대 길이
V = 1024    # 토큰의 개수
N = 8    # 멀티헤드 개수
H = 512    # 토큰의 임베딩 사이즈
EXP = 2048    # 확장 사이즈 (FeedForward 클래스 참고)
L = 6    # 인코더/디코더 레이어 개수

In [3]:
def attention(query, key, value):
    scale = query.shape[-1]
    score = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(scale)
    prob = F.softmax(score, dim=-1)
    attn = torch.matmul(prob, value)
    return attn

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_head, hidden_size):
        super(MultiHeadAttention, self).__init__()
        self.num_head = num_head
        self.dk = hidden_size // self.num_head
        
    def forward(self, query, key, value):
        '''
        x = torch.rand((B, M, H))
        m = MultiHeadAttention(N, H)
        v = m(x, x, x)
        v.shape  # torch.Size([64, 10, 512)])
        '''
        n_batch = query.shape[0]
        query = query.view(n_batch, -1, self.num_head, self.dk).transpose(1, 2)
        key = key.view(n_batch, -1, self.num_head, self.dk).transpose(1, 2)
        value = value.view(n_batch, -1, self.num_head, self.dk).transpose(1, 2)
        
        x = attention(query, key, value)
        x = x.transpose(1, 2).contiguous().view(n_batch, -1, self.dk * self.num_head)
        return x

MultiHeadAttention 클래스의 내부를 보면 query, key, value의 값을 만드는데 view 함수와 transpose 함수를 사용한 것을 확인할 수 있다. 이 과정을 통해서 (B, M, H)였던 입력값이 (B, N, M, dk)로 변형됐고, 이 값과 attention 함수를 이용해서 셀프어텐션을 수행한다. 참고로 query, key, value의 값이 모두 같을 경우 어텐션은 셀프어텐션을 수행하게 되고, query, key, value의 값이 다를 경우 일반적인 어텐션을 수행하게 된다. 디코더에서의 구조도 인코더와 비슷하다.

In [5]:
class FeedForward(nn.Module):
    def __init__(self, hidden_size, expand_size):
        super(FeedForward, self).__init__()
        self.linear_1 = nn.Linear(hidden_size, expand_size)
        self.linear_2 = nn.Linear(expand_size, hidden_size)
        
    def forward(self, x):
        '''
        x = torch.rand((B, M, H))
        m = FeedForward(H, EXP)
        v = m(x)
        v.shape    # torch.Size([64, 10, 512])
        '''
        x = self.linear_1(x)
        x = self.linear_2(x)
        return x

In [6]:
class Embedding(nn.Module):
    def __init__(self, n_vocab, hidden_size):
        super(Embedding, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(n_vocab, hidden_size)
        
    def forward(self, x):
        '''
        data = np.random.randint(0, V, (B, M))
        x = torch.from_numpy(data)
        m = Embedding(V, H)
        v = m(x)
        v.shape    # torch.Size([64, 10 512])
        '''
        return self.embedding(x)

위 코드를 보면 Embedding 레이어에서 H 사이즈만큼으로 임베딩을 한 후 EncoderLayer를 몇 번 반복해서 돌리는 구조를 띠고 있다. EncoderLayer 내부를 살펴보려면 아래 EncoderLayer 클래스를 참고하라.

In [7]:
class PositionalEncoding(nn.Module):
    def __init__(self, hidden_size):
        super(PositionalEncoding, self).__init__()
        pos_encoding = torch.zeros(M, hidden_size)
        position = torch.arange(0, M).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_size, 2) *
                             -(math.log(10000.0) / hidden_size))
        pos_encoding[:, 0::2] = torch.sin(position * div_term)
        pos_encoding[:, 1::2] = torch.cos(position * div_term)
        self.pos_encoding = pos_encoding.unsqueeze(0)
        
    def forward(self, x):
        '''
        x = torch.rand((B, M, H))
        m = PositionalEncoding(H)
        v = m(x)
        v.shape    # torch.Size([64, 10, 512])
        '''
        x = x + Variable(self.pos_encoding[:, :x.size(1)],
                         requires_grad=False)
        return x

트랜스포머에서는 RNN과 같은 순환 구조를 사용하지 않는다. 따라서 토큰 간의 순서 정보를 학습할 경우 약간의 추가적인 순서 정보를 넣어준다. 포지셔널 인코딩은 위와 같이 구현할 수 있다.
포지셔널 인코딩은 Embedding 레이어를 통해 임베딩된 벡터에 대해서 실행된다. 이 포지셔널 인코딩은 인코더와 디코더의 입력 모두 필요하다.

In [8]:
class EncoderLayer(nn.Module):
    def __init__(self, hidden_size):
        super(EncoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(N, hidden_size)
        self.feedforward = FeedForward(hidden_size, EXP)
        
    def forward(self, x):
        '''
        x = torch.rand((B, M, H))
        m = FeedForward(H, EXP)
        v = m(x)
        v.shape    # torch.Size([64, 10, 512])
        '''
        x = self.self_attention(x, x, x)
        x = self.feedforward(x)
        return x

In [9]:
class Encoder(nn.Module):
    def __init__(self, n_layers):
        super(Encoder, self).__init__()
        self.n_layers = n_layers
        self.embedding = Embedding(V, H)
        self.position = PositionalEncoding(H)
        self.layers = [EncoderLayer(H) for i in range(n_layers)]
        
    def forward(self, x):
        '''
        data = np.random.randint(0, V, (B, M))
        x = torch.from_numpy(data)
        m = Encoder(L)
        v = m(x)
        v.shape    # torch.Size([64, 10, 512])
        '''
        x = self.embedding(x)
        x = self.position(x)
        for layer in self.layers:
            x = layer(x)
        return x

EncoderLayer 부분을 보면 MultiHeadAttention 부분이 있고 MultiHeadAttention 레이어에서 멀티헤드 개수만큼의 어텐션 연산이 실행된다.

In [10]:
class DecoderLayer(nn.Module):
    def __init__(self, n_head, hidden_size):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(n_head, hidden_size)
        self.encdec_attention = MultiHeadAttention(n_head, hidden_size)
        self.feedforward = FeedForward(hidden_size, 2048)
        
    def forward(self, x, memory):
        '''
        x = torch.rand((B, M, H))
        mem = copy(x)
        m = DecoderLayer(N, H)
        v = m(x, mem)
        v.shape    # torch.Size([64, 10, 512])
        '''
        x = self.self_attention(x, memory, memory)
        return x

In [11]:
class Decoder(nn.Module):
    def __init__(self, n_layers):
        super(Decoder, self).__init__()
        self.embedding = Embedding(V, H)
        self.layers = [DecoderLayer(N, H) for i in range(n_layers)]
        
    def forward(self, x, memory):
        '''
        data = np.random.randint(0, V, (B, M))
        x = torch.from_numpy(data)
        mem = torch.rand((B, M, H))
        m = Decoder(L)
        v = m(x, mem)
        v.shape    # torch.Size([64, 10, 512])
        '''
        x = self.embedding(x)
        for layer in self.layers:
            x = layer(x, memory)
        return x

위 두 코드를 보면 디코더의 구조도 DecoderLayer를 몇 번 반복해서 수행하는 형태인 것을 알 수 있다. 다만 DecoderLayer를 보면 MultiHeadAttention 클래스가 두 번 정의돼 있다. 하나는 셀프어텐션을 위함이고, 다른 하나는 인코더에서의 값과 어텐션 연산을 하기 위함이다. DecoderLayer의 입력값 중에서 memory가 인코더로부터 넘어온 인코더의 출력값이다.

In [12]:
class Transformer(nn.Module):
    def __init__(self):
        super(Transformer, self).__init__()
        self.encoder = Encoder(L)
        self.decoder = Decoder(L)
        
    def forward(self, src, dst):
        '''
        data = np.random.randint(0, V, (B, M))
        src = torch.from_numpy(data)
        data = np.random.randint(0, V, (B, M))
        dst = torch.from_numpy(data)
        src.shape, dst.shape
        
        m = Transformer()
        v = m(src, dst)
        v.shape    # torch.Size([64, 10, 512])
        '''
        src_encoded = self.encoder(src)
        dst_decoded = self.decoder(dst, src_encoded)
        
        return dst_decoded

위 코드 주석을 보면 트랜스포머의 입력은 (B, M) 사이즈의 번역할 문장(src)과 (B, M) 사이즈의 번역된 문장(dst)으로 구성된다는 것을 알 수 있다. 트랜스포머의 출력이 (B, M, H)가 되는데 이 사이즈의 벡터는 로그 소프트맥스 함수 등을 통해서 각 (B, M) 사이즈로 변하게 된다.

In [13]:
        data = np.random.randint(0, V, (B, M))
        src = torch.from_numpy(data)
        data = np.random.randint(0, V, (B, M))
        dst = torch.from_numpy(data)
        src.shape, dst.shape
        
        m = Transformer()
        v = m(src, dst)
        v.shape    # torch.Size([64, 10, 512])

torch.Size([64, 10, 512])