In [None]:
import os
import numpy as np
import pandas as pd
import json

data_dir = "data"

df = pd.read_csv(os.path.join(data_dir, "kor_eng.csv"))
df

In [None]:
korean = df["korean"]
english = df["english"]

In [None]:
korean[:5]

In [None]:
english[:5]

## 한글 정규화


In [None]:
import re

# 한글, 영어, 숫자, 공백, ?!.,을 제외한 나머지 문자 제거
korean_pattern = r"[^ ?,.!A-Za-z0-9가-힣+]"

# 패턴 컴파일
normalizer = re.compile(korean_pattern)
normalizer

In [None]:
print(f"수정 전: {korean[10]}")
print(f'수정 후: {normalizer.sub("", korean[10])}')

In [None]:
print(f"수정 전: {english[10]}")
print(f'수정 후: {normalizer.sub("", english[10])}')

In [None]:
def normalize(sentence):
    return normalizer.sub("", sentence)


normalize(korean[10])

## 한글 형태소 분석기


In [None]:
from konlpy.tag import Mecab, Okt

# 형태소 분석기
mecab = Mecab()
okt = Okt()

In [None]:
# mecab
mecab.morphs(normalize(korean[10]))

In [None]:
# okt
okt.morphs(normalize(korean[10]))

In [None]:
def clean_text(sentence, tagger, korean=True):
    sentence = normalize(sentence)
    if korean:
        sentence = tagger.morphs(sentence)
        sentence = " ".join(sentence)
    sentence = sentence.lower()
    return sentence

In [None]:
# 한글
clean_text(korean[10], okt)

In [None]:
# 영어
clean_text(english[10], None, korean=False)

In [None]:
len(korean), len(english)

In [None]:
koreans = [clean_text(sent, okt, korean=True) for sent in korean.values[:1000]]
englishes = [clean_text(sent, None, korean=False)
             for sent in english.values[:1000]]

In [None]:
koreans[:5]

In [None]:
englishes[:5]

## 단어 사전 생성


In [None]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
PAD_TOKEN = 0
SOS_TOKEN = 1
EOS_TOKEN = 2


class WordVocab():
    def __init__(self):
        self.word2index = {
            '<PAD>': PAD_TOKEN,
            '<SOS>': SOS_TOKEN,
            '<EOS>': EOS_TOKEN,
        }
        self.word2count = {}
        self.index2word = {
            PAD_TOKEN: '<PAD>',
            SOS_TOKEN: '<SOS>',
            EOS_TOKEN: '<EOS>'
        }

        self.n_words = 3  # PAD, SOS, EOS 포함

    def add_sentence(self, sentence):
        for word in sentence.split(' '):
            self.add_word(word)

    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

In [None]:
koreans[10]

In [None]:
print(f"원문: {koreans[10]}")
lang = WordVocab()
lang.add_sentence(koreans[10])
print("===" * 10)
print("단어사전")
print(lang.word2index)

## Padding to sequences 에 대한 이해


In [None]:
max_length = 10
sentence_length = 6

sentence_tokens = np.random.randint(low=3, high=100, size=(sentence_length,))
sentence_tokens = sentence_tokens.tolist()
print(f"Generated Sentence: {sentence_tokens}")

sentence_tokens = sentence_tokens[: (max_length - 1)]

token_length = len(sentence_tokens)

# append <EOS> Token
sentence_tokens.append(2)

for i in range(token_length, max_length - 1):
    # add <PAD> Token
    sentence_tokens.append(0)

print(f"Output: {sentence_tokens}")
print(f"Total Length: {len(sentence_tokens)}")

## 전처리 모듈 클래스화


In [None]:
for idx, row in df.iterrows():
    print(idx)
    print(row["korean"])
    print(row["english"])
    break

In [None]:
from konlpy.tag import Mecab, Okt


