# 내가 직접 설계해보는 모델 코드

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

## utils

### 마스크 생성 함수

In [None]:
def create_padding_mask(input, pad_token=1):
   """
   seq_len x seq_len 사이즈의 padding mask를 만드는 함수
   padding 토큰위치를 false, 그 외 토큰을 true로 생성
   input = [batch_size, seq_len(token_ids)]  dtype=int
   output = [batch_size, num_head(1), seq_len(1), seq_len]   dtype=bool
   
   input tensor에서 pad_token과 같은 위치는 0, 다른 위치는 1로 만듦
   batch=3, seq_len=6
   [[1,1,1,0,0,0]
    [1,1,1,1,1,1]
    [1,1,1,1,0,0]]
   """
   mask = (input != pad_token)

   # [batch, seq_len] -> (attention score 행렬)[batch, num_head(1), seq_len(1), masked_token]
   return mask.unsqueeze(1).unsqueeze(2)

def create_look_ahead_mask(seq_len):
   """
   look ahead 마스크를 생성하여 리턴
   토큰이 자기 뒤 토큰을 컨닝하는것을 방지하기 위해 자기 뒤에있는 토큰을 가리는 mask
   input = seq_len   dtype=int
   output = [batch_size(1), num_head(1), seq_len, seq_len]   dtype=bool
   """
   # 1로 채워진 (seq_len, seq_len) 크기의 행렬을 만듦
   # torch.tril() 함수는 행렬의 하삼각 부분만 남기고 나머지는 0으로 만듦
   mask = torch.tril(torch.ones(seq_len, seq_len))

   # [seq_len, seq_len] -> [batch(1), num_head(1), seq_len, masked_token]
   return mask.unsqueeze(0).unsqueeze(0)

def create_mask(input, pad_token=1):
   """
   [seq_len x seq_len]의 mask를 batch_size만큼 만들어 리턴
   padding, look_ahead mask를 합침
   input = [batch_size, seq_len(input_ids)]
   output = [batch_size, seq_len, seq_len]
   """
   pad_mask = create_padding_mask(input,pad_token)
   look_ahead_mask = create_look_ahead_mask(input.shape[-1])
   return pad_mask * look_ahead_mask


## Layer

### Embedding Layer

token_ids를 vector로 embedding하고 위치 vector를 더하는 layer

input = [batch_size, seq_len(input_ids)]  dtype=int     
output = [batch_size, seq_len, d_model]   dtype=float


init 알고리즘
1. Embedding 클래스를 생성하여 embedding 가중치 행렬 생성
2. max_seq_len 길이의 position vector를 생성

forward 알고리즘     
1. [vocab_size, d_model]의 가중치 행렬에서 [input_id,:]를 look up하여 token_vectors 생성
2. token_vectors의 크기를 키움
3. token_vectors와 position_vectors를 더함


In [None]:
class EmbeddingLayer(nn.Module):
   def __init__(self, vocab_size, d_model, max_seq_len, dropout_rate=0.1):
      super(EmbeddingLayer, self).__init__()
      self.d_model = d_model

      "토큰 임베딩"
      self.token_embedding = nn.Embedding(vocab_size, self.d_model)


      "위치 임베딩"
      pe = torch.zeros(max_seq_len, self.d_model)
      # 위치 인덱스 생성 : [0,1,2,...,max_seq_len-1]
      position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)

      # 분모 계산
      div_term = torch.exp(torch.arange(0, self.d_model, 2).float() * (-math.log(10000.0) / self.d_model))

      # 짝수 인덱스에는 sin 함수 적용
      pe[:, 0::2] = torch.sin(position * div_term)
      # 홀수 인덱스에는 cos 함수 적용
      pe[:, 1::2] = torch.cos(position * div_term)

      # pe 차원을 [max_seq_len, d_model] -> [1(batch), max_seq,len, d_model]로 변경
      pe = pe.unsqueeze(0)

      # register_buffer: 모델의 파라미터는 아니지만, state_dict에 저장되어야하는 텐서
      self.register_buffer('pe',pe)

      
      "dropout"
      self.dropout = nn.Dropout(p=dropout_rate)

   def forward(self, x):
      """
      순전파 함수
      """

      "토큰 임베딩"
      token_emb = self.token_embedding(x)

      # 토큰 임베딩값을 키워서 위치 임베딩값이 상대적으로 매우 작은 값이 되게함
      token_emb = token_emb * math.sqrt(self.d_model)


      "위치 인코딩 추가"
      # 입력 시퀀스 길이에 맞게 위치 인코딩 텐서를 잘라서 사용
      # self.pe 크기: [1, max_seq_len, d_model] -> pos_enc 크기: [1, seq_len, d_model]
      seq_len = x.size(1)
      pos_enc = self.pe[:,:seq_len,:]

      # 토큰 임베딩에 위치 인코딩을 더함
      final_emb = token_emb + pos_enc

      "drop out"
      return self.dropout(final_emb)


      

