# Language processing & Transformer

음성 AI를 위한 자연어 처리와 Transformer의 핵심 구조인 Multi-head Attention을 구현하는 실습입니다.
1. 텍스트 전처리 과정 이해
    - tokenizing
    - cleaning
2. Multi-head attention 및 self-attention 구현.
3. 각 과정에서 일어나는 연산과 input/output 형태 이해.

### 필요 패키지 install & import

In [None]:
!pip install konlpy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from torch import nn
from torch.nn import functional as F
from tqdm import tqdm
import re
import torch
import math

from konlpy.tag import Okt
from tensorflow.keras.preprocessing.text import Tokenizer

In [None]:
torch.manual_seed(5)

<torch._C.Generator at 0x7f6d540e8f10>

## Req. 1-1 텍스트 전처리

주어진 문장 5개를 cleaning, tokenizing 한 뒤 정수 인코딩 하시오.  

원하는 다른 tokenizer를 사용해도 좋습니다.

In [None]:
sentences = [["안녕하세요 음성 AI 실!@습에 오신 것을 환영#$^&@$&$합니다."], ["이네들은 7895435너무나 멀리 있습니다."], 
["계절이 지나가는 하늘에는가을로 가&^%@!$!^득 차 있습니다."], ["아직 나의 청!@$!%춘이 다하지!@% 않은 까닭입니다."], ["가슴 속에 하!@$나 둘 새겨지는 별을"]]

### 정규표현식을 사용하여 숫자, 특수문자 제거

In [None]:
preprocessed_texts = []

okt = Okt()

punctuation = ['.']

for sentence in sentences:
    
    s = sentence[0]

    #문장에서 특수문자나 숫자를 매칭함
    preprocessed_text = re.sub('[#$^&@!%0-9]','',s)

    #okt 토크나이저로 문장 토큰화, 구두점 제거
    tokenize_words = [word for word in okt.morphs(preprocessed_text) if word not in punctuation]

    preprocessed_texts.append(tokenize_words)

In [None]:
preprocessed_texts

[['안녕하세요', '음성', 'AI', '실습', '에', '오신', '것', '을', '환영', '합니다'],
 ['이', '네', '들', '은', '너무나', '멀리', '있습니다'],
 ['계절', '이', '지나가는', '하늘', '에는', '가을로', '가득', '차', '있습니다'],
 ['아직', '나', '의', '청춘', '이', '다', '하지', '않은', '까닭', '입니다'],
 ['가슴', '속', '에', '하나', '둘', '새겨지는', '별', '을']]

In [None]:
tokenizer = Tokenizer()

tokenizer.fit_on_texts(preprocessed_texts)

In [None]:
tokenizer.word_index

{'이': 1,
 '에': 2,
 '을': 3,
 '있습니다': 4,
 '안녕하세요': 5,
 '음성': 6,
 'ai': 7,
 '실습': 8,
 '오신': 9,
 '것': 10,
 '환영': 11,
 '합니다': 12,
 '네': 13,
 '들': 14,
 '은': 15,
 '너무나': 16,
 '멀리': 17,
 '계절': 18,
 '지나가는': 19,
 '하늘': 20,
 '에는': 21,
 '가을로': 22,
 '가득': 23,
 '차': 24,
 '아직': 25,
 '나': 26,
 '의': 27,
 '청춘': 28,
 '다': 29,
 '하지': 30,
 '않은': 31,
 '까닭': 32,
 '입니다': 33,
 '가슴': 34,
 '속': 35,
 '하나': 36,
 '둘': 37,
 '새겨지는': 38,
 '별': 39}

In [None]:
encoding_sentences = tokenizer.texts_to_sequences(preprocessed_texts)

In [None]:
encoding_sentences

[[5, 6, 7, 8, 2, 9, 10, 3, 11, 12],
 [1, 13, 14, 15, 16, 17, 4],
 [18, 1, 19, 20, 21, 22, 23, 24, 4],
 [25, 26, 27, 28, 1, 29, 30, 31, 32, 33],
 [34, 35, 2, 36, 37, 38, 39, 3]]

