In [2]:
import os
import re

import numpy as np
import pandas as pd
import tensorflow as tf
import unicodedata
import urllib3
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer

import nltk.translate.bleu_score as bleu
from collections import Counter
from nltk import ngrams

##학습을 위한 병렬 corpus 가져오기
source와 target을 각각 병렬적으로 매치한 데이터셋

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!unzip -uq "/content/drive/MyDrive/kor-eng.zip" -d "/content" 

#영어와 프랑스어를 매치한 데이터 셋
#영어가 source 프랑스어가 target

In [4]:
def preprocess_sentence_kor(sent):
  # 악센트 제거 함수 호출

  # 단어와 구두점 사이에 공백 추가해서 구두점을 구분
  # ex) "I am a student." => "I am a student ."
  sent = re.sub(r"([?.!,¿])", r" \1", sent)

  # (a-z, A-Z, ".", "?", "!", ",")  영어랑 . ? ! , 제외하고 모두 지움
  sent = re.sub(r"[^a-zA-Zㄱ-ㅎㅏ-ㅣ가-힣 !.?]+", r" ", sent)
  # 다수 개의 공백을 하나의 공백으로 치환
  sent = re.sub(r"\s+", " ", sent)
  return sent


def preprocess_sentence_eng(sent):
  # 악센트 제거 함수 호출

  # 단어와 구두점 사이에 공백 추가해서 구두점을 구분
  # ex) "I am a student." => "I am a student ."
  sent = re.sub(r"([?.!,¿])", r" \1", sent)

  # (a-z, A-Z, ".", "?", "!", ",")  영어랑 . ? ! , 제외하고 모두 지움
  sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent)
  
  # 다수 개의 공백을 하나의 공백으로 치환
  sent = re.sub(r"\s+", " ", sent)
  return sent

In [5]:
!pip install konlpy
from konlpy.tag import Okt 





In [6]:
num_samples =3000 
okt = Okt()

def load_preprocessed_data():
  encoder_input, decoder_input, decoder_target = [], [], [] 
  #input을 3개 만듦 encoder는 output이 없으므로 encoder target은 만들 필요 x
  with open("kor.txt", "r") as lines:
    for i, line in enumerate(lines): #line 하나 안에 tab을 기준으로 source와 target을 구분하고 있음
      # source 데이터와 target 데이터를 tab을 기준으로 분리
      tar_line, src_line, _ = line.strip().split('\t')

      # source 데이터 전처리
      src_line = [w for w in okt.morphs(preprocess_sentence_kor(src_line))]
     # target 데이터 전처리
      tar_line = preprocess_sentence_eng(tar_line)
      tar_line_in = [w for w in ("<sos> " + tar_line).split()] #line을 받아와서 sos 토큰을 넣어준다
      tar_line_out = [w for w in (tar_line + " <eos>").split()]

      encoder_input.append(src_line)
      decoder_input.append(tar_line_in)
      decoder_target.append(tar_line_out)

      if i == num_samples - 1:
        break
  lines.close()
  return encoder_input, decoder_input, decoder_target

sents_kor_in , sents_en_in, sents_en_out = load_preprocessed_data()

In [7]:
print('인코더의 입력 :',sents_kor_in[15])
print('디코더의 입력 :',sents_en_in[25])
print('디코더의 레이블 :',sents_en_out[25])

인코더의 입력 : ['알았어', '.']
디코더의 입력 : ['<sos>', 'Get', 'up', '.']
디코더의 레이블 : ['Get', 'up', '.', '<eos>']


##Seq2Seq에 넣기 위해서 tokenize

영어와 한국어 각각에 대해서 tokenize함


In [8]:
#tokenizer 안에 filters는 문장 내에서 ""안에 것을 filtering함 우리는 이미 전처리 다해놔서 거를 것 없음 lower도 이미 해놓음

tokenizer_kor = Tokenizer(filters="", lower=False)
tokenizer_kor.fit_on_texts(sents_kor_in)

tokenizer_en = Tokenizer(filters="", lower=False)
tokenizer_en.fit_on_texts(sents_en_in)
tokenizer_en.fit_on_texts(sents_en_out)


encoder_input = tokenizer_kor.texts_to_sequences(sents_kor_in)
encoder_input = pad_sequences(encoder_input, padding="post")
#padding에는 2가지 pre, post 존재 post는 0들을 뒤에 채우는 것