### Self Attention Layer

토큰간에 관계 수치를 계산하여 수치만큼 값을 섞어주는 layer

input = [batch_size, seq_len, d_model]    dtype=float              
output = [batch_size, seq_len, d_model]   dtype=float


init 알고리즘
1. [d_model, d_model] 크기의 QKV 가중치 행렬 생성(nn.Linear())

2. [d_model, d_model] 크기의 out 가중치 행렬 생성

forward 알고리즘      
1. QKV 벡터를 input 벡터와 행렬곱을 함     

2. n_head 만큼 QKV 벡터를 나눔     
   [batch_size, seq_len, d_model] -> [batch_size, n_head, seq_len, d_model]

3. QK를 행렬곱(output)하여 attention 값을 계산함      

4. 내적값(output)을 softmax 함수를 통과하여 모든 요소의 합이 1이 되게 만듦         

5. output과 V 벡터를 행렬곱 하여 내적값 비율에 따른 V를 섞어줌          

6. n_head 수만큼의 V 벡터를 하나의 행렬로 연결해줌    
   [batch_size, n_head, seq_len, d_head] -> [batch_size, seq_len, d_model]

6. out_linear의 가중치 행렬과 행렬곱을 하여 각 헤드에서 계산된 값들을 섞어줌

In [None]:
class MultiHeadSelfAttention(nn.Module):
   def __init__(self, d_model, num_head, dropout=0.1):
      super().__init__()
      # 혹시 d_model가 nhead로 나누어떨어지지 않는다면 오류 발생
      assert d_model % num_head == 0, "d_model must be divisible by nhead"

      # 기본적인 파라미터 설정
      self.d_model = d_model  # 임베딩 차원
      self.num_head = num_head    # 어텐션 헤드 수
      self.head_dim = d_model // num_head # 어텐션 차원

      # QKV 선형 변환 레이어
      # input을 Q, K, V로 변환하는 역할
      # 각각 (d_model, d_model) 크기의 가중치 행렬을 사용
      self.q_linear = nn.Linear(self.d_model, self.d_model)
      self.k_linear = nn.Linear(self.d_model, self.d_model)
      self.v_linear = nn.Linear(self.d_model, self.d_model)

      # 출력 선형 변환 레이어
      self.out_linear = nn.Linear(self.d_model, self.d_model)

      # 드롭아웃 레이어
      self.dropout = nn.Dropout(dropout)

   
   def scaled_dot_product_attention(self, Q, K, V, mask=None):
      """
      계산된 QKV를 받아, 어텐션 계산을 한 후 
      최종 계산된 output과 attention weights를 반환
      """
      # 어텐션 점수 계산 (Q, K 내적)
      scores = torch.matmul(Q, K.transpose(-2,-1) / math.sqrt(self.head_dim))

      # 마스크 적용
      if mask is not None:
         scores = scores.masked_fill(mask == 0, -1e9)

      # softmax를 통해 어텐션 가중치 계산
      attention_weights = F.softmax(scores, dim=-1)

      # 특정 토큰 관계에 과도하게 의존하는것을 방지하기 위한 dropout
      attention_weights = self.dropout(attention_weights)
      
      # 어텐션 가중치를 바탕으로 V를 섞음
      output = torch.matmul(attention_weights, V)

      return output, attention_weights

   def forward(self, x, mask=None):
      """
      순전파 함수
      """
      batch_size = x.size(0)

      # QKV Linear 통과 후
      # QKV[batch_size, seq_len, d_model] -> QKV[batch_size, n_head, seq_len, d_head]로 변경
      q = self.q_linear(x).view(batch_size, -1, self.num_head, self.head_dim).transpose(1,2)
      k = self.k_linear(x).view(batch_size, -1, self.num_head, self.head_dim).transpose(1,2)
      v = self.v_linear(x).view(batch_size, -1, self.num_head, self.head_dim).transpose(1,2)

      # 각 헤드에서 어텐션 계산
      x, attention_weights = self.scaled_dot_product_attention(q,k,v,mask)

      # 각 헤드가 해석한 내용을 통합하는 linear
      output = self.out_linear(x.transpose(1,2).contiguous().view(batch_size,-1,self.d_model))

      return output, attention_weights