결과는 다음과 같이 나와야 합니다.  


[[5, 6, 7, 2, 8, 9, 3, 10, 11],  
 [1, 12, 13, 14, 15, 16, 4],  
 [17, 1, 18, 19, 20, 21, 22, 23, 4],  
 [24, 25, 26, 27, 1, 28, 29, 30, 31, 32],  
 [33, 34, 2, 35, 36, 37, 38, 3]]  
 

## Req. 1-2 Multi-head self-attention 구조 익히기

위에서 전처리한 데이터를 가져와 아래 과정을 실행하면서 시퀀스 입력이 multi-head self attention으로 어떻게 모델링 되는지 파악하시오.

In [None]:
pad_id = 0
vocab_size = 40

data = encoding_sentences

In [None]:
# 길이 맞춰주기 위해 패딩합니다.
def padding(data):
  max_len = len(max(data, key=len))
  print(f"Maximum sequence length: {max_len}")

  for i, seq in enumerate(tqdm(data)):
    if len(seq) < max_len:
      data[i] = seq + [pad_id] * (max_len - len(seq))

  return data, max_len

In [None]:
data, max_len = padding(data)

Maximum sequence length: 10


100%|██████████| 5/5 [00:00<00:00, 56375.05it/s]


In [None]:
data

[[5, 6, 7, 8, 2, 9, 10, 3, 11, 12],
 [1, 13, 14, 15, 16, 17, 4, 0, 0, 0],
 [18, 1, 19, 20, 21, 22, 23, 24, 4, 0],
 [25, 26, 27, 28, 1, 29, 30, 31, 32, 33],
 [34, 35, 2, 36, 37, 38, 39, 3, 0, 0]]

### Hyperparameter 세팅 및 embedding

In [None]:
d_model = 512  # model의 hidden size
num_heads = 8  # head의 개수

# d_model이 입력을 projection 시킬 임베딩 space의 차원이므로, num_heads로 나누어 떨어져야 한다.

In [None]:
torch.manual_seed(5)

<torch._C.Generator at 0x7f6d540e8f10>

In [None]:
embedding = nn.Embedding(vocab_size, d_model)

# B: batch size, L: maximum sequence length
batch = torch.LongTensor(data)  # (B, L)
batch_emb = embedding(batch)  # (B, L, d_model)

In [None]:
print(batch_emb)
print(batch_emb.shape)

