In [1]:
import glob
import os
import re
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt 
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

___
# 데이터 읽어오기

In [2]:
txt_file_path = "./lyrics/*"

txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])

데이터 크기: 187088
Examples:
 [' There must be some kind of way outta here', 'Said the joker to the thief', "There's too much confusion"]


___
# 데이터 정제

In [3]:
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() # 1
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 2
    sentence = re.sub(r'[" "]+', " ", sentence) # 3
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) # 4
    sentence = sentence.strip() # 5
    sentence = '<start> ' + sentence + ' <end>' # 6
    return sentence

# 문장 필터링 예시 출력
print(preprocess_sentence("This @_is ;;;sample        sentence."))

<start> this is sample sentence . <end>


## preprocess_sentence(sentence)
     1. 소문자로 바꾸고, 양쪽 공백을 지웁니다
     2. 특수문자 양쪽에 공백을 넣고
     3. 여러개의 공백은 하나의 공백으로 바꿉니다
     4. a-zA-Z?.!,¿가 아닌 모든 문자를 하나의 공백으로 바꿉니다
     5. 다시 양쪽 공백을 지웁니다
     6. 문장 시작에는 <start>, 끝에는 <end>를 추가합니다
 이 순서로 처리해주어 문제가 되는 상황을 방지

In [4]:
#문장 정제하기
corpus = []

for sentence in raw_corpus:
    if len(sentence) == 0: continue
    if sentence[-1] == ":": continue

    preprocessed_sentence = preprocess_sentence(sentence)
    corpus.append(preprocessed_sentence)
        
corpus[:10]

['<start> there must be some kind of way outta here <end>',
 '<start> said the joker to the thief <end>',
 '<start> there s too much confusion <end>',
 '<start> i can t get no relief business men , they drink my wine <end>',
 '<start> plowman dig my earth <end>',
 '<start> none were level on the mind <end>',
 '<start> nobody up at his word <end>',
 '<start> hey , hey no reason to get excited <end>',
 '<start> the thief he kindly spoke <end>',
 '<start> there are many here among us <end>']

In [5]:
def tokenize(corpus):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=12000, 
        filters=' ',
        oov_token="<unk>"
    )
    # corpus를 이용해 tokenizer 내부의 단어장을 완성합니다
    tokenizer.fit_on_texts(corpus)
    # 준비한 tokenizer를 이용해 corpus를 Tensor로 변환합니다
    tensor = tokenizer.texts_to_sequences(corpus)   
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, maxlen=15, truncating='post', padding='post')  
    
    indexes = [] 
    for i,t in enumerate(tensor):
        if 3 not in t:
            indexes.append(i)
    
    tensor = np.delete(tensor, indexes, axis=0) #15토큰보다 긴 열 제거
    
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

## tf.keras.preprocessing.text.Tokenizer()
>```py
tf.keras.preprocessing.text.Tokenizer(
    num_words=None,
    filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n',
    lower=True, split=' ', char_level=False, oov_token=None,
    document_count=0, **kwargs
)

- 텐서플로우 [공식문서](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/text/Tokenizer)에서 Tokenizer에 대하여 자세히 알 수 있습니다. 
- 12000단어를 기억할 수 있는 `tokenizer`를 만들고, 12000단어에 포함되지 못한 단어는 `'<unk>'`로 변환
- 이미 `preprocess_sentence`를 사용하여 문장을 정제하여 `filters=' '`으로 처리

## tf.keras.preprocessing.sequence.pad_sequences()
>```py
tf.keras.preprocessing.sequence.pad_sequences(
    sequences, maxlen=None, dtype='int32', padding='pre',
    truncating='pre', value=0.0
)

- 텐서플로우 [공식문서](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/sequence/pad_sequences)에서 pad_sequences에 대하여 자세히 알 수 있습니다. 
- `padding='post'`를 사용하여 문장 뒤에 패딩을 붙여 길이를 맞추었습니다.
- `maxlen=15, truncating='post'`를 인자로 주어 15글자가 넘는 문장은 뒷부분을 잘랐습니다.
- pad_sequences의 결과에서 `<end>`인`3`이 없는 행들을 모두 제거하여 토큰의 개수가 15개를 넘어가는 문장을 제외하였습니다.

In [6]:
for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])

    if idx >= 10: break

1 : <unk>
2 : <start>
3 : <end>
4 : ,
5 : i
6 : the
7 : you
8 : and
9 : a
10 : to