### Feed Forward Network Layer

비선형 함수를 추가하고, 임베딩 차원을 확장 축소하는 과정을 거쳐 각 토큰의 정보를 독립적으로 더 깊이 처리하여 표현력을 높임


init 알고리즘          
1. [d_model, d_ffn] 크기의 가중치 행렬 생성 (차원 확장 linear)        

2. [d_ffn, d_model] 크기의 가중치 행렬 생성 (차원 축소 linear)           

3. 활성화(비선형) 함수 정의          


forward 알고리즘
1. [batch_size, seq_len, d_model] 크기의 input을 linear 가중치와 행렬곱하여 [batch_size, seq_len, d_ffn] 크기로 차원 확장

2. 활성화 함수를 통과하여 비선형적 변환

3. linear 가중치와 행렬곱하여 [batch_size, seq_len, d_model]로 차원 축소

In [139]:
class FeedForwardNetwork(nn.Module):
   def __init__(self, d_model, d_ffn, dropout=0.1, activate=None):
      super(FeedForwardNetwork, self).__init__()

      # 차원 확장 리니어
      self.up_scale = nn.Linear(d_model, d_ffn)
      # 차원 축소 리니어
      self.down_scale = nn.Linear(d_ffn, d_model)

      # 활성화 함수
      if activate is None:
         self.activate = nn.ReLU()
      else:
         self.activate = activate

      self.dropout = nn.Dropout(dropout)
   
   def forward(self, x):
      # 차원 up scale
      x = self.up_scale(x)
      # 활성함수 적용
      x = self.activate(x)
      # dropout
      x = self.dropout(x)
      # 차원 down scale
      x = self.down_scale(x)
      return x



### Transformer Layer
Attention Layer + Normalization + FFN Layer

In [149]:
class TransformerDecoder(nn.Module):
   def __init__(self, d_model, d_ffn, n_head, dropout_rate=0.1, activate_fn = None, pre_norm=True):
      super().__init__()

      self.d_model = d_model
      self.d_ffn = d_ffn
      self.n_head = n_head
      self.d_head = d_model / n_head
      self.dropout_rate = dropout_rate
      self.activate_fn = activate_fn
      self.pre_norm = pre_norm

      self.attn_layer = MultiHeadSelfAttention(
         d_model=self.d_model,
         num_head=self.n_head,
         dropout=self.dropout_rate
      )

      self.ffn_layer = FeedForwardNetwork(
         d_model=self.d_model,
         d_ffn=self.d_ffn,
         dropout=self.dropout_rate,
         activate=self.activate_fn
      )

      self.norm_layer = nn.LayerNorm(self.d_model)

      self.dropout = nn.Dropout(self.dropout_rate)

   def forward(self, x, mask=None):
      # pre 정규화
      if self.pre_norm:
         output = self.norm_layer(x)
      else:
         output = x
      
      # attn layer 통과
      attn_output, _ = self.attn_layer(output, mask)
      # 잔차 연결
      linked_output = self.dropout(attn_output) + x

      # post 정규화
      if not self.pre_norm:
         output = self.norm_layer(linked_output)
      else:
         output = linked_output
      
      # ffn layer 통과
      output = self.ffn_layer(output)

      return output


### LM Head Layer

In [None]:
class LMHead(nn.Module):
   def __init__(self, vocab_size, d_model):
      super(LMHead, self).__init__()
      self.head = nn.Linear(d_model, vocab_size)

   def forward(self, x):
      token = self.head(x)
      return token

# 모델 조립

### custom model