decoder_input = tokenizer_en.texts_to_sequences(sents_en_in)
decoder_input = pad_sequences(decoder_input, padding="post")

decoder_target = tokenizer_en.texts_to_sequences(sents_en_out)
decoder_target = pad_sequences(decoder_target, padding="post")

In [9]:
encoder_input[0]

array([8, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], dtype=int32)

In [10]:
#영어는 문장 최대 길이가 12, 한국어는 16

print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)

인코더의 입력의 크기(shape) : (3000, 16)
디코더의 입력의 크기(shape) : (3000, 12)
디코더의 레이블의 크기(shape) : (3000, 12)


In [11]:
#word_index는 각 단어에 대한 index를 매칭해서 dictionary로 반환
src_vocab_size = len(tokenizer_kor.word_index) + 1
tar_vocab_size = len(tokenizer_en.word_index) + 1
print("한국어 단어 집합의 크기 : {:d}, 영어 단어 집합의 크기 : {:d}".format(src_vocab_size, tar_vocab_size))

한국어 단어 집합의 크기 : 3157, 영어 단어 집합의 크기 : 2203


In [12]:
#word_index는 단어 - 인덱스 순의 dictionary
#index_word는 그 반대 아래의 딕셔너리는 이후 예측값과 실제값 예측에 사용

src_to_index = tokenizer_kor.word_index
index_to_src = tokenizer_kor.index_word
tar_to_index = tokenizer_en.word_index
index_to_tar = tokenizer_en.index_word

In [13]:

indices = np.arange(num_samples)
np.random.shuffle(indices)
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

In [14]:
val_num=int(0.1*num_samples) #10% 만큼 test에 사용 split 해준다
encoder_input_train = encoder_input[:-val_num]
decoder_input_train = decoder_input[:-val_num]
decoder_target_train = decoder_target[:-val_num]

encoder_input_test = encoder_input[-val_num:]
decoder_input_test = decoder_input[-val_num:]
decoder_target_test = decoder_target[-val_num:]

In [15]:
print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)

훈련 source 데이터의 크기 : (2700, 16)
훈련 target 데이터의 크기 : (2700, 12)
훈련 target 레이블의 크기 : (2700, 12)
테스트 source 데이터의 크기 : (300, 16)
테스트 target 데이터의 크기 : (300, 12)
테스트 target 레이블의 크기 : (300, 12)


##본격적인 lstm Seq2Seq 모델링

functional API로 쌓음  아래에 functional API에 대한 설명 있음

https://www.tensorflow.org/guide/keras/functional?hl=ko

In [16]:
embedding_dim = 64
hidden_units = 64

In [17]:
# 인코더
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(src_vocab_size, embedding_dim)(encoder_inputs) # 임베딩 층
enc_masking = Masking(mask_value=0.0)(enc_emb) # 패딩 0은 연산에서 제외 #masgking이라는게 있는데 패딩시 발생한 0을 연산에서 아예 무시
encoder_lstm = LSTM(hidden_units, return_state=True) # encoder의 상태를 decoder로 보내려면 return state가 true여야한다.
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking) # 은닉 상태와 셀 상태를 리턴
encoder_states = [state_h, state_c] # 인코더의 은닉 상태와 셀 상태를 저장

In [18]:
# 디코더
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(tar_vocab_size, hidden_units) # 임베딩 층
dec_emb = dec_emb_layer(decoder_inputs) # 패딩 0은 연산에서 제외
dec_masking = Masking(mask_value=0.0)(dec_emb)

# 상태값 리턴을 위해 return_state는 True, 모든 시점에 대해서 단어를 예측하기 위해 return_sequences는 True
decoder_lstm = LSTM(hidden_units, return_sequences=True, return_state=True)  

# 인코더의 은닉 상태를 초기 은닉 상태(initial_state)로 사용
decoder_outputs, _, _ = decoder_lstm(dec_masking,
                                     initial_state=encoder_states)

# 모든 시점의 결과에 대해서 소프트맥스 함수를 사용한 출력층을 통해 단어 예측
decoder_dense = Dense(tar_vocab_size, activation='softmax') #vocab size만큼의 단어 분포 나오고 거기서 softmax
decoder_outputs = decoder_dense(decoder_outputs) #output을 dense에 넣어줌

