<a href="https://colab.research.google.com/github/yejikwon7/DeepLearning/blob/main/seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

RNN을 이용한 인코더-디코더

In [None]:
import os
import re
import shutil
import zipfile

import numpy as np
import pandas as pd
import tensorflow as tf
import unicodedata
import urllib3
from tensorflow.keras.layers import Embedding, GRU, Dense
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer

In [None]:
import requests

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWeKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}

def download_zip(url, output_path):
    response = requests.get(url, headers=headers, stream=True)
    if response.status_code == 200:
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"ZIP file downloaded to {output_path}")
    else:
        print(f"Failed to download. HTTP REsponse code: {response.status_code}")

# zip 파일 풀기
url = "http://www.manythings.org/anki/fra-eng.zip" # 단어 mapping된 구조: corpus로 만들어짐, 사용할 내용
output_path = "fra-eng.zip"
download_zip(url, output_path)

path = os.getcwd()
zipfilename = os.path.join(path, output_path)

with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
    zip_ref.extractall(path)

ZIP file downloaded to fra-eng.zip


In [None]:
# sample 수 제한
num_samples = 33000

전처리: 문장에서 불필요한 부분 제거

In [None]:
def to_ascii(s):
    # 프랑스어 악센트(accent) 삭제
    return ''.join(c for c in unicodedata.normalize('NFD', s)
                    if unicodedata.category(c) !='Mn')

def preprocess_sentence(sent):
    # 악센트 제거 함수 호출
    sent = to_ascii(sent.lower())

    # 단어와 구두점 사이에 공백 추가
    # ex) "I am a student." -> "I am a student ."
    sent = re.sub(r"([?.!,¿])", r" \1", sent)

    # (a-z, A-Z, ".", "?", "!", ",") 이들을 제외하고는 전부 공백으로 변환
    sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent) # 문장은 단어별로 구성되므로 단어에 붙어 있는 마침표 뗌

    # 다수 개의 공백을 하나의 공백으로 치환
    sent = re.sub(r"\s+", " ", sent)
    return sent

In [None]:
# 전처리 테스트
en_sent = u"Have you had dinner?"
fr_sent = u"Avez-vous déjà diné?" # '-' 사라지고 공백으로 변경

print('전처리 전 영어 문장: ', en_sent)
print('전처리 후 영어 문장: ', preprocess_sentence(en_sent))
print('전처리 전 프랑스어 문장: ', fr_sent)
print('전처리 후 프랑스어 문장: ', preprocess_sentence(fr_sent))

전처리 전 영어 문장:  Have you had dinner?
전처리 후 영어 문장:  have you had dinner ?
전처리 전 프랑스어 문장:  Avez-vous déjà diné?
전처리 후 프랑스어 문장:  avez vous deja dine ?


In [None]:
# 데이터 만들기
def load_preprocessed_data():
  encoder_input, decoder_input, decoder_target = [], [], [] # 세 개 데이터 생성

  with open("fra.txt", "r") as lines: # line by line으로 읽음
    for i, line in enumerate(lines):
      # source 데이터와 target 데이터 분리
      src_line, tar_line, _ = line.strip().split('\t') # src_line: 영어 문장, tar_line: 프랑스

      # source 데이터 전처리
      src_line = [w for w in preprocess_sentence(src_line).split()]

      # target 데이터 전처리
      tar_line = preprocess_sentence(tar_line)
      tar_line_in = [w for w in ("<sos> " + tar_line).split()] # 디코더 input
      tar_line_out = [w for w in (tar_line + " <eos>").split()] # 디코더 output

      encoder_input.append(src_line)
      decoder_input.append(tar_line_in)
      decoder_target.append(tar_line_out)

      if i == num_samples - 1:
        break

  return encoder_input, decoder_input, decoder_target

In [None]:
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data() # sents_en_in: 입력, sents_fra_in: 디코더 입력 데이터, sents_fra_out: 디코더 출력 결과
print('인코더의 입력:', sents_en_in[:5])
print('디코더의 입력:', sents_fra_in[:5]) # 교사 강요
print('디코더의 레이블:', sents_fra_out[:5])

