In [1]:
import os, shutil, zipfile

import pandas as pd
import tensorflow as tf
import urllib3
from tensorflow.keras.utils import to_categorical, pad_sequences

## [1] 데이터 준비
---

In [2]:
http = urllib3.PoolManager()
url ='http://www.manythings.org/anki/fra-eng.zip'
filename = 'fra-eng.zip'
path = os.getcwd()
zipfilename = os.path.join(path, filename)
with http.request('GET', url, preload_content=False) as r, open(zipfilename, 'wb') as out_file:       
    shutil.copyfileobj(r, out_file)

with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
    zip_ref.extractall(path)

BadZipFile: File is not a zip file

In [None]:
lines = pd.read_csv('fra.txt', names=['src', 'tar', 'lic'], sep='\t')
del lines['lic']
print('전체 샘플의 개수 :',len(lines))

In [None]:
# 6만개 데이터만 저장 
lines = lines.loc[:, 'src':'tar']
lines = lines[0:60000] 
lines.sample(10)

In [None]:
# 번역 문장에 해당되는 프랑스어 데이터=>시작 의미 심볼 <sos>, 종료 의미 심볼 <eos>추가
# <sos>와 <eos> 대신 '\t'를 시작 심볼, '\n'을 종료 심볼로 간주
lines.tar = lines.tar.apply(lambda x : '\t '+ x + ' \n')
lines.sample(10)

In [None]:
# 문자 집합 구축 ----------------------------------------
src_vocab = set()
for line in lines.src: # 1줄씩 읽음
    for char in line: # 1개의 문자씩 읽음
        src_vocab.add(char)

tar_vocab = set()
for line in lines.tar:
    for char in line:
        tar_vocab.add(char)

In [None]:
src_vocab_size = len(src_vocab)+1
tar_vocab_size = len(tar_vocab)+1
print('source 문장의 char 집합 :',src_vocab_size)
print('target 문장의 char 집합 :',tar_vocab_size)

In [None]:
src_vocab = sorted(list(src_vocab))
tar_vocab = sorted(list(tar_vocab))
print(src_vocab[45:75])
print(tar_vocab[45:75])

In [None]:
# 각 문자에 인덱스 부여
src_to_index = dict([(word, i+1) for i, word in enumerate(src_vocab)])
tar_to_index = dict([(word, i+1) for i, word in enumerate(tar_vocab)])
print(src_to_index)
print(tar_to_index)

In [None]:
# 인덱스 부여된 문자 집합으로부터 갖고있는 훈련 데이터에 정수 인코딩을 수행
encoder_input = []

# 1개의 문장
for line in lines.src:
  encoded_line = []
  # 각 줄에서 1개의 char
  for char in line:
    # 각 char을 정수로 변환
    encoded_line.append(src_to_index[char])
  encoder_input.append(encoded_line)
  
print('source 문장의 정수 인코딩 :',encoder_input[:5])

In [None]:
# 디코더의 입력이 될 프랑스어 데이터에 대해서 정수 인코딩을 수행
decoder_input = []
for line in lines.tar:
  encoded_line = []
  for char in line:
    encoded_line.append(tar_to_index[char])
  decoder_input.append(encoded_line)
print('target 문장의 정수 인코딩 :',decoder_input[:5])

In [None]:
# 디코더의 예측값과 비교하기 위한 실제값
# - 이 실제값에는 시작 심볼에 해당되는 <sos> 필요 없음
# - 모든 프랑스어 문장의 맨 앞에 붙어있는 '\t' 제거
decoder_target = []
for line in lines.tar:
  timestep = 0
  encoded_line = []
  for char in line:
    if timestep > 0:
      encoded_line.append(tar_to_index[char])
    timestep = timestep + 1
  decoder_target.append(encoded_line)
print('target 문장 레이블의 정수 인코딩 :',decoder_target[:5])

In [None]:
# 패딩을 위해서 영어 문장과 프랑스어 문장 각각에 대해서 
# 가장 길이가 긴 샘플의 길이를 확인
max_src_len = max([len(line) for line in lines.src])
max_tar_len = max([len(line) for line in lines.tar])
print('source 문장의 최대 길이 :',max_src_len)
print('target 문장의 최대 길이 :',max_tar_len)