class TextDataset(Dataset):
    def __init__(self, csv_path, max_length=32):
        super(TextDataset, self).__init__()
        data_dir = 'data'

        self.PAD_TOKEN = 0
        self.SOS_TOKEN = 1
        self.EOS_TOKEN = 2

        MIN_LENGTH = 5

        tagger = Mecab()
        self.max_length = max_length

        # CSV 데이터 로드
        df = pd.read_csv(os.path.join(data_dir, csv_path))

        korean_pattern = r'[^ ?,.!A-Za-z0-9가-힣+]'
        normalizer = re.compile(korean_pattern)

        koreans_clean = []
        englishes_clean = []

        koreans_wordvocab = WordVocab()
        englishes_wordvocab = WordVocab()

        for _, row in df.iterrows():
            src = row['korean']
            tgt = row['english']

            src = clean_text(src, tagger, korean=True)
            tgt = clean_text(tgt, None, korean=False)

            if len(src.split()) > MIN_LENGTH and len(tgt.split()) > MIN_LENGTH:
                koreans_wordvocab.add_sentence(src)
                koreans_clean.append(src)
                englishes_wordvocab.add_sentence(tgt)
                englishes_clean.append(tgt)

        self.koreans = koreans_clean
        self.englishes = englishes_clean
        self.koreans_wordvocab = koreans_wordvocab
        self.englishes_wordvocab = englishes_wordvocab

    @staticmethod
    def normalize(sentence):
        return normalizer.sub("", sentence)

    @staticmethod
    def clean_text(sentence, tagger, korean=True):
        sentence = normalize(sentence)
        if korean:
            sentence = tagger.morphs(sentence)
            sentence = ' '.join(sentence)
        sentence = sentence.lower()
        return sentence

    def texts_to_sequences(self, sentence, korean=True):
        if korean:
            return [self.koreans_wordvocab.word2index[w] for w in sentence.split()]
        else:
            return [self.englishes_wordvocab.word2index[w] for w in sentence.split()]

    def pad_sequence(self, sentence_tokens):
        sentence_tokens = sentence_tokens[:(self.max_length-1)]
        token_length = len(sentence_tokens)

        # append <EOS> Token
        sentence_tokens.append(self.EOS_TOKEN)

        for i in range(token_length, (self.max_length-1)):
            # add <PAD> Token
            sentence_tokens.append(self.PAD_TOKEN)
        return sentence_tokens

    def __getitem__(self, idx):
        inputs = self.koreans[idx]
        inputs_sequences = self.texts_to_sequences(inputs, korean=True)
        inputs_padded = self.pad_sequence(inputs_sequences)

        outputs = self.englishes[idx]
        outputs_sequences = self.texts_to_sequences(outputs, korean=False)
        outputs_padded = self.pad_sequence(outputs_sequences)

        return torch.tensor(inputs_padded), torch.tensor(outputs_padded)

    def __len__(self):
        return len(self.koreans)

In [None]:
MAX_LENGTH = 32

dataset = TextDataset("kor_eng.csv", max_length=MAX_LENGTH)

In [None]:
x, y = dataset[10]

In [None]:
x

In [None]:
y

In [None]:
x.numpy()

In [None]:
sequence_to_sentence(x.numpy(), korean=True)

In [None]:
sequence_to_sentence(y.numpy(), korean=False)

## train / test 데이터셋 분할


In [None]:
train_size = int(len(dataset) * 0.8)
train_size

In [None]:
test_size = len(dataset) - train_size
test_size

In [None]:
from torch.utils.data import random_split

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

## DataLoader 생성

- 배치 구성


In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset,
                          batch_size=16,
                          shuffle=True)

test_loader = DataLoader(test_dataset,
                         batch_size=1,
                         shuffle=True)

In [None]:
x, y = next(iter(train_loader))

In [None]:
x.shape

## Encoder


In [None]:
class Encoder(nn.Module):
    def __init__(self, num_vocabs, hidden_size, embedding_dim, num_layers):
        super(Encoder, self).__init__()

        self.num_vocabs = num_vocabs
        self.embedding = nn.Embedding(num_vocabs, embedding_dim)
        self.gru = nn.GRU(embedding_dim,
                          hidden_size,
                          num_layers=num_layers,
                          bidirectional=False,
                          batch_first=True,
                          )

    def forward(self, x):
        x = self.embedding(x)
        output, hidden = self.gru(x)
        return output, hidden

In [None]:
x, y = next(iter(train_loader))
x.shape

### Embedding Layer의 입/출력 shape에 대한 이해


In [None]:
embedding_dim = 20  # 임베딩 차원
embedding = nn.Embedding(dataset.koreans_wordvocab.n_words, embedding_dim)

embedded = embedding(x)
print(x.shape)
print(embedded.shape)
# input:  (batch_size, sequence_length)
# output: (batch_size, sequence_length, embedding_dim)

### GRU Layer의 입/출력 shape에 대한 이해


In [None]:
embedding_dim = 20  # 임베딩 차원
hidden_size = 32

gru = nn.GRU(embedding_dim,
             hidden_size,
             num_layers=1,
             bidirectional=False,
             batch_first=True)

o, h = gru(embedded)

