# Text Summarization with Attention mechanism

추출적 요약(extractive summarization) 
추상적 요약(abstractive summarization) <--

In [None]:
import numpy as np
import pandas as pd
import re
import matplotlib.pyplot as plt
import urllib.request

import random
np.random.seed(seed=0)

# Loading 단축어, 불용어
from contractions import contractions

# Data Loading
* Data Loading
    - 단축어, 불용어 로딩
    - 학습 데이터 로딩
* Data Preprocessing
    - Duplication
    - 불용어 제거
    - Remove Long Sentences

In [None]:
# Loading 단축어, 불용어
import nltk
from nltk.corpus import stopwords

nltk.download('all')
stop_words_set = set(stopwords.words('english'))  # stop words for summary

len(stop_words_set), random.sample(stop_words_set, 10)

In [None]:
# Reviews.csv 파일을 data라는 이름의 데이터프레임에 저장. 단, 10만개의 행(rows)으로 제한.
data = pd.read_csv("Reviews.csv")
data = data[['Text','Summary']]    # X : Text, Y : Summary
print('Reviews :', (len(data)))
print('Uniques :', data['Text'].nunique())

# Data Preprocessing1 : Remove Duplication
data.drop_duplicates(subset = ['Text'], inplace=True)
data.dropna(axis=0, inplace=True)
print('Samples :',(len(data)))

In [None]:
# Data Preprocessing2 : Remove Unnecessary Words 
#     - removed words : TAG, contractions, stopwords

from bs4 import BeautifulSoup 

def preprocess_sentence(sentence):
    sentence = BeautifulSoup(sentence.lower(), "lxml").text    # HTML TAG
    sentence = re.sub(r'\([^)]*\)', '', sentence)              # 괄호안 문자열
    sentence = re.sub('"','', sentence)                        # 따옴표 "
    # remove contractions
    sentence = ' '.join([contractions[t] if t in contractions else t for t in sentence.split()])
    sentence = re.sub(r"'s\b","",sentence)         # 소유격
    sentence = re.sub("[^a-zA-Z]", " ", sentence)  # 숫자, 특수문자
    sentence = re.sub('[m]{2,}', 'mm', sentence)   # m이 3개 이상.

    # 불용어 제거 (Text)
    tokens = ' '.join(word for word in sentence.split() if not word in stop_words_set if len(word) > 1).strip()
    return tokens if tokens else None

# Data Preprocessing2 : Remove Unnecessary Words 
data['Text'] = data['Text'].map(lambda s : preprocess_sentence(s))
data['Summary'] = data['Summary'].map(lambda s : preprocess_sentence(s))
data.dropna(inplace=True, )

In [None]:
# Data Preprocessing3 : Fix Word Size 

data['text_len'] = data.Text.map(lambda m: len(m.split()))
data['summary_len'] = data.Summary.map(lambda m: len(m.split()))

print (len(data), len(data.loc[data['text_len'] > 50]))
print (len(data), len(data.loc[data['summary_len'] > 8]))

# data 50 words 를 넘는게 25% 미만 : 학습을 위해 제거함
data = data.loc[data['text_len'] <= 50]
data = data.loc[data['summary_len'] <= 8]
data.head()

# 학습/검증 데이터 분리

In [None]:
from sklearn.model_selection import train_test_split

# 요약 데이터에는 시작 토큰과 종료 토큰을 추가한다.
data['encoder_input'] = data['Text']
data['decoder_input'] = data['Summary'].apply(lambda x : 'sostoken '+ x)
data['decoder_target'] = data['Summary'].apply(lambda x : x + ' eostoken')

train, test = train_test_split(data, test_size=0.2, shuffle=True, random_state=34)

print('Train Data :', len(train))
print('Test Data  :', len(test))

encoder_input_train = train['Text']
decoder_input_train = train['decoder_input']
decoder_target_train = train['decoder_target']

encoder_input_test = test['Text']
decoder_input_test = test['decoder_input']
decoder_target_test = test['decoder_target']

# Embedding & Padding


In [None]:
# Hyper Parameter

text_max_len = 50
summary_max_len = 8
src_vocab = 8000
tar_vocab = 2000

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer 

src_tokenizer = Tokenizer()
src_tokenizer.fit_on_texts(encoder_input_train)

src_tokenizer = Tokenizer(num_words = src_vocab) 
src_tokenizer.fit_on_texts(encoder_input_train)

# 텍스트 시퀀스를 정수 시퀀스로 변환
encoder_input_train = src_tokenizer.texts_to_sequences(encoder_input_train) 
encoder_input_test = src_tokenizer.texts_to_sequences(encoder_input_test)

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer 

