In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.chdir('drive/MyDrive/DL2024_1/Attention')

%load_ext autoreload
%autoreload 2

In [None]:
import os
import shutil
import zipfile

import pandas as pd
import tensorflow as tf
import urllib3
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

In [None]:
lines = pd.read_csv('dataset/fra.txt', names=['src', 'tar', 'lic'], sep='\t')
del lines['lic']
print('전체 샘플의 개수 :',len(lines))

In [None]:
lines = lines.loc[:, 'src':'tar']
lines = lines[0:30000] # 3만개만 사용
lines.sample(10)

In [None]:
lines.tar = lines.tar.apply(lambda x : '\t '+ x + ' \n')
lines.sample(10)

In [None]:
# 문자 집합 구축
src_vocab = set()
for line in lines.src: # 1줄씩 읽음
    for char in line: # 1개의 문자씩 읽음
        src_vocab.add(char)

tar_vocab = set()
for line in lines.tar:
    for char in line:
        tar_vocab.add(char)
src_vocab_size = len(src_vocab)+1
tar_vocab_size = len(tar_vocab)+1
print('source 문장의 char 집합 :',src_vocab_size)
print('target 문장의 char 집합 :',tar_vocab_size)


In [None]:
src_vocab = sorted(list(src_vocab))
tar_vocab = sorted(list(tar_vocab))
print(src_vocab[45:75])
print(tar_vocab[45:75])


In [None]:
src_to_index = dict([(word, i+1) for i, word in enumerate(src_vocab)])
tar_to_index = dict([(word, i+1) for i, word in enumerate(tar_vocab)])
print(src_to_index)
print(tar_to_index)

In [None]:
encoder_input = []

# 1개의 문장
for line in lines.src:
  encoded_line = []
  # 각 줄에서 1개의 char
  for char in line:
    # 각 char을 정수로 변환
    encoded_line.append(src_to_index[char])
  encoder_input.append(encoded_line)
print('source 문장의 정수 인코딩 :',encoder_input[:5])


In [None]:
decoder_input = []
for line in lines.tar:
  encoded_line = []
  for char in line:
    encoded_line.append(tar_to_index[char])
  decoder_input.append(encoded_line)
print('target 문장의 정수 인코딩 :',decoder_input[:5])


In [None]:
decoder_target = []
for line in lines.tar:
  timestep = 0
  encoded_line = []
  for char in line:
    if timestep > 0:
      encoded_line.append(tar_to_index[char])
    timestep = timestep + 1
  decoder_target.append(encoded_line)
print('target 문장 레이블의 정수 인코딩 :',decoder_target[:5])


In [None]:
max_src_len = max([len(line) for line in lines.src])
max_tar_len = max([len(line) for line in lines.tar])
print('source 문장의 최대 길이 :',max_src_len)
print('target 문장의 최대 길이 :',max_tar_len)


In [None]:
encoder_input = pad_sequences(encoder_input, maxlen=max_src_len, padding='post')
decoder_input = pad_sequences(decoder_input, maxlen=max_tar_len, padding='post')
decoder_target = pad_sequences(decoder_target, maxlen=max_tar_len, padding='post')
encoder_input = to_categorical(encoder_input)
decoder_input = to_categorical(decoder_input)
decoder_target = to_categorical(decoder_target)


# Attention layer를 추가한 Seq2Seq 모델 학습해보기

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Concatenate, Dot, Activation, Lambda, Softmax
from tensorflow.keras.optimizers import RMSprop
import numpy as np
import tensorflow as tf

In [None]:
# 어텐션 레이어
class AttentionLayer(tf.keras.layers.Layer):
    def __init__(self):
        super(AttentionLayer, self).__init__()

    def call(self, query, key, value):
        scores = tf.matmul(query, key, transpose_b=True)
        attention_weights = Softmax(axis=-1)(scores)
        context_vector = tf.matmul(attention_weights, value)
        return context_vector, attention_weights

