<a href="https://colab.research.google.com/github/roussanne/Athens/blob/master/(%EC%8B%A4%EC%8A%B5_%EB%AC%B8%EC%A0%9C)2_1_%ED%86%A0%ED%81%B0%ED%99%94%2C%EC%9E%84%EB%B2%A0%EB%94%A9_%EC%8B%A4%EC%8A%B5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Content License Agreement**

<font color='red'><b>**WARNING**</b></font> : 본 자료는 삼성청년SW·AI아카데미의 컨텐츠 자산으로, 보안서약서에 의거하여 어떠한 사유로도 임의로 복사, 촬영, 녹음, 복제, 보관, 전송하거나 허가 받지 않은 저장매체를 이용한 보관, 제3자에게 누설, 공개 또는 사용하는 등의 무단 사용 및 불법 배포 시 법적 조치를 받을 수 있습니다.

### **Objectives**

1. 실습명: 토큰화/임베딩 실습
2. 핵심 주제
    1) tokenizer를 이용하여 단어들을 토큰으로 변환하는 과정을 이해
    2) 토큰화된 토큰들을 임베딩 벡터로 변환하는 과정을 이해
    3) RNN부터 트랜스포머까지 모델의 발전사를 직접 체험하고 각 요소 기술의 역할을 이해
3. 학습 목표
    1) 토크나이저가 무엇이고 토큰화가 무엇인지에 대해서 설명할 수 있다.
    2) 토큰화를 왜 하는지에 대해서 설명할 수 있다.
    3) 토큰화된 토큰들을 임베딩 벡터로 변환하는 과정을 이해할 수 있다.
    4) 임베딩 벡터를 이용하여 어떤 식으로 활용할 수 있는지 설명할 수 있다.
    5) 다양한 모델의 발전사에 대해 직접 체험하고 각 아키텍쳐가 가지는 특징을 설명할 수 있다.

4. 학습 개념
    1) 토큰화:
    2) 임베딩 벡터:
    3) 인코더/디코더:
  
5. 학습 방향
    - 실습은 아래 내용들을 직접 체험하고 각 아키텍쳐가 가지는 특징을 이해하는 것이 목표입니다.
      - 토큰화
      - 임베딩
      - RNN
      - LSTM
      - 어텐션 메커니즘
      - 인코더
      - 디코더
    - 실습 코드는 조교가 직접 구현한 코드를 참고하며 학습합니다.
    - 자연스럽게 코드를 구현하면서 아키텍쳐의 발전사를 체험합니다.

6. 데이터셋 개요 및 저작권 정보
    - 데이터셋 명 : NSMC(Naver Sentiment Movie Corpus)
    - 데이터셋 개요 : 네이버 영화 감정분석 데이터셋
    - 데이터셋 저작권 : CC0 1.0

### **Prerequisites**
```
numpy==2.0.2
pandas==2.2.2
tokenizers==0.21.4
transformers==4.55.2
torch==2.8.0+cu126
```

- 만약, 기본 코랩과 버전이 다르다면 아래 명령어를 복사해서 실행시켜주세요.
```
%pip install numpy==2.0.2 pandas==2.2.2 tokenizers==0.21.4 transformers==4.55.2 torch==2.8.0+cu126 --index-url https://download.pytorch.org/whl
```

In [52]:
import torch
import torch.nn as nn
import numpy as np
from typing import (
    Generic,
    Tuple,
    TypeVar,
    List,
    Union,
    get_args
)
# 시드 설정
np.random.seed(1234)
torch.manual_seed(1234)
torch.cuda.manual_seed(1234)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

Batch = TypeVar("Batch", bound=int)
Token = TypeVar("Token", bound=int)
Sequence = TypeVar("Sequence", bound=int)
Layers = TypeVar("Layers", bound=int)
HiddenStates = TypeVar("HiddenStates", bound=int)
VocabSize = TypeVar("VocabSize", bound=int)
EmbeddingSize = TypeVar("EmbeddingSize", bound=int)
MaxLength = TypeVar("MaxLength", bound=int)

_1D = TypeVar("_1D")
_2D = TypeVar("_2D")
_3D = TypeVar("_3D")

def _label_str(self) -> str:
    """인스턴스의 제네릭 라벨 이름을 예쁘게 표시 (e.g., [Sequence])"""
    oc = getattr(self, "__orig_class__", None)
    if oc is None:
        return "[]"
    args = get_args(oc)
    names = [getattr(a, "__name__", str(a)) for a in args]
    return "[" + ", ".join(names) + "]"


class Tensor1D(Generic[_1D]):
    def __init__(self, tensor: torch.Tensor):
        assert tensor.dim() == 1, ValueError("Tensor must be 1-dimensional")
        self.tensor = tensor
        self.s: _1D = tensor.size(0)  # sequence length

    def size(self) -> Tuple[int, int]:
        return self.tensor.size()

    def __repr__(self) -> str:
        return f"Tensor(shape=({self.s}))"

class Tensor2D(Generic[_1D, _2D]):
    def __init__(self, tensor: torch.Tensor):
        assert tensor.dim() == 2, ValueError("Tensor must be 2-dimensional")
        self.tensor = tensor
        self.b: _1D = tensor.size(0)  # batch size
        self.s: _2D = tensor.size(1)  # sequence length
        assert self.b == tensor.size(0), ValueError(
            f"Expected batch {self.b}, but got {tensor.size(0)}"
        )
        assert self.s == tensor.size(1), ValueError(
            f"Expected Sequence {self.s}, but got {tensor.size(1)}"
        )

    def size(self) -> Tuple[int, int]:
        return self.tensor.size()

    def __repr__(self) -> str:
        return f"Tensor(shape=({self.b}, {self.s}))"


class Tensor3D(Generic[_1D, _2D, _3D]):
    def __init__(self, tensor: torch.Tensor):
        assert tensor.dim() == 3, ValueError("Tensor must be 3-dimensional")
        self.tensor = tensor
        self.b: _1D = tensor.size(0)  # batch size
        self.s: _2D = tensor.size(1)  # sequence length
        self.h: _3D = tensor.size(2)  # hidden state size
        assert self.b == tensor.size(0), ValueError(
            f"Expected batch {self.b}, but got {tensor.size(0)}"
        )
        assert self.s == tensor.size(1), ValueError(
            f"Expected Sequence {self.s}, but got {tensor.size(1)}"
        )
        assert self.h == tensor.size(2), ValueError(
            f"Expected Hidden State {self.h}, but got {tensor.size(2)}"
        )

    def size(self) -> Tuple[int, int]:
        return self.tensor.size()

    def __repr__(self) -> str:
        return f"Tensor(shape=({self.b}, {self.s}, {self.h}))"


# 1. 토크나이저 / 워드 임베딩

- 학습 목표
  1. 토크나이저를 학습할 수 있다.
  2. 토크나이저를 사용하여 텍스트를 토큰 ID 시퀀스로 변환하는 방법을 이해하고 구현할 수 있ㅏ.
- 학습 개념
  1. 토크나이저
  2. 토큰화
  3. 임베딩
- 진행하는 실습 요약
  1. 제공된 말뭉치로 WordPiece 토크나이저를 훈련시키는 코드 한 줄을 완성
  2. 훈련된 토크나이저를 사용해 특정 문장을 토큰 ID 시퀀스로 변환하는 코드
  3. nn.Embedding 레이어(혹은 간단한 dict lookup)를 사용하여 주어진 토큰 ID에 해당하는 임베딩 벡터를 조회하는 코드

### 1.1. Tokenizer 학습

<blockquote>
<b>🧠 토크나이저 학습</b><br>
언어 모델에서 토크나이저는 텍스트를 토큰으로 변환하는 역할을 합니다. 토크나이저를 학습하는 방법에 대해 알아봅니다.
</blockquote>