tar_tokenizer = Tokenizer()
tar_tokenizer.fit_on_texts(decoder_input_train)

tar_tokenizer = Tokenizer(num_words = tar_vocab) 
tar_tokenizer.fit_on_texts(decoder_input_train)
tar_tokenizer.fit_on_texts(decoder_target_train)

# 텍스트 시퀀스를 정수 시퀀스로 변환
decoder_input_train = tar_tokenizer.texts_to_sequences(decoder_input_train) 
decoder_target_train = tar_tokenizer.texts_to_sequences(decoder_target_train)
decoder_input_test = tar_tokenizer.texts_to_sequences(decoder_input_test)
decoder_target_test = tar_tokenizer.texts_to_sequences(decoder_target_test)

In [None]:
# Drop NA
drop_train = [index for index, sentence in enumerate(decoder_input_train) if len(sentence) == 1]
drop_test = [index for index, sentence in enumerate(decoder_input_test) if len(sentence) == 1]

encoder_input_train = np.delete(encoder_input_train, drop_train, axis=0)
decoder_input_train = np.delete(decoder_input_train, drop_train, axis=0)
decoder_target_train = np.delete(decoder_target_train, drop_train, axis=0)

encoder_input_test = np.delete(encoder_input_test, drop_test, axis=0)
decoder_input_test = np.delete(decoder_input_test, drop_test, axis=0)
decoder_target_test = np.delete(decoder_target_test, drop_test, axis=0)

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Padding
encoder_input_train = pad_sequences(encoder_input_train, maxlen = text_max_len, padding='post')
encoder_input_test = pad_sequences(encoder_input_test, maxlen = text_max_len, padding='post')
decoder_input_train = pad_sequences(decoder_input_train, maxlen = summary_max_len, padding='post')
decoder_target_train = pad_sequences(decoder_target_train, maxlen = summary_max_len, padding='post')
decoder_input_test = pad_sequences(decoder_input_test, maxlen = summary_max_len, padding='post')
decoder_target_test = pad_sequences(decoder_target_test, maxlen = summary_max_len, padding='post')

# seq2seq + attention으로 요약 모델 테스트하기

In [None]:
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
embedding_dim = 128
hidden_size = 256

# 인코더
encoder_inputs = Input(shape=(text_max_len,))

# 인코더의 임베딩 층
enc_emb = Embedding(src_vocab, embedding_dim)(encoder_inputs)

# 인코더의 LSTM 1
encoder_lstm1 = LSTM(hidden_size, return_sequences=True, return_state=True ,dropout = 0.4, recurrent_dropout = 0.4)
encoder_output1, state_h1, state_c1 = encoder_lstm1(enc_emb)

# 인코더의 LSTM 2
encoder_lstm2 = LSTM(hidden_size, return_sequences=True, return_state=True, dropout=0.4, recurrent_dropout=0.4)
encoder_output2, state_h2, state_c2 = encoder_lstm2(encoder_output1)

# 인코더의 LSTM 3
encoder_lstm3 = LSTM(hidden_size, return_state=True, return_sequences=True, dropout=0.4, recurrent_dropout=0.4)
encoder_outputs, state_h, state_c= encoder_lstm3(encoder_output2)

In [None]:
# 디코더
decoder_inputs = Input(shape=(None,))

# 디코더의 임베딩 층
dec_emb_layer = Embedding(tar_vocab, embedding_dim)
dec_emb = dec_emb_layer(decoder_inputs)

# 디코더의 LSTM
decoder_lstm = LSTM(hidden_size, return_sequences = True, return_state = True, dropout = 0.4, recurrent_dropout=0.2)
decoder_outputs, _, _ = decoder_lstm(dec_emb, initial_state = [state_h, state_c])

In [None]:
# 디코더의 출력층
decoder_softmax_layer = Dense(tar_vocab, activation = 'softmax')
decoder_softmax_outputs = decoder_softmax_layer(decoder_outputs) 

# 모델 정의
model = Model([encoder_inputs, decoder_inputs], decoder_softmax_outputs)
model.summary()

In [None]:
from attention import AttentionLayer

# 어텐션 층(어텐션 함수)
attn_layer = AttentionLayer(name='attention_layer')
attn_out, attn_states = attn_layer([encoder_outputs, decoder_outputs])

# 어텐션의 결과와 디코더의 hidden state들을 연결
decoder_concat_input = Concatenate(axis = -1, name='concat_layer')([decoder_outputs, attn_out])

# 디코더의 출력층
decoder_softmax_layer = Dense(tar_vocab, activation='softmax')
decoder_softmax_outputs = decoder_softmax_layer(decoder_concat_input)

