In [1]:
import copy
import torch

from math import sqrt

from torch import nn
from torch.nn import functional as F

# Tokenization

## 공백 기준 토큰화

In [2]:
input_text = "나는 대한민국 서울에 사는 사람이다."
input_text_list = input_text.split()
print(input_text_list)

['나는', '대한민국', '서울에', '사는', '사람이다.']


## 단어집 생성

In [3]:
## Vocabulary 구축.
str2idx = {word:idx for idx, word in enumerate(input_text_list)}
idx2str = {idx:word for idx, word in enumerate(input_text_list)}

print(str2idx)
print(idx2str)

{'나는': 0, '대한민국': 1, '서울에': 2, '사는': 3, '사람이다.': 4}
{0: '나는', 1: '대한민국', 2: '서울에', 3: '사는', 4: '사람이다.'}


## 토큰 리스트 -> id 리스트

In [4]:
input_ids = [str2idx[text] for text in input_text_list]
print(input_ids)

[0, 1, 2, 3, 4]


## 토큰 임베딩

In [5]:
embed_dim = 512 ## d_model
token_embed_layer = nn.Embedding(len(str2idx), embed_dim) ## vocab_size, output_dim

input_tensor = torch.tensor(input_ids)
print(f"Input Tensor : {input_tensor.shape}")

token_embeddings = token_embed_layer(input_tensor)
print(f"Token Embedding : {token_embeddings.shape}")

token_embeddings = token_embeddings.unsqueeze(0)
print(f"Unsqueeze Token Embedding : {token_embeddings.shape}")

Input Tensor : torch.Size([5])
Token Embedding : torch.Size([5, 512])
Unsqueeze Token Embedding : torch.Size([1, 5, 512])


## 위치 임베딩(Absolute Positional Encoding)

In [6]:
max_position = 1024 ## 최대 토큰 수, 최대 문장 길이

ids_len = len(input_ids) ## 시퀀스 데이터가 포함하는 토큰들의 수
print(f"IDs Length : {ids_len}") ## 5

position_ids = torch.arange(ids_len, dtype=torch.long) ## 각 토큰별로 위치값(정수)을 부여.
print(f"Position IDs : {position_ids.shape} {position_ids}") ## (5,) [0, 1, 2, 3, 4]

position_ids = position_ids.unsqueeze(0)
print(f"Unsqueeze Position IDs : {position_ids.shape}") ## (1, 5)

position_embed_layer = nn.Embedding(max_position, embed_dim) ## 각 토큰별 위치(정수형 id)를 embed_dim 차원의 벡터로 임베딩.
position_embeddings = position_embed_layer(position_ids)
print(f"Position Embedding : {position_embeddings.shape}") ## (1, 5, 16)

IDs Length : 5
Position IDs : torch.Size([5]) tensor([0, 1, 2, 3, 4])
Unsqueeze Position IDs : torch.Size([1, 5])
Position Embedding : torch.Size([1, 5, 512])


## Input Embedding

In [7]:
input_embeddings = token_embeddings + position_embeddings
print(f"Input Embeddings : {input_embeddings.shape}")

Input Embeddings : torch.Size([1, 5, 512])


## Self-Attention

### Query, Key, Value

In [8]:
d_model = embed_dim
num_heads = 8
head_dim = d_model // num_heads
print(f"Head dim : {head_dim}")

## input_embedding을 query, key, value로 변환하는 층
weight_q = nn.Linear(embed_dim, head_dim)
weight_k = nn.Linear(embed_dim, head_dim)
weight_v = nn.Linear(embed_dim, head_dim)

## 변환 수행
querys = weight_q(input_embeddings)
keys = weight_k(input_embeddings)
values = weight_v(input_embeddings)

print(f"Query : {querys.shape}")
print(f"Key : {keys.shape}")
print(f"Value : {values.shape}")

Head dim : 64
Query : torch.Size([1, 5, 64])
Key : torch.Size([1, 5, 64])
Value : torch.Size([1, 5, 64])


### Self-Attention

