# [E-04] 작사가 인공지능 만들기

이번 프로젝트는 순환신경망(RNN)을 이용하여 단어를 생성하고 문장을 만들어내는 모델을 만드는 것을 목표로 한다.

<hr>

## 1. 데이터 불러오기

In [1]:
# 라이브러리 불러오기
import tensorflow as tf
import os ,re
import glob

txt_file_path = os.getenv('HOME')+'/aiffel/lyricist/data/lyrics/*'

txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus에 담기
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])

데이터 크기: 187088
Examples:
 ["Now I've heard there was a secret chord", 'That David played, and it pleased the Lord', "But you don't really care for music, do you?"]


<hr>

## 2. 데이터 정제하기

preprocess_sentence() 함수를 사용하여 데이터를 정제하는 작업을 진행할 예정이다. 정제하는 과정에서 사용된 코드의 순서는 다음과 같다.

1. 소문자로 바꾸고, 양쪽 공백 지우기
2. 특수문자 양쪽에 공백 넣기
3. 여러개의 공백은 하나의 공백으로 바꾸기
4. a-zA-Z?.!,¿가 아닌 모든 문자를 하나의 공백으로 바꾸기
5. 다시 양쪽 공백을 지우기
6. 문장 시작에는 <start>, 끝에는 <end>를 추가하기
    
토큰의 개수가 15개를 넘어가는 문장은 학습 데이터에서 제외시킨다.

In [2]:
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() 
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) 
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) 
    sentence = sentence.strip() 
    sentence = '<start> ' + sentence + ' <end>'
    return sentence

# 필터링이 제대로 작동하는지 확인
print(preprocess_sentence("This @_is ;;;sample        sentence."))

<start> this is sample sentence . <end>


In [3]:
# 원하지 않는 문장은 건너뛰고 정제하여 corpus에 담기
corpus = []

for sentence in raw_corpus:
    if len(sentence) == 0: continue 
    if sentence[-1] == ":": continue
    if sentence[0] == "[": continue
    if len(sentence) > 15 : continue
    
    preprocessed_sentence = preprocess_sentence(sentence)
    corpus.append(preprocessed_sentence)
        
len(corpus)

13665

<hr>

## 3. 토큰화하기

tensorflow의 Tokenizer와 pad_sequences를 사용하여 토큰화를 한다.

In [4]:
def tokenize(corpus):

    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=15000,    # tokenizer가 기억할 수 있는 단어 수
        filters=' ',
        oov_token="<unk>"   # 10000개의 단어에 포함되지 못하면 <unk>로 표시
    )

    tokenizer.fit_on_texts(corpus)                      # corpus를 이용해 tokenizer 내부의 단어장을 완성

    tensor = tokenizer.texts_to_sequences(corpus)       # 준비한 tokenizer를 이용해 corpus를 Tensor로 변환
    
    # 문장 뒤에 패딩을 넣어 maxlen에 시퀀스 길이 맞춤
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')
    
    print(tensor,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

[[  2 159   3 ...   0   0   0]
 [  2 159   3 ...   0   0   0]
 [  2  54 864 ...   0   0   0]
 ...
 [  2 213 543 ...   0   0   0]
 [  2  59  17 ...   0   0   0]
 [  2  35  21 ...   0   0   0]] <keras_preprocessing.text.Tokenizer object at 0x7f9d143d4850>


In [5]:
# 저장된 단어의 index 10개까지 확인

for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])

    if idx >= 10: break

1 : <unk>
2 : <start>
3 : <end>
4 : i
5 : ,
6 : .
7 : you
8 : oh
9 : it
10 : me


<hr>

## 4. 데이터셋 분리하기

tensor에서 마지막 토큰을 잘라낸 것을 소스 문장으로, tensor에서 <start>를 잘라낸 것을 타겟 문장으로 한다.<br>


In [6]:
src_input = tensor[:, :-1]  
tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])

[  2 159   3   0   0   0   0   0   0   0   0   0]
[159   3   0   0   0   0   0   0   0   0   0   0]


In [7]:
from sklearn.model_selection import train_test_split

enc_train, enc_val, dec_train, dec_val = train_test_split(src_input,
                                                         tgt_input,
                                                         test_size=0.2,
                                                         shuffle=True)
print("Source Train:", enc_train.shape, "Source Validation:", enc_val.shape)
print("Target Train:", dec_train.shape, "Target Validation:", dec_val.shape)

Source Train: (10932, 12) Source Validation: (2733, 12)
Target Train: (10932, 12) Target Validation: (2733, 12)


tf.data.Dataset.from_tensor_slices()를 이용해 corpus 텐서를 tf.data.Dataset객체로 변환하는 작업을 해준다.

In [8]:
BUFFER_SIZE = len(src_input)
BATCH_SIZE = 256
steps_per_epoch = len(src_input) // BATCH_SIZE

 # tokenizer가 구축한 단어 10000개에 <pad>를 포함하여 10001개
VOCAB_SIZE = tokenizer.num_words + 1   

dataset = tf.data.Dataset.from_tensor_slices((src_input, tgt_input))
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
dataset

