# Library

In [247]:
import pandas as pd
import numpy as np
import re
import unicodedata
import itertools
import random
from tqdm import tqdm
from sklearn.model_selection import train_test_split

import nltk
from nltk.stem import WordNetLemmatizer

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence

In [226]:
seed = 0
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

<torch._C.Generator at 0x1696d071790>

In [77]:
tqdm.pandas()

# Attention

reference: [Attention](https://wikidocs.net/22893)

paper:
- [Encoder-Decoder](https://arxiv.org/pdf/1406.1078)
- [Attention (Loung et al.)](https://courses.grainger.illinois.edu/cs546/sp2018/Slides/Mar15_Luong.pdf)
- [Attention (Bahdanau et al.)](https://formacion.actuarios.org/wp-content/uploads/2024/05/1409.0473-Neural-Machine-Translation-By-Jointly-Learning-To-Align-And-Translate.pdf)

## Implementation

In [9]:
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.hidden_dim = hidden_dim
        
        self.w_a = nn.Linear(self.hidden_dim, 1)
        self.w_b = nn.Linear(self.hidden_dim, self.hidden_dim)
        self.w_c = nn.Linear(self.hidden_dim, self.hidden_dim)

    def forward(self, query, key, value):
        if query.dim() == 2:                # batch, dim
            query = query[:, np.newaxis, :] # batch, 1, dim

        temp = F.tanh(self.w_b(query) + self.w_c(key))      # batch, seq_len, dim
        score = self.w_a(temp)                              # batch, seq_len, 1
        attention_distribution = F.softmax(score, dim=1)    # batch, seq_len, 1

        context_vector = (attention_distribution*value).sum(axis=1) # batch, dim

        return context_vector, attention_distribution

In [10]:
query = torch.randn((32, 128))
key = torch.randn((32, 20, 128))
value = torch.randn((32, 20, 128))

Attention(128)(query, key, value)

(tensor([[ 0.2325,  0.3472,  0.1849,  ...,  0.0278,  0.4824,  0.4273],
         [ 0.0584,  0.0685,  0.3575,  ..., -0.0513, -0.6138,  0.1850],
         [-0.0502, -0.1747, -0.1399,  ..., -0.2725,  0.0506, -0.2972],
         ...,
         [-0.1104, -0.2626,  0.0837,  ..., -0.4097,  0.0690, -0.1299],
         [ 0.0584, -0.0553, -0.3293,  ...,  0.2993,  0.0803, -0.0207],
         [-0.2430,  0.1390,  0.0496,  ...,  0.3994, -0.2173, -0.2490]],
        grad_fn=<SumBackward1>),
 tensor([[[0.0437],
          [0.0418],
          [0.0413],
          [0.0529],
          [0.0458],
          [0.0605],
          [0.0435],
          [0.0595],
          [0.0403],
          [0.0638],
          [0.0392],
          [0.0488],
          [0.0623],
          [0.0447],
          [0.0469],
          [0.0784],
          [0.0461],
          [0.0455],
          [0.0496],
          [0.0454]],
 
         [[0.0396],
          [0.0611],
          [0.0482],
          [0.0634],
          [0.0361],
          [0.0441],
   

In [33]:
# 인코더 클래스 정의
class Encoder(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int, latent_dim: int, num_layers: int):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.latent_dim = latent_dim
        self.num_layers = num_layers

        # 단어 임베딩 레이어
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim)
        # LSTM 레이어 정의
        self.lstm = nn.LSTM(
            self.embedding_dim,
            self.latent_dim,
            num_layers=self.num_layers,
            batch_first=True,
        )
    
    def forward(self, x):
        # 입력을 임베딩
        x = self.embedding(x)
        # LSTM을 통해 hidden state 얻기
        x, _ = self.lstm(x)

        return x


# 디코더 클래스 정의
class Decoder(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int, latent_dim: int):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.latent_dim = latent_dim

        # 단어 임베딩 레이어
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim)

        # attention layer
        self.attention = Attention(self.latent_dim)

        # 여러 개의 LSTM 레이어 정의
        self.lstm1 = nn.LSTM(self.embedding_dim+self.latent_dim, self.latent_dim, batch_first=True)
        self.lstm2 = nn.LSTM(self.latent_dim, self.latent_dim, batch_first=True)
        self.lstm3 = nn.LSTM(self.latent_dim, self.latent_dim, batch_first=True)
        self.lstm4 = nn.LSTM(self.latent_dim, self.latent_dim, batch_first=True)
        # 최종 출력 레이어
        self.fc_out = nn.Linear(self.latent_dim, self.vocab_size)

    def forward(
        self,
        x,
        hidden_state_decoder,
        hidden_state_encoder,
        hidden_state,
        cell_state,
        ):
        x = x[:, np.newaxis]   # 차원 추가 (배치, 1)
        x = self.embedding(x)  # 입력 임베딩

        # attention을 통한 context vector
        context_vector, _ = self.attention(
            hidden_state_decoder,
            hidden_state_encoder,
            hidden_state_encoder,
        )
        context_vector = context_vector[:, np.newaxis, :]

        # context vector와 x의 embedding 결합
        x = torch.concat([x, context_vector], axis=-1)

        # 여러 LSTM 레이어를 순차적으로 통과
        x, _ = self.lstm1(x, (hidden_state, cell_state))
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        x, (h_n, c_n) = self.lstm4(x)  # 마지막 LSTM 레이어의 출력과 상태 반환
        x = self.fc_out(x)  # 최종 출력 생성 (단어 확률 분포)

        return x, (h_n, c_n)  # 출력 및 마지막 hidden state, cell state 반환


# Seq2Seq 모델 정의
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, teacher_forcing_ratio: float = 0.5):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.teacher_forcing_ratio = teacher_forcing_ratio  # teacher forcing 비율 설정
    
    def forward(self, source, target):
        batch_size = len(source)  # 배치 크기
        target_length = target.shape[1]  # 목표 시퀀스의 길이
        target_vocab_size = self.decoder.vocab_size  # 출력 어휘 크기
        outputs = torch.zeros(batch_size, target_length, target_vocab_size)  # 출력을 저장할 텐서 초기화

        # 인코더를 통해 잠재 벡터 생성
        hidden_state_encoder = self.encoder(source)

        # <SOS> token의 hidden state 구하기
        x = target[:, 0]                # batch
        x = x[:, np.newaxis]            # batch, 1
        x = self.decoder.embedding(x)   # batch_size, 1, dim
        context_vector = torch.zeros_like(x)
        x = torch.concat([x, context_vector], axis=-1)
        x, _ = self.decoder.lstm1(x)    # batch_size, 1, dim
        x, _ = self.decoder.lstm2(x)    # batch_size, 1, dim
        x, _ = self.decoder.lstm3(x)    # batch_size, 1, dim
        x, (h_n, c_n) = self.decoder.lstm4(x)
        hidden_state_decoder = h_n[0]

        # 첫 번째 token (sos제외) 입력
        x = target[:, 1]

        # 목표 시퀀스의 각 타임스텝에 대해 반복
        for t in range(1, target_length):
            # 디코더를 통해 출력 및 다음 hidden/cell state 얻기
            output, (h_n, c_n) = self.decoder(
                x,
                hidden_state_decoder,
                hidden_state_encoder,
                h_n,
                c_n,
            )
            hidden_state_decoder = h_n[0]
            outputs[:, t - 1, :] = output[:, 0, :]  # 현재 타임스텝의 출력 저장

            # teacher forcing 사용 여부 결정
            if (np.random.random() < self.teacher_forcing_ratio):
                x = output[:, 0, :].argmax(axis=-1)  # 모델의 출력을 다음 입력으로 사용
            else:
                if t < target_length-2:
                    x = target[:, t+1]  # 실제 타겟을 다음 입력으로 사용
        
        return outputs  # 모든 출력 반환

In [41]:
# 예시 데이터 생성
source = torch.randint(0, 1000, (32, 20))   # 0 ~ 999의 정수 값을 갖는 (batch_size, seq_len) 행렬 생성
target = torch.randint(0, 1000, (32, 20))   # 0 ~ 999의 정수 값을 갖는 (batch_size, seq_len) 행렬 생성

# 인코더 초기화
encoder = Encoder(1000, 256, 256, 4)

# 디코더 초기화
decoder = Decoder(1000, 256, 256)

# seq2seq 초기화
seq2seq = Seq2Seq(encoder, decoder)

# cross entropy 사용 방법에 맞게 축 변환
pred = seq2seq(source, target).permute(0, 2, 1) # (batch_size, seq_len, dim) -> (batch_size, dim, seq_len)

# cross entropy loss 계산
F.cross_entropy(pred, target)

tensor(6.9094, grad_fn=<NllLoss2DBackward0>)

## Practice

In [92]:
# pandas 라이브러리를 사용하여 'fra.txt' 파일을 읽어온다.
# 파일은 탭으로 구분되어 있으며, 헤더는 없음.
data = pd.read_csv('./data/fra.txt', sep='\t', header=None).iloc[:, :2]

# 읽어온 데이터의 열 이름을 'eng'와 'fra'로 설정
data.columns = ['eng', 'fra']

# 데이터의 처음 5개 행을 출력
data.head()

Unnamed: 0,eng,fra
0,Go.,Va !
1,Go.,Marche.
2,Go.,En route !
3,Go.,Bouge !
4,Hi.,Salut !


In [93]:
def remove_accent(sentence):
    '''주어진 문장에서 악센트(발음 기호)를 제거하는 함수'''
    
    # 'NFD' 형식으로 정규화하여 악센트를 분리
    return ''.join(
        char
        for char in unicodedata.normalize('NFD', sentence)  # 문자를 NFD 형식으로 변환
        if unicodedata.category(char) != 'Mn'  # 'Mn' (발음 기호) 범주에 속하지 않는 문자만 선택
    )

In [94]:
def preprocessing(sentence):
    '''문장을 전처리하는 함수'''
    
    # 1. 문장을 소문자로 변환
    sentence = sentence.lower()
    
    # 2. 악센트를 제거
    sentence = remove_accent(sentence)
    
    # 3. 문장 부호(!, ?, .) 앞에 공백 추가
    sentence = re.sub('([!,?.])', r' \1', sentence)
    
    # 4. 여러 개의 공백을 하나의 공백으로 치환
    sentence = re.sub('\s+', ' ', sentence)

    # 전처리된 문장을 반환
    return sentence

In [95]:
# 영어 문장에 대해 전처리 함수를 적용하고 진행 상황을 표시
data.eng = data.eng.progress_apply(lambda x: preprocessing(x))

# 프랑스어 문장에 대해 전처리 함수를 적용하고 진행 상황을 표시
data.fra = data.fra.progress_apply(lambda x: preprocessing(x))

# task: 영어 -> 프랑스어
# encoder input: eng
# decoder input: fra

100%|██████████| 232736/232736 [00:23<00:00, 10055.00it/s]
100%|██████████| 232736/232736 [00:26<00:00, 8746.53it/s]


In [96]:
# 영어 문장을 공백을 기준으로 분리하여 토큰화한 결과를 새로운 열에 저장
data['token_eng'] = data.eng.str.split()

# 프랑스어 문장을 공백을 기준으로 분리하여 토큰화한 결과를 새로운 열에 저장
data['token_fra'] = data.fra.str.split()

In [98]:
# 영어 토큰 리스트의 시작에 <SOS> (시작 토큰)과 끝에 <EOS> (종료 토큰)을 추가
data.token_eng = data.token_eng.apply(lambda x: ['<SOS>'] + x + ['<EOS>'])

# 프랑스어 토큰 리스트의 시작에 <SOS> (시작 토큰)과 끝에 <EOS> (종료 토큰)을 추가
data.token_fra = data.token_fra.apply(lambda x: ['<SOS>'] + x + ['<EOS>'])

In [100]:
# 전처리된 데이터를 피클 형식으로 저장
data.to_pickle('./preprocess_data.pickle')

In [213]:
# 영어 토큰 리스트를 source 변수에 저장
source = data.token_eng

# 프랑스어 토큰 리스트를 target 변수에 저장
target = data.token_fra

In [214]:
# 영어 단어 목록을 생성하기 위해 모든 토큰을 1차원으로 만들고 중복 제거
eng_vocab = list(set(itertools.chain(*source.tolist())))

# <PAD> 토큰을 어휘 목록의 첫 번째 항목으로 추가
eng_vocab = ['<PAD>'] + eng_vocab

# 각 단어에 고유한 인덱스를 부여하여 사전 생성
eng_vocab = dict(zip(eng_vocab, range(len(eng_vocab))))

# 인덱스를 키로, 단어를 값으로 하는 반전된 사전 생성
eng_inverse_vocab = {value: key for key, value in eng_vocab.items()}

In [215]:
def char_to_idx(tokens: list, vocab: dict):
    '''주어진 토큰 리스트를 인덱스 리스트로 변환'''
    return [vocab.get(word) for word in tokens]

def idx_to_char(tokens: torch.Tensor, inverse_vocab: dict):
    '''주어진 인덱스 텐서를 원래의 토큰 리스트로 변환'''
    return [inverse_vocab.get(token.item()) for token in tokens]

def list_to_tensor(tokens: list):
    '''주어진 리스트를 PyTorch 텐서로 변환하고 정수형으로 변환'''
    return torch.Tensor(tokens).long()

In [216]:
# 영어 토큰을 인덱스로 변환하여 새로운 열에 저장
data['encoded_token_eng'] = data.token_eng.apply(
    lambda x: char_to_idx(x, eng_vocab)
)

# 인덱스 리스트를 PyTorch 텐서로 변환
data.encoded_token_eng = data.encoded_token_eng.apply(
    lambda x: list_to_tensor(x)
)

# 변환된 텐서 리스트를 패딩하여 동일한 길이로 맞춤
source = pad_sequence(
    data.encoded_token_eng.tolist(),
    batch_first=True,
)

In [220]:
# 프랑스어 어휘 사전을 초기화하고 <PAD>, <SOS>, <EOS>에 인덱스 부여
fra_vocab = ['<PAD>', '<SOS>', '<EOS>']
fra_vocab = dict(zip(fra_vocab, range(len(fra_vocab))))

# 프랑스어 토큰 목록을 1차원으로 만들어 중복 제거한 후, <SOS>와 <EOS>를 제거
_fra_vocab = list(set(itertools.chain(*target.tolist())))
_fra_vocab.remove('<SOS>')
_fra_vocab.remove('<EOS>')

# 나머지 단어들에 인덱스를 부여 (3부터 시작)
_fra_vocab = dict(zip(_fra_vocab, range(3, len(_fra_vocab)+3)))

# 초기화한 프랑스어 어휘 사전에 나머지 단어들을 업데이트
fra_vocab.update(_fra_vocab)

In [221]:
# 프랑스어 토큰을 인덱스로 변환하여 새로운 열에 저장
data['encoded_token_fra'] = data.token_fra.apply(
    lambda x: char_to_idx(x, fra_vocab)
)

# 인덱스 리스트를 PyTorch 텐서로 변환
data.encoded_token_fra = data.encoded_token_fra.apply(
    lambda x: list_to_tensor(x)
)

# 변환된 텐서 리스트를 패딩하여 동일한 길이로 맞춤
target = pad_sequence(
    data.encoded_token_fra.tolist(),
    batch_first=True,
)

In [241]:
# 데이터셋을 학습용과 임시 데이터로 분할
# temp 데이터를 검증용과 테스트용 데이터로 분할
train_source, source_temp, train_target, target_temp = train_test_split(
    source,
    target,
    test_size=0.2,
    random_state=seed,
)
valid_source, test_source, valid_target, test_target = train_test_split(
    source_temp,
    target_temp,
    test_size=0.5,
    random_state=seed,
)

In [249]:
class TranslationDataset(Dataset):
    def __init__(self, source, target):  # 생성자 메서드
        super().__init__()  # 부모 클래스의 생성자를 호출하여 초기화
        self.source = source
        self.target = target

    def __len__(self):
        # 데이터셋의 전체 샘플 수를 반환
        return len(self.source)

    def __getitem__(self, idx):
        # 인덱스 `idx`에 해당하는 데이터 샘플을 반환
        source = self.source[idx]
        target = self.target[idx]
        
        # 입력 데이터와 레이블을 딕셔너리 형태로 반환
        return {
            'X': source,
            'y': target,
        }

In [250]:
# 데이터셋 객체로 변환
train_dataset = TranslationDataset(train_source, train_target)
valid_dataset = TranslationDataset(valid_source, valid_target)
test_dataset = TranslationDataset(test_source, test_target)

In [251]:
# 데이터셋 객체 확인
train_dataset[0]

{'X': tensor([ 6316, 11514, 15359,  3189,  6327, 14539,  9984, 15993,  3881,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0]),
 'y': tensor([    1,  9842,  4419, 27415,  6163, 18521, 15066, 29336,  9990,     2,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,    