인코더의 입력: [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더의 입력: [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더의 레이블: [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]


In [None]:
# 단어 -> 정수화
tokenizer_en = Tokenizer(filters="", lower=False)
tokenizer_en.fit_on_texts(sents_en_in)
encoder_input = tokenizer_en.texts_to_sequences(sents_en_in) # text -> sequence 변환
encoder_input = pad_sequences(encoder_input, padding="post") # padding: 문장 길이 맞추기 위함

tokenizer_fra = Tokenizer(filters="", lower=False)
# in, out 똑같이 fitting
tokenizer_fra.fit_on_texts(sents_fra_in)
tokenizer_fra.fit_on_texts(sents_fra_out)

# sequence 변환
decoder_input = tokenizer_fra.texts_to_sequences(sents_fra_in)
decoder_input = pad_sequences(decoder_input, padding="post")

decoder_target = tokenizer_fra.texts_to_sequences(sents_fra_out)
decoder_target = pad_sequences(decoder_target, padding="post")

In [None]:
src_vocab_size = len(tokenizer_en.word_index) + 1
tar_vocab_size = len(tokenizer_fra.word_index) + 1
print("영어 단어 집합의 크기: {:d}, 프랑스어 단어 집합의 크기: {:d}".format(src_vocab_size, tar_vocab_size))

영어 단어 집합의 크기: 4485, 프랑스어 단어 집합의 크기: 7878


In [None]:
# 인코더, 디코더 크기 지정
print('인코더의 입력의 크기(shape):', encoder_input.shape) # encoder_input.shape: 하나의 문장 내 단어 개수
print('디코더의 입력의 크기(shape):', decoder_input.shape)
print('디코더의 레이블의 크기(shape):', decoder_target.shape)

인코더의 입력의 크기(shape): (33000, 7)
디코더의 입력의 크기(shape): (33000, 16)
디코더의 레이블의 크기(shape): (33000, 16)


In [None]:
# 단어 자체를 하나의 숫자로 바꿈
src_to_index = tokenizer_en.word_index # word -> 숫자
index_to_src = tokenizer_en.index_word # 숫자 -> word
tar_to_index = tokenizer_fra.word_index
index_to_tar = tokenizer_fra.index_word

In [None]:
# index 변경
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices) # shuffle된 index에 맞게 데이터 변경
print('랜덤 시퀀스:', indices)

랜덤 시퀀스: [17047 10695  1407 ...  8354 23697 13168]


In [None]:
# index에 따라 인코더, 디코더 타켓 구성
# 데이터 변경
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

In [None]:
print('encoder input 샘플 출력: ', encoder_input[30997])
print('decoder input 샘플 출력: ', decoder_input[30997])
print('decoder target 샘플 출력: ', decoder_target[30997])

encoder input 샘플 출력:  [   2   20 2178    1    0    0    0]
decoder input 샘플 출력:  [   2   12   77 2334    1    0    0    0    0    0    0    0    0    0
    0    0]
decoder target 샘플 출력:  [  12   77 2334    1    3    0    0    0    0    0    0    0    0    0
    0    0]


In [None]:
n_of_val = int(33000*0.1)

encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

In [None]:
print('훈련 source 데이터의 크기:', encoder_input_train.shape)
print('훈련 target 데이터의 크기:', decoder_input_train.shape)
print('훈련 target 레이블의 크기:', decoder_target_train.shape)
print('테스트 source 데ㅓ이터의 크기:', encoder_input_test.shape)
print('테스트 target 데이터의 크기:', decoder_input_test.shape)
print('테스트 target 레이블의 크기:', decoder_target_test.shape)

훈련 source 데이터의 크기: (29700, 7)
훈련 target 데이터의 크기: (29700, 16)
훈련 target 레이블의 크기: (29700, 16)
테스트 source 데ㅓ이터의 크기: (3300, 7)
테스트 target 데이터의 크기: (3300, 16)
테스트 target 레이블의 크기: (3300, 16)


In [None]:
# 모델 구성: 모델 간단함
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking
from tensorflow.keras.models import Model

embedding_dim = 64 # 한 단어를 64개의 vector로 만듦(글->숫자->벡터): 연관성 만들어주는 단계
hidden_units = 64 # node 개수

In [None]:
# 인코더
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(src_vocab_size, embedding_dim)(encoder_inputs) # 임베딩 층
enc_masking = Masking(mask_value=0.0)(enc_emb) # 패딩 0은 연산에서 제외 # layer
encoder_lstm = LSTM(hidden_units, return_state=True) # 상태값 리턴을 위해 return_state는 True # LSTM layer 정의
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking) # 은닉 상태와 셀 상태를 리턴 # 실제 사용
encoder_states = [state_h, state_c] # 인코더의 은닉 상태와 셀 상태를 저장

In [None]:
# 디코더
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(tar_vocab_size, hidden_units) # 임베딩 층 # 임베딩 정의
dec_emb = dec_emb_layer(decoder_inputs) # 임베딩 실제 실행
dec_masking = Masking(mask_value=0.0)(dec_emb) # 패딩 0은 연산에서 제외