In [None]:
# 병렬 데이터 => 영어와 프랑스어의 길이는 하나의 쌍이라고 하더라도 전부 다르므로 
# 패딩을 할 때도 이 두 개의 데이터의 길이를 전부 동일하게 맞춰줄 필요는 없습니다. 
# 영어 데이터는 영어 샘플들끼리, 프랑스어는 프랑스어 샘플들끼리 길이 맞추어서 패딩
# 가장 긴 샘플의 길이에 맞춰서 영어 데이터의 샘플은 전부 길이가 23이 되도록 패딩
# 프랑스어 데이터의 샘플은 전부 길이가 76이 되도록 패딩
encoder_input = pad_sequences(encoder_input, maxlen=max_src_len, padding='post')
decoder_input = pad_sequences(decoder_input, maxlen=max_tar_len, padding='post')
decoder_target = pad_sequences(decoder_target, maxlen=max_tar_len, padding='post')

In [None]:
# 모든 값에 대해서 원-핫 인코딩을 수행 
# 문자 단위 번역기므로 워드 임베딩은 별도로 사용되지 않으며, 
# 예측값과의 오차 측정에 사용되는 실제값뿐만 아니라 
# 입력값도 원-핫 벡터를 사용
from tensorflow.keras.utils import to_categorical

encoder_input = to_categorical(encoder_input)
decoder_input = to_categorical(decoder_input)
decoder_target = to_categorical(decoder_target)

## 교사 강요(Teacher forcing)
- RNN의 모든 시점에 대해서 이전 시점의 예측값 대신 실제값을 입력으로 주는 방법
---
- decoder_input이 왜 필요할까요?
    * 훈련 과정에서는 이전 시점의 디코더 셀의 출력을 현재 시점의 디코더 셀의
      입력으로 넣어주지 않고, 이전 시점의 실제값을 현재 시점의 디코더 셀의 
      입력값으로 하는 방법을 사용
    * 이유는 이전 시점의 디코더 셀의 예측이 틀렸는데 이를 현재 시점의 디코더
     셀의 입력으로 사용하면 현재 시점의 디코더 셀의 예측도 잘못될 가능성이 높고
      이는 연쇄 작용으로 디코더 전체의 예측을 어렵게 합니다. 
    * 이런 상황이 반복되면 훈련 시간이 느려집니다. 
    * 이 상황을 원하지 않는다면 이전 시점의 디코더 셀의 예측값 대신 실제값을 
      현재 시점의 디코더 셀의 입력으로 사용하는 방법을 사용

In [None]:
from keras.layers import Input, LSTM, Embedding, Dense
from keras.models import Model
import numpy as np

## [3] 모델 구성
---

#### [3-1] Encoder
---
- LSTM의 은닉 상태 크기는 256
- 인코더의 내부 상태를 디코더로 넘겨주어야 하기 때문에 return_state=True 설정
- 인코더에 입력을 넣으면 내부 상태 리턴

- LSTM에서 state_h, state_c를 리턴받음
- 이는 각각 이전 LSTM의 은닉 상태와 셀 상태 ==> encoder_states 저장
- encoder_states를 디코더에 전달 ==> 컨텍스트 벡터


In [None]:
encoder_inputs = Input(shape=(None, src_vocab_size))
encoder_lstm = LSTM(units=256, return_state=True)

# encoder_outputs은 여기서는 불필요
encoder_outputs, state_h, state_c = encoder_lstm(encoder_inputs)

# LSTM은 바닐라 RNN과는 달리 상태가 두 개. 은닉 상태와 셀 상태.
encoder_states = [state_h, state_c]

### [3-2] DECODER - INPUT
---
- 디코더는 인코더의 마지막 은닉 상태를 초기 은닉 상태로 사용
- initial_state의 인자값으로 encoder_states를 주는 코드가 이에 해당
- 또한 동일하게 디코더의 은닉 상태 크기도 256
- 디코더도 은닉 상태, 셀 상태 리턴하기는 하지만 훈련 과정에서는 사용하지 않습니다. 
- 그 후 출력층에 프랑스어의 단어 집합의 크기만큼 뉴런을 배치
- 소프트맥스 함수를 사용하여 실제값과의 오차 구함

In [None]:
# 입력층
decoder_inputs = Input(shape=(None, tar_vocab_size))
decoder_lstm = LSTM(units=256, return_sequences=True, return_state=True)

# 디코더에게 인코더의 은닉 상태, 셀 상태를 전달.
decoder_outputs, _, _= decoder_lstm(decoder_inputs, initial_state=encoder_states)

# 출력층
decoder_softmax_layer = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_softmax_layer(decoder_outputs)

model = Model([encoder_inputs, decoder_inputs], decoder_outputs)


In [None]:
model.summary()

In [None]:
from tensorflow.keras.utils import plot_model

plot_model(model, show_shapes=True)

In [None]:
model.compile(optimizer="rmsprop", loss="categorical_crossentropy")