<BatchDataset shapes: ((256, 12), (256, 12)), types: (tf.int32, tf.int32)>

<hr>

## 5. 모델 설계하기

In [9]:
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 512
hidden_size = 2048
model = TextGenerator(tokenizer.num_words + 1, embedding_size , hidden_size)

In [10]:
# 데이터셋에서 한 배치만 불러오기

for src_sample, tgt_sample in dataset.take(1): break

model(src_sample)

<tf.Tensor: shape=(256, 12, 15001), dtype=float32, numpy=
array([[[ 8.0148056e-06,  1.6628644e-04, -4.3495620e-05, ...,
          1.9002553e-04,  3.7800567e-04,  2.5659814e-04],
        [ 1.9439929e-06,  6.6383678e-04,  2.1686821e-05, ...,
          4.0540379e-04,  7.9255371e-04,  5.3634867e-04],
        [-4.6138381e-04,  1.0157702e-03,  1.2258209e-04, ...,
          3.4196037e-04,  6.4837979e-04,  5.5305817e-04],
        ...,
        [-2.1802976e-03,  2.4761588e-03, -7.2576036e-04, ...,
         -5.6108634e-04,  1.3960727e-03,  1.2777292e-03],
        [-2.2277515e-03,  2.4480612e-03, -9.3464321e-04, ...,
         -5.9112062e-04,  1.5744300e-03,  1.1794448e-03],
        [-2.2607862e-03,  2.3669524e-03, -1.0683488e-03, ...,
         -5.8430270e-04,  1.6991834e-03,  1.0422738e-03]],

       [[ 8.0148056e-06,  1.6628644e-04, -4.3495620e-05, ...,
          1.9002553e-04,  3.7800567e-04,  2.5659814e-04],
        [-1.0279367e-04,  2.2296897e-04, -1.6884742e-05, ...,
          3.8192031e-04, 

출력된 텐서의 shape를 살펴보면 shape=(256, 12, 15001)에서 배치 사이즈가 256으로 지정되어 있고 Dense 레이어의 차원수는 15001이다. 이는 15001개의 단어 중 확률이 가장 높은 단어를 찾아야 하기 때문이다. 여기서 12은 앞서 동일하게 맞춰 준 시퀀스의 길이인 max_len이 12이었다는 것을 의미한다.

In [11]:
model.summary()

Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  7680512   
_________________________________________________________________
lstm (LSTM)                  multiple                  20979712  
_________________________________________________________________
lstm_1 (LSTM)                multiple                  33562624  
_________________________________________________________________
dense (Dense)                multiple                  30737049  
Total params: 92,959,897
Trainable params: 92,959,897
Non-trainable params: 0
_________________________________________________________________


<hr>

## 6. 모델 학습시키기

In [12]:
optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)

model.compile(loss=loss, optimizer=optimizer)
model.fit(dataset, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f9c701632b0>

<hr>

## 7. 작문시켜 평가하기

단어 하나씩 예측해 문장을 만든다.

1. 입력받은 문장의 텐서를 입력
2. 예측된 값 중 가장 높은 확률인 word index를 뽑아냄
3. 2에서 예측된 word index를 문장 뒤에 붙임
4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성을 마침

In [13]:
def generate_text(model, tokenizer, init_sentence="<start>", max_len=15):
    # 테스트를 위해 입력받은 init_sentence도 텐서로 변환
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]
    
    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환 
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

In [14]:
generate_text(model, tokenizer, init_sentence="<start> oh")

'<start> oh , oh , oh <end> '

In [15]:
generate_text(model, tokenizer, init_sentence="<start> it")

'<start> it s a monster <end> '

In [16]:
generate_text(model, tokenizer, init_sentence="<start> but")

'<start> but i m not <end> '

In [19]:
generate_text(model, tokenizer, init_sentence="<start> i")

'<start> i m a survivor <end> '

<hr>

# <프로젝트 회고>

이번 프로젝트는 좋은 모델을 만들기 위해 수정해가는 과정에서 시간이 정말 많이 걸렸다. epoch는 10으로 두고 loss를 최대한 낮추려고 노력을 했다. 많은 시도를 해 본 결과, embedding_size = 512, hidden_size = 2048 일 때 성능이 좋게 나왔다. loss는 1.12까지 줄어들었고 파라미터의 수는 약 9,300만개였다. 출력된 결과물을 보니 짧긴 하지만 문장이 만들어졌다. 문장이 서로 이어지는 것 같기도 해서 신기했다. 문장이 길어지는 것을 막고 <pad>의 갯수를 줄이기 위해 학습되는 문장들은 토큰이 15개 이하로 제한을 두었다. 하지만 막상 결과물을 확인하니 너무 짧은 문장들만 나온 것 같아 아쉬운 생각도 들었다. NLP에 대해 처음 알게 되었는데 NLP에서 기본이라 할 수 있는 RNN 모델에 대해 자세히 공부할 수 있어서 좋았다. 조금 더 공부해서 GPT-2를 활용해 더 긴 문장을 만들어보는 것도 도전해봐야겠다. 