토크나이저를 학습하기 위해서는 다음 두가지가 필요합니다.
1. 토크나이저 객체(클래스)
2. 학습 데이터


그러면 우선 학습 데이터를 준비해보겠습니다.

학습할 텍스트 데이터가 들어있는 파일을 준비합니다.

여기서는 NSMC(Naver Sentiment Movie Corpus) 데이터셋을 사용하겠습니다.

아래 명령어를 실행하여 데이터셋을 다운로드 받습니다.

In [53]:
!wget https://github.com/e9t/nsmc/raw/master/ratings.txt

--2025-10-16 03:31:04--  https://github.com/e9t/nsmc/raw/master/ratings.txt
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/e9t/nsmc/master/ratings.txt [following]
--2025-10-16 03:31:04--  https://raw.githubusercontent.com/e9t/nsmc/master/ratings.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 19515078 (19M) [text/plain]
Saving to: ‘ratings.txt.2’


2025-10-16 03:31:04 (256 MB/s) - ‘ratings.txt.2’ saved [19515078/19515078]



데이터셋을 확인해봅니다.

In [54]:
import pandas as pd
import os

file_list = os.listdir()
for file in file_list:
    if "ratings.txt" == file:
        print('학습에 필요한 파일이 존재합니다!', file)
        df = pd.read_table( (os.getcwd() + '/' + file), encoding='utf-8') # 데이터 프레임으로 보기 편하게 바꿔줍시다!
        df = df.dropna(how = 'any') # 널값을 없애줍니다!
        print('리뷰 갯수 :', len(df))
        df.head()

학습에 필요한 파일이 존재합니다! ratings.txt
리뷰 갯수 : 199992


텍스트 데이터가 있는 'document'열만을 가져오고

해당 데이터를 txt 파일로 저장합니다.

In [55]:
# 텍스트 데이터가 있는 'document'열만을 가져오고
documents = df['document']

# 해당 데이터를 txt 파일로 저장합니다.
with open('naver_review.txt', 'w', encoding='utf8') as f:
    for doc in documents:
        f.write(str(doc) + '\n') # Convert to string before writing

# 저장된 파일을 확인해봅니다.
with open('naver_review.txt', 'r', encoding='utf8') as f:
    for i in range(5):
        print(f.readline().strip())

어릴때보고 지금다시봐도 재밌어요ㅋㅋ
디자인을 배우는 학생으로, 외국디자이너와 그들이 일군 전통을 통해 발전해가는 문화산업이 부러웠는데. 사실 우리나라에서도 그 어려운시절에 끝까지 열정을 지킨 노라노 같은 전통이있어 저와 같은 사람들이 꿈을 꾸고 이뤄나갈 수 있다는 것에 감사합니다.
폴리스스토리 시리즈는 1부터 뉴까지 버릴께 하나도 없음.. 최고.
와.. 연기가 진짜 개쩔구나.. 지루할거라고 생각했는데 몰입해서 봤다.. 그래 이런게 진짜 영화지
안개 자욱한 밤하늘에 떠 있는 초승달 같은 영화.


In [56]:
import os
import pandas as pd

# CSV 파일 읽기 - ratings_train.txt 파일이 현재 작업 디렉토리에 있어야 합니다
df = pd.read_csv(os.path.join(os.getcwd(), 'ratings.txt'), sep='\t')

In [57]:
import pandas as pd

# CSV 파일 읽기
df = pd.read_csv('ratings.txt', sep='\t')

학습이 되어 있지 않은 빈 tokenizer를 생성합니다.

여기서는 BertWordPieceTokenizer를 불러옵니다.

##### 파라미터:
- `strip_accents` : 입력 텍스트의 악센트(액센트)를 제거할지 여부를 결정하는 옵션입니다. 한국어를 학습할때에는 `False`로 설정합니다.
- `lowercase` : 영어를 모두 소문자로 바꿉니다. `False`로 설정하면 영어를 대문자로 유지합니다.

In [58]:
from tokenizers import BertWordPieceTokenizer

# 빈 tokenizer 생성 : vocabulary_size = 0 인 것을 확인하실 수 있습니다.
tokenizer = BertWordPieceTokenizer(
    lowercase=False,
    strip_accents=False,
)
tokenizer