In [9]:
def compute_attention(querys, keys, values, is_casual=False):
    ## Attention Score
    dim_k = querys.size(-1)
    scores = querys @ keys.transpose(-2, -1) ## Q \cdot K^T
    normalized_scores = scores / sqrt(dim_k) ## \sqrt{dim_k}로 나누기(scaling)

    ## Attention Dist
    weights = F.softmax(normalized_scores, dim=-1)

    return weights @ values ## Attention Values

In [10]:
attention_embeddings = compute_attention(querys, keys, values)
print(f"Attention 적용 후 : {attention_embeddings.shape}")

Attention 적용 후 : torch.Size([1, 5, 64])


In [11]:
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim, is_casual=False):
        super().__init__()

        self.is_casual = is_casual
        self.weight_q = nn.Linear(embed_dim, head_dim)
        self.weight_k = nn.Linear(embed_dim, head_dim)
        self.weight_v = nn.Linear(embed_dim, head_dim)

    def forward(self, querys, keys, values):
        querys = self.weight_q(querys)
        keys = self.weight_k(keys)
        values = self.weight_v(values)

        outputs = compute_attention(querys, keys, values, is_casual=self.is_casual)
        
        return outputs

In [12]:
attention_head = AttentionHead(embed_dim, head_dim)
attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)
print(f"Attention 적용 후 : {attention_embeddings.shape}")

Attention 적용 후 : torch.Size([1, 5, 64])


## Multi Head Attention