tensor([[[-1.6057e+00,  5.1626e-01,  8.7614e-01,  ..., -1.4336e+00,
          -3.4603e-01, -9.0669e-01],
         [ 1.9242e+00,  1.2260e-01, -8.2242e-01,  ..., -3.4325e-01,
          -1.5216e+00,  8.2277e-02],
         [-9.7788e-01, -9.2131e-02, -4.7637e-01,  ...,  1.6527e+00,
          -1.3805e+00, -1.5641e+00],
         ...,
         [ 2.2973e-01, -9.2456e-01, -5.6650e-01,  ...,  2.5386e+00,
           1.0674e+00, -7.1239e-01],
         [-1.7765e+00,  8.1844e-01,  9.6409e-01,  ..., -2.5248e-01,
          -1.4807e-03, -9.0503e-01],
         [-6.4328e-01, -7.1078e-01, -1.5838e-01,  ..., -6.2596e-01,
           1.1199e+00, -1.0839e+00]],

        [[ 1.2055e+00, -1.3480e+00, -9.8994e-02,  ..., -9.5853e-01,
          -2.6443e+00,  2.4948e-01],
         [ 5.3607e-02,  4.4246e-01,  8.0602e-01,  ..., -9.7368e-01,
           2.3132e-02, -3.7473e-01],
         [ 1.2890e+00,  8.4212e-01,  5.1969e-01,  ...,  1.5287e-01,
           1.2314e+00, -9.0307e-01],
         ...,
         [ 1.8423e+00,  5

### Linear projection & 여러 head로 나누기

Multi-head attention 내에서 쓰이는 linear projection matrix들을 정의합니다.

In [None]:
w_q = nn.Linear(d_model, d_model)
w_k = nn.Linear(d_model, d_model)
w_v = nn.Linear(d_model, d_model)

In [None]:
w_0 = nn.Linear(d_model, d_model)

In [None]:
torch.manual_seed(5)

<torch._C.Generator at 0x7f6d540e8f10>

In [None]:
q = w_q(batch_emb)  # (B, L, d_model)
k = w_k(batch_emb)  # (B, L, d_model)
v = w_v(batch_emb)  # (B, L, d_model)

print(q.shape)
print(k.shape)
print(v.shape)

torch.Size([5, 10, 512])
torch.Size([5, 10, 512])
torch.Size([5, 10, 512])


Q, k, v를 `num_head`개의 차원 분할된 여러 vector로 만듭니다.

- 이론적으로는 multi-head attention을 수행하면 input을 각각 다른 head 개수만큼의 Wq, Wk, Wv로 linear transformation 해서 각각 여러번의 attention 수행한 후 concat 한 후 linear transformation 수행해준다
- 구현에서는 Wq, Wk, Wv 한 개씩
- 실제 `attention is all you need` 논문의 구현 예시는 Query vector 한개를 dim으로 쪼개서 진행한다

In [None]:
batch_size = q.shape[0]
d_k = d_model // num_heads

# num_heads * d_k로 쪼갠다
q = q.view(batch_size, -1, num_heads, d_k)  # (B, L, num_heads, d_k)
k = k.view(batch_size, -1, num_heads, d_k)  # (B, L, num_heads, d_k)
v = v.view(batch_size, -1, num_heads, d_k)  # (B, L, num_heads, d_k)

print(q.shape)
print(k.shape)
print(v.shape)

torch.Size([5, 10, 8, 64])
torch.Size([5, 10, 8, 64])
torch.Size([5, 10, 8, 64])


In [None]:
# num_heads를 밖으로 뺌으로써
# 각 head가 (L, d_k) 만큼의 matrix를 가지고 self-attention 수행

q = q.transpose(1, 2)  # (B, num_heads, L, d_k)
k = k.transpose(1, 2)  # (B, num_heads, L, d_k)
v = v.transpose(1, 2)  # (B, num_heads, L, d_k)

print(q.shape)
print(k.shape)
print(v.shape)

torch.Size([5, 8, 10, 64])
torch.Size([5, 8, 10, 64])
torch.Size([5, 8, 10, 64])


### Scaled dot-product self-attention 구현

각 head에서 실행되는 self-attetion 과정입니다.

In [None]:
torch.manual_seed(5)

<torch._C.Generator at 0x7f6d540e8f10>

In [None]:
# shape - (L, L)
# 같은 sequence 내에 서로 다른 token들에게 얼마나 가중치를 두고 attention을 해야하는가
attn_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)  # (B, num_heads, L, L)
# softmax - row-wise이기 때문에 dim은 -1
attn_dists = F.softmax(attn_scores, dim=-1)  # (B, num_heads, L, L)

print(attn_dists)
print(attn_dists.shape)

tensor([[[[0.0550, 0.0868, 0.0645,  ..., 0.0749, 0.1359, 0.1409],
          [0.0809, 0.0409, 0.0938,  ..., 0.1158, 0.0655, 0.1383],
          [0.1304, 0.0951, 0.1111,  ..., 0.0791, 0.1291, 0.1318],
          ...,
          [0.0800, 0.0730, 0.1012,  ..., 0.1482, 0.0904, 0.0911],
          [0.0676, 0.0691, 0.0932,  ..., 0.1479, 0.1008, 0.1347],
          [0.0861, 0.0869, 0.1291,  ..., 0.0701, 0.1701, 0.1060]],

         [[0.0919, 0.1111, 0.1576,  ..., 0.0720, 0.1390, 0.0932],
          [0.1772, 0.0910, 0.1112,  ..., 0.1000, 0.0853, 0.1614],
          [0.1266, 0.0877, 0.0814,  ..., 0.1188, 0.0829, 0.0969],
          ...,
          [0.1040, 0.0949, 0.1150,  ..., 0.1032, 0.1228, 0.1003],
          [0.1372, 0.0652, 0.0749,  ..., 0.1390, 0.1136, 0.0868],
          [0.1504, 0.0721, 0.0714,  ..., 0.0510, 0.0655, 0.0594]],

         [[0.1033, 0.1156, 0.0730,  ..., 0.1512, 0.1451, 0.0434],
          [0.1258, 0.1393, 0.0647,  ..., 0.0929, 0.0809, 0.0704],
          [0.1134, 0.1157, 0.0913,  ..., 0

In [None]:
torch.manual_seed(5)

In [None]:
attn_values = torch.matmul(attn_dists, v)  # (B, num_heads, L, d_k)

print(attn_values.shape)

torch.Size([5, 8, 10, 64])


### 각 head의 결과물 병합

각 head의 결과물을 concat하고 동일 차원으로 linear projection합니다.

In [None]:
attn_values = attn_values.transpose(1, 2)  # (B, L, num_heads, d_k)
attn_values = attn_values.contiguous().view(batch_size, -1, d_model)  # (B, L, d_model)

print(attn_values.shape)

torch.Size([5, 10, 512])


In [None]:
torch.manual_seed(5)

<torch._C.Generator at 0x7f6d540e8f10>

In [None]:
# w_0 : (d_model, d_model)
# 서로 다른 의미로 foucsing 된 각 head의 self-attention 정보들을 합쳐주는 역할 수행
outputs = w_0(attn_values)

print(outputs)
print(outputs.shape)

tensor([[[ 1.4145e-02,  9.0591e-02, -3.0949e-02,  ...,  1.9440e-01,
          -1.5152e-02, -1.9763e-01],
         [ 2.7790e-02,  1.3647e-01, -8.7589e-02,  ...,  1.5087e-01,
          -5.9431e-02, -2.0513e-01],
         [ 1.5193e-03,  5.1733e-02, -3.4423e-02,  ...,  1.6774e-01,
          -8.2296e-02, -2.1104e-01],
         ...,
         [-6.9484e-05,  7.8142e-02, -4.7687e-02,  ...,  1.6356e-01,
          -5.4219e-02, -1.8753e-01],
         [-9.4693e-03,  7.9399e-02, -2.6237e-02,  ...,  1.3948e-01,
           1.9279e-02, -1.9136e-01],
         [ 2.6785e-02,  8.8562e-02, -2.0943e-02,  ...,  1.5453e-01,
          -8.5891e-02, -1.8358e-01]],

        [[ 1.1532e-01, -1.3643e-01, -1.5151e-01,  ...,  9.1063e-02,
           1.0467e-01,  5.7146e-03],
         [ 1.4090e-01, -1.0668e-01, -1.5049e-01,  ...,  1.2853e-01,
           1.2191e-01, -2.5358e-02],
         [ 1.3648e-01, -1.0209e-01, -7.6098e-02,  ...,  1.4596e-01,
           1.0709e-01, -5.4055e-02],
         ...,
         [ 1.7824e-01, -1

## Req. 1-3 Multi-head self-attention 모듈 클래스 구현

위의 과정을 모두 합쳐 하나의 Multi-head attention 모듈 class를 구현하겠습니다.

아래 코드의 TODO 부분을 채워주세요.

In [None]:
class MultiheadAttention(nn.Module):
  def __init__(self):
    super(MultiheadAttention, self).__init__()

    # Q, K, V learnable matrices
    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)

    # Linear projection for concatenated outputs
    self.w_0 = nn.Linear(d_model, d_model)

  # scaled-dot product attention
  def self_attention(self, q, k, v):
    attn_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)  # (B, num_heads, L, L)
    attn_dists = F.softmax(attn_scores, dim=-1)  # (B, num_heads, L, L)

    attn_values = torch.matmul(attn_dists, v)  # (B, num_heads, L, d_k)

    return attn_values

  def forward(self, q, k, v):
    batch_size = q.shape[0]
    d_k = d_model // num_heads

    # linear projection
    ################################################################################
    # TODO 1: Implement the forward pass for linear projection.                #
    ################################################################################
    # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

    q = w_q(q)  # (B, L, d_model)
    k = w_k(k)  # (B, L, d_model)
    v = w_v(v)  # (B, L, d_model)


    # head만큼 쪼개준다
    ################################################################################
    # TODO 2: Implement the forward pass for split head.                #
    ################################################################################
    # *****START OF YOUR CODE (DO NOT DELETE/MODIFY THIS LINE)*****

    # num_heads * d_k로 쪼갠다
    q = q.view(batch_size, -1, num_heads, d_k)  # (B, L, num_heads, d_k)
    k = k.view(batch_size, -1, num_heads, d_k)  # (B, L, num_heads, d_k)
    v = v.view(batch_size, -1, num_heads, d_k)  # (B, L, num_heads, d_k)

    # 각 head가 (L, d_k)의 matrix를 담당하도록 만든다
    q = q.transpose(1, 2)  # (B, num_heads, L, d_k)
    k = k.transpose(1, 2)  # (B, num_heads, L, d_k)
    v = v.transpose(1, 2)  # (B, num_heads, L, d_k)

    attn_values = self.self_attention(q, k, v)  # (B, num_heads, L, d_k)
    attn_values = attn_values.transpose(1, 2).contiguous().view(batch_size, -1, d_model)  # (B, L, num_heads, d_k) => (B, L, d_model)

    return self.w_0(attn_values)

In [None]:
torch.manual_seed(5)

<torch._C.Generator at 0x7f6d540e8f10>

In [None]:
multihead_attn = MultiheadAttention()

outputs = multihead_attn(batch_emb, batch_emb, batch_emb)  # (B, L, d_model)

In [None]:
print(outputs)
print(outputs.shape)  # (batch_size, length, d_model)

tensor([[[ 2.5881e-02,  4.7264e-02,  7.3918e-02,  ..., -1.4889e-01,
          -7.8450e-02, -8.2003e-02],
         [ 7.7830e-02,  5.0064e-02,  1.5045e-01,  ..., -6.5740e-02,
          -1.0189e-01, -7.6289e-03],
         [ 6.7338e-02,  9.0291e-02,  1.0462e-01,  ..., -1.1704e-01,
          -5.6486e-02, -2.3669e-02],
         ...,
         [ 4.9953e-02,  1.2863e-02,  9.6843e-02,  ..., -1.0637e-01,
          -1.0702e-01, -7.6089e-02],
         [ 4.7319e-02,  5.2183e-02,  8.2475e-02,  ..., -7.8222e-02,
          -1.1873e-01, -3.5306e-02],
         [ 5.6760e-02,  3.6103e-02,  1.0054e-01,  ..., -7.7613e-02,
          -5.8231e-02, -6.0732e-02]],

        [[ 2.3406e-01, -9.6137e-02, -6.8675e-02,  ..., -7.3635e-02,
          -8.1034e-02,  5.7708e-02],
         [ 1.5537e-01, -3.6611e-02, -6.2355e-02,  ..., -8.5579e-03,
          -4.5835e-02,  2.2010e-02],
         [ 6.6599e-02, -6.3743e-02,  1.8539e-02,  ..., -3.4993e-02,
          -1.1480e-01, -1.7960e-02],
         ...,
         [ 9.2126e-02, -5