In [1]:
import numpy as np
import copy
import torch
import torch.nn as nn
import torch.nn.functional as F

# Encoder

### 여기서 mask를 만드는 과정이 메모리를 먹어서 비효율적. 배운다는 느낌으로 하는것

In [2]:
# Q, K, V shape : (n_batch, seq_length, d_k), mask : (n_batch, seq_length, seq_length)
def calculate_attention(query, key, value, mask):
    d_k = key.shape[-1]
    attention_score = torch.matmul(query, key.transpose(-2, -1)) 
    # torch.transpose에서 -2 자리의 seq_length와 -1자리의 d_k만 transpose 하고 minibatch 안건드린다는 뜻. 결과는 (n_batch, d_k, seq_length)
    attention_score = attention_score / math.sqrt(d_k)
    if mask is not None:
        attention_score = attention_score.masked_fill(mask==0, -1e9)
    attention_prob = F.softmax(attention_score, dim = -1) # (n_batch, seq_len, seq_len), 마지막 차원끼리의 연산?
    out = torch.matmul(attention_prob, value)
    return out #(n_batch, seq_length, d_k)

In [3]:
def make_pad_mask(self, query, key, pad_idx=1):
    # query: (n_batch, query_seq_len)
    # key: (n_batch, key_seq_len)
    query_seq_len, key_seq_len = query.size(1), key.size(1)
    
    key_mask = key.ne(pad_idx).unsqueeze(1).unsqueeze(2) # (n_batch, 1, 1, key_seq_len)
    key_mask = key_mask.repeat(1,1, query_seq_len, 1)  # (n_batch, 1, query_seq_len, key_seq_len) 해당 모양을 1,1,len,1 로 쌓기
    
    query_mask = query.ne(pad_idx).unsqueeze(1).unsqueeze(3) # (n_batch, 1, query_seq_len, 1)
    query_mask = query_mask.repeat(1,1,1,key_seq_len) # (n_batch, 1, query_seq_len, key_seq_len)
    
    mask = key_mask & query_mask
    mask.requires_grad = False
    return mask

In [4]:
def make_src_mask(self, src):
    pad_mask = self.make_pad_mask(src, src)
    return pad_mask # encoder의 경우 Q 와 K가 항상 같으니까 이때는 이 mask를 쓰기

