# 데이터 읽어오기

In [1]:
import glob
import os, re 
import numpy as np
import tensorflow as tf

txt_file_path = os.getenv('HOME')+'/aiffel/lyricist/data/lyrics/*'

txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:10])


데이터 크기: 187088
Examples:
 ['[Verse 1]', 'They come from everywhere', 'A longing to be free', 'They come to join us here', 'From sea to shining sea And they all have a dream', 'As people always will', 'To be safe and warm', 'In that shining city on the hill Some wanna slam the door', 'Instead of opening the gate', "Aw, let's turn this thing around"]


# 데이터 정제

In [2]:
def preprocess_sentence(sentence):
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence)
    sentence = sentence.strip() # 5
    sentence = '<start> ' + sentence + ' <end>'
    return sentence

In [3]:
print(preprocess_sentence("This @_is ;;;sample        sentence."))

<start> this is sample sentence . <end>


In [4]:
corpus = []

for sentence in raw_corpus:
    if len(sentence) == 0: continue
    if sentence[-1] == ":": continue

    preprocessed_sentence = preprocess_sentence(sentence)
    corpus.append(preprocessed_sentence)

print(np.array(corpus).shape)
corpus[:10]

(175749,)


['<start> verse <end>',
 '<start> they come from everywhere <end>',
 '<start> a longing to be free <end>',
 '<start> they come to join us here <end>',
 '<start> from sea to shining sea and they all have a dream <end>',
 '<start> as people always will <end>',
 '<start> to be safe and warm <end>',
 '<start> in that shining city on the hill some wanna slam the door <end>',
 '<start> instead of opening the gate <end>',
 '<start> aw , let s turn this thing around <end>']

In [5]:
def tokenize(corpus):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=12000, 
        filters=' ',
        oov_token="<unk>"
    )
    tokenizer.fit_on_texts(corpus)
    tensor_tmp = tokenizer.texts_to_sequences(corpus)
    tensor=[]
    for i in tensor_tmp:
        if len(i) > 15: 
            continue
        else:
            tensor.append(i)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')  
    
    print(tensor,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

[[   2  520    3 ...    0    0    0]
 [   2   45   66 ...    0    0    0]
 [   2    9 3390 ...    0    0    0]
 ...
 [   2  561   21 ...    0    0    0]
 [   2  120   34 ...    0    0    0]
 [   2    5   22 ...    0    0    0]] <keras_preprocessing.text.Tokenizer object at 0x7f0381b9e7d0>


In [6]:
for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])

    if idx >= 10: break

1 : <unk>
2 : <start>
3 : <end>
4 : ,
5 : i
6 : the
7 : you
8 : and
9 : a
10 : to


In [7]:
print(tensor[:3, :10])

[[   2  520    3    0    0    0    0    0    0    0]
 [   2   45   66   74  808    3    0    0    0    0]
 [   2    9 3390   10   27  270    3    0    0    0]]


In [8]:
src_input = tensor[:, :-1]  

tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])

[  2 520   3   0   0   0   0   0   0   0   0   0   0   0]
[520   3   0   0   0   0   0   0   0   0   0   0   0   0]


# 평가 데이터셋 만들기

In [9]:
# 평가 데이터셋 만들기

from sklearn.model_selection import train_test_split

enc_train, enc_val, dec_train, dec_val = train_test_split(src_input, tgt_input, test_size=0.2)

In [11]:
BUFFER_SIZE = len(enc_train)
BATCH_SIZE = 256
steps_per_epoch = len(enc_train) // BATCH_SIZE

VOCAB_SIZE = tokenizer.num_words + 1   
dataset_train = tf.data.Dataset.from_tensor_slices((enc_train, dec_train)).shuffle(BUFFER_SIZE)
dataset_train = dataset_train.batch(BATCH_SIZE, drop_remainder=True)
dataset_train

BUFFER_SIZE = len(enc_val)
steps_per_epoch = len(enc_val) // BATCH_SIZE
dataset_val = tf.data.Dataset.from_tensor_slices((enc_val, dec_val)).shuffle(BUFFER_SIZE)
dataset_val = dataset_val.batch(BATCH_SIZE, drop_remainder=True)
dataset_val

<BatchDataset shapes: ((256, 14), (256, 14)), types: (tf.int32, tf.int32)>

# 학습시키기

In [12]:
class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 256
hidden_size = 1024
model = TextGenerator(tokenizer.num_words + 1, embedding_size , hidden_size)

In [14]:
for src_sample, tgt_sample in dataset_train.take(1): break

model(src_sample)
print(src_sample.shape)

(256, 14)


In [15]:
model.summary()

Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  3072256   
_________________________________________________________________
lstm (LSTM)                  multiple                  5246976   
_________________________________________________________________
lstm_1 (LSTM)                multiple                  8392704   
_________________________________________________________________
dense (Dense)                multiple                  12301025  
Total params: 29,012,961
Trainable params: 29,012,961
Non-trainable params: 0
_________________________________________________________________


In [16]:
# 이 과정에서 epoch 시간이 너무 오래걸려 30회에서 10회로 줄였다.
# 밑의 결과를 보면 알 수 있듯이 10회의 epoch으로 loss가 2.2410으로 줄어들었다.
# 30회의 epoch이면 loss의 값이 2.2 이하로 내려갔을 것임을 알 수 있다.

optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)

model.compile(loss=loss, optimizer=optimizer)
model.fit(dataset_train, epochs=10, validation_data = dataset_val)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f037f7f8950>

# 인공지능 만들기

In [19]:
def generate_text(model, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    while True:
        predict = model(test_tensor) 
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

In [20]:
#Loss
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

In [22]:
generate_text(model, tokenizer, init_sentence="<start> i love", max_len=20)

'<start> i love you <end> '