### 학습
---
- 입력으로는 인코더 입력과 디코더 입력이 들어감
- 디코더의 실제값인 decoder_target도 필요
- 배치 크기는 64
- 총 40 에포크

- 위에서 설정한 은닉 상태의 크기와 에포크 수는 실제로는 훈련 데이터에 과적합 상태를 불러옵니다. 
- 중간부터 검증 데이터에 대한 오차인 val_loss의 값이 올라가는데, 
- 사실 이번 실습에서는 주어진 데이터의 양과 태스크의 특성으로 인해 훈련 과정에서 훈련 데이터의 정확도와 과적합 방지라는 두 마리 토끼를 동시에 잡기에는 쉽지 않습니다. 
- seq2seq의 메커니즘과 짧은 문장과 긴 문장에 대한 성능 차이에 대한 확인

In [None]:
model.fit(x=[encoder_input, decoder_input], 
          y=decoder_target, 
          batch_size=64, 
          epochs=40, 
          validation_split=0.2)

In [None]:
###

### 동작
---
- 훈련할 때와 동작할 때의 방식이 다르다
- 입력한 문장에 대해서 기계 번역을 하도록 모델을 조정하고 동작

- 전체적인 번역 동작 단계
    *1. 번역하고자 하는 입력 문장 인코더에 들어가서 은닉 상태,셀 상태를 얻음
    *2. 상태와 <SOS>에 해당하는 '\t'를 디코더로 보냄
    *3. 디코더가 <EOS>에 해당하는 '\n'이 나올 때까지 다음 문자 예측 행동 반복

In [None]:
# 우선 인코더를 정의
# encoder_inputs와 encoder_states는 훈련 과정에서 이미 정의한 것 재사용

encoder_model = Model(inputs=encoder_inputs, outputs=encoder_states)

### DECODER
---



In [None]:
# 이전 시점의 상태들을 저장하는 텐서
decoder_state_input_h = Input(shape=(256,))
decoder_state_input_c = Input(shape=(256,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

# 문장의 다음 단어를 예측하기 위해서 초기 상태(initial_state)를 이전 시점의 상태로 사용.
# 뒤의 함수 decode_sequence()에 동작을 구현 예정
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)

# 훈련 과정에서와 달리 LSTM의 리턴하는 은닉 상태와 셀 상태를 버리지 않음.
decoder_states = [state_h, state_c]
decoder_outputs = decoder_softmax_layer(decoder_outputs)

decoder_model = Model(inputs=[decoder_inputs] + decoder_states_inputs, outputs=[decoder_outputs] + decoder_states)

In [None]:
# 단어로부터 인덱스를 얻는 것이 아니라 
# 인덱스로부터 단어를 얻을 수 있는 index_to_src와 index_to_tar
index_to_src = dict((i, char) for char, i in src_to_index.items())
index_to_tar = dict((i, char) for char, i in tar_to_index.items())

In [None]:

def decode_sequence(input_seq):
  # 입력으로부터 인코더의 상태를 얻음
  states_value = encoder_model.predict(input_seq)

  # <SOS>에 해당하는 원-핫 벡터 생성
  target_seq = np.zeros((1, 1, tar_vocab_size))
  target_seq[0, 0, tar_to_index['\t']] = 1.

  stop_condition = False
  decoded_sentence = ""

  # stop_condition이 True가 될 때까지 루프 반복
  while not stop_condition:
    # 이점 시점의 상태 states_value를 현 시점의 초기 상태로 사용
    output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

    # 예측 결과를 문자로 변환
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = index_to_tar[sampled_token_index]

    # 현재 시점의 예측 문자를 예측 문장에 추가
    decoded_sentence += sampled_char

    # <eos>에 도달하거나 최대 길이를 넘으면 중단.
    if (sampled_char == '\n' or
        len(decoded_sentence) > max_tar_len):
        stop_condition = True

    # 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
    target_seq = np.zeros((1, 1, tar_vocab_size))
    target_seq[0, 0, sampled_token_index] = 1.

    # 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
    states_value = [h, c]

  return decoded_sentence

In [None]:
for seq_index in [3,50,100,300,1001]: # 입력 문장의 인덱스
  input_seq = encoder_input[seq_index:seq_index+1]
  decoded_sentence = decode_sequence(input_seq)
  print(35 * "-")
  print('입력 문장:', lines.src[seq_index])
  print('정답 문장:', lines.tar[seq_index][2:len(lines.tar[seq_index])-1]) # '\t'와 '\n'을 빼고 출력
  print('번역 문장:', decoded_sentence[1:len(decoded_sentence)-1]) # '\n'을 빼고 출력