In [None]:
# 12/5(금) 9:20

# Encoder–Decoder 구조

- Encoder–Decoder 구조는 어떤 형태의 입력 시퀀스를 받아 **의미를 해석**한 뒤, 새로운 **출력 시퀀스를 생성**해야 하는 거의 모든 AI 문제를 해결하는 딥러닝 모델 구조다.
- 이 구조는 Encoder와 Decoder 두개의 딥러닝 모델을 연결한 구조로 **입력 데이터를 하나의 표현으로 압축한 뒤, 이를 다시 출력 데이터로 변환하는 방식**으로 동작한다.

- **Encoder Network**
  - 입력 데이터를 해석(이해)하는 역할을 수행.
  - 입력 시퀀스에 담긴 의미적 정보를 하나의 고정된 벡터 형태로 요약.

- **Decoder Network**
  - Encoder가 생성한 요약 정보를 바탕으로 최종 출력을 생성.
  - 즉, Encoder의 “이해 결과”를 이용해 새로운 시퀀스를 만들어낸다.

## Seq2Seq (Sequence-to-Sequence)

Seq2Seq 모델: **Encoder–Decoder 구조를 RNN(Recurrent Neural Network) 계열에 적용한 대표적인 시퀀스 변환 모델**.  
입력과 출력이 모두 “시퀀스(sequence)” 형태라는 점에서 *Sequence-to-Sequence*라는 이름이 붙었다.

### Encoder의 역할: 입력 시퀀스 이해 및 Context Vector 생성

Encoder는 입력으로 들어온 **전체 시퀀스**(sequence)를 순차적으로 처리한 뒤,  그 의미를 **하나의 고정 길이 벡터**(Vector)로 압축하여 출력한다.  
이 벡터를 **Context Vector**(컨텍스트 벡터)라고 한다.
- **Context Vector란?**  
  - 입력 시퀀스 전체의 의미, 문맥, 핵심 정보를 요약해 담고 있는 벡터 표현이다.
  - **기계 번역**(Machine Translation)의 경우  
    - 번역할 원문 문장에서 **번역 결과를 생성하는 데 필요한 핵심 의미 정보**(feature)
  - **챗봇**(Chatbot)의 경우  
    - 사용자가 입력한 질문에서 **적절한 답변을 생성하는 데 필요한 의미 정보**(feature)

### Decoder의 역할: Context Vector를 바탕으로 출력 시퀀스 생성

Decoder는 Encoder가 출력한 **Context Vector를 입력으로 받아**, 이를 바탕으로 **목표 출력 시퀀스**를 한 토큰(token)씩 순차적으로 생성.

- **기계 번역**(Machine Translation)의 경우  
  - 입력 문장의 의미를 반영한 **번역 문장** 생성.
- **챗봇**(Chatbot)  
  - 질문에 대한 **자연스러운 답변 문장** 생성.

Decoder는 매 시점(time step)마다
  - 이전에 생성한 단어
  - 그리고 Context Vector에 담긴 입력 문맥
을 함께 고려해 다음 단어를 예측.


![seq2seq](figures/seq2seq.png)

# Seq2Seq 를 이용한 Chatbot 모델 구현
- Encoder를 이용해 질문의 특성을 추출하고 Decoder를 이용해 답변을 생성한다.

# Chatbot Dataset

- https://github.com/songys/Chatbot_data
- columns
    - Q: 질문
    - A: 답
    - label: 일상다반사 0, 이별(부정) 1, 사랑(긍정) 2
- **Download**

![dataset](figures/chatbot.png)

# Chatbot Dataset Loading 및 확인

## 데이터셋 다운로드 및 확인

In [None]:
import requests
import os

url = "https://raw.githubusercontent.com/songys/Chatbot_data/refs/heads/master/ChatbotData.csv"
res = requests.get(url)
if res.status_code == 200:
    with open("data/chatbot_data.csv", "wt", encoding="utf-8") as fw:
        fw.write(res.text)
else:
    print("다운 실패:", res.status_code)

In [None]:
import pandas as pd

df = pd.read_csv("data/chatbot_data.csv", encoding="utf-8")
df.shape

(11823, 3)

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11823 entries, 0 to 11822
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Q       11823 non-null  object
 1   A       11823 non-null  object
 2   label   11823 non-null  int64 
dtypes: int64(1), object(2)
memory usage: 277.2+ KB


In [None]:
df.drop(columns='label', inplace=True)

In [None]:
df.head(10)

Unnamed: 0,Q,A
0,12시 땡!,하루가 또 가네요.
1,1지망 학교 떨어졌어,위로해 드립니다.
2,3박4일 놀러가고 싶다,여행은 언제나 좋죠.
3,3박4일 정도 놀러가고 싶다,여행은 언제나 좋죠.
4,PPL 심하네,눈살이 찌푸려지죠.
5,SD카드 망가졌어,다시 새로 사는 게 마음 편해요.
6,SD카드 안돼,다시 새로 사는 게 마음 편해요.
7,SNS 맞팔 왜 안하지ㅠㅠ,잘 모르고 있을 수도 있어요.
8,SNS 시간낭비인 거 아는데 매일 하는 중,시간을 정하고 해보세요.
9,SNS 시간낭비인데 자꾸 보게됨,시간을 정하고 해보세요.