Tokenizer(vocabulary_size=0, model=BertWordPiece, unk_token=[UNK], sep_token=[SEP], cls_token=[CLS], pad_token=[PAD], mask_token=[MASK], clean_text=True, handle_chinese_chars=True, strip_accents=False, lowercase=False, wordpieces_prefix=##)

아래 코드를 실행하여 토크나이저를 학습합니다.
#### 파라미터 설명:
- `data_file` : 데이터 경로를 지정해줍니다. list 형태로 여러개의 파일을 지정해줄수도 있습니다.
- `vocab_size (default: 30000)` : 단어사전 크기를 지정할 수 있습니다. 어떠한 값이 가장 좋다는 것은 없지만, 값이 클수록 많은 단어의 의미를 담을 수 있습니다.
- `initial_alphabet` : 꼭 포함됐으면 하는 initial alphabet을 학습 전에 추가해줍니다.
    - initial은 학습하기 이전에 미리 단어를 vocab에 넣는 것을 의미합니다.
    - special token들도 initial에 vocab에 추가됩니다.
- `limit_alphabet (default: 1000)` : initial tokens의 갯수를 제한합니다.
- `min_frequency (default: 2)` : 최소 빈도수를 의미합니다. 만약 어떤 단어가 1번 나오면 vocab에 추가하지 않습니다.
- `special_tokens` : 특수 토큰을 넣을 수 있습니다.. BERT에는 다음과 같은 토큰이 들어가야 합니다.
    - `[PAD]` : 패딩을 위한 토큰
    - `[UNK]` : OOV 단어를 위한 토큰
    - `[CLS]` : 문장의 시작을 알리고 분류 문제에 사용되는 토큰
    - `[SEP]` : 문장 사이사이를 구별해주는 토큰
    - `[MASK]` : MLM 태스크를 위한 마스크 토큰
- `wordpiece_prefix(default: '##')` : sub-word라는 것을 알려주는 표시입니다.
    - BERT는 기본적으로 '##'을 씁니다.
    - 예를 들어, `SS, ##AF, ##Y` 처럼 sub-word를 구분하기 위해 '##'을 사용합니다.
- `show_progress` : 학습 과정을 보여줍니다.

In [59]:
data_file = 'naver_review.txt'
vocab_size = 30000
min_frequency = 2
initial_alphabet = []
limit_alphabet = 6000
special_tokens = ["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"]
wordpieces_prefix = "##"
show_progress=True

tokenizer.train(
    files = data_file,
    vocab_size = vocab_size,
    min_frequency = min_frequency,
    initial_alphabet = initial_alphabet,
    limit_alphabet = limit_alphabet,
    special_tokens = special_tokens,
    wordpieces_prefix = wordpieces_prefix,
    show_progress = True,
)

vocab = tokenizer.get_vocab()
print("vocab size : ", len(vocab))
print(sorted(vocab, key=lambda x: vocab[x])[:20])

vocab size :  30000
['[PAD]', '[UNK]', '[CLS]', '[SEP]', '[MASK]', '!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/']


### 1.2. 토크나이저를 이용한 토큰 ID 시퀀스 반환

<blockquote>
<b>🧠 토크나이저를 이용한 토큰 ID 시퀀스 반환</b><br>
모델이 토큰을 이해하기 위해서는 정수값으로 반환하는 과정이 필요합니다. 토크나이저를 이용하여 텍스트 토큰을 ID 시퀀스로 변환합니다.
</blockquote>

아래 코드를 실행하여 토크나이저를 이용하여 텍스트 토큰을 ID 시퀀스로 변환합니다.

In [60]:
text = "I'm a student of SSAFY!"

encoded = tokenizer.encode(text)
print('🌱토큰화 결과 :',encoded.tokens)
print('🌱정수 인코딩 :',encoded.ids)
print('🌈디코딩 :',tokenizer.decode(encoded.ids))

🌱토큰화 결과 : ['I', "'", 'm', 'a', 'st', '##ud', '##ent', 'of', 'S', '##S', '##A', '##F', '##Y', '!']
🌱정수 인코딩 : [45, 11, 81, 69, 15444, 24892, 16071, 10280, 55, 3678, 4338, 3859, 4272, 5]
🌈디코딩 : I ' m a student of SSAFY!


<blockquote>
<b>🧠 토크나이저를 이용한 모델 입력 만들기</b><br>
그렇다면 모델의 입력으로 넣기 위해서는 어떤 방식으로 토크나이징을 해야 할까요?
</blockquote>

위에 대한 답변은 앞으로 실습 코드를 진행하면서 나오기 때문에 이 점을 잊지 말고 계속 따라가시기 바랍니다.

### 1.3. 임베딩 벡터

<blockquote>
<b>🧠 토큰 ID에 따라 어떤 방식으로 벡터화가 될까요?</b><br>
토큰 ID에 해당하는 임베딩 벡터를 확인해보겠습니다.
</blockquote>

아래 코드를 실행하여 특정 토큰 ID에 따른 임베딩 벡터를 확인해보겠습니다.

임베딩 벡터는 torch의 nn.Embedding 모듈을 사용하여 생성됩니다. 해당 임베딩 벡터는 모두 임의의 값으로 초기화됩니다.

In [61]:
embedding_vector = nn.Embedding(vocab_size, 768)

임베딩 벡터를 초기화하려고 하니 다음 두가지 파라미터를 반드시 넣으라고 합니다.

1. `num_embeddings`: 임베딩 사전의 크기 (size of the dictionary of embeddings)
2. `embedding_dim`: 각 임베딩 벡터의 차원 (the size of each embedding vector)

<blockquote>
<b>🧠 num_embeddings </b><br>
임베딩 사전의 크기는 무슨 의미일까요?
</blockquote>

여기서 `num_embeddings`는 고유한 토큰(단어, 문자 등)의 총 개수를 의미합니다. 즉, 어떤 `인덱스 → 벡터` 매핑 테이블을 만들 건데, 그 테이블에 몇 개의 항목이 들어가야 하는지를 정의하는 값입니다. tokenizer를 만들때 `vocab_size`와 동일한 값을 의미합니다.

<blockquote>
<b>🧠 embedding_dim </b><br>
각 임베딩 벡터의 차원은 무슨 의미일까요?
</blockquote>

`embedding_dim`은 각 단어(또는 토큰)가 표현되는 벡터의 길이입니다. 즉, 하나의 단어를 어떤 숫자 벡터로 나타낼 때 그 벡터가 몇 차원인지 정하는 값입니다. 보통의 embedding은 `768`, `1024` 등 2의 제곱수 차원을 사용합니다. ("어떤 값이 정답이다" 하는 값이 있는 건 아닙니다.)

여기서는 vocab_size와 embedding_dim을 768로 정의해보겠습니다.

In [62]:
embedding_vector: Tensor2D[VocabSize, EmbeddingSize] = nn.Embedding(vocab_size, 768)
embedding_vector.weight.shape

torch.Size([30000, 768])

그러면 특정 토큰의 임베딩 벡터를 확인해보겠습니다.

In [63]:
token_id = tokenizer.token_to_id("I")
print("token_id:", token_id)
input_id = torch.tensor([token_id], dtype=torch.long)
print("input_id 차원:", input_id.shape)

vector = embedding_vector(input_id)
print("vector 차원:", vector.shape)
print("vector:", vector)

token_id: 45
input_id 차원: torch.Size([1])
vector 차원: torch.Size([1, 768])
vector: tensor([[ 2.0713e-04, -1.0795e-01, -1.0739e+00, -9.6976e-01, -2.9640e-01,
          7.6254e-01, -1.5894e+00,  4.7854e-01, -1.2304e+00,  2.3961e-01,
         -3.1191e-01, -8.4522e-01,  1.4816e+00,  3.8595e-01, -1.1367e+00,
         -8.6317e-01,  1.5040e+00, -9.3336e-02,  2.9453e-01, -4.7926e-02,
         -6.6803e-01, -1.2190e+00, -8.7034e-02, -2.4750e-01,  6.8342e-01,
         -8.3985e-01, -8.5719e-01,  9.4026e-01,  6.8466e-01,  2.1210e+00,
         -6.0650e-01,  4.0380e-01, -9.0871e-01,  6.9575e-01,  2.4129e+00,
         -1.5025e+00, -2.8435e-01,  3.1574e-01,  2.8680e-01, -6.1668e-01,
         -1.9330e+00, -4.4828e-02, -2.4540e-01,  2.1067e+00, -1.0078e+00,
         -7.6053e-02, -8.8781e-01,  1.3249e+00,  6.8391e-01, -9.7641e-01,
         -3.6913e-01,  6.9565e-01, -5.8386e-02, -1.0664e+00, -1.0686e+00,
         -9.5853e-01,  4.2346e-01, -2.3637e-01, -4.4244e-02, -1.9801e-01,
          9.3793e-01,  1.2641e

# 2. RNN/LSTM

- 학습 목표
  1. RNN/LSTM을 이용하여 문장 전체의 정보를 압축한 문맥 벡터에 대한 이해를 할 수 있다.
  2. Encoder Decoder 구조를 통해 문맥 벡터를 이용하여 특정 task를 수행할 수 있다.
- 학습 개념
  1. RNN/LSTM
  2. Encoder/Decoder
- 진행하는 실습 요약
  1. 간단한 RNN/LSTM을 구현한다.
  2. 번역 task와 관련된 encoder decoder 구조를 구현한다.


<blockquote>
<b>🧠 Recurrent Neural Network(RNN)이란? </b><br>
순차적(Sequential) 이전의 정보를 기억하여 현재의 정보를 처리하는 신경망 구조를 의미합니다.
</blockquote>

RNN이 갖는 특징은 다음과 같습니다.

- 입력을 순차적으로 처리합니다.
- RNN은 같은 가중치를 반복적으로 사용합니다.
- 재귀적인 구조를 가집니다.

그러면 이제부터 입력 텍스트를 RNN에 입력으로 넣어서 출력층의 결과값을 받아봅시다!

텍스트를 입력으로 넣기 위해서는 위에서 보았듯, 워드 임베딩으로 변환해야 합니다.
워드 임베딩을 만듭니다.

In [64]:
word_embeddings: Tensor2D[VocabSize, EmbeddingSize] = nn.Embedding(vocab_size, 768)
print("워드 임베딩 차원 :", word_embeddings.weight.shape)

워드 임베딩 차원 : torch.Size([30000, 768])


워드 임베딩 차원에 맞게 RNN을 구현합니다.

In [65]:
input_size: int = word_embeddings.weight.size()[1] # RNN의 input size는 임베딩 벡터의 차원과 일치해야 합니다.
hidden_size: int = 1024  # RNN의 hidden size
num_layers: int = 1  # 쌓을 RNN layer의 개수
bidirectional: bool = False  # 단방향 RNN

rnn = nn.RNN(
    input_size=input_size,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=bidirectional
)

# 초기 hidden state 초기화

hidden_state_shape: int = (num_layers * (2 if bidirectional else 1), hidden_size)

h_0: Tensor2D[Sequence, HiddenStates] = torch.zeros(hidden_state_shape)  # (num_layers * num_dirs, hidden_size)
print("h_0의 차원 :",h_0.shape)

h_0의 차원 : torch.Size([1, 1024])


입력 텍스트 데이터를 토크나이저를 사용하여 토큰화한 후, ids만 꺼냅니다.

In [66]:
text: str = "나는 학교에 간다."

# 토큰화를 진행합니다.
encoded = tokenizer.encode(text)
# 토큰의 ids만 꺼냅니다.
input_ids: List[int] = encoded.ids

# 텐서화를 합니다.
input_ids: Tensor1D[Sequence] = torch.tensor(input_ids, dtype=torch.long)
input_ids

tensor([6228, 7125, 3308, 9046,   18])

변환된 input_ids를 워드 임베딩으로 넣고
워드 임베딩을 RNN의 입력으로 넣어 두 output을 얻습니다.

1. `hidden_states`: 각 time step에 해당하는 hidden state들의 묶음.
2. `h_n`: 모든 sequence를 거치고 나온 마지막 hidden state(`last hidden state`). hidden_states의 마지막과 동일.

In [67]:
input_embeds: Tensor2D[Sequence, EmbeddingSize] = word_embeddings(input_ids)
print("워드 임베딩 차원 : ", input_embeds.shape)  # (vocab_size, embedding_dim)
outputs = rnn(input_embeds, h_0)
hidden_states: Tensor2D[Sequence, HiddenStates] = outputs[0]
h_n: Tensor2D[Layers, HiddenStates] = outputs[1]

# sequence_length: input_token의 길이(length), hidden size: hidden state 차원 수, num_layers: layer 개수, num_dirs: 방향의 개수
print("hidden_states 차원 : ", hidden_states.shape)  # (sequence_length, d_h)
print("h_n 차원 : ", h_n.shape)  # (num_layers * num_dirs, d_h) = (1, d_h)

if torch.equal(hidden_states[-1].unsqueeze(0), h_n):
    print("hidden_states의 마지막과 h_n이 같습니다.")

워드 임베딩 차원 :  torch.Size([5, 768])
hidden_states 차원 :  torch.Size([5, 1024])
h_n 차원 :  torch.Size([1, 1024])
hidden_states의 마지막과 h_n이 같습니다.


그러면 이러한 은닉 상태(hidden state)를 얻어서 어떠한 작업을 할 수 있을까요?

<blockquote>
<b>🧠 은닉 상태(hidden state)는 문장의 정보들을 압축적으로 저장합니다.</b><br>
RNN layer를 통과하면서 문장 전체의 정보를 압축하게 되고 이러한 정보들은 hidden state에 담기게 됩니다. 이러한 hidden state는 문맥 벡터(context vector)로 사용됩니다.
</blockquote>

문맥 벡터(context vector)는 입력 문장의 정보들을 벡터상에 압축하여 저장한 것으로, 이를 통해 다양한 task를 수행할 수 있게 됩니다.

여기서는 번역(translation) task를 수행하기 위해 hidden state를 사용하겠습니다.

번역을 하기 위해서는 last hidden state를 다시 저희의 입력 데이터와 유사한 형태인 텍스트(토큰) id로 변환하는 layer가 필요합니다. 이를 저희는 Decoder라고 부릅니다.

![image](https://raw.githubusercontent.com/Ssunbell/TIL/refs/heads/master/assets/Seq2SeqRNN.png)

그러면 아래에서 Encoder와 Decoder를 연결하여 번역을 수행하는 모델을 구현하겠습니다.

먼저 인코더를 구현하겠습니다. 위에서 구현한 rnn을 그대로 이용하여 클래스화를 진행하는 것과 동일합니다.

In [68]:
from abc import ABC, abstractmethod

# 인코더 모델은 RNN을 사용합니다. 아래는 추상화 클래스입니다.
class Encoder(nn.Module, ABC):
    def __init__(self: "Encoder") -> None:
        super().__init__()
        pass

    @abstractmethod
    def forward(self: "Encoder", input_ids: torch.Tensor) -> torch.Tensor:
        # forward에서 실제로 인코딩을 수행하기 위한 레이어를 쌓습니다.
        pass

class RNNEncoder(Encoder):
    def __init__(
        self: "RNNEncoder",
        vocab_size: int,
        embedding_dim: int,
        hidden_size: int,
        num_layers: int,
        bidirectional: bool,
    ) -> None:
        super().__init__()
        # word embedding layer
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        # rnn layer
        self.rnn = nn.RNN(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
        )

    def forward(
        self: "RNNEncoder",
        input_ids: Tensor1D[Sequence]
    ) -> Tuple[Tensor2D[Sequence, HiddenStates], Tensor2D[Layers, HiddenStates]]:
        """입력 토큰을 워드 임베딩을 통해 임베딩 변환을 합니다."""
        input_embeds = self.word_embeddings(input_ids)

        """RNN을 통해 입력 임베딩을 문맥 벡터(context vector)화 합니다."""
        outputs = self.rnn(input_embeds)
        # TODO: 직접 구현해보세요!
        # hidden_states: Tensor2D[Sequence, HiddenStates] = FIXME
        # h_n: Tensor2D[Layers, HiddenStates] = FIXME

        return hidden_states, h_n

vocab_size = 30000
embedding_dim = 768
hidden_size = 1024  # RNN의 hidden size
num_layers = 1  # 쌓을 RNN layer의 개수
bidirectional = False  # 단방향 RNN

rnn_encoder = RNNEncoder(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=bidirectional
)

outputs = rnn_encoder(input_ids)
hidden_states: Tensor2D[Sequence, HiddenStates] = outputs[0]
h_n: Tensor2D[Layers, HiddenStates] = outputs[1]
print("hidden_states 차원 : ", hidden_states.shape)  # (L, B, d_h)
print("h_n 차원 : ", h_n.shape)  # (num_layers*num_dirs, B, d_h) = (1, B, d_h)


hidden_states 차원 :  torch.Size([5, 1024])
h_n 차원 :  torch.Size([1, 1024])


다음 디코더 부분을 구현하겠습니다.

In [69]:
# 디코더 모델 또한 RNN을 사용합니다.
class Decoder(nn.Module, ABC):
    def __init__(self: "Decoder") -> None:
        super().__init__()

    @abstractmethod
    def forward(self, input_ids: torch.Tensor, init_hidden_state: torch.Tensor) -> torch.Tensor:
        pass

class RNNDecoder(Decoder):
    def __init__(
        self: "RNNDecoder",
        vocab_size: int,
        embedding_dim: int,
        hidden_size: int,
        num_layers: int,
        bidirectional: bool,
        start_token_id: int,
        end_token_id: int,
    ) -> None:
        super().__init__()
        self.start_token_id = start_token_id
        self.end_token_id = end_token_id
        # word embedding layer
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        # rnn layer
        self.rnn = nn.RNN(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
        )
        # fully connected layer
        self.fully_connected_layer = nn.Linear(hidden_size, vocab_size)

    def forward(
        self: "RNNDecoder",
        init_hidden_state: Tensor2D[Layers, HiddenStates],
        max_len: int = 10
    ) -> Tuple[Tensor2D[MaxLength, VocabSize], List[int]]:
        logits: List[Tensor1D[VocabSize]] = []
        input_token: Tensor1D[Token] = torch.tensor([self.start_token_id], dtype=torch.long)
        output_token_ids: List[int] = [input_token.item()] # tensor에서 item()을 사용하여 int로 변환합니다.
        h_n = init_hidden_state # h_n은 encoder의 h_0와 동일한 역할을 합니다.

        for _ in range(max_len):
            if input_token == self.end_token_id:
                # 문장의 종료를 의미하는 special token([SEP])이 나왔다면 추론(생성)을 종료합니다.
                break

            """직전 토큰만 입력으로 넣고 생성한 context vector는 logits에 저장합니다."""
            # TODO: 직접 구현해보세요!
            # embedded: Tensor2D[Token, EmbeddingSize] = FIXME
            # outputs = FIXME
            # h_n: Tensor2D[Layers, HiddenStates] = FIXME
            concat_h_n: Tensor1D[HiddenStates] = h_n.squeeze(0) # 여기서는 layer 갯수가 1이고, bidirectional이 False이므로 squeeze를 사용해도 무방합니다. (원래는 torch.cat으로 h_n을 합치는 작업이 필요합니다.)

            """fully connected layer를 통해 [VocabSize]의 logit을 생성합니다."""
            logit: Tensor1D[VocabSize] = self.fully_connected_layer(concat_h_n)
            logits.append(logit)

            """logit 내에서 가장 높은 점수값을 가진 토큰을 선택합니다."""
            input_token: Tensor1D[Token] = torch.argmax(logit, dim=-1).unsqueeze(0)
            output_token_ids.append(input_token.item())

        """리스트의 logits를 torch의 Tensor로 변경합니다."""
        logits = torch.stack(logits, dim=0)  # [max_len, vocab_size]

        return logits, output_token_ids

start_token_id: int = tokenizer.encode("[CLS]").ids[0]
end_token_id: int = tokenizer.encode("[SEP]").ids[0]

vocab_size: int = 30000
embedding_dim: int = 768
hidden_size: int = 1024  # RNN의 hidden size
num_layers: int = 1  # 쌓을 RNN layer의 개수
bidirectional: bool = False  # 단방향 RNN

rnn_decoder = RNNDecoder(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=bidirectional,
    start_token_id=start_token_id,
    end_token_id=end_token_id,
)
logits, output_token_ids = rnn_decoder(h_n)
output_texts = tokenizer.decode(output_token_ids)
print(output_texts)

##사람의사람의사람의사람의사람의사람의사람의사람의사람의사람의


이제 구현한 encoder와 decoder를 연결하여 seq2seq 모델을 구현해보겠습니다.

In [70]:
class RNNSeq2Seq(nn.Module):
    def __init__(self: "RNNSeq2Seq", encoder: nn.Module, decoder: nn.Module) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self: "RNNSeq2Seq", input_ids: Tensor1D[Sequence]):
        hidden_states, context_vector = self.encoder(input_ids) # encoder에서 생성한 context_vector(h_n)을 decoder layer로 전달
        logits, output_tokens = self.decoder(context_vector)

        return logits, output_tokens

seq2seq = RNNSeq2Seq(rnn_encoder, rnn_decoder)
logits, output_tokens = seq2seq(input_ids)
output_token_ids = logits.argmax(dim=-1)
output_texts = tokenizer.decode(output_token_ids.tolist())
print(output_texts)

##사람의사람의사람의사람의사람의사람의사람의사람의사람의사람의


<blockquote>
<b>🤔 결과값이 이상해요</b><br>
데이터로 충분히 학습을 하지 않아서 그렇습니다. 여기서는 모델의 구조에 대해서 집중하고 추후에 모델을 학습하는 과정을 경험해보겠습니다.
</blockquote>

저희는 Sequence to Sequence(Encoder - Decoder) 구조를 이용하여 텍스트를 생성해보았습니다.

Seq2Seq 구조 내에서 실제 워드 임베딩을 컨텍스트 벡터로 변환하고, 그 변환된 컨텍스트 벡터를 텍스트(토큰)으로 변환하는 과정에서 쓰인 모델은 RNN이였습니다.

RNN뿐만 아니라 LSTM, 어텐션 등을 사용하여 Seq2Seq 구조를 구현할 수 있습니다.

전체적인 큰 틀은 그대로 유지한 채, RNN 모듈만 바꿔주기만 하면 됩니다.

그러면 이제부터 LSTM으로 다시 한번 구현해보겠습니다.

RNN과 LSTM의 가장 큰 차이점은 LSTM에는 cell state가 추가된다는 점입니다.

장기 기억을 담당하는 cell state를 통해 좀더 성능을 높일 수 있습니다.

<blockquote>
<b>🧠 Key point!</b><br>
모델의 아키텍쳐마다 모델의 입출력이 달라집니다. 모델의 입력과 출력이 어떻게 나오는지에 대해서 이해하는 것이 중요합니다.
</blockquote>

그러면 Encoder에서 LSTM을 적용해보겠습니다.

In [71]:
class LSTMEncoder(Encoder):
    def __init__(
        self,
        vocab_size: int,
        embedding_dim: int,
        hidden_size: int,
        num_layers: int,
        bidirectional: bool,
    ) -> None:
        super().__init__()
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
        )

    def forward(
        self: "LSTMEncoder",
        input_ids: Tensor1D[Sequence]
    )-> Tuple[
        Tensor2D[Sequence, HiddenStates], # hidden states
        Tuple[
            Tensor2D[Layers, HiddenStates], # h_n
            Tensor2D[Layers, HiddenStates] # c_n
        ]
    ]:
        # Embed -> same leading dims + embedding_dim
        input_embeds = self.word_embeddings(input_ids)  # [S,B,E] or [B,S,E]
        outputs = self.lstm(input_embeds)   # outputs: [S,B,D*H] or [B,S,D*H]
        # TODO: 직접 구현해보세요!
        hidden_states: Tensor2D[Sequence, HiddenStates] = outputs[0]
        h_n: Tensor2D[Layers, HiddenStates] = outputs[1][0]
        c_n: Tensor2D[Layers, HiddenStates] = outputs[1][1]

        return hidden_states, (h_n, c_n)

vocab_size = 30000
embedding_dim = 768
hidden_size = 1024  # RNN의 hidden size
num_layers = 1  # 쌓을 RNN layer의 개수
bidirectional = False  # 단방향 RNN

lstm_encoder = LSTMEncoder(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=bidirectional
)

outputs = lstm_encoder(input_ids)
hidden_states: Tensor2D[Sequence, HiddenStates] = outputs[0]
h_n: Tensor2D[Layers, HiddenStates] = outputs[1][0]
c_n: Tensor2D[Layers, HiddenStates] = outputs[1][1]
print("hidden_states 차원 : ", hidden_states.shape)  # (L, B, d_h)
print("h_n 차원 : ", h_n.shape)  # (num_layers*num_dirs, B, d_h) = (1, d_h)
print("c_n 차원 : ", c_n.shape)  # (num_layers*num_dirs, B, d_h) = (1, d_h)

hidden_states 차원 :  torch.Size([5, 1024])
h_n 차원 :  torch.Size([1, 1024])
c_n 차원 :  torch.Size([1, 1024])


이번에는 LSTM을 사용하여 Decoder Layer를 구현해보겠습니다.

In [72]:
class LSTMDecoder(Decoder):
    def __init__(
        self: "LSTMDecoder",
        vocab_size: int,
        embedding_dim: int,
        hidden_size: int,
        num_layers: int,
        bidirectional: bool,
        start_token_id: int,
        end_token_id: int,
    ) -> None:
        super().__init__()
        self.start_token_id = start_token_id
        self.end_token_id = end_token_id
        # word embedding layer
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        # rnn layer
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
        )
        # fully connected layer
        self.fully_connected_layer = nn.Linear(hidden_size, vocab_size)

    def forward(
        self: "LSTMDecoder",
        init_hidden_state: Tensor2D[Layers, HiddenStates],
        init_cell_state: Tensor2D[Layers, HiddenStates],
        max_len: int = 10
    ) -> Tuple[Tensor2D[MaxLength, VocabSize], List[int]]:
        logits: List[Tensor1D[VocabSize]] = []
        input_token: Tensor1D[Token] = torch.tensor([self.start_token_id], dtype=torch.long)
        output_token_ids: List[int] = [input_token.item()] # tensor에서 item()을 사용하여 int로 변환합니다.
        h_n = init_hidden_state # h_n은 encoder의 h_0와 동일한 역할을 합니다.
        c_n = init_cell_state

        for _ in range(max_len):
            if input_token == self.end_token_id:
                # 문장의 종료를 의미하는 special token([SEP])이 나왔다면 추론(생성)을 종료합니다.
                break

            """직전 토큰만 입력으로 넣고 생성한 context vector는 logits에 저장합니다."""
            embedded: Tensor2D[Token, EmbeddingSize] = self.word_embeddings(input_token)  # 직전 입력 토큰만 사용 [1, embedding_dim]
            outputs = self.lstm(embedded, (h_n, c_n))   # outputs: [S,B,D*H] or [B,S,D*H]
            h_n: Tensor2D[Layers, HiddenStates] = outputs[1][0]
            c_n: Tensor2D[Layers, HiddenStates] = outputs[1][1]

            concat_h_n: Tensor1D[HiddenStates] = h_n.squeeze(0) # 여기서는 layer 갯수가 1이고, bidirectional이 False이므로 squeeze를 사용해도 무방합니다. (원래는 torch.cat으로 h_n을 합치는 작업이 필요합니다.)

            """fully connected layer를 통해 [VocabSize]의 logit을 생성합니다."""
            logit: Tensor1D[VocabSize] = self.fully_connected_layer(concat_h_n)
            logits.append(logit)

            """가장 높은 점수값을 가진 토큰을 선택합니다."""
            input_token: Tensor1D[Token] = torch.argmax(logit, dim=-1).unsqueeze(0)
            output_token_ids.append(input_token.item())

        """리스트의 logits를 torch의 Tensor로 변경합니다."""
        logits = torch.stack(logits, dim=0)  # [max_len, vocab_size]

        return logits, output_token_ids


start_token_id: int = tokenizer.encode("[CLS]").ids[0]
end_token_id: int = tokenizer.encode("[SEP]").ids[0]

vocab_size: int = 30000
embedding_dim: int = 768
hidden_size: int = 1024 # RNN의 hidden size
num_layers: int = 1 # 쌓을 RNN layer의 개수
bidirectional: bool = False # 단방향 RNN

lstm_decoder = LSTMDecoder(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=bidirectional,
    start_token_id=start_token_id,
    end_token_id=end_token_id,
)

logits, output_tokens = lstm_decoder(h_n, c_n)
output_token_ids = logits.argmax(dim=-1)
output_texts = tokenizer.decode(output_token_ids.tolist())
print(output_texts)

##스피스피 이영화보고 좃 정신줄 고정관념 추모34귯 연출로


Encoder와 Decoder를 사용하여 Seq2Seq 모델을 구현해보겠습니다.

In [73]:
class LSTMSeq2Seq(nn.Module):
    def __init__(self: "LSTMSeq2Seq", encoder: nn.Module, decoder: nn.Module) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self: "LSTMSeq2Seq", input_ids: Tensor1D[Sequence]):
        hidden_states, (context_vector, cell_states) = self.encoder(input_ids) # encoder에서 생성한 context_vector(h_n)을 decoder layer로 전달
        logits, output_tokens = self.decoder(context_vector, cell_states)

        return logits, output_tokens

seq2seq = LSTMSeq2Seq(lstm_encoder, lstm_decoder)
logits, output_tokens = seq2seq(input_ids)
output_token_ids = logits.argmax(dim=-1)
output_texts = tokenizer.decode(output_token_ids.tolist())
print(output_texts)

##스피스피 이영화보고 좃 정신줄 고정관념 추모34귯 연출로


# 3. Attention Mechanism

- 학습 목표
  1. Luong Attention(Dot Attention)을 구현할 수 있다.
  2. Attention을 이용하여 Decoder를 구현할 수 있다.
- 학습 개념
  1. Luong Attention
- 진행하는 실습 요약
  1. Luong Attention을 구현한다.
  2. Seq2Seq 구조에 들어갈 Decoder를 구현한다.


이번에는 Attention을 사용한 seq2seq 모델을 구현해보겠습니다.

<blockquote>
<b>🧠 Attention Mechanism</b><br>
현재 구현할 seq2seq 모델에서의 Attention은 최근 사용하는 attention은 아닙니다. 최근의 Transformers 모델들은 Multi-Head Scaled Dot-Product Attention을 사용합니다. 해당 내용은 과제에서 다룰 예정입니다.
</blockquote>

1. 전체적인 Seq2Seq 모델의 구조는 동일합니다.
2. Encoder에서 context vector를 얻을 때, LSTM을 사용하는 Encoder 모듈을 그대로 사용합니다.
3. Decoder에서 output token을 생성할 때, attention mechanism을 추가합니다.

그러면 우선 Dot Attention(Luong attention)을 먼저 구현합니다.

In [74]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LuongAttention(nn.Module):
    def __init__(self: "LuongAttention", hidden_size: int):
        super().__init__()
        self.W_a = nn.Linear(hidden_size, hidden_size, bias=False)

    @torch.no_grad()  # 학습 시 제거하세요
    def forward(
        self:"LuongAttention",
        h_t: Tensor1D[HiddenStates],
        encoder_outputs: Tensor2D[Sequence, HiddenStates],
    ) -> Tuple[Tensor1D[HiddenStates], Tensor1D[Sequence]]:
        """hidden state를 W_a에 projection하여 Wa_ht를 구합니다."""
        Wa_ht: Tensor1D[HiddenStates] = self.W_a(h_t)

        """encoder_outputs와 Wa_ht를 내적하여 attention score를 구합니다."""
        # TODO: 직접 구현해보세요!
        attention_score: Tensor1D[Sequence] = torch.matmul(encoder_outputs, Wa_ht)

        """attention score를 softmax layer에 통과시켜 attention weights(attention distribution)을 구합니다."""
        # TODO: 직접 구현해보세요!
        attention_weights: Tensor1D[Sequence] = F.softmax(attention_score, dim=-1)

        """각 encoder의 attention weights와 encoder의 hidden state를 내적하여 context vector(attention value)를 구합니다."""
        # TODO: 직접 구현해보세요!
        context_vector: Tensor1D[HiddenStates] = torch.matmul(attention_weights.unsqueeze(0), encoder_outputs).squeeze(0)

        return context_vector, attention_weights

<blockquote>
<b>🤔 엇 여기서도 context vector가 나오네요?</b><br>
네 그렇습니다. 과거에는 encoder의 마지막 hidden state(h_n)을 context vector라고 불렀습니다. 하지만, attention이 나오면서 context vector는 각 디코딩 시점마다 인코더의 모든 hidden states에 대한 어텐션 가중합이라고 생각해주시면 됩니다.
</blockquote>

구현한 attention mechanism을 이용하여 Decoder layer에 적용합니다.

In [75]:
class AttentionDecoder(nn.Module):
    def __init__(
        self: "AttentionDecoder",
        vocab_size: int,
        embedding_dim: int,
        hidden_size: int,
        num_layers: int,
        bidirectional: bool,
        start_token_id: int,
        end_token_id: int,
    ):
        super().__init__()
        self.start_token_id = start_token_id
        self.end_token_id = end_token_id
        # word embedding layer
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        # rnn layer
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            bidirectional=bidirectional,
        )

        """attention을 추가합니다."""
        self.attn = LuongAttention(hidden_size)
        """context vector을 입력으로 받는 trainable weights"""
        self.W_c = nn.Linear(hidden_size * 2, hidden_size)
        # fully connected layer
        self.fully_connected_layer = nn.Linear(hidden_size, vocab_size)

    @torch.no_grad()  # 학습 시 제거
    def forward(
        self:"AttentionDecoder",
        init_hidden_state: Tensor1D[HiddenStates],
        init_cell_state: Tensor1D[HiddenStates],
        encoder_outputs: Tensor2D[Sequence, HiddenStates],
        max_len: int = 10,
    ):
        logits: List[Tensor1D[VocabSize]] = []
        input_token: Tensor1D[Token] = torch.tensor([self.start_token_id], dtype=torch.long)
        output_token_ids: List[int] = [input_token.item()] # tensor에서 item()을 사용하여 int로 변환합니다.
        h_n = init_hidden_state # h_n은 encoder의 h_0와 동일한 역할을 합니다.
        c_n = init_cell_state

        for _ in range(max_len):
            if input_token == self.end_token_id:
                # 문장의 종료를 의미하는 special token([SEP])이 나왔다면 추론(생성)을 종료합니다.
                break

            """직전 토큰만 입력으로 넣고 생성한 context vector는 logits에 저장합니다."""
            embedded: Tensor2D[Token, EmbeddingSize] = self.word_embeddings(input_token)  # 직전 입력 토큰만 사용 [1, embedding_dim]
            outputs = self.lstm(embedded, (h_n, c_n))   # outputs: [S,B,D*H] or [B,S,D*H]
            h_n: Tensor2D[Layers, HiddenStates] = outputs[1][0]
            c_n: Tensor2D[Layers, HiddenStates] = outputs[1][1]

            concat_h_n: Tensor1D[HiddenStates] = h_n.squeeze(0) # 여기서는 layer 갯수가 1이고, bidirectional이 False이므로 squeeze를 사용해도 무방합니다. (원래는 torch.cat으로 h_n을 합치는 작업이 필요합니다.)

            # 어텐션
            context_vector, attention_weights = self.attn(concat_h_n, encoder_outputs)

            """h_n(은닉 상태)와 context_vector를 연결합니다. (Concatenate)"""
            v_t: Tensor1D[HiddenStates * 2] = torch.cat([concat_h_n, context_vector], dim=-1)

            """v_t를 trainable weights를 통과시키고 tanh를 적용합니다."""
            # TODO: 직접 구현해보세요!
            attentional_hidden_state: Tensor1D[HiddenStates] = torch.tanh(self.W_c(v_t))

            """fully connected layer를 통해 [VocabSize]의 logit을 생성합니다."""
            logit: Tensor1D[VocabSize] = self.fully_connected_layer(attentional_hidden_state)
            logits.append(logit)

            """가장 높은 점수값을 가진 토큰을 선택합니다."""
            input_token: Tensor1D[Token] = torch.argmax(logit, dim=-1).unsqueeze(0)
            output_token_ids.append(input_token.item())

        logits = torch.stack(logits, dim=0) if logits else torch.empty(0, self.fully_connected_layer.out_features)

        return logits, output_token_ids