# 모델 정의
model = Model([encoder_inputs, decoder_inputs], decoder_softmax_outputs)
model.compile(optimizer='rmsprop', loss='sparse_categorical_crossentropy')

model.summary()

In [None]:
import tensorflow as tf
tf.keras.utils.plot_model(model, to_file='model.png', show_shapes=True)

# 모델 학습

In [None]:
# Define Callback : EarlyStopping
early_stop = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience = 2)

# Fit Model
history = model.fit(x = [encoder_input_train[:30000], decoder_input_train[:30000]], y = decoder_target_train[:30000], \
          validation_data = ([encoder_input_test[:6000], decoder_input_test[:6000]], decoder_target_test[:6000]),
          batch_size = 256, callbacks=[early_stop], epochs = 1)

In [None]:
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.legend()
plt.show()

# Test Model : seq2seq + attention

In [None]:
# 테스트를 위해 필요한 3개의 사전을 만듭니다.

src_index_to_word = src_tokenizer.index_word # 원문 단어 집합에서 정수 -> 단어를 얻음
tar_word_to_index = tar_tokenizer.word_index # 요약 단어 집합에서 단어 -> 정수를 얻음
tar_index_to_word = tar_tokenizer.index_word # 요약 단어 집합에서 정수 -> 단어를 얻음

In [None]:
# 인코더 설계
encoder_model = Model(inputs=encoder_inputs, outputs=[encoder_outputs, state_h, state_c])

# 이전 시점의 상태들을 저장하는 텐서
decoder_state_input_h = Input(shape=(hidden_size,))
decoder_state_input_c = Input(shape=(hidden_size,))

dec_emb2 = dec_emb_layer(decoder_inputs)
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=[decoder_state_input_h, decoder_state_input_c])

In [None]:
# 어텐션 함수
decoder_hidden_state_input = Input(shape=(text_max_len, hidden_size))
attn_out_inf, attn_states_inf = attn_layer([decoder_hidden_state_input, decoder_outputs2])
decoder_inf_concat = Concatenate(axis=-1, name='concat')([decoder_outputs2, attn_out_inf])

# 디코더의 출력층
decoder_outputs2 = decoder_softmax_layer(decoder_inf_concat) 

# 최종 디코더 모델
decoder_model = Model(
    [decoder_inputs] + [decoder_hidden_state_input,decoder_state_input_h, decoder_state_input_c],
    [decoder_outputs2] + [state_h2, state_c2])

# Save & Load Model

In [None]:
from tensorflow import keras
encoder_model.save('/home/jonghwanchae/my_enc_model.h5')
decoder_model.save('/home/jonghwanchae/my_dec_model.h5')

In [None]:
from tensorflow import keras
from attention import AttentionLayer

encoder_model2 = keras.models.load_model("/home/jonghwanchae/my_enc_model.h5")
decoder_model2 = keras.models.load_model("/home/jonghwanchae/my_dec_model.h5")

# Model Test

In [None]:
def decode_sequence(input_seq):
    # 입력으로부터 인코더의 상태를 얻음
    e_out, e_h, e_c = encoder_model.predict(input_seq)

     # <SOS>에 해당하는 토큰 생성
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = tar_word_to_index['sostoken']

    stop_condition = False
    decoded_sentence = ''
    while not stop_condition: # stop_condition이 True가 될 때까지 루프 반복

        output_tokens, h, c = decoder_model.predict([target_seq] + [e_out, e_h, e_c])
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_token = tar_index_to_word[sampled_token_index]

        if(sampled_token!='eostoken'):
            decoded_sentence += ' '+sampled_token

        #  <eos>에 도달하거나 최대 길이를 넘으면 중단.
        if (sampled_token == 'eostoken'  or len(decoded_sentence.split()) >= (summary_max_len-1)):
            stop_condition = True

        # 길이가 1인 타겟 시퀀스를 업데이트
        target_seq = np.zeros((1,1))
        target_seq[0, 0] = sampled_token_index

        # 상태를 업데이트 합니다.
        e_h, e_c = h, c

    return decoded_sentence

In [None]:
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq2text(input_seq):
    temp = ' '.join([src_index_to_word[i] for i in input_seq if i!=0 ])
    return temp

# 요약문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq2summary(input_seq):
    temp = ' '.join([tar_index_to_word[i] for i in input_seq \
                     if i != 0 and i != tar_word_to_index['sostoken'] and i != tar_word_to_index['eostoken'] ])
    return temp

for i in range(500, 1000):
    print("원문 : ",seq2text(encoder_input_test[i]))
    print("실제 요약문 :",seq2summary(decoder_input_test[i]))
    print("예측 요약문 :",decode_sequence(encoder_input_test[i].reshape(1, text_max_len)))
    print("\n")