# 상태값 리턴을 위해 return_state는 True, 모든 시점에 대해서 단어를 예측하기 위해 return_sequences는 True
decoder_lstm = LSTM(hidden_units, return_sequences=True, return_state=True) # 정의

# 인코더의 은닉 상태를 초기 은닉 상태(initial_state)로 사용
# 인코더 결과값 + 디코더 입력 -> 디코더 output 생성
decoder_outputs, _, _ = decoder_lstm(dec_masking,
                                     initial_state=encoder_states) # 실제 적용

# 모든 시점의 결과에 대해서 소프트맥스 함수를 사용한 출력층을 통해 단어 예측
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs) # 최종 output 생성

# 모델의 입력과 출력을 정의.
# 함수형 api 형태: 입출력 명확히 나옴
# 모델 뒤에 구성: 입력, 결과를 전체 모델로 구성
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])

model.fit(x=[encoder_input_train, decoder_input_train], y=decoder_target_train, \
          validation_data=([encoder_input_test, decoder_input_test], decoder_target_test),
          batch_size=128, epochs=50)

Epoch 1/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 31ms/step - acc: 0.6048 - loss: 5.0273 - val_acc: 0.6205 - val_loss: 2.0016
Epoch 2/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 30ms/step - acc: 0.6473 - loss: 1.9074 - val_acc: 0.7437 - val_loss: 1.7214
Epoch 3/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 30ms/step - acc: 0.7453 - loss: 1.6615 - val_acc: 0.7497 - val_loss: 1.5745
Epoch 4/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 29ms/step - acc: 0.7560 - loss: 1.5238 - val_acc: 0.7637 - val_loss: 1.4573
Epoch 5/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 31ms/step - acc: 0.7683 - loss: 1.4014 - val_acc: 0.7814 - val_loss: 1.3522
Epoch 6/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 30ms/step - acc: 0.7850 - loss: 1.2990 - val_acc: 0.7978 - val_loss: 1.2684
Epoch 7/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 29

<keras.src.callbacks.history.History at 0x781075df4d60>

In [None]:
# 인코더
# 디코더 다시 설계: test 위한 설계, 인코더 output에 들어가지 않도록
# 인코더, 디코더 따로 -> 연결
encoder_model = Model(encoder_inputs, encoder_states)

# 디코더 설계 시작
# 이전 시점의 상태를 보관할 텐서
decoder_state_input_h = Input(shape=(hidden_units,))
decoder_state_input_c = Input(shape=(hidden_units,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c] # 실제 사용하는 값

# 훈련 때 사용했던 임베딩 층을 재사용
dec_emb2 = dec_emb_layer(decoder_inputs)

# 다음 단어 예측을 위해 이전 시점의 상태를 현 시점의 초기 상태로 사용
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]

# 모든 시점에 대해서 단어 예측
decoder_outputs2 = decoder_dense(decoder_outputs2)

# 수정된 디코더
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs2] + decoder_states2)

In [None]:
def decode_sequence(input_seq):
  # 입력으로부터 인코더의 마지막 시점의 상태(은닉 상태, 셀 상태)를 얻음
  states_value = encoder_model.predict(input_seq)

  # <SOS>에 해당하는 정수 생성
  target_seq = np.zeros((1,1))
  target_seq[0, 0] = tar_to_index['<sos>']

  stop_condition = False
  decoded_sentence = ''

  # stop_condition이 True가 될 때까지 루프 반복
  # 구현의 간소화를 위해서 이 함수는 배치 크기를 1로 가정합니다.
  while not stop_condition:
    # 이점 시점의 상태 states_value를 현 시점의 초기 상태로 사용
    output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

    # 예측 결과를 단어로 변환
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = index_to_tar[sampled_token_index]

    # 현재 시점의 예측 단어를 예측 문장에 추가
    decoded_sentence += ' '+sampled_char

    # <eos>에 도달하거나 정해진 길이를 넘으면 중단.
    if (sampled_char == '<eos>' or
        len(decoded_sentence) > 50):
        stop_condition = True

    # 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = sampled_token_index

    # 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
    states_value = [h, c]

  return decoded_sentence

In [None]:
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence

In [None]:
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_train[seq_index: seq_index + 1]
  decoded_sentence = decode_sequence(input_seq)

  print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
  print("번역문장 :",decoded_sentence[1:-5])
  print("-"*50)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 284ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 189ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 33ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
입력문장 : you broke the law . 
정답문장 : vous avez enfreint la loi . 
번역문장 : tu as enfreint la loi . 
--------------------------------------------------
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[