start_token_id: int = tokenizer.encode("[CLS]").ids[0]
end_token_id: int = tokenizer.encode("[SEP]").ids[0]

vocab_size: int = 30000
embedding_dim: int = 768
hidden_size: int = 1024 # RNN의 hidden size
num_layers: int = 1 # 쌓을 RNN layer의 개수
bidirectional: bool = False # 단방향 RNN

attention_decoder = AttentionDecoder(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    bidirectional=bidirectional,
    start_token_id=start_token_id,
    end_token_id=end_token_id,
)

logits, output_tokens = attention_decoder(h_n, c_n, hidden_states)
output_token_ids = logits.argmax(dim=-1)
output_texts = tokenizer.decode(output_token_ids.tolist())
print(output_texts)

##빨로 뮁 날리고 北 섬 녀석들구성슈 황홀 렀


Decoder layer를 구현했으니 이제 Seq2Seq 모델에 적용해봅니다.

In [76]:
class AttentionSeq2Seq(nn.Module):
    def __init__(self: "AttentionSeq2Seq", encoder: nn.Module, decoder: nn.Module) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self: "AttentionSeq2Seq", input_ids: Tensor1D[Sequence]):
        hidden_states, (last_hidden_state, cell_states) = self.encoder(input_ids) # encoder에서 생성한 h_n을 decoder layer로 전달
        logits, output_tokens = self.decoder(last_hidden_state, cell_states, hidden_states)

        return logits, output_tokens

