In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
from konlpy.tag import Mecab
from sklearn.model_selection import train_test_split

import matplotlib.ticker as ticker
import matplotlib.pyplot as plt
import matplotlib as mpl

from tqdm import tqdm    # tqdm
import random

import time
import re
import os
import io

from utils import *
from s2s import Encoder, Decoder
from loss import loss_function

In [2]:

%config InlineBackend.figure_format = 'retina'
 
import matplotlib.font_manager as fm
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic') 
mpl.font_manager.findfont(font)

'/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'

In [3]:
path_to_dir = os.getenv('HOME') + "/aiffel/s2s_translation/"

path_to_train_kor = path_to_dir + "korean-english-park.train.ko"
path_to_train_eng = path_to_dir + "korean-english-park.train.en"

path_to_test_kor = path_to_dir + "korean-english-park.test.ko"
path_to_test_eng = path_to_dir + "korean-english-park.test.en"

In [4]:
src_train = read_data(path_to_train_kor)
tgt_train = read_data(path_to_train_eng)

src_test = read_data(path_to_test_kor)
tgt_test = read_data(path_to_test_eng)

In [5]:
print(len(src_train))
print(len(src_test))

94123
2000


In [6]:
# 데이터 중복 제거
unique_pairs = set(zip(src_train, tgt_train))
corpus_train = list(unique_pairs)

In [7]:
enc_corpus_train = []
dec_corpus_train = []

for pair in corpus_train:
    kor, eng = pair[0], pair[1]
    kor_preprocessed = preprocess_kor(kor)
    eng_preprocessed = preprocess_eng(eng)
    
    if len(kor_preprocessed) <= 80 and len(eng_preprocessed) <= 80 and len(kor_preprocessed) > 2 and len(eng_preprocessed) > 2:
        enc_corpus_train.append(kor_preprocessed)
        dec_corpus_train.append(eng_preprocessed)

print("한국어:", enc_corpus_train[100])
print("영어:", dec_corpus_train[100])

print(len(enc_corpus_train))

한국어: 에드워드 7세가 왈리스 심슨 부인때문에 왕관을 포기했는가 ?
영어: <start> why did edward viii give up the throne for wallis simpson ? <end>
11152


In [8]:
# 토큰화하기
enc_input, enc_tokenizer = tokenize_kor(enc_corpus_train)
dec_input, dec_tokenizer = tokenize_eng(dec_corpus_train)

# train_test_split을 활용해서 훈련 데이터와 검증 데이터로 분리하기
enc_train, enc_val = train_test_split(enc_input, test_size=0.2)
dec_train, dec_val = train_test_split(dec_input, test_size=0.2)

In [9]:
BATCH_SIZE = 128
SRC_VOCAB_SIZE = len(enc_tokenizer.index_word) + 1
TGT_VOCAB_SIZE = len(dec_tokenizer.index_word) + 1

units = 512
embedding_dim = 512

encoder = Encoder(SRC_VOCAB_SIZE, embedding_dim, units)
decoder = Decoder(TGT_VOCAB_SIZE, embedding_dim, units)

# sample input
sequence_len = 30

sample_enc = tf.random.uniform((BATCH_SIZE, sequence_len))
sample_output = encoder(sample_enc)

print ('Encoder Output:', sample_output.shape)

sample_state = tf.random.uniform((BATCH_SIZE, units))

sample_logits, h_dec, attn = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                     sample_state, sample_output)

print ('Decoder Output:', sample_logits.shape)
print ('Decoder Hidden State:', h_dec.shape)
print ('Attention:', attn.shape)

Encoder Output: (128, 30, 512)
Decoder Output: (128, 1, 12394)
Decoder Hidden State: (128, 512)
Attention: (128, 30, 1)


In [10]:
optimizer = tf.keras.optimizers.Adam(0.001)

In [11]:
@tf.function
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    batch_size = src.shape[0]
    loss = 0

    with tf.GradientTape() as tape:
        enc_out = encoder(src)
        h_dec = enc_out[:, -1]
        
        dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * batch_size, 1)

        for t in range(1, tgt.shape[1]):
            pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

            loss += loss_function(tgt[:, t], pred)
            dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))

    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return batch_loss



In [12]:
# Define eval_step
@tf.function
def eval_step(src, tgt, encoder, decoder, dec_tok):
    bsz = src.shape[0]
    loss = 0

    enc_out = encoder(src)

    h_dec = enc_out[:, -1]
    
    dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)

    for t in range(1, tgt.shape[1]):
        pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

        loss += loss_function(tgt[:, t], pred)
        dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))
    
    return batch_loss


# Training Process with validation

EPOCHS = 10