# 모델의 입력과 출력을 정의.
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])
#sparse cross entropy는 결과 값을 one-hot coding하지 않았을 때 cross entropy를 구해주는 loss function

In [19]:
decoder_outputs.shape

TensorShape([None, None, 2203])

In [20]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 embedding (Embedding)          (None, None, 64)     202048      ['input_1[0][0]']                
                                                                                                  
 embedding_1 (Embedding)        (None, None, 64)     140992      ['input_2[0][0]']                
                                                                                              

In [21]:
model.fit(x=[encoder_input_train, decoder_input_train], y=decoder_target_train, \
          validation_data=([encoder_input_test, decoder_input_test], decoder_target_test),
          batch_size=64, epochs=25)

Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25
Epoch 17/25
Epoch 18/25
Epoch 19/25
Epoch 20/25
Epoch 21/25
Epoch 22/25
Epoch 23/25
Epoch 24/25
Epoch 25/25


<keras.callbacks.History at 0x7f1ebb9f6d50>

## 학습 이후 학습 된 모델로 machine translation 실행

In [22]:
# 인코더는 train에서 사용되고 이미 train된 애들을 그대로 가져옵니다
encoder_model = Model(encoder_inputs, encoder_states)

# 디코더 설계 시작
# 이전 시점의 상태를 보관할 텐서
decoder_state_input_h = Input(shape=(hidden_units,))
decoder_state_input_c = Input(shape=(hidden_units,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

# 훈련 때 사용했던 임베딩 층을 재사용
dec_emb2 = dec_emb_layer(decoder_inputs)

# 다음 단어 예측을 위해 이전 시점의 상태를 현 시점의 초기 상태로 사용
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
#initial state에 decoder state input h decoder state input c를 넣어줌 
decoder_states2 = [state_h2, state_c2]

# 모든 시점에 대해서 단어 예측
decoder_outputs2 = decoder_dense(decoder_outputs2) #output을 desne에 넣어서 확률 처리

# 수정된 디코더
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs2] + decoder_states2)

In [23]:
#디코더를 컨트롤하기 위한 함수, test과정에서 사용할 함수

def decode_sequence(input_seq):
  # 입력으로부터 인코더의 마지막 시점의 상태(은닉 상태, 셀 상태)를 얻음
  states_value = encoder_model.predict(input_seq)

  # <SOS>에 해당하는 정수 생성
  target_seq = np.zeros((1,1))
  target_seq[0, 0] = tar_to_index['<sos>'] #시작토큰의 inex를 시작에 넣어줌

  stop_condition = False
  decoded_sentence = ''

  # stop_condition이 True가 될 때까지 루프 반복
  # 구현의 간소화를 위해서 이 함수는 배치 크기를 1로 가정합니다.
  while not stop_condition:
    # 이점 시점의 상태 states_value를 현 시점의 초기 상태로 사용
    output_tokens, h, c = decoder_model.predict([target_seq] + states_value) 

    # 예측 결과를 단어로 변환
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = index_to_tar[sampled_token_index]

    # 현재 시점의 예측 단어를 예측 문장에 추가
    decoded_sentence += ' '+sampled_char

    # <eos>에 도달하거나 정해진 길이를 넘으면 중단.
    if (sampled_char == '<eos>' or
        len(decoded_sentence) > 50):
        stop_condition = True

    # 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = sampled_token_index

    # 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
    states_value = [h, c]

  return decoded_sentence

In [24]:
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence

In [28]:
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_train[seq_index: seq_index + 1]
  decoded_sentence = decode_sequence(input_seq)

  print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
  print("번역문장 :",decoded_sentence[1:-5])
  print("-"*50)

입력문장 : 그 아빠 에 그 아들 . 
정답문장 : Like father like son . 
번역문장 : The re a a age . 
--------------------------------------------------
입력문장 : 우리 가 살아남았어 ! 
정답문장 : We survived ! 
번역문장 : They ! 
--------------------------------------------------
입력문장 : 지금 바빠 ? 
정답문장 : Are you busy now ? 
번역문장 : You can you ? 
--------------------------------------------------
입력문장 : 나 는 그것 을 버리고 싶지 않다 . 
정답문장 : I don t want to throw that away . 
번역문장 : I m a a a a ? 
--------------------------------------------------
입력문장 : 그건 네 책임 이야 . 
정답문장 : That s your funeral . 
번역문장 : You is a . 
--------------------------------------------------