In [5]:
class MultiHeadAttentionLayer(nn.Module):
    
    def __init__(self, d_model, h, qkv_weight, out_weight):
        super(MultiHeadAttentionLayer, self).__init__()
        self.d_model = d_model
        self.h = h
        self.q_weight = copy.deepcopy(qkv_weight)
        self.k_weight = copy.deepcopy(qkv_weight)
        self.v_weight = copy.deepcopy(qkv_weight)
        self.out_weight = out_weight
        
    def forward(self, query, key, value, mask = None):
        # query, key, value: (n_batch, seq_len, d_embed) input임
        # mask: (n_batch, seq_len, seq_len)
        # return value: (n_batch, h, seq_len, d_k)
        n_batch = query.size(0)
        
        def transform(x, fc):
            out = fc(x) # (n_batch, seq_len, d_model)
            out = out.view(n_batch, -1, self.h, self.d_model//self.h)  # (n_batch, seq_len, h, d_k)
            out = out.transpose(1, 2) # (n_batch, h, seq_len, d_k)
            return out
    
        query = transform(query, self.q_weight) # (n_batch, h, seq_len, d_k)
        key = transform(key, self.k_weight) # (n_batch, h, seq_len, d_k)
        value = transform(value, self.v_weight) # (n_batch, h, seq_len, d_k)
        
        out = self.calculate_attention(query, key, value, mask) # (n_batch, h, seq_len, d_k)
        out = out.transpose(1, 2) # (n_batch, seq_len, h, d_k)
        out = out.contiguous().view(n_batchm -1, self.d_model) # (n_batch, seq_len, d_model) 메모리가 row방향으로 저장되는것
        out = self.out_weight(out) # (n_batch, seq_len, d_embed)
        return out

In [6]:
class PositionWiseFeedForwardLayer(nn.Module):
    
    def __init__(self, fc1, fc2):
        super(PositionWiseFeedForwardLayer, self).__init__()
        self.fc1 = fc1
        self.relu = nn.ReLU()
        self.fc2 = fc2
        
    def forward(self, x):
        out = x
        out = self.fc1(out)
        out = self.relu(out)
        out = self.fc2(out)
        return out

In [7]:
class ResidualConnectionLayer(nn.Module):
    
    def __init__(self, fc1, fc2):
        super(ResidualConnectionLayer, self).__init__()
        
    def forward(self, x, sub_layer):
        out = x
        out = sub_layer(out)
        out = out + x
        return out

In [8]:
class EncoderBlock(nn.Module):
    
    def __init__(self, self_attention, position_ff):
        super(EncoderBlock, self).__init__()
        self.self_attention = self_attention
        self.position_ff = position_ff
        self.residuals = [ResidualConnectionLayer() for _ in range(2)]
        
    def forward(self, src, src_mask):
        out = src
        out = self.residuals[0](out, lambda out: self.self_attention(query=out, key=out, value=out, mask=src_mask))
        out = self.residuals[1](out, self.position_ff)
        return out

In [9]:
class Encoder(nn.Module):
    
    def __init__(self, encoder_block, n_layer): ## n_layer : Encoder Block의 갯수(논문에서는 6)
        super(Encoder, self).__init__()
        self.layers = []
        for i in range(n_layer):
            self.layers.append(copy.deepcopy(encoder_block))
        
    def forward(self, src, src_mask):
        out = src
        for layer in self.layers:
            out = layer(out, src_mask)
        return out

# Decoder

In [13]:
def make_subsequent_mask(query, key): # teacher forcing mask
    # query: (n_batch, query_seq_len)
    # key: (n_batch, key_seq_len)
    query_seq_len, key_seq_len = query.size(1), key.size(1)
    
    tril = np.tril(np.ones((query_seq_len, key_seq_len)), k = 0).astype('unit8') # np.tril : lower triangle without diagonal
    mask = torch.tensor(tril, dtype = torch.bool, requires_grad = False, device = query.device)
    return mask

In [14]:
def make_tgt_mask(self, tgt): # teacher forcing mask랑 padding mask는 같이 적용되어야하므로 합쳐주기
    pad_mask = self.make_pad_mask(tgt, tgt)
    seq_mask = self.make_subsequent_mask(tgt, tgt)
    mask = pad_mask & seq_mask
    return pad_mask & seq_mask

In [15]:
def make_src_tgt_mask(self, src, tgt): # decoder의 q와 encoder에서 넘어온 k,v에 쓰는 마스크
    pad_mask = self.make_pad_mask(tgt, src)
    return pad_mask

In [16]:
class DecoderBlock(nn.Module):

    def __init__(self, self_attention, cross_attention, position_ff):
        super(DecoderBlock, self).__init__()
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.position_ff = position_ff
        self.residuals = [ResidualConnectionLayer() for _ in range(3)]


    def forward(self, tgt, encoder_out, tgt_mask, src_tgt_mask):
        out = tgt
        out = self.residuals[0](out, lambda out: self.self_attention(query=out, key=out, value=out, mask=tgt_mask))
        out = self.residuals[1](out, lambda out: self.cross_attention(query=out, key=encoder_out, value=encoder_out, mask=src_tgt_mask))
        out = self.residuals[2](out, self.position_ff)
        return out

In [17]:
class Decoder(nn.Module):

    def __init__(self, decoder_block, n_layer):
        super(Decoder, self).__init__()
        self.n_layer = n_layer
        self.layers = nn.ModuleList([copy.deepcopy(decoder_block) for _ in range(self.n_layer)])


    def forward(self, tgt, encoder_out, tgt_mask, src_tgt_mask):
        out = tgt
        for layer in self.layers:
            out = layer(out, encoder_out, tgt_mask, src_tgt_mask)
        return out

# Positional Encoding

In [18]:
class TokenEmbedding(nn.Module):

    def __init__(self, d_embed, vocab_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_embed)
        self.d_embed = d_embed

    def forward(self, x):
        out = self.embedding(x) * math.sqrt(self.d_embed) # scaling 필요 논문에 하라고 나와있음
        return out

In [20]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_embed, max_len=256, device=torch.device("cpu")):
        super(PositionalEncoding, self).__init__()
        encoding = torch.zeros(max_len, d_embed)
        encoding.requires_grad = False
        position = torch.arange(0, max_len).float().unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_embed, 2) * -(math.log(10000.0) / d_embed))
        encoding[:, 0::2] = torch.sin(position * div_term)
        encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = encoding.unsqueeze(0).to(device)


    def forward(self, x):
        _, seq_len, _ = x.size()
        pos_embed = self.encoding[:, :seq_len, :]
        out = x + pos_embed
        return out