for epoch in range(EPOCHS):
    total_loss = 0
    
    idx_list = list(range(0, enc_train.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (batch, idx) in enumerate(t):
        batch_loss = train_step(enc_train[idx:idx+BATCH_SIZE],
                                dec_train[idx:idx+BATCH_SIZE],
                                encoder,
                                decoder,
                                optimizer,
                                dec_tokenizer)
    
        total_loss += batch_loss
        
        t.set_description_str('Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))
    
    test_loss = 0
    
    idx_list = list(range(0, enc_val.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (test_batch, idx) in enumerate(t):
        test_batch_loss = eval_step(enc_val[idx:idx+BATCH_SIZE],
                                    dec_val[idx:idx+BATCH_SIZE],
                                    encoder,
                                    decoder,
                                    dec_tokenizer)
    
        test_loss += test_batch_loss

        t.set_description_str('Test Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Test Loss %.4f' % (test_loss.numpy() / (test_batch + 1)))

Epoch  1: 100%|██████████| 70/70 [00:46<00:00,  1.49it/s, Loss 3.5891]
Test Epoch  1: 100%|██████████| 18/18 [00:13<00:00,  1.29it/s, Test Loss 3.4296]
Epoch  2: 100%|██████████| 70/70 [00:09<00:00,  7.13it/s, Loss 3.3491]
Test Epoch  2: 100%|██████████| 18/18 [00:00<00:00, 20.38it/s, Test Loss 3.4620]
Epoch  3: 100%|██████████| 70/70 [00:09<00:00,  7.08it/s, Loss 3.3529]
Test Epoch  3: 100%|██████████| 18/18 [00:00<00:00, 20.19it/s, Test Loss 3.4736]
Epoch  4: 100%|██████████| 70/70 [00:09<00:00,  7.07it/s, Loss 3.3465]
Test Epoch  4: 100%|██████████| 18/18 [00:00<00:00, 19.86it/s, Test Loss 3.4833]
Epoch  5: 100%|██████████| 70/70 [00:10<00:00,  6.99it/s, Loss 3.3514]
Test Epoch  5: 100%|██████████| 18/18 [00:00<00:00, 19.99it/s, Test Loss 3.4941]
Epoch  6: 100%|██████████| 70/70 [00:10<00:00,  6.91it/s, Loss 3.3515]
Test Epoch  6: 100%|██████████| 18/18 [00:00<00:00, 19.62it/s, Test Loss 3.5012]
Epoch  7: 100%|██████████| 70/70 [00:10<00:00,  6.84it/s, Loss 3.3541]
Test Epoch  7: 10

In [13]:
def evaluate(sentence, encoder, decoder):
    attention = np.zeros((dec_train.shape[-1], enc_train.shape[-1]))
    
    sentence = preprocess_kor(sentence)
    inputs = enc_tokenizer.texts_to_sequences([sentence.split()])
    inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs,
                                                           maxlen=enc_train.shape[-1],
                                                           padding='post')
    
    result = ''

    enc_out = encoder(inputs)

    dec_hidden = enc_out[:, -1]
    dec_input = tf.expand_dims([dec_tokenizer.word_index['<start>']], 0)

    for t in range(dec_train.shape[-1]):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        attention_weights = tf.reshape(attention_weights, (-1,))
        attention[t] = attention_weights.numpy()

        # Ensure to take the argmax for batch_size=1 and handle it correctly for larger batches
        predicted_id = tf.argmax(tf.math.softmax(predictions, axis=-1)[0], axis=-1).numpy()  # Shape: (batch_size,)
        predicted_id = predicted_id[0]

        result += dec_tokenizer.index_word[predicted_id] + ' '

        if dec_tokenizer.index_word[predicted_id] == '<end>':
            return result, sentence, attention

        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention


def plot_attention(attention, sentence, predicted_sentence):
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    fontdict = {'fontsize': 14}

    ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
    
    ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()


def translate(sentence, encoder, decoder):
    result, sentence, attention = evaluate(sentence, encoder, decoder)

    print('Input: %s' % (sentence))
    print('Predicted translation: {}'.format(result))
    
    attention = attention[:len(result.split()), :len(sentence.split())]
    plot_attention(attention, sentence.split(), result.split(' '))

In [14]:
# 번역할 문장 리스트
input_sentences = [
    "오바마는 대통령이다.",
    "시민들은 도시 속에 산다.",
    "커피는 필요 없다.",
    "일곱 명의 사망자가 발생했다."
]

# 각 문장에 대해 번역 실행
for sentence in input_sentences:
    translated_sentence, processed_input, attention = evaluate(sentence, encoder, decoder)

    # 번역 결과 출력
    print(f"input: {processed_input}")
    print(f"output: {translated_sentence}")

    # Attention Map 시각화
    plot_attention(attention, processed_input, translated_sentence)

KeyError: 0

## 회고


---

 * utils, loss, s2s 모듈을 만들어 코드를 간략 하게 하였음.

 * 최초 아래와 같이 실행 된 후 계속 번역에 실패 하고 있음.

![f_f](./error_the_us.png)

 * 계속 원인을 추적 하고 있음.
 
 