In [169]:
class TransformerDecoderOnlyModel(nn.Module):
   def __init__(self,n_blocks , vocab_size, d_model, d_ffn, n_head, max_seq_len, dropout_rate=0.1, pre_norm=True, activate_fn=None, pad_token_id=1):
      """
      Decoder Only Transformer Model 초기화

      Args:
         n_blocks (int): 쌓을 디코더 블록의 개수
         vocab_size (int): 단어 사전 크기
         d_model (int): 모델의 기본 차원
         d_ffn (int): FFN의 차원
         n_head (int): 어텐션 헤드의 개수
         max_seq_len (int): 최대 input token 크기
         dropout_rate (int): 드롭아웃 비율(default=0.1)
         pre_nrom (bool): Pre-norm 또는 Post-norm 선택(default=True)
         activate_fn (fn): FFN 내부 활성화 함수(default=GELU)
         pad_id (int): padding 토큰 id(default=1)
      """
      super(TransformerDecoderOnlyModel, self).__init__()

      self.n_blocks = n_blocks
      self.vocab_size = vocab_size
      self.d_model = d_model
      self.d_ffn = d_ffn
      self.n_head = n_head
      self.max_seq_len = max_seq_len
      self.dropout_rate = dropout_rate
      self.pre_norm = pre_norm
      self.activate_fn = activate_fn
      self.pad_token_id = pad_token_id

      self.embed_layer = EmbeddingLayer(self.vocab_size, self.d_model, self.max_seq_len, self.dropout_rate)

      self.decoder_blocks = nn.ModuleList(
         [
            TransformerDecoder(self.d_model, self.d_ffn, self.n_head, self.dropout_rate, self.activate_fn, self.pre_norm)
            for _ in range(n_blocks)
         ]
      )

      if self.pre_norm:
         self.final_norm = nn.LayerNorm(d_model)

      self.lm_head = LMHead(self.vocab_size, self.d_model)

      # token embedding 가중치와 LM Head 가중치를 공유
      self.lm_head.head.weight = self.embed_layer.token_embedding.weight

      # 파라미터 가중치 초기화
      self.apply(self._init_weights)

   def _init_weights(self, module):
      """
      모델의 파라미터를 초기화하는 함수
      """
      if isinstance(module, nn.Linear):
         # 선형 레이어의 가중치를 Xavier Uniform 방식으로 초기화
         nn.init.xavier_uniform_(module.weight)
         if module.bias is not None:
            # 편향(bias)은 0으로 초기화
            nn.init.constant_(module.bias, 0)
      elif isinstance(module, nn.Embedding):
         # 임베딩 레이어의 가중치를 정규분포로 초기화
         nn.init.normal_(module.weight, mean=0.0, std=0.02)
      elif isinstance(module, nn.LayerNorm):
         # LayerNorm의 편향은 0, 가중치는 1로 초기화
         nn.init.constant_(module.bias, 0)
         nn.init.constant_(module.weight, 1.0)


   
   def forward(self, x):
      """
      순전파 함수

      Args:
         x (torch.Tensor): 입력 토큰 ID 텐서 [batch_size, seq_len(token_ids)]
      """

      # mask 생성
      mask = create_mask(x, self.pad_token_id)

      # 임베딩 layer 통과
      output = self.embed_layer(x)

      # Transformer block 통과
      for decoder_block in self.decoder_blocks:
         output = decoder_block(output, mask)

      # 최종 정규화(Pre-Norm일 경우)
      if self.pre_norm:
         output = self.final_norm(output)
      
      # LM Head layer 통과
      logits = self.lm_head(output)

      return logits


# Test

In [183]:
vocab_size = 100
d_model = 10
d_ffn = d_model * 4
n_head = 2
seq_len = 5
batch_size = 1

model = TransformerDecoderOnlyModel(6, vocab_size,d_model,d_ffn,n_head,seq_len)
model.eval()

input_ids = torch.randint(0, vocab_size, (batch_size, seq_len))

with torch.no_grad():
   final_logits = model(input_ids)

print(final_logits.shape)
print(final_logits)

torch.Size([1, 5, 100])
tensor([[[ 0.0606,  0.8559, -0.2472,  0.0537, -0.9774, -0.3505,  0.0290,
          -0.3143,  0.8954,  0.1802,  0.0220,  0.0767,  0.3195, -0.2154,
           0.4784, -0.5134,  0.2779,  0.3530, -0.2026,  0.1862,  0.1317,
           0.0388,  0.4069, -0.4339,  0.4787,  0.5078, -0.0074,  0.1148,
           0.2016,  0.3495,  0.2245,  0.4202, -0.8704, -0.0047, -0.4065,
           0.5051, -0.2130,  0.1018,  0.6835, -0.3045,  0.2745, -0.3510,
           0.9970, -0.4728, -0.1624,  0.1991,  0.9027,  0.7680, -0.6944,
          -0.4747,  0.6820, -0.4628,  0.3635,  0.1804, -0.1201,  0.3186,
           0.7449, -0.6342,  0.1991, -0.0769, -0.2550,  0.3060, -0.3434,
          -0.2691, -0.4887,  0.7110, -0.4117,  0.2527, -0.0660, -0.9792,
          -1.0190,  0.4599,  0.2004,  0.1995,  0.2572,  0.3660,  0.2949,
           0.2403, -0.1207,  0.3486, -0.2585,  0.6885, -0.1083, -0.2716,
          -0.8608, -0.3914, -0.4543,  0.0225,  0.3685, -0.1982, -0.0809,
           0.3383,  0.4676,