In [None]:
# 인코더 정의
encoder_inputs = Input(shape=(None, src_vocab_size))
encoder_lstm = LSTM(256, return_sequences=True, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(encoder_inputs)

# 디코더 정의
decoder_inputs = Input(shape=(None, tar_vocab_size))
decoder_lstm = LSTM(256, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=[state_h, state_c])


In [None]:

# 어텐션 레이어 추가
# 1) AttentioLayer  선언
# 2) context_vector, attention_weights 에 출력 담기
'''


'''

# 컨텍스트 벡터와 디코더 출력을 연결
decoder_concat_input = Concatenate(axis=-1)([context_vector, decoder_outputs])

# 출력 레이어
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_concat_input)

# 전체 모델 정의
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 모델 컴파일
model.compile(optimizer=RMSprop(), loss='categorical_crossentropy', metrics=['accuracy'])

# 모델 요약
model.summary()

In [None]:
# 모델 학습
model.fit(
    [encoder_input, decoder_input],
    decoder_target,
    batch_size=64,
    epochs=40,
    validation_split=0.2
)



In [None]:
index_to_src = dict((i, char) for char, i in src_to_index.items())
index_to_tar = dict((i, char) for char, i in tar_to_index.items())


In [None]:
# 인코더 모델
encoder_model = Model(encoder_inputs, [encoder_outputs, state_h, state_c])

# 디코더
# 입력 정의
decoder_state_input_h = Input(shape=(256,), name="decoder_state_input_h")
decoder_state_input_c = Input(shape=(256,), name="decoder_state_input_c")

# 디코더 LSTM
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=[decoder_state_input_h, decoder_state_input_c])

# 어텐션 레이어 추가
context_vector, attention_weights = attention_layer(decoder_outputs, encoder_outputs, encoder_outputs)

# 컨텍스트 벡터와 디코더 출력을 결합
decoder_concat_input = Concatenate(name="concatenate_layer")([context_vector, decoder_outputs])

# 최종 출력 레이어
decoder_final_output = decoder_dense(decoder_concat_input)

# 디코더 모델 생성
decoder_model = Model(
    inputs=[decoder_inputs, encoder_outputs, decoder_state_input_h, decoder_state_input_c],
    outputs=[decoder_final_output, state_h, state_c, attention_weights]
)


# 오류 메시지에 나타난 문제를 디버깅하기 위해 모델의 개요를 출력
encoder_model.summary()
decoder_model.summary()



In [None]:

# 번역 결과를 디코딩하는 함수
def decode_sequence(input_seq):
    # 인코더의 상태를 얻음
    encoder_output, state_h, state_c = encoder_model.predict(input_seq)

    # 디코더의 초기 입력 (시작 심볼)
    target_seq = np.zeros((1, 1, tar_vocab_size))
    target_seq[0, 0, tar_to_index['\t']] = 1.

    # 디코딩 루프
    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c, a = decoder_model.predict([target_seq, encoder_output, state_h, state_c])

        # 샘플링
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = index_to_tar[sampled_token_index]
        decoded_sentence += sampled_char

        # 종료 조건: 최대 길이 초과 또는 종료 심볼
        if (sampled_char == '\n' or len(decoded_sentence) > max_tar_len):
            stop_condition = True

        # 다음 디코더 입력 업데이트
        target_seq = np.zeros((1, 1, tar_vocab_size))
        target_seq[0, 0, sampled_token_index] = 1.

        # 상태 업데이트
        state_h, state_c = h, c

    return decoded_sentence

In [None]:
# 테스트 데이터 사용 예시
for seq_index in [3,50,100,300,1001]: # 입력 문장의 인덱스
  input_seq = encoder_input[seq_index:seq_index+1]
  decoded_sentence = decode_sequence(input_seq)
  print(35 * "-")
  print('입력 문장:', lines.src[seq_index])
  print('정답 문장:', lines.tar[seq_index][2:len(lines.tar[seq_index])-1]) # '\t'와 '\n'을 빼고 출력
  print('번역 문장:', decoded_sentence[1:len(decoded_sentence)-1]) # '\n'을 빼고 출력