In [13]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, d_model, num_heads, is_casual=False, debug=False):
        super().__init__()
        self.debug = debug
        self.num_heads = num_heads
        self.is_casual = is_casual

        self.weight_q = nn.Linear(embed_dim, d_model)
        self.weight_k = nn.Linear(embed_dim, d_model)
        self.weight_v = nn.Linear(embed_dim, d_model)

        self.concat_linear = nn.Linear(d_model, d_model)

    def forward(self, querys, keys, values):
        batch_size, seq_len, embedding_dim = querys.size()

        querys = self.weight_q(querys).view(batch_size, seq_len, self.num_heads, embedding_dim // self.num_heads).transpose(1, 2)
        keys = self.weight_k(keys).view(batch_size, seq_len, self.num_heads, embedding_dim // self.num_heads).transpose(1, 2)
        values = self.weight_v(values).view(batch_size, seq_len, self.num_heads, embedding_dim // self.num_heads).transpose(1, 2)

        attention = compute_attention(querys, keys, values, self.is_casual)
        output = attention.transpose(1, 2).contiguous().view(batch_size, seq_len, embedding_dim)
        output = self.concat_linear(output)

        if self.debug:
            print(f"Querys : {querys.shape}")
            print(f"Keys : {keys.shape}")
            print(f"Values : {values.shape}")
            print(f"Attention_Value : {attention.shape}")
            print(f"Output : {output.shape}")

        return output

In [14]:
num_heads = 8

multi_head_attention_layer = MultiHeadAttention(embed_dim, d_model, num_heads, is_casual=False, debug=True)
multi_head_attn_output = multi_head_attention_layer(input_embeddings, input_embeddings, input_embeddings)
print(f"Multi Head Attention 적용 후 : {multi_head_attn_output.shape}")

Querys : torch.Size([1, 8, 5, 64])
Keys : torch.Size([1, 8, 5, 64])
Values : torch.Size([1, 8, 5, 64])
Attention_Value : torch.Size([1, 8, 5, 64])
Output : torch.Size([1, 5, 512])
Multi Head Attention 적용 후 : torch.Size([1, 5, 512])


## LayerNorm

In [15]:
norm_layer = nn.LayerNorm(embed_dim)
norm_x = norm_layer(input_embeddings)
print(f"Layer Norm : {norm_x.shape}")
print(f"mean : {norm_x.mean(dim=-1).data}, std : {norm_x.std(dim=-1).data}")

Layer Norm : torch.Size([1, 5, 512])
mean : tensor([[ 5.5879e-09, -7.2177e-09, -5.5879e-09,  1.4901e-08, -9.3132e-10]]), std : tensor([[1.0010, 1.0010, 1.0010, 1.0010, 1.0010]])


## FeedForward

In [16]:
class PreLayerNormFeedForward(nn.Module):
    def __init__(self, d_model, d_ffn, dropout):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ffn)
        self.linear2 = nn.Linear(d_ffn, d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.activation = nn.GELU()
        self.norm = nn.LayerNorm(d_model)

    def forward(self, src):
        x = self.norm(src)
        x1 = self.dropout1(self.activation(self.linear1(x)))
        x = x + self.linear2(x1)
        x = self.dropout2(x)

        return x

## Encoder Layer

In [17]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ffn, dropout):
        super().__init__()

        self.attn = MultiHeadAttention(d_model, d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.ffn = PreLayerNormFeedForward(d_model, d_ffn, dropout)

    def forward(self, src):
        norm_x = self.norm1(src)
        attn_output = self.attn(norm_x)
        x = src + self.dropout1(attn_output)

        x = self.ffn(x)

        return x

## Encoder

In [18]:
def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers, norm):
        super().__init__()
        self.norm = norm
        self.num_layers = num_layers
        self.layer_list = get_clones(encoder_layer, num_layers)

    def forward(self, src):
        output = src
        for layer in self.layer_list:
            output = layer(output)

        return output

## Masked Multi Head Attention

In [19]:
def compute_attention(querys, keys, values, is_casual=False):
    ## Attention Score
    dim_k = querys.size(-1)
    scores = querys @ keys.transpose(-2, -1) ## Q \cdot K^T
    scores = scores / sqrt(dim_k) ## \sqrt{dim_k}로 나누기(scaling)

    if is_casual:
        query_length = querys.size(2)
        key_length = keys.size(2)
        temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
        scores = scores.masked_fill(temp_mask == False, float("-inf"))

    ## Attention Dist
    weights = F.softmax(scores, dim=-1)

    return weights @ values ## Attention Values

In [20]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, d_model, num_heads, is_casual=False, debug=False):
        super().__init__()
        self.debug = debug
        self.num_heads = num_heads
        self.is_casual = is_casual

        self.weight_q = nn.Linear(embed_dim, d_model)
        self.weight_k = nn.Linear(embed_dim, d_model)
        self.weight_v = nn.Linear(embed_dim, d_model)

        self.concat_linear = nn.Linear(d_model, d_model)

    def forward(self, querys, keys, values):
        batch_size, seq_len, embedding_dim = querys.size()

        querys = self.weight_q(querys).view(batch_size, seq_len, self.num_heads, embedding_dim // self.num_heads).transpose(1, 2)
        keys = self.weight_k(keys).view(batch_size, seq_len, self.num_heads, embedding_dim // self.num_heads).transpose(1, 2)
        values = self.weight_v(values).view(batch_size, seq_len, self.num_heads, embedding_dim // self.num_heads).transpose(1, 2)

        attention = compute_attention(querys, keys, values, self.is_casual)
        output = attention.transpose(1, 2).contiguous().view(batch_size, seq_len, embedding_dim)
        output = self.concat_linear(output)

        if self.debug:
            print(f"Querys : {querys.shape}")
            print(f"Keys : {keys.shape}")
            print(f"Values : {values.shape}")
            print(f"Attention_Value : {attention.shape}")
            print(f"Output : {output.shape}")

        return output

## Decoder Layer

In [None]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model=512, num_heads=8, d_ffn=2048, dropout=0.1):
        super().__init__()

        self.self_attn = MultiHeadAttention(d_model, d_model, num_heads, is_casual=False, debug=True)
        self.multihead_attn = MultiHeadAttention(d_model, d_model, num_heads, is_casual=True, debug=True)
        self.feed_forward = PreLayerNormFeedForward(d_model, d_ffn, dropout)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, tgt, encoder_output):
        ## Self Attention
        x = self.norm1(tgt)
        x = x + self.dropout1(self.self_attn(x, x, x))

        ## Cross Attention
        x = self.norm2(x)
        x = x + self.dropout2(self.multihead_attn(x, encoder_output, encoder_output))

        ## FeedForward
        x = self.feed_forward(x)

        return x

## Decoder

In [22]:
def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers):
        super().__init__()
        self.num_layers = num_layers
        self.layers = get_clones(decoder_layer, num_layers)

    def forward(self, tgt, src):
        output = tgt
        for layer in self.layers:
            output = layer(tgt, src)

        return output