print(o.shape)
# output      : (batch_size, sequence_length, hidden_size(32) x bidirectional(1))
print(h.shape)
# hidden_state: (Bidirectional(1) x number of layers(1), batch_size, hidden_size(32))

### Encoder의 입/출력 shape에 대한 이해


In [None]:
x, y = next(iter(train_loader))
x.shape

In [None]:
NUM_VOCABS = dataset.koreans_wordvocab.n_words
print(f"number of vocabs: {NUM_VOCABS}")

In [None]:
# Encoder 정의
encoder = Encoder(dataset.koreans_wordvocab.n_words,
                  hidden_size=32,
                  embedding_dim=20,
                  num_layers=1)

In [None]:
# Encoder에 x 통과 후 output, hidden_size 의 shape 확인
o, h = encoder(x)

In [None]:
print(o.shape)
# output      : (batch_size, sequence_length, hidden_size(32) x bidirectional(1))
print(h.shape)
# hidden_state: (Bidirectional(1) x number of layers(1), batch_size, hidden_size(32))

## Decoder


In [None]:
class Decoder(nn.Module):
    def __init__(self, num_vocabs, hidden_size, embedding_dim, num_layers=1):
        super(Decoder, self).__init__()
        self.num_vocabs = num_vocabs
        self.embedding = nn.Embedding(num_vocabs, embedding_dim)
        self.gru = nn.GRU(embedding_dim,
                          hidden_size,
                          num_layers=num_layers,
                          bidirectional=False,
                          )
        self.fc = nn.Linear(hidden_size, num_vocabs)

    def forward(self, x, hidden_state):
        # (1, batch_size, sequence_length, hidden_size) 로 변환
        x = x.unsqueeze(0)
        embedded = F.relu(self.embedding(x))
        output, hidden = self.gru(embedded, hidden_state)
        output = self.fc(output.squeeze(0))
        return output, hidden

In [None]:
x = torch.abs(torch.randn(size=(1, 16)).long())
print(x)
x.shape
# batch_size = 16 이라 가정했을 때,
# (1, batch_size)
# 여기서 batch_size => (1, batch_size) 로 shape 변환을 선행

In [None]:
embedding_dim = 20  # 임베딩 차원
embedding = nn.Embedding(dataset.koreans_wordvocab.n_words, embedding_dim)

embedded = embedding(x.long())
embedded.shape
# (1, batch_size, embedding_dim)

In [None]:
embedding_dim = 20  # 임베딩 차원
hidden_size = 32

gru = nn.GRU(embedding_dim,
             hidden_size,
             num_layers=1,
             bidirectional=False,
             batch_first=False,  # batch_first=False로 지정
             )

o, h = gru(embedded)
print(o.shape)
# output shape: (sequence_length, batch_size, hidden_size(32) x bidirectional(1))
print(h.shape)
# hidden_state shape: (Bidirectional(1) x number of layers(1), batch_size, hidden_size(32))

In [None]:
fc = nn.Linear(32, 1024)  # output dimension을 1024개로 가정

print(o[0].shape)
# input : (batch_size, output from GRU)
output = fc(o[0])
# output: (batch_size, output dimension)
output.shape

In [None]:
decoder = Decoder(num_vocabs=dataset.englishes_wordvocab.n_words,
                  hidden_size=32,
                  embedding_dim=20,
                  num_layers=1)

In [None]:
x, y = next(iter(train_loader))

o, h = encoder(x)

In [None]:
o.shape, h.shape

인코더(Encoder)로부터 생성된 hidden_state(h)와 SOS 토큰을 디코더(Decoder)의 입력으로 넣어줍니다


In [None]:
x = torch.abs(torch.randn(size=(16,)).long())
print(x)
x.shape
# batch_size = 16 이라 가정

In [None]:
decoder_output, decoder_hidden = decoder(x, h)
decoder_output.shape, decoder_hidden.shape
# (batch_size, num_vocabs), (1, batch_size, hidden_size)