seq2seq = AttentionSeq2Seq(lstm_encoder, attention_decoder)
logits, output_tokens = seq2seq(input_ids)
output_token_ids = logits.argmax(dim=-1)
output_texts = tokenizer.decode(output_token_ids.tolist())
print(output_texts)

##빨로 뮁 날리고 北 섬 녀석들구성슈 황홀 렀


# 4. Huggingface 라이브러리 활용

- 학습 목표
  1. huggingface 라이브러리를 이용하여 기학습된 모델을 불러올 수 있다.
  2. 기학습된 모델을 이용하여 추론을 할 수 있다.
- 학습 개념
  1. huggingface
- 진행하는 실습 요약
  1. HuggingFace Hub에서 한국어-영어 번역을 위해 사전학습된 모델과 토크나이저를 불러오는 코드(from_pretrained)를 완성
  2. 불러온 토크나이저로 입력 문장을 인코딩하고, model.generate() 함수를 사용해 번역 결과를 생성하는 코드를 완성
  3. 과제 2에서 사용한 번역 모델이 실제로 인코더와 디코더를 모두 가지고 있는지 코드로 확인

huggingface는 글로벌 최대 AI 모델 오픈소스 커뮤니티입니다. 과거에는 자연어처리 모델만 있었지만, 최근에는 비전, 로봇 등 다양한 오픈소스 모델들을 지원합니다.