In [None]:
df.tail(10)

Unnamed: 0,Q,A
11813,회사에 좋아하는 남자가 생겼어 어떡하지?,사랑하기 힘든 관계인가봐요.
11814,회사에서 어떤 사람이랑 자꾸 눈 마추쳐.,눈 마주치는 게 우연인지 잘 살펴 보세요.
11815,회식 중이라고 하는데 연락이 안돼.,정신 없이 바쁠지도 몰라요. 조금만 더 기다려보고 물어보는게 좋을 것 같아요.
11816,회식하는데 나만 챙겨줘. 썸임?,호감이 있을 수도 있어요. 그렇지만 조금 더 상황을 지켜보세요.
11817,후회 없이 사랑하고 싶어,진심으로 다가가 보세요.
11818,훔쳐보는 것도 눈치 보임.,티가 나니까 눈치가 보이는 거죠!
11819,훔쳐보는 것도 눈치 보임.,훔쳐보는 거 티나나봐요.
11820,흑기사 해주는 짝남.,설렜겠어요.
11821,힘든 연애 좋은 연애라는게 무슨 차이일까?,잘 헤어질 수 있는 사이 여부인 거 같아요.
11822,힘들어서 결혼할까봐,도피성 결혼은 하지 않길 바라요.


In [None]:
df.isnull().sum()

Q    0
A    0
dtype: int64

# Dataset, DataLoader 정의

## Tokenization

### Subword방식

In [None]:
# 토큰화를 위해서 문장을 q + a 형식으로 만든다. 
# 어휘사전을 만들 때 Q와 A에 있는 모든 단어들이 다 들어가게 하기 위해.
question_texts = df['Q']
answer_texts = df['A']

all_texts = list(question_texts+" "+answer_texts)  # series + 문자열 + series (원소 단위 연산)

In [None]:
all_texts[:10]

['12시 땡! 하루가 또 가네요.',
 '1지망 학교 떨어졌어 위로해 드립니다.',
 '3박4일 놀러가고 싶다 여행은 언제나 좋죠.',
 '3박4일 정도 놀러가고 싶다 여행은 언제나 좋죠.',
 'PPL 심하네 눈살이 찌푸려지죠.',
 'SD카드 망가졌어 다시 새로 사는 게 마음 편해요.',
 'SD카드 안돼 다시 새로 사는 게 마음 편해요.',
 'SNS 맞팔 왜 안하지ㅠㅠ 잘 모르고 있을 수도 있어요.',
 'SNS 시간낭비인 거 아는데 매일 하는 중 시간을 정하고 해보세요.',
 'SNS 시간낭비인데 자꾸 보게됨 시간을 정하고 해보세요.']

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.trainers import BpeTrainer

tokenizer = Tokenizer(
    BPE(unk_token="<unk>")
)
tokenizer.pre_tokenizer = Whitespace()
trainer = BpeTrainer(
    vocab_size=10_000,    # 최대 어휘 수
    min_frequency=5,      # 어휘사전에 등록할 단어의 최소 빈도 수 (5회 이상은 나와야 등록)
    continuing_subword_prefix='##',    # 연결 subword 앞에 붙일 접두어를 ##로 지정. cowork: co + ##work
    special_tokens=["<pad>", "<unk>", "<sos>"]   # <sos>는 문장의 시작을 의미하는 특수 토큰
)
tokenizer.train_from_iterator(all_texts, trainer=trainer)






In [None]:
print("총 어휘 수:", tokenizer.get_vocab_size())

총 어휘 수: 7040


In [None]:
encode = tokenizer.encode("오늘 날씨가 너무 좋습니다. 이런 날씨에 뭘 하면 좋을까요?")

In [None]:
encode.tokens

['오늘', '날씨가', '너무', '좋습니다', '.', '이런', '날씨', '##에', '뭘', '하면', '좋을까요', '?']

In [None]:
encode.ids

[2290, 3852, 2258, 5913, 8, 2752, 2841, 1256, 527, 2530, 5533, 20]

In [None]:
tokenizer.id_to_token(1500)  # id로 토큰 문자열 조회
tokenizer.token_to_id("하면") # 토큰 문자열로 id (정수)를 조회

2530

### Tokenizer 저장

In [None]:
tokenizer.save("saved_models/chatbot_bpe.json")

In [None]:
load_tokenizer = Tokenizer.from_file("saved_models/chatbot_bpe.json")

## Dataset, DataLoader 정의


### Dataset 정의 및 생성
- 모든 문장의 토큰 수는 동일하게 맞춰준다.
    - DataLoader는 batch 를 구성할 때 batch에 포함되는 데이터들의 shape이 같아야 한다. 그래야 하나로 묶을 수 있다.
    - 문장의 최대 길이를 정해주고 **최대 길이보다 짧은 문장은 `<PAD>` 토큰을 추가**하고 **최대길이보다 긴 문장은 최대 길이에 맞춰 짤라준다.**

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split

device = "cuda" if torch.cuda.is_available() else "cpu"
# device = "mps"  # 맥 M1 이상 쓰는 사람들 