- `decoder_output`은 `(batch_size, num_vocabs) shape로 출력
- `decoder_hidden`의 shape는 입력으로 넣어준 shape와 동일함을 확인


## Seq2Seq


In [None]:
import random

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, inputs, outputs, teacher_forcing_ratio=0.5):
        # inputs : (batch_size, sequence_length)
        # outputs: (batch_size, sequence_length)

        batch_size, output_length = outputs.shape
        output_num_vocabs = self.decoder.num_vocabs

        # 리턴할 예측된 outputs를 저장할 임시 변수
        predicted_outputs = torch.zeros(
            output_length, batch_size, output_num_vocabs).to(self.device)

        _, decoder_hidden = self.encoder(inputs)

        decoder_input = torch.full(
            (batch_size,), SOS_TOKEN, device=self.device)

        for t in range(1, output_length):
            decoder_output, decoder_hidden = self.decoder(
                decoder_input, decoder_hidden)

            # t번째 단어에 디코더의 output 저장
            predicted_outputs[t] = decoder_output

            # teacher forcing 적용 여부 확률로 결정
            teacher_force = random.random() < teacher_forcing_ratio

            # top1 예측
            top1 = decoder_output.argmax(1)

            # teacher forcing 인 경우 ground truth 값을
            # 그렇지 않은 경우, 예측 값을 다음 input으로 지정
            decoder_input = outputs[:, t] if teacher_force else top1

        # (batch_size, sequence_length, num_vocabs)
        return predicted_outputs.permute(1, 0, 2)

In [None]:
x, y = next(iter(train_loader))
x.shape

In [None]:
# Encoder 정의
encoder = Encoder(num_vocabs=dataset.koreans_wordvocab.n_words,
                  hidden_size=32,
                  embedding_dim=20,
                  num_layers=1)
# Decoder 정의
decoder = Decoder(num_vocabs=dataset.englishes_wordvocab.n_words,
                  hidden_size=32,
                  embedding_dim=20,
                  num_layers=1)

In [None]:
seq2seq = Seq2Seq(encoder, decoder, "cpu")

In [None]:
output = seq2seq(x, y)

In [None]:
output.shape

## 훈련


In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

INPUT_NUM_VOCABS = dataset.koreans_wordvocab.n_words
OUTPUT_NUM_VOCABS = dataset.englishes_wordvocab.n_words
HIDDEN_SIZE = 512
EMBEDDIMG_DIM = 256

print(
    f'input_num_vocabs: {INPUT_NUM_VOCABS}, output_num_vocabs: {OUTPUT_NUM_VOCABS}')

# Encoder 정의
encoder = Encoder(num_vocabs=INPUT_NUM_VOCABS,
                  hidden_size=HIDDEN_SIZE,
                  embedding_dim=EMBEDDIMG_DIM,
                  num_layers=1)
# Decoder 정의
decoder = Decoder(num_vocabs=OUTPUT_NUM_VOCABS,
                  hidden_size=HIDDEN_SIZE,
                  embedding_dim=EMBEDDIMG_DIM,
                  num_layers=1)

# Seq2Seq 생성
model = Seq2Seq(encoder.to(device), decoder.to(device), device)

In [None]:
model

In [None]:
LR = 1e-3

optimizer = optim.Adam(model.parameters(), lr=LR)
loss_fn = nn.CrossEntropyLoss()

In [None]:
def train(model, data_loader, optimizer, loss_fn, device):

    running_loss = 0
    for x, y in data_loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()

        output = model(x, y)
        output_dim = output.size(2)
        output = output[1:].reshape(-1, output_dim)
        y = y[1:].view(-1)
        loss = loss_fn(output, y)

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * x.size(0)
    return running_loss / len(data_loader)

In [None]:
NUM_EPOCHS = 200
model.train()

for epoch in range(NUM_EPOCHS):
    loss = train(model, train_loader, optimizer, loss_fn, device)
    if epoch % 1 == 0:
        print(f"epoch: {epoch+1}, loss: {loss:.4f}")

In [None]:
x, y = next(iter(test_loader))
model.eval()
x, y = x.to(device), y.to(device)
prediction = model(x, y, teacher_forcing_ratio=0)

In [None]:
preds = prediction.squeeze(0).argmax(1)
preds = preds.detach().cpu().numpy()
preds

In [None]:
x = x.detach().cpu().numpy()[0]
y = y.detach().cpu().numpy()[0]

In [None]:
def sequence_to_sentence(sequences, korean):
    outputs = []
    for p in sequences:
        if korean:
            word = dataset.koreans_wordvocab.index2word[p]
        else:
            word = dataset.englishes_wordvocab.index2word[p]

        if p not in [SOS_TOKEN, EOS_TOKEN, PAD_TOKEN]:
            outputs.append(word)
        if word == EOS_TOKEN:
            break
    return ' '.join(outputs)

In [None]:
sequence_to_sentence(x, korean=True)

In [None]:
sequence_to_sentence(preds, korean=False)

In [None]:
sequence_to_sentence(y, korean=False)