여기서 Seq2Seq 아키텍쳐 구조에서 미리 학습한 모델을 불러와서 추론을 해보겠습니다.

아래 코드를 실행하여 모델과 토크나이저를 불러옵니다.

In [77]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

model_name = "Helsinki-NLP/opus-mt-ko-en"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)



불러온 모델이 Encoder와 Decoder 모듈을 가지고 있는지 확인하는 2가지 방법이 있습니다.

1. `print(model)`을 사용하여 모델의 구조를 확인합니다. 시각적으로 잘 정돈된 모델 구조를 확인할 수 있습니다.
2. `model.named_parameters()`를 사용하여 실제 클래스를 확인할 수 있습니다.

In [78]:
print(model)

MarianMTModel(
  (model): MarianModel(
    (shared): Embedding(65001, 512, padding_idx=65000)
    (encoder): MarianEncoder(
      (embed_tokens): Embedding(65001, 512, padding_idx=65000)
      (embed_positions): MarianSinusoidalPositionalEmbedding(512, 512)
      (layers): ModuleList(
        (0-5): 6 x MarianEncoderLayer(
          (self_attn): MarianAttention(
            (k_proj): Linear(in_features=512, out_features=512, bias=True)
            (v_proj): Linear(in_features=512, out_features=512, bias=True)
            (q_proj): Linear(in_features=512, out_features=512, bias=True)
            (out_proj): Linear(in_features=512, out_features=512, bias=True)
          )
          (self_attn_layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (activation_fn): SiLU()
          (fc1): Linear(in_features=512, out_features=2048, bias=True)
          (fc2): Linear(in_features=2048, out_features=512, bias=True)
          (final_layer_norm): LayerNorm((512,), eps=1e-05

In [79]:
for name, param in model.named_parameters():
    print(name)

model.shared.weight
model.encoder.embed_positions.weight
model.encoder.layers.0.self_attn.k_proj.weight
model.encoder.layers.0.self_attn.k_proj.bias
model.encoder.layers.0.self_attn.v_proj.weight
model.encoder.layers.0.self_attn.v_proj.bias
model.encoder.layers.0.self_attn.q_proj.weight
model.encoder.layers.0.self_attn.q_proj.bias
model.encoder.layers.0.self_attn.out_proj.weight
model.encoder.layers.0.self_attn.out_proj.bias
model.encoder.layers.0.self_attn_layer_norm.weight
model.encoder.layers.0.self_attn_layer_norm.bias
model.encoder.layers.0.fc1.weight
model.encoder.layers.0.fc1.bias
model.encoder.layers.0.fc2.weight
model.encoder.layers.0.fc2.bias
model.encoder.layers.0.final_layer_norm.weight
model.encoder.layers.0.final_layer_norm.bias
model.encoder.layers.1.self_attn.k_proj.weight
model.encoder.layers.1.self_attn.k_proj.bias
model.encoder.layers.1.self_attn.v_proj.weight
model.encoder.layers.1.self_attn.v_proj.bias
model.encoder.layers.1.self_attn.q_proj.weight
model.encoder.la

이미 학습된 모델을 통해 추론을 진행합니다.
위의 실습에서 추론했던 것과는 다르게 학습된 모델이므로 성능이 더 높게 나타납니다.

In [80]:
text = "나는 학교에 간다."
"""여기서는 batch로 입력을 처리하여 차원이 [seq_len]이 아닌 [batch_size, seq_len]입니다. 여기서는 입력이 한개이므로 [1, seq_len]입니다."""
encoded = tokenizer(text, return_tensors="pt")

generated_ids = model.generate(
    **encoded,
    max_new_tokens=64,
)

translation = tokenizer.decode(generated_ids.squeeze(), skip_special_tokens=True)
print("SRC:", text)
print("MT :", translation)

SRC: 나는 학교에 간다.
MT : I'm going to school.


# 5. 아키텍처별 모델 다뤄보기(Encoder model, Decoder model)

- 학습 목표
  1. huggingface 라이브러리를 이용하여 다양한 모델 구조의 모델을 다룰 수 있다.
- 학습 개념
  1. huggingface
- 학습 내용
  1. 문맥을 양방향으로 이해하는 데 강점이 있는 BERT 모델을 사용하여 문장의 빈칸([MASK])에 가장 적절한 단어를 추론
  2. 이전 텍스트를 바탕으로 다음 텍스트를 생성하는 데 특화된 GPT-2 모델을 사용하여 이야기의 뒷부분을 창작

지금까지는 Seq2Seq(Encoder - Decoder) 모델 구조를 다뤘습니다. 하지만, 현재 가장 많이 사용되는 모델은 Only Decoder 모델입니다.

1. Only Encoder 모델 : BERT 같은 모델. RAG등 문서 검색에 주로 사용
2. Only Decoder 모델 : Chat-GPT 같은 모델. 대화, 번역, 챗봇 등 현재 가장 많이 사용
3. Encoder - Decoder 모델 : 최근에는 잘 사용하지 않음

그러면 Only Encoder 모델과 Only Decoder 모델을 이용해 모델 추론을 해보겠습니다.

Encoder의 대표 모델인 BERT 모델을 불러옵니다.

In [81]:
from transformers import AutoTokenizer, AutoModelForMaskedLM

model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForMaskedLM.from_pretrained(model_name)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


BERT 모델을 이용하여 빈칸 맞추기(Masked Language Modeling)를 추론해봅니다.

예를 들어, I [MASK] to school. 이라는 문장에서 [MASK]에 들어갈 단어를 맞춘다고 하면 I go to school. 이 문장이 정답이 됩니다.

하지만, I went to school도 정답이 될 수 있습니다.

이처럼 [MASK]에 들어갈 단어는 여러가지가 될 수 있고, 모델의 학습에 따라 어떤 단어가 [MASK]에 들어갈지 결정됩니다.

이러한 특성을 이용하여 BERT 모델을 이용하여 빈칸 맞추기(`[MASK]`)를 추론해봅니다.

In [82]:
# 4. 우리가 맞출 문장 만들기. tokenizer.mask_token = "[MASK]" 이 부분이 빈칸이 됨
sentence = f"I {tokenizer.mask_token} to school."

top_k = 5  # 상위 5개 후보 단어를 보고 싶다

# 5. 문장을 숫자로 바꿔서 BERT가 읽을 수 있게 준비
encoded = tokenizer(sentence, return_tensors="pt", return_attention_mask=True)

# 6. 숫자로 된 문장 정보에서 '입력 토큰 ID' 꺼내기
input_ids = encoded.input_ids

# 7. [MASK]의 숫자 아이디 가져오기
mask_token_id = tokenizer.mask_token_id

# 8. 문장에서 [MASK]가 있는 위치(인덱스) 찾기 mask_positions는 (배치 번호, 문장 속 위치) 형태로 저장됨
# TODO: 직접 구현해보세요!
mask_positions = (input_ids == mask_token_id).nonzero(as_tuple=True)

# 9. BERT 모델에 문장(숫자형태)을 넣어서 예측 결과(logits) 얻기
outputs = model(**encoded)

# 10. logits: 각 단어 위치마다 '다음 단어일 가능성'을 모든 단어 사전 크기만큼 기록한 값
logits = outputs.logits.squeeze(0)  # (seq_len, vocab_size)

# 11. 모든 [MASK] 위치에 대해 예측하기
all_token_candidates: List[List[Tuple[str, float]]] = []
# Iterate through the mask positions correctly
for batch_index, pos in zip(mask_positions[0], mask_positions[1]):
    pos = pos.item()  # 위치 숫자 꺼내기
    logits_at_pos = logits[pos]  # 해당 위치의 예측 점수
    probs = torch.softmax(logits_at_pos, dim=-1)  # 점수를 확률로 변환
    topk = torch.topk(probs, k=top_k)  # 확률이 높은 상위 5개 선택

    ids = topk.indices.tolist()   # 단어 ID
    scores = topk.values.tolist() # 확률 값

    # 단어 ID를 실제 단어(토큰)로 변환
    tokens = [tokenizer.convert_ids_to_tokens(tid) for tid in ids]

    # (단어, 확률) 형태로 묶어서 저장
    candidates = list(zip(tokens, scores))
    all_token_candidates.append(candidates)

# 12. [MASK]에 들어갈 단어로 완성된 문장들을 저장할 리스트
restored_sentences: List[str] = []

# 13. 첫 번째 [MASK] 위치의 후보 단어들
token_candidates: List[Tuple[str, float]] = all_token_candidates[0]

# 14. 후보 단어들을 하나씩 넣어서 문장을 만들어 보기
for tok, _ in token_candidates:
    new_ids = input_ids.clone()  # 원래 문장의 숫자 복사
    tok_id = tokenizer.convert_tokens_to_ids(tok)  # 후보 단어를 숫자로 변환
    # Use the correct index for the position
    new_ids[0, mask_positions[0][0]] = tok_id      # [MASK] 위치에 후보 단어 ID 넣기
    text = tokenizer.decode(new_ids[0], skip_special_tokens=True)  # 다시 글자로 변환
    restored_sentences.append(text.strip())  # 앞뒤 공백 제거 후 저장

# 15. 결과 출력
print("원본 문장:", sentence)
print("BERT가 예측한 문장들:")
for idx, sent in enumerate(restored_sentences, start=1):
    print("{}순위: {}".format(idx, sent))

원본 문장: I [MASK] to school.
BERT가 예측한 문장들:
1순위: went I to school.
2순위: go I to school.
3순위: walked I to school.
4순위: ran I to school.
5순위: got I to school.


Only Decoder 모델의 대표인 GPT 모델을 이용하여 추론을 해보겠습니다.

GPT-2 모델을 불러옵니다.

In [83]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

GPT-2 모델은 입력으로 토큰화된 텍스트를 받고, 그 뒤에 올 단어들을 예측(Next token Prediction)하는 것이 목표입니다.

아래 코드를 이용하여 스토리(입력 텍스트)의 뒷 내용을 생성해보겠습니다.

In [84]:
prompt = "Once upon a time in a small village, a curious child found a mysterious key."
inputs = tokenizer(prompt, return_tensors="pt")

with torch.no_grad():
    generated_ids = model.generate(
        **inputs,
        max_new_tokens=64,
    )

output_tokens = tokenizer.decode(generated_ids.squeeze(), skip_special_tokens=True)
print(output_tokens)

Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Once upon a time in a small village, a curious child found a mysterious key. The child was a boy named Kiyoshi. He was a boy who had been born with a strange, mysterious, and mysterious voice. He was a boy who had been born with a strange, mysterious, and mysterious voice. He was a boy who had been born with a strange, mysterious, and mysterious voice.