In [None]:
class ChatbotDataset(Dataset):

    def __init__(self, question_texts, answer_texts, max_length, tokenizer):
        # __init__ 함수는 ChatbotDataset 객체를 만들 때 가장 먼저 실행. 필요한 모든 준비물 생성. 
        """
        Args:
            question_texts (list[str]): 질문 text 리스트. ["질문1", "질문2", ..]
            answer_texts (list[str]): 답변 text 리스트. ["답변1", "답변2", ...]
            max_length (int): 개별 문장의 최대 토큰 수
            tokenizer (Tokenzier): 위에서 훈련시킨 토크나이저 도구 (텍스트 -> 숫자 ID 변환)
        """
        self.max_length = max_length
        self.tokenizer = tokenizer
        # "질문" -> Tensor(토큰 id)
        self.question_texts = [self.__process_sequence(q) for q in question_texts]
        self.answer_texts = [self.__process_sequence(a) for a in answer_texts]

    def __pad_token_sequence(self, token_sequence):
        """
        token_sequence를 self.max_length 길이에 맞추는 메소드.
        max_length보다 적으면 <pad>를 추가, 크면 잘라낸다.
        Args:
            token_sequence (list[int]): 한 문장의 토큰 id 리스트. [2334, 7100, 257, ..]
        Returns:
            list[int]: 길이를 max_length에 맞춘 토큰 id 리스트
        """
        pad_token = self.tokenizer.token_to_id('<pad>')
        seq_length = len(token_sequence)
        if seq_length > self.max_length: #잘라내기
            result = token_sequence[:self.max_length]
        else: # <pad> 추가 (padding 처리)
            result = token_sequence + [pad_token] * (self.max_length - seq_length)
    
        return result
    
    def __process_sequence(self, text):
        # 텍스트를 숫자로 변환하고 길이 맞추기 
        # 하나의 텍스트(str)를 받아서 모델이 이해할 수 있는 숫자 형태의 텐서로 변환하는 메인 처리 과정.
        """
        한 문장(text-str)을 받아서 token화 한 뒤 max_length에 개수를 맞춰서 반환.
        max_length에 맞추는 작업은 __pad_token_sequence() 를 이용
        Args:
            text (str): 토큰화할 문장
        Returns:
            torch.Tensor[int64]: 토큰화한 토큰 id 리스트
        """
        encode = self.tokenizer.encode(text)
        token_ids = encode.ids # "나는 학생이다." -> [4020, 1003, 3932]
        # [4020, 1003, 3932] -> [4020, 1003, 3932, 0, 0, 0 ] 패딩 처리.
        return torch.tensor(self.__pad_token_sequence(token_ids), dtype=torch.int64)
        #torch.tensor(...): 파이토치 모델이 학습할 수 있는 자료형인 텐서(torch.Tensor)로 최종 변환하여 반환

    def __len__(self):    # 총 몇 쌍의 질문-답변 데이터가 있는지 반환. 
        return len(self.question_texts)

    def __getitem__(self, index):
        """
        index의 (question, answer) 쌍을 반환.
        Args:
            index (int) : 몇번 질문-답변 쌍인지 index
        Return:
            tuple[Tensor(int64), Tensor(int64)]
        """
        q = self.question_texts[index]
        a = self.answer_texts[index]
        return q, a

In [None]:
# max_length. 가장 긴 문장의 토큰 수 
max([len(tokenizer.encode(sent).ids) for sent in question_texts]) # 결과: (제일 긴 토큰 수) 21.

21

In [None]:
max([len(tokenizer.encode(sent).ids) for sent in answer_texts]) # 결과: 29. 제일 긴 토큰 수 29.

29

In [None]:
####### 블로그 여기서부터 ########

In [None]:
max_length = 29   # 단순히 함수나 메소드의 입력값 (매개변수)이거나 내부에서만 생성되고 사용되는 지역변수 (local variable)
# self.max_length는 인스턴스 변수. 클래스로 만들어진 객체 (메소드)에 영구적으로 소속되는 변수
#    - 객체의 모든 메소드 (__init__, __pad_token_sequence, __process_sequence에서 self.접두사를 통해 접근하고 사용 가능.
dataset = ChatbotDataset(
    list(question_texts),
    list(answer_texts),
    max_length,
    tokenizer
)

In [None]:
len(dataset)

11823

In [None]:
dataset[0]

(tensor([  10, 1464, 1294,  368,    3,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0]),
 tensor([6119,  378,   47, 2252,    8,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
            0,    0,    0,    0,    0]))

### Trainset / Testset 나누기
train : test = 0.95 : 0.05

In [None]:
train_size = int(len(dataset) * 0.95)
test_size = len(dataset) - train_size
train_size, test_size

(11231, 592)

In [None]:
train_set, test_set = random_split(dataset, [train_size, test_size])

In [None]:
type(train_set), type(test_set)

(torch.utils.data.dataset.Subset, torch.utils.data.dataset.Subset)

In [None]:
len(train_set), len(test_set)

(11231, 592)

### DataLoader 생성

In [None]:
train_loader = DataLoader(train_set, batch_size=64, shuffle=True, drop_last=True)
test_loader = DataLoader(test_set, batch_size=64)

In [None]:
len(train_loader), len(test_loader)

(175, 10)

# 모델 정의

