In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence

# Sequence-to-Sequence (seq2seq)

기존 모델의 한계: 입력과 출력이 고정된 차원의 벡터로 인코딩될 수 있는 문제에만 적용 가능<br>
-> 음성 인식과 기계 번역 및 질문 응답 등은 sequence문제이며, sequence를 sequence로 mapping할 수 있는 도메인 독립적 방법 요구됨

In [2]:
class Seq2Seq(nn.Module):
    def __init__(
            self,
            input_dim,
            latent_dim,
            vocab_size,
            n_layers: int = 4,
            teacher_forcing_ratio: float = 0.5,
    ):
        super().__init__()
        self.input_dim = input_dim                                  # 입력 특징 차원
        self.latent_dim = latent_dim                                # 잠재 공간(은닉 상태)의 차원        
        self.vocab_size = vocab_size                                # 어휘 크기(출력 차원)
        self.n_layers = n_layers                                    # 인코더의 레이어 수
        self.teacher_forcing_ratio = teacher_forcing_ratio          # 교사 강요 사용 확률

        # 인코더: 입력 시퀀스를 처리하는 LSTM 네트워크
        self.encoder = nn.LSTM(
            self.input_dim, 
            self.latent_dim,
            num_layers=self.n_layers,
            batch_first=True,                       # 입력과 출력 텐서가 (배치, 시퀀스, 특징) 형태로 제공됨
        )

        # 디코더: 출력 시퀀스를 생성하는 일련의 LSTM 레이어
        self.decoder1 = nn.LSTM(                    
            self.latent_dim, 
            128,
            batch_first=True,
            num_layers=4
        )
        self.decoder2 = nn.LSTM(
            128,
            128,
            batch_first=True,
            )
        self.decoder3 = nn.LSTM(
            128,
            128,
            batch_first=True,
            )
        self.decoder4 = nn.LSTM(
            128,
            1000,                                   # 최종 레이어의 출력 차원
            batch_first=True,
        )

    def forward(self, source, target):
        # 소스 시퀀스를 인코딩하여 잠재 벡터를 얻음
        output, latent_vector = self.encoder(source)
        # 디코더의 초기 입력을 0으로 초기화
        x = torch.zeros_like(output)[:, :1, :]
        # 첫 번째 디코더 레이어를 통과
        output, (h_n, c_n) = self.decoder1(x, latent_vector)
        # 첫 번째 예측 토큰을 얻음
        next_token = output[:, 0, :].argmax(axis = -1)
        outputs = [next_token]                      # 모든 예측 토큰을 저장할 리스트

        for t in range(1, target.shape[1]):
            # 교사 강요 사요이 실제 타겟을 다음 입력으로 사용
            if np.random.random() > self.teacher_forcing_ratio:
                output = target[:, t, :]
            # 후속 디코더 레이어를 통과
            output, (h_n, c_n) = self.decoder(output, (h_n, c_n))

            next_token = output[:, 0, :].argmax(axis = -1)
            outputs.append(next_token)
        
        return outputs

**Encoder class**

1. Embedding layer:
- 입력 시퀀스의 각 단어를 고정된 크기의 벡터로 변환한다.<br>이는 단어를 수치적으로 표현하여 모델이 처리할 수 있도록 한다.<br>
- 입력 데이터를 LSTM 레이어에 전달하기 전에 필수적인 전처리 단계

2. LSTM layer:
- 시퀀스 데이터를 처리하여 각 타임스텝의 출력과 마지막 hidden state 및 cell state를 계산<br>
- LSTM은 시퀀스 데이터의 시간적 의존성을 학습하며, 인코더의 핵심 역할을 담당.<br>여러 레이어로 구성되어 더 복잡한 패턴을 학습 가능

3. Latent vector 생성:
- 마지막 hidden state와 cell state를 결합하여 latent vector 생성.<br>이는 인코더가 입력 시퀀스를 요약한 정보.
- latent vector는 디코더로 전달되어 디코더가 출력을 생성하는 데 필요한 컨텍스트 정보를 제공. 

In [9]:
class Encoder(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int, latent_dim: int, num_layers: int):
        super().__init__()
        self.vocab_size = vocab_size                # 입력 시퀀스의 단어 집합 크기. Embedding layer에서 각 단어를 고유한 벡터로 변환하기 위해 필요
        self.embedding_dim = embedding_dim              # 단어 embedding의 찬원. 각 단어를 고정된 크기의 벡터로 표현하는 데 사용.
        self.latent_dim = latent_dim                # LSTM의 hidden_state, cell_state의 차원
        self.num_layers = num_layers                # LSTM layer의 수

        # 단어를 벡터로 변환하는 임베딩 레이어
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim)
        # 입력 처리할 LSTM 레이어 정의
        self.lstm = nn.LSTM(
            self.embedding_dim,
            self.latent_dim,
            num_layers = self.num_layers,
            batch_first = True,
        )

        def forward(self, x):
            # 입력 시퀀스를 임베딩 벡터로 변환
            x = self.embedding(x)
            # LSTM를 통하여 각 time step 의 출력화 마지막 hidden state와 cell state 계산
            x, (h_n, c_n) = self.lstm(x)

            # 마지막 hidden state와 cell state를 결합하여 latent vector 생성
            latent_vector = torch.car([h_n[-1], c_n[-1]], axis = -1)
            latent_vector = latent_vector[:, np.newaxis, :] # 차원 추가

            return latent_vector
        
