<a href="https://colab.research.google.com/github/jongheonleee/LLM_study/blob/main/Normalization%26FeedForwardLayer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch.nn as nn
import torch


# > 토큰화 <
input_text = '나는 최근 파리 여행을 다녀왔다'
input_text_list = input_text.split()
print('input text list => ', input_text_list)

str2idx = {word:idx for idx, word in enumerate(input_text_list)}
idx2str = {idx:word for idx, word in enumerate(input_text_list)}
print('str2idx => ', str2idx)
print('idx2str => ', idx2str)

input_ids = [str2idx[word] for word in input_text_list]
print('input_ids => ', input_ids)

# > 토큰 임베딩으로 변환하기 <
embedding_dim = 16
embed_layer = nn.Embedding(len(str2idx), embedding_dim)

input_embeddings = embed_layer(torch.tensor(input_ids))
input_embeddings = input_embeddings.unsqueeze(0)
input_embeddings.shape


# > 절대적 위치 인코딩 <
embedding_dim = 16
max_position = 12
embed_layer = nn.Embedding(len(str2idx), embedding_dim)
position_embed_layer = nn.Embedding(max_position, embedding_dim)

position_ids = torch.arange(len(input_ids), dtype=torch.long).unsqueeze(0)
position_encodings = position_embed_layer(position_ids)
token_embeddings = embed_layer(torch.tensor(input_ids))
token_embeddings = token_embeddings.unsqueeze(0)
input_embeddings = token_embeddings + position_encodings
input_embeddings.shape

input text list =>  ['나는', '최근', '파리', '여행을', '다녀왔다']
str2idx =>  {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
idx2str =>  {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
input_ids =>  [0, 1, 2, 3, 4]


torch.Size([1, 5, 16])

In [2]:
# 층 정규화 코드
norm = nn.LayerNorm(embedding_dim)
norm_x = norm(input_embeddings)
norm_x.shape

norm_x.mean(dim=-1).data, norm_x.std(dim=1).data # 실제로 평균과 표준편차 확인하기

(tensor([[-1.4901e-08,  2.9802e-08, -3.7253e-08, -1.4901e-08,  3.7253e-09]]),
 tensor([[0.8306, 1.0567, 0.8631, 0.3138, 0.8157, 0.4767, 1.4511, 0.9057, 0.2182,
          1.1453, 0.8515, 0.8423, 1.1153, 1.4657, 1.9802, 0.7376]]))

In [8]:
class MultiheadAttention(nn.Module):

  def __init__(self, token_embed_dim, d_model, n_head, is_causal=False):
    super().__init__()
    self.n_head = n_head
    self.is_causal = is_causal

    self.weight_q = nn.Linear(token_embed_dim, d_model)
    self.weight_k = nn.Linear(token_embed_dim, d_model)
    self.weight_v = nn.Linear(token_embed_dim, d_model)
    self.concat_linear = nn.Linear(d_model, d_model)

  def forward(self, querys, keys, values):
    B, T, C = querys.size()
    querys = self.weight_q(querys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    keys = self.weight_k(keys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    values = self.weight_v(values).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    attention = compute_attention(querys, keys, values, self.is_causal)
    output = attention.transpose(1, 2).contiguous().view(B, T, C)
    output = self.concat_linear(output)
    return output


In [3]:
# > 피드 포워드 층 코드 <
class PreLayerNormFeedForward(nn.Module):
  def __init__(self, d_model, dim_feedforward, dropout):
    super().__init__()

    self.linear1 = nn.Linear(d_model, dim_feedforward) # 선형 층 1
    self.linear2 = nn.Linear(dim_feedforward, d_model) # 선형 층 2
    self.dropout1 = nn.Dropout(dropout) # 드랍아웃 층 1
    self.dropout2 = nn.Dropout(dropout) # 드랍아웃 층 2
    self.activation = nn.GELU() # 활성 함수
    self.norm = nn.LayerNorm(d_model) # 층 정규화

  def forward(self, src):
    x = self.norm(src)
    x = x + self.linear2(self.dropout1(self.activation(self.linear1(x))))
    x = self.dropout2(x)
    return x



In [5]:
# > 인코더 층 <

class TransformerEncoderLayer(nn.Module):
  def __init__(self, d_model, nhead, dim_feedforward, dropout):
    super.__init__()
    self.attn = nn.MultiheadAttention(d_model, d_model, nhead)
    self.norm1 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout)


  def forward(self, src):
    norm_x = self.norm1(src)
    attn_output = self.attn(norm_x, norm_x, norm_x)
    x = src + self.dropout1(attn_output)

    # 피드 포워드
    x = self.feed_forward(x)
    return x


In [6]:
# > 인코더 구현 <
import copy

def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerEncoder(nn.Module):
  def __init__(self, encoder_layer, num_layers):
    super().__init__()
    self.layers = get_clones(encoder_layer, num_layers)
    self.num_layers = num_layers
    self.norm = norm

  def forward(self, src):
    output = src

    for mod in self.layers:
      output = mod(output)
    return self.norm(output)



In [7]:
# > 디코더에서 어텐션 연산(마스크 어텐션) <
from math import sqrt
import torch.nn.functional as F

def compute_attention(querys, keys, values, is_causal=False):
  dim_k = querys.size(-1)
  scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)

  if is_causal:
    query_length = querys.size(-2)
    key_length = keys.size(-2)
    temp_mask = torch.ones(query_length, key_length,
                           dtype=torch.bool).tril(diagonal=0)
    scores = scores.masked_fill(temp_mask == False, float('-inf'))

  weights = F.softmax(scores, dim=-1)
  return weights @ values

In [9]:
# > 크로스 어텐션이 포함된 디코더 층 <
class TransformerDecoderLayer(nn.Module):
  def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
    super().__init__()
    self.self_attn = MultiheadAttention(d_model, d_model, nhead)
    self.multihead_attn = MultiheadAttention(d_model, d_model, nhead)
    self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout)

    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)

  def forward(self, tgt, encoder_output, is_causal=True):
    # 셀프 어텐션 연산
    x = self.norm1(tgt)
    x = x + self.dropout1(self.self_attn(x, x, x, is_causal=is_causal))

    # 크로스 어텐션 연산
    x = self.norm2(x)
    x = x + self.dropout2(self.multihead_attn(x, encoder_output, encoder_output))

    # 피드 포워드 연산
    x = self.feed_forward(x)
    return x



In [10]:
# > 디코더 구현 <

import copy

def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])


class TransformerDecoder(nn.Module):
  def __init__(self, decoder_layer, num_layer):
    super().__init__()
    self.layers = get_clones(decoder_layer, num_layer)
    self.num_layer = num_layer

  def forward(self, tgt, src):
    output = tgt

    for mod in self.layers:
      output = mod(tgt, src)

    return output