In [19]:
class TransformerEmbedding(nn.Module):

    def __init__(self, token_embed, pos_embed):
        super(TransformerEmbedding, self).__init__()
        self.embedding = nn.Sequential(token_embed, pos_embed) #순서대로 함수가 들어오면 순서대로 연산해주는 함수

    def forward(self, x):
        out = self.embedding(x)
        return out

In [21]:
class Transformer(nn.Module): ## nn.Module을 참고하는 함수 정의
    
    def __init__(self, src_embed, tgt_embed, encoder, decoder, generator):
        super(Transformer, self).__init__()
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.encoder = encoder
        self.decoder = decoder
        self.generator = generator
    
    def encode(self, src, src_mask):
        return self.encoder(self.src_embed(src), src_mask)
    
    def decode(self, tgt, encoder_out, tgt_mask, src_tgt_mask):
        return self.decoder(self.tgt_embed(tgt), encoder_out, tgt_mask, src_tgt_mask)
    
    def forward(self, src, tgt):
        src_mask = self.make_src_mask(src)
        tgt_mask = self.make_tgt_mask(tgt)
        src_tgt_mask = self.make_src_tgt_mask(src, tgt)
        encoder_out = self.encode(src, src_mask)
        decoder_out = self.decode(tgt, encoder_out, tgt_mask, src_tgt_mask)
        out = self.generator(decoder_out) # 마지막 dimension이 embedding dimension인데 이걸 len(vocab)으로 만들어주면 각 vocab에 따라 softmax사용하여 확률값으로 변환. 
        out = F.log_softmax(out, dim=-1) # 마지막 dimension인 len(vocab)에 대한 확률값
        return out, decoder_out

# Pytorch의 Transformer 사용 함수

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_len):
        super().__init__()
        
        self.dropout = nn.Dropout(dropout_p)
 
        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, dim_model)
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) # 0, 1, 2, 3, 4, 5
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model) # 1000^(2i/dim_model)
 
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
 
        # Saving buffer (same as parameter without gradients needed)
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding", pos_encoding)
 
    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        # Residual connection + pos encoding
        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])

In [None]:
criterion = nn.CrossEntropyLoss()
lr = 5.0 # 학습률
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

