# transformers
-  Hugging Face에서 제공하는 NLP 모델 라이브러리로, BERT, GPT, RoBERTa, T5 등 다양한 사전 학습된 모델을 지원한다.
- NLP와 트랜스포머 기반 딥러닝 연구를 간편하게 수행할 수 있도록 도와준다.

In [1]:
!sudo apt-get install -y fonts-nanum
!sudo fc-cache -fv
!rm ~/.cache/matplotlib -rf


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following NEW packages will be installed:
  fonts-nanum
0 upgraded, 1 newly installed, 0 to remove and 49 not upgraded.
Need to get 10.3 MB of archives.
After this operation, 34.1 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 fonts-nanum all 20200506-1 [10.3 MB]
Fetched 10.3 MB in 1s (14.4 MB/s)
debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 78, <> line 1.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin: 
Selecting previously unselected package fonts-nanum.
(Reading database ... 123634 files and direc

In [1]:
import matplotlib.pyplot as plt
plt.rc('font',family="NanumBarunGothic")
plt.rcParams["axes.unicode_minus"] = False



```
gdown : google drive에 저장된 파일 다운로드할 수 있게 도와주는 패키지  
(머신러닝 모델, 데이터셋)  
einops : 딥러닝 작업에서 텐서의 재배열을 간단하게 처리하도록 도와주는 패키지  
(transformers 모델처럼 복잡한 텐서 구조에 유용)  
sentencepiece : 텍스트 토큰화, 언어에 구애받지 않고 BPE(Byte Pair Encoding)나 Unigram 모델 기반으로 서브워드 토큰화 제공  
(Hugging Face와 같은 NLP 라이브러리와 함께 사용)  
sacremoses : 텍스트 전처리, 텍스트를 토큰화하거나 역토큰화할 때 사용되며, 번역 모델의 전처리 단계에서 자주 사용
```



In [2]:
!pip install gdown
!pip inatall einops
!pip install sentencepiece
!pip install transformers
!pip install sacremoses

ERROR: unknown command "inatall" - maybe you meant "install"
Collecting sacremoses
  Downloading sacremoses-0.1.1-py3-none-any.whl.metadata (8.3 kB)
Downloading sacremoses-0.1.1-py3-none-any.whl (897 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m897.5/897.5 kB[0m [31m23.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sacremoses
Successfully installed sacremoses-0.1.1


In [3]:
import torch

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

#
train_model = False

DEVICE

'cuda'

# 1. 모델 구현  
## 1. MHA ( Multi Head Attention )  
Encoder, Decoder의 Self Attention을 수행하는 모듈  
### 1.1 einops 사용하기  
- 장점 : 텐서의 변환을 매우 쉽게 이해하고 변환 가능  
- 단점 : 오류 잡기가 매우 힘들고 속도도 비교적 느리다

In [4]:
from einops import rearrange

# 차원 재배치
x = torch.randn(2, 3, 4) # Tensor shape : (2, 3, 4)
y = rearrange(x, 'a b c -> c a b')

y.shape


torch.Size([4, 2, 3])

In [5]:
# 차원 합치기
x = torch.randn(2, 3, 4)
y = rearrange(x, 'a b c -> (a b) c')

y.shape

torch.Size([6, 4])

In [6]:
# 차원 나누기
x = torch.randn(6, 4)
y = rearrange(x, '(a b) c -> a b c', a=2, b=3)

y.shape

torch.Size([2, 3, 4])

## 2. MHA 구현

In [7]:
from torch import nn
from einops import rearrange
import torch

class MHA(nn.Module):

  def __init__(self, d_model=512, n_heads=8):
    """
      d_model : 임베딩 벡터의 차원 (dimension of embedding vector)
      n_heads : 멀티 헤드 어텐션의 헤드 개수 (number of heads in multi_head attention)
    """
    super().__init__()

    self.n_heads = n_heads

    # Query, Key, Value 벡터를 위한 선형 레이어를 정의
    self.fc_q = nn.Linear(d_model, d_model)
    self.fc_k = nn.Linear(d_model, d_model)
    self.fc_v = nn.Linear(d_model, d_model)

    # 최종 출력을 위한 선형 레이어
    self.fc_o = nn.Linear(d_model, d_model)

    # 어텐션 스코어를 스케일링 하기 위한 값. (d_model / n_heads의 제곱근)
    self.scale = torch.sqrt(torch.tensor(d_model / n_heads)) # 512 / 8의 root

  def forward(self, Q, K, V, mask=None):
    """
      Q, K, V : 단어가 임베딩 레이어를 통과한 결과. : 임베딩 벡터
      shape : (N, t, D) : (batch_size, max_len, dimension)
      mask : 어텐션 스코어에 적용할 마스트 (optional)
    """

    # 임베딩 벡터인 Q, K, V에 대한 선형 변환 적용
    Q = self.fc_q(Q) # (N, t, D) -> (N, t, D)
    K = self.fc_k(K) # (N, t, D) -> (N, t, D)
    V = self.fc_v(V) # (N, t, D) -> (N, t, D)

    # 멀티 헤드를 위해 임베딩 차원 D를 헤드 개수 n_heads로 분할
    # 예를 들어 (32, 128, 512)로 들어왔을 때 8개의 head로 나누면 (32, 8, 128, 64)
    Q = rearrange(Q, 'N t (h dk) -> N h t dk', h=self.n_heads) # (N, t, D) -> (N, h, t, D//h)
    K = rearrange(K, 'N t (h dk) -> N h t dk', h=self.n_heads) # (N, t, D) -> (N, h, t, D//h)
    V = rearrange(V, 'N t (h dk) -> N h t dk', h=self.n_heads) # (N, t, D) -> (N, h, t, D//h)

    # 어텐션 스코어 구하기 (softmax 통과 전) QK^T / 루트d_k
    # Q와 K^T의 내적을 통해 어텐션 스코어를 계산하고 스케일링 적용

    # (N, h, t, dk) @ (N, h, dk, t) -> (N, h, t(query의 길이), t(Key의 길이))
    attention_score = Q @ K.transpose(-2, -1) / self.scale

    # 패딩의 위치에다가 굉장히 작은 값을 강제로 부여 -> 소프트맥스 적용 시에 0이 될 수 있도록
    if mask is not None: # 패딩 위치를 의미하는 인덱스들이 존재한다면
      attention_score[mask] = -1e10 # mask에 해당하는 위치에 굉장히 작은 값을 넣어준다

    # 에너지 값을 구하기 위해서는 키의 방향으로 softmax를 적용한다
    energy = torch.softmax(attention_score, dim=-1)

    # 에너지와 V를 곱해서 최종 어텐션 값을 구한다
    attention = energy @ V # (N, h, t, t) @ (N, h, t, dk) -> (N, h, t, dk)

    # 헤드 차원을 연결해서 원래의 차원으로 되돌리기
    x = rearrange(attention, 'N h t dk -> N t (h dk)') # (N, h, t, dk) -> (N, t, D)

    # 최종 출력값에 대해 선형 변환을 적용한다 각각의 헤드의 생각을 섞어준다.
    x = self.fx_o(x) # (N, t, D) -> (N, t, D)

    return x, energy

## 3. FNN (Feed Forward Network)  
- 인코더와 디코더의 MHA의 결과를 하나로 합쳐주는 역할

In [8]:
class FeedForward(nn.Module):

  def __init__(self, d_model=512, d_ff=2048, drop_p=0.1):
    super().__init__()

    self.linear == nn.Sequential(
        nn.Linear(d_model, d_ff),
        nn.ReLU(),
        nn.Dropout(drop_p),
        nn.Linear(d_ff, d_model)
    )

  def forward(self, mha_output):
    out = self.linear(mha_output)
    return out

## 4. Encoder 구현

In [9]:
# MHA - FFN 연결 과정 구현
# 추가적으로 skip connection, LN까지 구현

class EncoderLayer(nn.Module):
  def __init__(self, d_model, d_ff, n_heads, drop_p):
    """
      d_model : 임베딩 벡터의 차원
      d_ff : 피드 포워드 신경망의 은닉층의 차원
      n_heads : 멀티 헤드 어텐션의 헤드 개수
      drop_p : 드롭아웃 비율
    """
    super().__init__()

    # Multi Head Attention 레이어 정의 (self-attention)
    self.self_atten = MHA(d_model, n_heads)

    # MHA에 대한 Layer Normalization 정의
    self.self_atten_LN = nn.LayerNorm(d_model)

    # 피드 포워드 네트워크 정의
    self.FF = FeedForward(d_model, d_ff, drop_p)

    # 피드 포워드 네트워크의 출력에 대한 Layer Normailzation
    self.FF_LN = nn.LayerNorm(d_model)

    # 드롭아웃 레이어 정의
    self.dropout = nn.Dropout(drop_p)

  def forward(self, x, enc_mask):

    """
      x : 입력 텐서, Shape은 (batch_size, seq_len, d_model) -> Projection Layer
      enc_mask : 입력 마스크, Shape은 (batch_size, 1, seq_len)

      return : 인코더 레이어의 출력과 어텐션 가중치(에너지)
    """

    # Multi Head Attention 블록 구현
    # 리니어 레이어를 지나기 전에는 Q=K=V다. 이를 x로 받아오고 있음
    residual, atten_enc = self.self_atten(Q=x, K=x, V=x, mask=enc_mask)
    residual = self.dropout(residual)

    # Skip Connection(Add) & Layer Norm
    encoder_self_attention_output = self.self_atten_LN(x + residual)

    # FFN 블록
    residual = self.FF(encoder_self_attention_output)
    residual = self.dropout(residual)

    # Skip Connection & Layer Norm
    encoder_ffn_output = self.FF_LN(encoder_self_attention_output + residual)

    return encoder_ffn_output, atten_enc

In [10]:
class Encoder(nn.Module):

  def __init__(self, input_embedding, max_len, n_layers, d_model, d_ff, n_heads, drop_p):
    """
      input_embedding : 입력 임베딩 레이어 (nn.Embedding)
      max_len : 입력 시퀀스의 최대 길이 (int)
      n_layers : 인코더 레이어의 개수
      d_model : 임베딩 벡터의 차원
      d_ff : 피드 포워드 신경망의 은닉층의 차원
      n_heads : 멀티 헤드 어텐션의 헤드 개수
      drop_p : 드롭아웃 비율
    """

    super().__init__()

    # d_model의 제곱근 값으로 scale을 정의하여 임베딩 벡터의 크기를 조정
    self.scale = torch.sqrt(torch.tensor(d_model))

    # 입력 임베딩 레이어
    self.input_embeddin = input_embedding

    # 위치 임베딩 레이어 : 입력 시퀀스의 위치 정보를 학습하기 위한 레이어.
    # 최근에는 위치 정보도 학습의 대상으로 보기 위함
    self.pos_embedding = nn.Embedding(max_len, d_model)

    self.dropout = nn.Dropout(drop_p)

    # 여러 개의 인코더 레이어를 쌓기 위해 모듈 리스트를 활용
    self.layers = nn.ModuleList(
        [ EncoderLayer(d_model, d_ff, n_heads, drop_p) for _ in range(n_layers)]
    )
    self.device = DEVICE

  def forward(self, src, mask, atten_map_save=False):
    """
      src : 입력 시퀀스 (batch_size, seq_len)
      mask : 패딩을 마스킹하기 위한 마스크 텐서 (batch_size, 1, seq_len)
      atten_map_save : 어켄션 맵을 저장할지 여부 (기본값 : False)
    """

    # 위치 인덱스 텐서 생성 : 각 배치에서 시퀀스의 길이만큼 위치 인덱스 반복
    pos = torch.arrange(src.shape[1]).repeat(src.shape[0], 1).to(self.device)

    # 입력 임베딩과 위치 임베딩을 합산해서 입력 텐서 x 생성
    x_embedding = self.scale * self.input_embedding(src) + self.pos_embedding(pos)

    # 드롭아웃
    x_embedding = self.dropout(x_embedding)

    # 제일 처음 입력
    encoder_output = x_embedding

    # 인코더의 에너지 값들을 저장할 비어있는 텐서
    atten_encs = torch.tensor([]).to(self.device)

    # 각 인코더 레이어를 순차적으로 통과
    for layer in self.layers:
      # 인코더 레이어를 통과시키면 encoder_output과 에너지 값을 얻는다.
      encoder_output, atten_enc = layer(encoder_output, mask)

      # atten_map_save가 True면 에너지 저장
      if atten_map_save:

        # 현재 어텐션 맵을 기존의 atten_encs에 추가 (첫 번째 레이어의 어텐션 맵만 저장 )
        atten_encs = torch.cat([atten_encs, atten_enc[0].unsqueeze(0)], dim=0)

      # 최종 출력과 에너지 맵을 반환
      return encoder_output, atten_encs

## 5. Decoder 구현  
**Masking 실험**

In [11]:
import torch

# 예제 설정
batch_size = 3
seq_len = 10
padding = 3
n_heads = 8

# attention_score 텐서 생성 (무작위 값으로 초기화)
# Shape : (batch_size, n_heads, seq_len, seq_len)
attention_score = torch.randn(batch_size, n_heads, seq_len, seq_len)

# enc_mask 생성 : 패딩이 있는 위치를 마스킹
# 각 시퀀스의 마지막 3개 위치에 패딩이 있다고 가정
enc_mask = torch.tensor([
    [0, 0, 0, 0, 0, 0, 0, 1, 1, 1], # 첫 번째 시퀀스
    [0, 0, 0, 0, 0, 0, 0, 1, 1, 1], # 두 번째 시퀀스
    [0, 0, 0, 0, 0, 0, 0, 1, 1, 1], # 세 번째 시퀀스
], dtype = torch.bool).unsqueeze(1).unsqueeze(2) # Shape : (batch_size, 1, 1, seq_len)

# enc_mask의 shape을 (batch_size, n_heads, seq_len, seq_len)로 확장
enc_mask = enc_mask.expand(batch_size, n_heads, seq_len, seq_len)

print("="*25, "attention_score에 마스킹 적용 전", "="*25)
print("attention_score[0, 0] (첫 번째 배치, 첫 번째 헤드):\n", attention_score[0,0])

# attention_score에 enc_mask 적용
if enc_mask is not None:
  attention_score[enc_mask] = 0 # 마스크된 위치에 매우 작은 값을 넣어 softmax 결과에 영향 미치지 않게 함

print("="*25, "attention_score에 마스킹 적용 전", "="*25)
print("attention_score shape:", attention_score.shape)
print(enc_mask.shape)

# 특정 헤드에 대한 attention_score 확인 (첫 번재 헤드)
print("="*25, "attention_score에 마스킹 적용 후", "="*25)
print("attention_score[0, 0] (첫 번째 배치, 첫 번째 헤드):\n", attention_score[0, 0])

attention_score[0, 0] (첫 번째 배치, 첫 번째 헤드):
 tensor([[ 1.4086e+00,  7.0849e-01,  2.6769e-01, -2.7591e-02,  5.2874e-01,
          1.0179e+00,  4.3681e-01,  7.1985e-01,  4.3365e-01,  1.7712e+00],
        [-8.4898e-01,  3.5340e-01, -5.9011e-01, -1.9423e+00, -6.3491e-01,
         -1.3555e+00, -1.5007e+00,  3.3165e-01,  1.2865e+00, -7.0250e-02],
        [ 7.9057e-01,  1.7039e+00,  6.4460e-02, -7.8073e-01, -9.0764e-01,
         -7.6891e-01,  2.7758e-02, -5.7297e-01,  1.4036e+00,  4.9849e-01],
        [ 4.2384e-01, -5.5427e-01,  1.2881e-01, -1.1226e+00,  4.3744e-01,
         -2.5542e-05,  1.1006e+00, -2.2449e+00,  8.7916e-01, -1.7438e+00],
        [-7.3371e-02,  1.1193e+00, -2.0132e+00,  2.4533e-01,  1.1731e+00,
         -6.6114e-02,  1.5799e+00,  1.4119e+00, -8.6355e-01,  2.0156e-01],
        [-4.8999e-01,  1.9785e+00, -2.1782e+00,  2.2810e+00, -7.1590e-01,
          7.0295e-01,  1.9300e-01,  1.5347e-01, -1.1260e+00, -1.3306e+00],
        [-6.7815e-01,  2.7048e-01,  8.7626e-01,  1.3881e+00,  4

In [12]:
class DecoderLayer(nn.Module):

  def __init__(self, d_model, d_ff, n_heads, drop_p):
    super().__init__()

    # Decoder의 Self-Attention Layer -> Masked Multi Head Attention 정의
    self.self_atten = MHA(d_model, n_heads) # forward할 때 Decoder Mask가 따로 부여
    self.self_atten_LN = nn.LayerNorm(d_model)

    # Encoder - Decoder Attention Layer
    self.enc_dec_atten = MHA(d_model, n_heads)
    self.enc_dec_atten_LN = nn.LayerNorm(d_model)

    # Feed Forward
    self.FF = FeedForward(d_model, d_ff, drop_p)
    self.FF_LN = nn.LayerNorm(d_model)

    # Dropout
    self.dropout = nn.Dropout(drop_p)

  def forward(self, x, enc_out, dec_mask, enc_dec_mask):
    """
      x : 디코더의 입력 텐서 (batch_size, seq_len, d_model)
      enc_out : 인코더의 출력 텐서 (batch_size, seq_len, d_model)
      dec_mask : 디코더의 Self-Attention에 사용되는 마스크 (batch_size, 1, seq_len)
      enc_dec_mask : 인코더-디코더 Attention에 사용되는 마스크 (batch_size, 1, seq_len)

      return : 디코더 레이어의 출력 텐서, 디코더 Self-Attention 맵, 인코더-디코더 Attention 맵
    """
    # 1. 디코더의 Self Attention
    residual, atten_dec = self.self_atten(Q=x, K=x, V=x, mask=dec_mask)
    residual = self.dropout(residual)
    decoder_masked_self_attention_output = self.self_atten_LN(x + residual)

    # 2. Encoder - Decoder Attention
    # Q는 Masked Multi Head Attention, K, V는 인코더의 출력
    residual, atten_dec_enc = self.enc_dec_atten(
        Q=decoder_masked_self_attention_output,
        K=enc_out,
        V=enc_out,
        mask=enc_dec_mask
    )
    residual = self.dropout(residual)
    decoder_self_attention_output = self.enc_dec_atten_LN(x + residual)

    # 3. Feed Forward
    residual = self.FF(decoder_self_attention_output)
    residual = self.dropout(residual)
    decoder_output = self.FF_LN(decoder_self_attention_output + residual)

    # decoder_output : 디코더 출력값. 디코더가 N회 반복된 후 소프트맥스를 만나 단어를 출력하기 위한 값
    # atten_dec : Masked Self Attention의 에너지 맵
    # atten_dec_enc : Encoder에서 출력한 단어와 Decoder에서 출력할 단어의 에너지 맵
    return decoder_output, atten_dec, atten_dec_enc