## Seq2Seq 모델 정의
- Seq2Seq 모델은 Encoder와 Decoder의 입력 Sequence의 길이와 순서가 자유롭기 때문에 챗봇이나 번역에 이상적인 구조다.
    - 단일 RNN은 각 timestep 마다 입력과 출력이 있기 때문에 입/출력 sequence의 개수가 같아야 한다.
    - 챗봇의 질문/답변이나 번역의 대상/결과 문장의 경우는 사용하는 어절 수가 다른 경우가 많기 때문에 단일 RNN 모델은 좋은 성능을 내기 어렵다.
    - Seq2Seq는 **입력처리(질문,번역대상)처리 RNN과 출력 처리(답변, 번역결과) RNN 이 각각 만들고 그 둘을 연결한 형태로 길이가 다르더라도 상관없다.**

## Encoder
Encoder는 하나의 Vector를 생성하며 그 Vector는 **입력 문장의 의미**를 N 차원 공간 저장하고 있다. 이 Vector를 **Context Vector** 라고 한다.    
![encoder](figures/seq2seq_encoder.png)

In [None]:
a = nn.Embedding(100, 5, padding_idx=2)
a.weight

Parameter containing:
tensor([[-0.3878, -0.2407, -0.7084, -0.5376,  0.4590],
        [ 0.7614, -2.3023,  0.4863,  1.4747,  0.1179],
        [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
        [-0.4192, -0.8022, -0.6512,  0.6792,  0.4380],
        [-1.4338, -0.0480, -0.2202,  0.3836,  0.1374],
        [ 0.7502, -0.1611,  0.9635, -1.0538, -0.3368],
        [-0.8508,  1.4465,  0.1885,  0.6573,  0.7363],
        [ 0.9614, -0.0447, -0.9787, -1.4607,  0.1769],
        [-0.2596,  0.1841, -0.7803,  0.1845, -0.0351],
        [-0.1488, -1.3017,  0.3861, -0.0372,  0.6094],
        [-1.6507, -0.6879,  1.9730, -0.0060, -0.1055],
        [ 0.9414,  1.1964,  0.4580, -1.1133,  1.9118],
        [-1.4245, -0.2467, -0.7848, -0.0780, -1.2669],
        [-0.7771, -0.0900, -1.1654, -1.5151,  0.9880],
        [-0.4361, -0.0627, -0.7369, -0.2241, -0.3267],
        [-0.7403,  0.6048, -0.8557,  0.1613,  2.4707],
        [ 0.2431,  0.5341,  0.3026,  2.4493,  0.7642],
        [ 0.0070,  0.7105, -1.4244,  1.1010

In [None]:
class Encoder(nn.Module):

    def __init__(
            self,
            vocab_size: int,   # 총 어휘 수
            embedding_dim: int,   # Embedding Vector의 차원
            hidden_size: int, # GRU의 hidden 개수
            bidirectional: bool = True,   # GRU의 양방향 여부
            num_layers: int = 1,   # GRU의 layer stack 수 
            dropout: float = 0.2   # dropout의 비율
        ):
            super().__init__()
            self.vocab_size = vocab_size
            # X -> (Embedding Model) -> (GRU) -> Context Vector -> (Decoder)
            # Encoder의 목적은 (질문의) Context Vector를 추출하는 것이 목적. 
            self.embedding = nn.Embedding(
                 vocab_size, 
                 embedding_dim,  # (vocab_size X embedding_dim)
                 padding_idx=0
            )

            self.gru = nn.GRU(
                 input_size=embedding_dim,
                 hidden_size=hidden_size,
                 num_layers=num_layers,
                 bidirectional=bidirectional,
                 dropout=dropout if num_layers > 1 else 0.0
            )

    def forward(self, X):
        # X.shape [batch, seq_length]
        embedding_vector = self.embedding(X)  # [batch, seq_length]
        embedding_vector = embedding_vector.transpose(1, 0) # [seq_length, batch, emb_dim]
        out, hidden = self.gru(embedding_vector)  
        # out: 모든 time step의 hidden_state, hidden: 마지막 time step의 hidden_state
        return out, hidden

In [None]:
train_set[0][0]

tensor([ 203, 5872, 3585, 2291,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0])

In [None]:
from torchinfo import summary
dummy_input = torch.randint(10, size=(64, 20), dtype=torch.int64)
dummy_input

tensor([[1, 5, 6,  ..., 6, 2, 8],
        [1, 4, 9,  ..., 3, 0, 7],
        [0, 6, 3,  ..., 4, 9, 8],
        ...,
        [8, 8, 8,  ..., 6, 2, 5],
        [0, 2, 7,  ..., 0, 8, 1],
        [5, 7, 8,  ..., 9, 4, 8]])

In [None]:
summary(Encoder(1000, 100, 20), input_data=dummy_input)

Layer (type:depth-idx)                   Output Shape              Param #
Encoder                                  [20, 64, 40]              --
├─Embedding: 1-1                         [64, 20, 100]             100,000
├─GRU: 1-2                               [20, 64, 40]              14,640
Total params: 114,640
Trainable params: 114,640
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 25.14
Input size (MB): 0.01
Forward/backward pass size (MB): 1.43
Params size (MB): 0.46
Estimated Total Size (MB): 1.90

## Decoder
- Encoder의 출력(context vector)를 받아서 번역 결과 sequence를 출력한다.
- Decoder는 매 time step의 입력으로 **이전 time step에서 예상한 단어와 hidden state값이** 입력된다.
- Decoder의 처리결과 hidden state를 Estimator(Linear+Softmax)로 입력하여 **입력 단어에 대한 번역 단어가 출력된다.** (이 출력단어가 다음 step의 입력이 된다.)
    - Decoder의 첫 time step 입력은 문장의 시작을 의미하는 <SOS>(start of string) 토큰이고 hidden state는 context vector(encoder 마지막 hidden state) 이다.

![decoder](figures/seq2seq_decoder.png)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split

In [None]:
class Decoder(nn.Module):

    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers=1, bidirectional=False, # Decoder는 생성. 뒤에 토큰들을 알지 못하기 때문에 단방향 처리. 
                          dropout=0.2
                         ):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

        self.gru = nn.GRU(
            embedding_dim, 
            hidden_size, 
            num_layers=num_layers,
            dropout= dropout if num_layers > 1 else 0.0
        )

        self.classifier = nn.Linear(
            hidden_size,   # 입력 - gru의 마지막 hidden state값.
            vocab_size     # 출력 - 다중분류: 어휘사전의 단어들 중 다음 단어 한 개를 찾는 다중분류.
        )
        
    def forward(self, X, hidden):
        """
        X: 한 개 토큰. shape: [batch] []
        hidden: 이전 처리 hidden state. 첫 번째 time step: Encoder의 context vector
                [seq_length: 1, batch, hidden_size]
        """
        # [batch] -> [batch, 1] 1: seq_length
        X = X.unsqueeze(1)
        embedding_vector = self.embedding(X)   # 입력: int64, [batch, seq_length(1)]
        # [batch, seq_length, embedding_dim] -> [seq_length, batch, embedding_dim]
        embedding_vector = embedding_vector.transpose(1, 0)

        #[seq_length(1), batch, embedding_dim -> out, hidden
        # out (모든 time step의 hidden state 모음): [seq_length(1), batch_size, hidden_size]
        # hidden (마지막 time step의 hidden state): [num_layers, batch_size, hidden_size]
        out, hidden = self.gru(embedding_vector, hidden)

        # Linear (분류기) 넣어서 다음 단어를 예측
        last_out = self.classifier(out[-1])

        # last_out: 다음 단어일 확률, [batch, vocab_size]
        # hidden: 다음 단어를 예측할 때 넣어줄 context vector (hidden state)
        return last_out, hidden

In [None]:
from torchinfo import summary

In [None]:
dummy_input = torch.ones([64], dtype=torch.int64)   # 64: batch
dummy_hidden = torch.ones((1, 64, 200), dtype=torch.float32)
# hidden shape: [1-gru layer 개수, 64: batch, 200: hidden_size]
# gru layer 개수: num_layers * 2 if bidirectional else 1

dummy_decoder = Decoder(10000, 200, 256, num_layers=1)
summary(dummy_decoder, input_data=(dummy_input, dummy_hidden))

RuntimeError: Failed to run torchinfo. See above stack traces for more details. Executed layers up to: [Embedding: 1]

In [None]:
next_word, hidden = dummy_decoder(dummy_input, dummy_hidden)

In [None]:
print(next_word.shape)
print(hidden.shape)

torch.Size([64, 10000])
torch.Size([1, 64, 256])


In [None]:
next_word.max(dim=-1).indices

tensor([7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274,
        7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274,
        7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274,
        7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274,
        7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274, 7274,
        7274, 7274, 7274, 7274])

## Seq2Seq 모델

- Encoder - Decoder 를 Layer로 가지며 Encoder로 질문의 feature를 추출하고 Decoder로 답변을 생성.

### Teacher Forcing
- **Teacher forcing** 기법은, RNN계열 모델이 다음 단어를 예측할 때, 이전 timestep에서 예측된 단어를 입력으로 사용하는 대신 **실제 정답 단어(ground truth) 단어를** 입력으로 사용하는 방법.
    - 모델은 이전 시점의 출력 단어를 다음 시점의 입력으로 사용한다. 그러나 모델이 학습할 때 초반에는 정답과 많이 다른 단어가 생성되어 엉뚱한 입력이 들어가 학습이 빠르게 되지 않는 문제가 있다.
- **장점**
    - **수렴 속도 증가**: 정답 단어를 사용하기 때문에 모델이 더 빨리 학습 가능.
    - **안정적인 학습**: 초기 학습 단계에서 모델의 예측이 불안정할 때, 잘못된 예측으로 인한 오류가 다음 단계로 전파되는 것을 막아줍니다.
- **단점**
    - **노출 편향(Exposure Bias) 문제:** 실제 예측 시에는 정답을 제공할 수 없으므로 모델은 전 단계의 출력값을 기반으로 예측해 나가야 한다. 학습 과정과 추론과정의 이러한 차이 때문에 모델의 성능이 떨어질 수있다.
        - 이런 문제를 해결하기 학습 할 때 **Teacher forcing을 random하게 적용해 학습시킨다.**
![seq2seq](figures/seq2seq.png)

In [None]:
####### colab 에서 실행하기 ###########

In [None]:
import random
SOS_TOKEN = tokenizer.token_to_id("<sos>")

class Seq2Seq(nn.Module):

    def __init__(self, encoder, decoder, device):
        super.__init__()
        self.encoder = encoder.to(device)
        self.decoder = decoder.to(device)
        self.device = device

    def forward(self, inputs, outputs, teacher_forcing_rate=0.99):
        """
        Args: 
            inputs: 질문 (batch, seq_length)
            outputs: 답변(정답) (batch, seq_length) -> teacher forcing 때 사용.
            teacher_forcing_rate: teacher forcing은 random하게 적용. 적용될 확률. 
        """
        # 질문과 답변이 1차원일 경우 2차원으로 reshape
        # if [batch] -> [batch(1), seq_length]
        if inputs.dim() ==1:
            inputs = inputs.unsqueeze(0)
        if outputs.dim() == 1:
            outputs = outputs.unsqueeze(0)

        batch_size, output_length = outputs.shape   # output_length: output의 max_length. 답변 문장의 토큰 수를 여기에 맞출 것. 
        output_vocab_size = self.encoder.vocab_size


        ##############################################################
        # 생성된 문장을 저장할 tensor 생성 (모델이 생성한 예측 문장)
        # (seq_length, batch_size, vocab_size)
        # [나는, 학생, 이다.]
        # [
        # vocab_size의 각 단어가 "나는"일 확률
        # vocab_size의 각 단어가 "학생"일 확률
        # vocab_size의 각 단어가 "이다"일 확률
        # ]
        ##################################

        predicted_outputs = torch.zeros(output_length, batch_size, output_vocab_size).to(self.device)

        ##############################################################
        # 추론
        # 1. encoder를 이용해서 context vector 추출 (한번에 처리)
        # 2. decoder를 이용해서 답변 문장을 생성 (개별 토큰 별로 생성) 반복.
        ##############################################################
        # encode를 이용해 context vector 추출
        
        encoder_out, _ = self.encoder(inputs)   # encoder_out: 전체 hidden state 모음

        # context vector == encoder_out[-1] => Decoder의 첫 번째 time step의 hidden으로 입력.
        decoder_hidden = encoder_out[-1].unsqueeze(0)
        # decoder에 입력할 첫 번째 time step값: <sos> 토큰 ID
        decoder_input = torch.full([batch_size], fill_value=SOS_TOKEN, device=self.device)

        ####################################
        # Decoder를 이용해서 한 단어 (토큰)씩 생성
        ####################################
        for t in range(output_length):
            decoder_out, decoder_hidden = self.decoder(decoder_input, decoder_hidden)

            predicted_outputs = decoder_out  # t번째 예측 단어.

            # 다음 time step의 input을 생성 (decoder_input 값을 생성)
            # teacher forcing 적용 -> t번째 정답 토큰 
            #                 적용 안 함 -> Decoder가 생성한 decoder_out에서 token id값.
            # teacher_forcing_rate 비율로 teacher forcing을 적용
            teacher_forcing = teacher_forcing_rate > random.random()
            teacher_forcing_rate *= 0.99  # 반복하면서 teacher forcing 적용 확률을 줄여나간다. 

            top1 = decoder_out.argmax(dim=-1)     # 다음 단어일 확률이 가장 높은 단어의 토큰 ID
            decoder_input = outputs[:, t] if teacher_forcing else top1


        return predicted_outputs.transpose(1, 0)   # [seq_length <-> batch, vocab_size]

In [None]:
# tokenizer.encode("안녕하세요. 반가워요.").ids
# b, o = [64, 29]
# b, o

(64, 29)

In [None]:
# a = torch.tensor([[1, 2], [3, 4]])
# a.shape
# a[-1].unsqueeze(0).shape

torch.Size([1, 2])

In [None]:
# random.random()  # 0~1 사이의 실수 반환. 모든 실수는 같은 확률로 나온다. 

# 학습

## 모델생성

In [None]:
# hyper parameter 정의
vocab_size = tokenizer.get_vocab_size()
encoder_bidirectional = True  # encoder는 양방향
encoder_hidden_size = 200

# encoder의 hidden(context vector)과 decoder hidden을 맞춰준다.
decoder_hidden_size = encoder_hidden_size * 2 if encoder_bidirectional else encoder_hidden_size

embedding_dim = 256
teacher_forcing_rate = 0.9

In [None]:
# 모델 생성 
encoder = Encoder(
    vocab_size = vocab_size,
    embedding_dim = embedding_dim,
    hidden_size = encoder_hidden_size,
    num_layers = 1,
    bidirectional=encoder_bidirectional
)
decoder = Decoder(
    vocab_size = vocab_size,
    embedding_dim = embedding_dim,
    hidden_size = decoder_hidden_size,
    num_layers = 1
)
seq2seq = Seq2Seq(encoder, decoder, device)

NameError: name 'hidden_size' is not defined

In [None]:
d_input = torch.zeros((64, 30), dtype=torch.int64)

summary(seq2seq, input_data=(d_input, d_input))

## loss함수, optimizer

In [None]:
lr = 0.001
model = seq2seq(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.)

## train/evaluation 함수 정의

### train 함수정의

In [None]:
def train(model, dataloader, optimizer, loss_fn, device, teacher_forcing_rate=0.9):
    model.train()
    train_loss=0.0

    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        pred = model(X, y, teacher_forcing_rate)  # seq2seq
        # pred: [batch, seq_length, vocab_size]
        # pred: [batch, seq_length, vocab_size]
        y_hat = pred.reshape(-1, pred.shape[2])  # loss 계산을 위해서
        y = y.reshape(-1)   # [batch, seq_length] -> [batch * seq_length]
        # CrossEntropyLoss입력: 정답 - [batch,], 추론: [batch, class 개수 - vocab_size]
        loss = loss_fn(y_hat, y)
        loss.backward()   # gradient 계산.
        optimizer.step()  # update
        optimizer.sero_grad()
        train_loss += loss.item()

    return train_loss / len (dataloader)

### Test 함수

In [None]:
@torch.no_grad()
def eval(model, dataloader, loss_fn, device):
    model.eval()
    eval_loss = 0.0

    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        pred = model (X, y, teacher_forcing_rate=0.0)  # teacher_forcing 적용하면 안 됨
        y_hat = pred.reshape(-1, pred.shape[-1])
        y = y.reshape(-1)
        eval_loss += loss_fn(y_hat, y).item()

    return eval_loss / len(dataloader)

### Training

In [None]:
epochs = 10
model_save_path = "saved_models/chatbot_seq2seq.pth"
# 가장 validation loss가 좋은 모델을 저장.
best_loss = torch.inf

for epoch in range(epochs):
    train_loss = train(model, train_loader, optimizer, loss_fn, device, teacher_forcing_rate)
    eval_loss = eval(model, test_loader, loss_fn, device)

    if best_loss > eval_loss:  # 성능 개선
        torch.save(model, model_save_path)
        print(epoch+1, "에서 저장 -------------")
        best_loss = eval_loss

    print(epoch+1, train_loss, eval_loss)

In [None]:
## 저장 모델 Load
device = "cuda" if torch.cuda.is_available() else "cpu"
# map_location=device : 다른 device에 학습/저장한 모델을 읽어올 때 현재 device를 지정해서  현재 device에 맞춰 load하도록한다.
best_model = torch.load(model_save_path, weights_only=False, map_location=device)
best_model.device = device

# 결과확인

- Sampler:
    -  DataLoader가 Datatset의 값들을 읽어서 batch를 만들때 index 순서를 정해주는 객체.
    -  DataLoader의 기본 sampler는 SequentialSampler 이다. shuffle=True 일경우 RandomSampler: 랜덤한 순서로 제공.

In [None]:
# ds = Dataset(...)

# d_loader = DataLoader(ds, 200, shuffle=True)
# index = [0~999]
# 1. new_idx = shuffle(index) [920, 1, 87, 234, ...] 0~199, 200~399

In [None]:
def handle_special_tokens(decoded_string):
    """
    Subword 처리
    subword는 단어의 시작으로 쓰인 것과 중간 부분(연결)에 사용된 두가지 subword가 있다.  연결 subword는 `#`과 같은 특수문자로 시작 한다.
    tokenizer.decode() 결과 문자열은 subword의 특수문자('##')을 처리하지 않는다. 이것을 처리하는 함수
    ex) "이 기회 ##는 내 ##꺼 ##야" ==> "이 기회는 내꺼야"
    
    Parameter
        decoded_string: str - Tokenizer가 decode한 중간 subword의 특수문자 처리가 안된 문자열. 
    Return
        str: subword 특수문자 처리한 문자열
    """
    
    tokens = decoded_string.split() # 공백기준으로 토큰화.
    new_tokens = []
    for token in tokens:
        if token.startswith("##"): # 연결 토큰
            if new_tokens: # len(new_tokens) != 0 원소가 하나라도 있으면
                # 토큰에서 ##을 제거하고 리스트의 마지막 원소(문자열) 뒤에 붙인다.
                new_tokens[-1] += token[2:]
            else: # new_tokens가 빈 리스트. 현재 token이 첫번째 단어. ##을 지우고 append
                new_tokens.append(token[2:])
        else: # 단어의 시작인 토큰. (##이 없는 토큰) -> list에 추가.
            new_tokens.append(' '+token)
        
    return "".join(new_tokens) 


In [None]:
from torch.utils.data import SubsetRandomSampler
# sampler는 “Dataset에서 어떤 순서로 index를 뽑을지 결정하는 객체”이다. DataLoader는 sampler가 제공하는 index를 이용해 Dataset에서 데이터를 추출한다.
## shuffle=True 이면 RandomSampler가 사용된다. False이면 SequentialSampler가 사용된다.


#  dataset에서 일부 데이터들을 가지고 확인
def random_evaluation(model, dataset, device, n=10):
    """
    Dataset에서 일부 질문-답변 쌍들을 가져다 모델에 질문을 넣어 추론한 결과와 함께 확인.
    Parameter
        model: 학습된 seq2seq 모델
        dataset: 질문-답변 쌍울 추출할 dataset
        device
        n: int - 추출할 질문-답변 쌍 개수 default: 10
    """
    ## 평가할 데이터셋을 만들기
    n_samples = len(dataset)       # Dataset의 총 데이터개수
    # index = list(range(n_samples)) # Dataset의 index만들기.  [0, 1, 2, ...., dataset_length]
    # np.random.shuffle(index)       # 값들을 랜덤하게 섞어준다. [100, 23, 590, 10, ...] 
    # sample_index = index[ : n]     # 평가할 데이터 개수만큼 index 생성.


    sample_index = torch.randint(0, n_samples, size=[n])
    
    # Dataloader 생성
    # SubsetRandomSampler: 지정한 index들 안에서 random한 순서로 제공.
    # sample_index=[1, 20, 4, 5, 100], dataset에서 [1, 20, 4, 5, 100] index의 값만 추출
    sampler = SubsetRandomSampler(sample_index)
    sample_loader = DataLoader(dataset, batch_size=n, sampler=sampler)
    
    ## 추론 후 확인
    model.to(device)
    model.eval()
    with torch.no_grad():
        for X, y in sample_loader:
            X, y = X.to(device), y.to(device)
            output = model(X, y, 0.0) # [batch, seq_len, vocab_size]

            # torch.Tensor -> ndarray (tokenizer decode에 넣기 위해.)
            ## tensor를 cpu로 이동후 변환가능.
            ### tensor가 grad를 가지고 있으면(계산그래프에 포함돼 있으면)
            ####                               -> tensor.detach().cpu().ndarray()

            pred = output.cpu().numpy()  # X.to("cpu") # 모델추정 답변
            X = X.cpu().numpy()   # 정답-질문
            y = y.cpu().numpy()   # 정답-답변 (batch, seq_len, vocab)

            for i in range(n):
                q = handle_special_tokens(tokenizer.decode(X[i]))
                a = handle_special_tokens(tokenizer.decode(y[i]))
                p = handle_special_tokens(tokenizer.decode(pred[i].argmax(-1)))
                print(f"질문: {q}")
                print(f"정답: {a}")
                print(f"예측: {p}")
                print('==================================================')

In [None]:
random_evaluation(model, train_set, device)

# 학습모델을 이용한 대화

In [None]:
class ChatbotInputDataset(Dataset):
    """
    질문만 받아서 생성하는 Dataset
    - 새로운 데이터 추론용.
    """

    def __init__(self, question_texts, max_length, tokenizer):
        """
        parameter
            question_texts: list[str] - 질문 texts 목록. 리스트에 질문들을 담아서 받는다. ["질문1", "질문2", ...]
            max_length: 개별 문장의 token 개수. 모든 문장의 토큰수를 max_length에 맞춘다.
            tokenizer: Tokenizer
        """
        self.max_length = max_length
        self.tokenizer = tokenizer
        self.question_texts = [self.__process_sequence(q) for q in question_texts]
    
    def __pad_token_sequence(self, token_sequence): 
        """
        max_length 길이에 맞춰 token_id 리스트를 구성한다.
        max_length 보다 길면 뒤에를 자르고 max_length 보다 짧으면 [PAD] 토큰을 추가한다.
        
        Parameter
            token_sentence: list[int] - 길이를 맞출 한 문장 token_id 목록
        Return
            list[int] - length가 max_length인 token_id 목록
        """
        pad_token = self.tokenizer.token_to_id('<pad>')
        seq_len = len(token_sequence) # 입력 문장의 토큰수
        if seq_len > self.max_length: # 문장 최대 토큰수 보다 길다면.
            return token_sequence[:self.max_length]
        else:
            return token_sequence + ([pad_token] * (self.max_length - seq_len))
    
    def __process_sequence(self, text): 
        """
        한 문장(str)을 받아서 padding이 추가된 token_id 리스트로 변환 후 반환
        Parameter
            text: str - token_id 리스트로 변환할 한 문장
        Return
            list[int] - 입력받은 문장에 대한 token_id 리스트
        """
        # encoding
        encode = self.tokenizer.encode(text) # "........" => [. , . , .]
        # max_length 크기에 맞춘다.
        token_ids = self.__pad_token_sequence(encode.ids) #[3400, 20, 6, 0, 0, 0 ..]
        return token_ids
    
    def __len__(self):
        return len(self.question_texts)

    
    def __getitem__(self, index):
        # 질문만 반환.
        q = self.question_texts[index]  # List
        
        # List->LongTensor. nn.Embedding()의 입력(정수타입)으로 들어간다. 
        return torch.tensor(q, dtype=torch.int64) 
        

In [None]:
input_data = [
    "난 가족들과 주말에 여행갈 거야.", 
    "와! 내일 주말이다.",
    "너무 피곤하네요.",
    "지금 몇시에요?",
    "여자 친구와 데이트 약속했어."    
]
input_dataset = ChatbotInputDataset(input_data, max_length, tokenizer)

In [None]:
def predict(dataset, model, device):
    model.eval()
    model.to(device)
    with torch.no_grad():
        for X in dataset:  # Dataset에서 한 질문씩을 조회
            X = X.to(device)
            output = model(X.unsqueeze(0), X.unsqueeze(0), 0.0)
            pred = output.cpu().numpy()
            X = X.cpu().numpy()
            q = handle_special_tokens(tokenizer.decode(X))
            a = handle_special_tokens(tokenizer.decode(pred[0].argmax(-1)))
            print(f"질문: {q}")
            print(f"예상답: {a}")
            print("=========================================================")

In [None]:
predict(input_dataset, model, device)