- 위 같은 형태로 `tokenizer`에 단어들이 저장되어 있는 것을 확인할 수 있습니다.

In [7]:
# tensor에서 마지막 토큰을 잘라내서 소스 문장을 생성합니다
# 마지막 토큰은 <end>가 아니라 <pad>일 가능성이 높습니다.
src_input = tensor[:, :-1]  
# tensor에서 <start>를 잘라내서 타겟 문장을 생성합니다.
tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])

[  2  65 280  27  99 528  19  85 778  93   3   0   0   0]
[ 65 280  27  99 528  19  85 778  93   3   0   0   0   0]


In [8]:
#트레인데이터와 테스트데이터를 분리해 주었습니다.
enc_train, enc_val, dec_train, dec_val = train_test_split(src_input, 
                                                    tgt_input, 
                                                    test_size=0.2, 
                                                    random_state=7)

print("enc_train:", enc_train.shape)
print("dec_train:", dec_train.shape)
print("enc_val:", enc_val.shape)
print("dec_val:", dec_val.shape)

enc_train: (124810, 14)
dec_train: (124810, 14)
enc_val: (31203, 14)
dec_val: (31203, 14)


In [9]:
#학습을 위한 트레인데이터셋 생성
BUFFER_SIZE = len(enc_train)
BATCH_SIZE = 256

dataset = tf.data.Dataset.from_tensor_slices((enc_train,dec_train))
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

print(dataset)

<BatchDataset shapes: ((256, 14), (256, 14)), types: (tf.int32, tf.int32)>


- 준비한 데이터로부터 데이터셋을 만듭니다
- 텐서플로우 [공식문서](https://www.tensorflow.org/api_docs/python/tf/data/Dataset)에서 Dataset에 대하여 자세히 알 수 있습니다.

___
# 모델 생성

In [10]:
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
 # tokenizer가 구축한 단어사전 내 12000개와, 여기 포함되지 않은 0:<pad>를 포함하여 12001개
VOCAB_SIZE = tokenizer.num_words + 1   
embedding_size = 1000
hidden_size = 1500
model = TextGenerator(VOCAB_SIZE, embedding_size , hidden_size)

---
# 모델 구조 확인

In [11]:
for src_sample, tgt_sample in dataset.take(1): break

model(src_sample) # summary확인을 위해 한 배치만 불러온 데이터를 모델에 넣습니다.
model.summary()

Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  12001000  
_________________________________________________________________
lstm (LSTM)                  multiple                  15006000  
_________________________________________________________________
lstm_1 (LSTM)                multiple                  18006000  
_________________________________________________________________
dense (Dense)                multiple                  18013501  
Total params: 63,026,501
Trainable params: 63,026,501
Non-trainable params: 0
_________________________________________________________________


___
# 모델 학습

In [12]:
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

model.compile(loss=loss, optimizer=optimizer)
model.fit(dataset, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7fea012e2880>

___
# 학습 결과

In [13]:
train_loss = model.evaluate(enc_train, dec_train, verbose=1)
val_loss = model.evaluate(enc_val, dec_val, verbose=1)
 
print('train_loss : ' + str(train_loss))
print('val_loss : ' + str(val_loss))

train_loss : 1.0566670894622803
val_loss : 2.1559181213378906


- `val_loss`값이 2.2이하인것을 확인할 수 있습니다.

___
# 모델을 사용하여 문장 만들기

In [14]:
def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환합니다 
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

   ## generate_text()
   단어를 하나씩 예측해 문장을 만들어 줍니다
   1. 입력받은 문장의 텐서를 입력합니다
   2. 예측된 값 중 가장 높은 확률인 word index를 뽑아냅니다
   3. 2에서 예측된 word index를 문장 뒤에 붙입니다
   4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성을 마칩니다

In [15]:
generate_text(model, tokenizer, init_sentence="<start> i live", max_len=20)

'<start> i live for the funk ill die for the funk <end> '

- 학습된 모델을 통하여 새로 생성된 문장을 확인할 수 있습니다.

___
# 회고
ai작사가를 만들어 rnn신경망에 대하여 알아보았고 `re`모듈과 `tokenizer` 등을 사용하여 자연어를 데이터셋으로 사용할 수 있게 전처리하는 과정을 실습해보았습니다. 하이퍼 파라미터를 수정해보며 모델을 학습시켜보았는데, 생각보다 시간이 많이 걸렸습니다. 적절한 하이퍼 파라미터값을 빠르게 구하기 위해서 공부를 더 해야겠다는 생각이 들었습니다.