class Decoder(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int, latent_dim: int):
        super().__init__()
        self.vocab_size = vocab_size                # 출력 시퀀스의 단어 집합 크기.최종
        self.embedding_dim = embedding_dim          # 입력 단어를 벡터로 변환하는 데 사용됨. 인코더와 동일한 차원을 사용
        self.latent_dim = latent_dim                # LSTM의 hidden state, cell state의 차원. Encoder와 동일한 차원을 사용하여 latent vector와 결합

        # 단어를 벡터로 변환하는 embedding layer
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim)

        # 여러 개의 LSTM layer 정의하여 입력을 처리
        self.lstm1 = nn.LSTM(self.embedding_dim + self.latent_dim*2, self.latent_dim, batch_size = True)               
        # self.embedding_dim: 현재 입력 단어의 embedding 벡터 차원
        # self.latent_dim*2: encoder에서 생성된 latent vector(마지막 hidden state, cell state결합)의 차원
        self.lstm2 = nn.LSTM(self.latent_dim, self.latent_dim, batch_first=True)
        self.lstm3 = nn.LSTM(self.latent_dim, self.latent_dim, batch_first=True)
        self.lstm4 = nn.LSTM(self.latent_dim, self.latent_dim, batch_first=True)
        # 최종 출력 레이어로 단어 확률 분호 생성
        self.fc_out = nn.Linear(self.latent_dim, self.vocab_size)

    def forward(self, x, hidden_state, cell_state, latent_vector):
        x = x[:, np.newaxis]                        # 입력 차원 추가 (배치 크기, 1)
        # 입력을 embedding vector로 변환
        x = self.embedding(x)

        # embedding과 latent vector를 결합하여 LSTM에 전달할 입력 생성
        x = torch.cat([x, latent_vector], axis=-1)

        # 순차적으로 여러 LSTM layer를 통과하며 출력 계산
        x, _ = self.lstm1(x, (hidden_state, cell_state))
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)

        # 마지막 LSTM layer의 출력과 상태 반환
        x, (h_n, c_n)= self.lstm4(x)
        # 최종 출력 생성 (단어 확률 분호)
        x = self.fc_out(x)

        return x, (h_n, c_n)                        # 출력 및 마지막 hidden state와 cell state 반환
    
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, teacher_forcing_ratio: float = 0.5):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.teacher_forcing_ratio = teacher_forcing_ratio        
    
    def forward(self, source, target):
        batch_size = len(source)                    
        target_length = target.shaper[1]
        target_vocab_size = self.decoder.vocab_size         # 디코더의 어휘 크기
        # 출력을 저장할 텐서 초기화 (배치 크기 x 목표 시퀀스 길이 x 어휘 크기)
        outputs = torch.zeros(batch_size, target_length, target_vocab_size)

        # encoder를 통해 latent vector 생성
        latent_vector = self.encoder(source)
        # 첫 번째 decoder 입력은 <SOS> token 사용
        x = target[:, 0]
        h_n = torch.zeros(1, batch_size, self.encoder.latent_dim)           # 초기 hidden state 설정
        c_n = torch.zeros(1, batch_size, self.encoder.latent_dim)           # 초기 cell state 설정

        # 목표 시퀀스의 각 타입스텝에 대한 반복
        for t in range(1, target_length):
            output, (h_n, c_n) = self.decoder(x, h_n, c_n, latent_vector)   # decoder를 통해 출력 및 다음 hidden state, cell state 얻기
            outputs[:, t-1, :] = output[:, 0, :]                            # 현재 타입스텝의 출력 저장

            if np.random.random() < self.teacher_forcing_ratio:
                x = output[:, 0, :].argmax(axis=-1)                         # 모델의 출력을 다음 입력으로 사용
            else: 
                x = target[:, t]                                            # 실제 타겟을 다음 입력으로 사용

        return outputs

In [10]:
# 예시 데이터 생성
source = torch.randint(0, 1000, (32, 20))   # 0 ~ 999의 정수 값을 갖는 (batch_size, seq_len) 행렬 생성
target = torch.randint(0, 1000, (32, 20))   # 0 ~ 999의 정수 값을 갖는 (batch_size, seq_len) 행렬 생성

In [11]:
# 인코더 초기화
encoder = Encoder(1000, 256, 256, 4)

# latent vector 생성
latent_vector = Encoder(1000, 256, 256, 4)(source)

NotImplementedError: Module [Encoder] is missing the required "forward" function

In [12]:
hidden = torch.zeros(1, 32, 256)    # (num_layers, batch_size, latent_dim)
cell = torch.zeros(1, 32, 256)      # (num_layers, batch_size, latent_dim)

# 디코더 초기화
decoder = Decoder(1000, 256, 256)

# 디코더의 첫 번째 입력으로 target의 첫 번째 요소 전달
decoder(target[:, 0], hidden, cell, latent_vector)

TypeError: RNNBase.__init__() got an unexpected keyword argument 'batch_size'

In [13]:
# seq2seq 초기화
seq2seq = Seq2Seq(encoder, decoder)

NameError: name 'decoder' is not defined

In [14]:
# cross entropy 사용 방법에 맞게 축 변환
pred = seq2seq(source, target).permute(0, 2, 1) # (batch_size, seq_len, dim) -> (batch_size, dim, seq_len)

# cross entropy loss 계산
F.cross_entropy(pred, target)

NameError: name 'seq2seq' is not defined