In [None]:
from tensorflow import keras
import numpy as np

path = keras.utils.get_file(
    'rnn_short_toji.txt',
    origin='https://raw.githubusercontent.com/pykwon/etc/master/rnn_test_toji.txt')
text = open(path, 'rb').read().decode(encoding='utf-8')

print('전체 글자 수 : {}'.format(len(text)))
print(text[:100])


import re

def clean_str_func(string): #데이터 정제
    string = re.sub(r"[^가-힣0-9(),?!]",' ',string)
    string = re.sub(r"!",'! ',string)
    string = re.sub(r"\(",'',string)
    string = re.sub(r"\)",'',string)
    string = re.sub(r"\?",'? ',string)
    string = re.sub(r"\s{2,}",' ',string)
    string = re.sub(r"\'",'',string)
    return string
# 데이터가공은 정규표현식을 사용해서 구현하면 아주 효과적임

# print(clean_str_func('abc?!  12 나는" (안준수)" 34, 다 ANJUNSU')) ? ! 12 나는 안준수 34, 다

train_text =text.split('\n')
train_text= [clean_str_func(sentence) for sentence in train_text]

print(train_text[:5]) #깔끔하게 나온다

train_text_x=[]
for sen in train_text:
    train_text_x.extend(sen.split(' '))
    train_text_x.append('\n')

train_text_x = [word for word in train_text_x if word !=''] # 단어1 단어2 단어3 - 공백을 기준으로 단어를 나눔
print(train_text_x[:5])

#한 셀에 내용을 다같이 쓰는 게 좋다. 다음셀에서 이전 셀의 내용을 이해하지 못할 수 있기 때문
#그래서 비슷한 내용의 경우 다같이 쓰고 내용이 바뀌었을때는 다음 셀로


#단어 사전

In [None]:
vocab = sorted(set(train_text_x))
vocab.append('UNK') # 단어 사전에 존재하지 않는 토큰은 'UNK'로 처리함
# 특수 토큰 <unk> 토크나이저가 모르는 단어를 만나면 unknown으로 처리하기 위한 , 처리용 토큰임
# <eos>, <sep>, <mask>
print('{} unique words '.format(len(vocab)))

# 단어에 대한 인덱싱
word2idx = {w:i for i,w in enumerate(vocab)}
# print(word2idx) {'\n': 0, '!': 1, ',': 2, ',,?': 3, ',그년이,': 4, '00': 5
idx2word= np.array(vocab)
print(idx2word) #['\n' '!' ',' ... '힘찼으며' '힝' 'UNK']
text_as_int = np.array([word2idx[c] for c in train_text_x])

print(train_text_x[:20]) #['제', '1', '편', '어둠의', '발소리', '\n'
print(text_as_int[:20]) #[43572     7 50084 34364 21922     0


In [None]:
# 데이터 가공 - dataset 작성
seq_length = 25 # 25개의 단어가 주어질 경우 다음 단어를 예측
example_per_epoch = len(text_as_int) // seq_length
print(example_per_epoch) #\7093

import tensorflow as tf
sentence_dataset = tf.data.Dataset.from_tensor_slices(text_as_int) # 전체가 아니라 부분 데이터 읽기
sentence_dataset = sentence_dataset.batch(seq_length + 1, drop_remainder =True)
# seq_length + 1 : 처음 25개의 단어 feature와 그 뒤에 나오는 정답이 될 1단어를 합쳐 반환하기 위함
# drop_remainder = True 마지막 배치 크기를 무시

for item in sentence_dataset.take(1): #batch를 한번씩 불러
    print(item.numpy())
    print(idx2word[item.numpy()])

print()
def split_input_target(chunk):
    return [chunk[:-1],chunk[-1]] #[25단어], 1단어

train_dataset = sentence_dataset.map(split_input_target)

for x,y in train_dataset.take(1):
    print(idx2word[x.numpy()])
    print(x.numpy())
    print(idx2word[y.numpy()])
    print(y.numpy())