import time
def train():
    model.train() # 훈련 모드로 전환
    total_loss = 0.
    start_time = time.time()
    ntokens = len(vocab)

    # 스크립트 실행 시간을 짧게 유지하기 위해서 50 배치만 학습합니다.
    nbatches = min(50 * bptt, train_data.size(0) - 1)

    for batch, i in enumerate(range(0, nbatches, bptt)):
        data, targets = get_batch(train_data, i)
        optimizer.zero_grad()
        # Pipe는 단일 호스트 내에 있고
        # forward 메서드로 반환된 ``RRef`` 프로세스는 이 노드에 국한되어 있기 때문에
        # ``RRef.local_value()`` 를 통해 간단히 찾을 수 있습니다.
        output = model(data).local_value()
        # 타겟을 파이프라인 출력이 있는
        # 장치로 옮겨야합니다.
        loss = criterion(output.view(-1, ntokens), targets.cuda(1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        log_interval = 10
        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | '
                  'lr {:02.2f} | ms/batch {:5.2f} | '
                  'loss {:5.2f} | ppl {:8.2f}'.format(
                    epoch, batch, nbatches // bptt, scheduler.get_lr()[0],
                    elapsed * 1000 / log_interval,
                    cur_loss, math.exp(cur_loss)))
            total_loss = 0
            start_time = time.time()

def evaluate(eval_model, data_source):
    eval_model.eval() # 평가 모드로 전환
    total_loss = 0.
    ntokens = len(vocab)
    # 스크립트 실행 시간을 짧게 유지하기 위해 50 배치만 평가합니다.
    nbatches = min(50 * bptt, data_source.size(0) - 1)
    with torch.no_grad():
        for i in range(0, nbatches, bptt):
            data, targets = get_batch(data_source, i)
            output = eval_model(data).local_value()
            output_flat = output.view(-1, ntokens)
            # 타겟을 파이프라인 출력이 있는
            # 장치로 옮겨야합니다.
            total_loss += len(data) * criterion(output_flat, targets.cuda(1)).item()
    return total_loss / (len(data_source) - 1)

In [None]:
class Transformer(nn.Module):
    # Constructor
    def __init__(self, num_tokens, dim_model, num_heads, num_encoder_layers, num_decoder_layers, dropout_p):
        super().__init__()

        # INFO
        self.model_type = "Transformer"
        self.dim_model = dim_model

        # LAYERS
        self.positional_encoder = PositionalEncoding(dim_model=dim_model, dropout_p=dropout_p, max_len=5000)
        self.embedding = nn.Embedding(num_tokens, dim_model)
        self.transformer = nn.Transformer(
            d_model=dim_model,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dropout=dropout_p,
        )
        self.out = nn.Linear(dim_model, num_tokens)

    def forward(self, src, tgt, tgt_mask=None, src_pad_mask=None, tgt_pad_mask=None):
        # Src, Tgt size 는 반드시 (batch_size, src sequence length) 여야 합니다.

        # Embedding + positional encoding - Out size = (batch_size, sequence length, dim_model)
        src = self.embedding(src) * math.sqrt(self.dim_model)
        tgt = self.embedding(tgt) * math.sqrt(self.dim_model)
        src = self.positional_encoder(src)
        tgt = self.positional_encoder(tgt)

        src = src.permute(1,0,2) # transpose는 두 차원만 서로 교체할 수 있는데, permute은 여러 차원을 다 같이 바꿀 수 있음
        tgt = tgt.permute(1,0,2)

        # Transformer blocks - Out size = (sequence length, batch_size, num_tokens)
        transformer_out = self.transformer(src, tgt, tgt_mask=tgt_mask, src_key_padding_mask=src_pad_mask, tgt_key_padding_mask=tgt_pad_mask)
        out = self.out(transformer_out)

        return out

    def get_tgt_mask(self, size) -> torch.tensor:
        mask = torch.tril(torch.ones(size, size) == 1) # Lower triangular matrix
        mask = mask.float()
        mask = mask.masked_fill(mask == 0, float('-inf')) # Convert zeros to -inf
        mask = mask.masked_fill(mask == 1, float(0.0)) # Convert ones to 0

        return mask

    def create_pad_mask(self, matrix: torch.tensor, pad_token: int) -> torch.tensor:
        return (matrix == pad_token)


In [None]:
def predict(model, input_sequence, max_length=15, SOS_token=2, EOS_token=3):
    model.eval()
 
    y_input = torch.tensor([[SOS_token]], dtype=torch.long, device=device)
 
    num_tokens = len(input_sequence[0])
 
    for _ in range(max_length):
        # Get source mask
        tgt_mask = model.get_tgt_mask(y_input.size(1)).to(device)
 
        pred = model(input_sequence, y_input, tgt_mask)
 
        next_item = pred.topk(1)[1].view(-1)[-1].item() # num with highest probability
        next_item = torch.tensor([[next_item]], device=device)
 
        # Concatenate previous input with predicted best word
        y_input = torch.cat((y_input, next_item), dim=1)
 
        # Stop if model predicts end of sentence
        if next_item.view(-1).item() == EOS_token:
            break
 
    return y_input.view(-1).tolist()