In [None]:
# model
BATCH_SIZE = 64
steps_per_epoch = example_per_epoch // BATCH_SIZE
BUFFER_SIZE = 5000

# shuffle을 사용하면 epoch 마다 Dataset을 섞을 수 있다. 과적합 방지에 효과적
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

total_words = len(vocab)
print(total_words)  # 53331

model = tf.keras.Sequential([
    tf.keras.layers.Embedding(total_words, 100, input_length=seq_length),
    tf.keras.layers.LSTM(units=256, return_sequences=True),
    tf.keras.layers.Dropout(rate=0.2),
    tf.keras.layers.LSTM(units=256),
    tf.keras.layers.Dense(units=256, activation='relu'),
    tf.keras.layers.Dropout(rate=0.2),
    tf.keras.layers.Dense(units=total_words, activation='softmax')
])

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
print(model.summary())

# 단어 단위 생성 모델 학습

In [None]:
from keras.preprocessing.sequence import pad_sequences

def testmodelFunc(epoch, logs):
    if epoch % 5 !=0 and epoch !=49: #5의 배수이거나 49이면 처리
        return

    test_sentence = train_text[0]

    next_words =100
    for _ in range(next_words):
        test_text_x = test_sentence.split(' ')[-seq_length:] #임의의 문장 뒤에서부터 seq_length(25)만큼 선택
        test_text_x =np.array([word2idx[c] if c in word2idx else word2idx['UNK'] for c in test_text_x])
        test_text_x = pad_sequences([test_text_x],maxlen=seq_length, padding='pre',value=word2idx['UNK'])
        output_idx= np.argmax(model.predict(test_text_x)[0]) # 출력값 중에서 가장 값이 큰 인덱스 반환
        test_sentence +=' '+ idx2word[output_idx] # 출력 단어는 test_sentence에 누적해 다음 작업의 입력으로 활
    print()
    print(test_sentence)
    print()

#epoch이 끝날때마다 testmodelFunc를 해 진행 결과를 출력
#fit할 때(학습 도중) 학습 데이터가 좋은 학습 모델을 만들어나가는 과정을 확인해가며, 작업하고 싶을 때 사용
testModelCb = tf.keras.callbacks.LambdaCallback(on_epoch_end=testmodelFunc)
#repeat() : input을 반복, 1개의 epoch의 끝과 다음 epoch의 시작에 상관없이 인자 만큼 반복
history=model.fit(train_dataset.repeat(), epochs=50, steps_per_epoch=steps_per_epoch, callbacks=[testModelCb],verbose=2)
#한 에폭에 사용할 step 수를 지정,ex 총 45개 sample이 ㅣㅇㅆ고 배치사이즈가 3이라면 15스텝


In [None]:
print(history.history['loss'][-1])
print(history.history['accuracy'][-1])
model.save('tf31model.hdf5')

In [None]:
from keras.models import load_model
model=load_model('tf31model.hdf5')

# 임의 문장을 사용해 생성된 새로운 문장 확인
test_sentence='이날은 수수개비를 꺾어도 아이들은 매를 맞지 않는다'

new_words=100

for _ in range(new_words):
    test_text_x = test_sentence.split(' ')[-seq_length:] #임의의 문장 뒤에서부터 seq_length(25)만큼 선택
    test_text_x =np.array([word2idx[c] if c in word2idx else word2idx['UNK'] for c in test_text_x])
    test_text_x = pad_sequences([test_text_x],maxlen=seq_length, padding='pre',value=word2idx['UNK'])
    output_idx= np.argmax(model.predict(test_text_x)[0]) # 출력값 중에서 가장 값이 큰 인덱스 반환
    test_sentence +=' '+ idx2word[output_idx] # 출력 단어는 test_sentence에 누적해 다음 작업의 입력으로 활

print(test_sentence)

