    6. exnode AI writer creation
        1) library import
        2) data read
        3) data refine(정제)
        4) data tokenize
            - tensor 확인 : 156013
        5) train, test data 생성
            - source, tarket data 생성
            - train, validation data 생성 : validation data 20%
        6) 모델 생성 
        7) 모델 검증 

In [2]:
#1) library import
import numpy as np
import tensorflow as tf
import glob
import os


In [3]:
#2) data read

# 경로 지정
txt_file_path = os.getenv('HOME')+'/aiffel/lyricist/data/lyrics/*'

# 해당 경로 폴더의 모든 하위 폴더까지 탐색하여 txt_list에 넣음
txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담습니다.
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:3])

데이터 크기: 187088
Examples:
 ["Now I've heard there was a secret chord", 'That David played, and it pleased the Lord', "But you don't really care for music, do you?"]


In [4]:
#2) data read 확인
for idx, sentence in enumerate(raw_corpus):
    if len(sentence) == 0: continue   # 길이가 0인 문장은 건너뜁니다.
    if sentence[-1] == ":": continue  # 문장의 끝이 : 인 문장은 건너뜁니다.

    if idx > 10: break   # 일단 문장 10개만 확인해 볼 겁니다.
        
    print(sentence)

Now I've heard there was a secret chord
That David played, and it pleased the Lord
But you don't really care for music, do you?
It goes like this
The fourth, the fifth
The minor fall, the major lift
The baffled king composing Hallelujah Hallelujah
Hallelujah
Hallelujah
Hallelujah Your faith was strong but you needed proof
You saw her bathing on the roof


In [5]:
#3) data refine ... define refine 함수
import re 

def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() # 1
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 2
    sentence = re.sub(r'[" "]+', " ", sentence) # 3
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) # 4
    sentence = sentence.strip() # 5
    sentence = '<start> ' + sentence + ' <end>' # 6
    return sentence

In [6]:
#3) data refine
corpus = [] #정제된 문장 모으는 곳

# 문장 없는 레코드, 긴 문장, ':'문장 없애기
for sentence in raw_corpus:
    if len(sentence) == 0: continue
    if len(sentence) > 100: continue 
    if sentence[-1] == ":": continue
    
#refine 함수로 data 정제
    preprocessed_sentence = preprocess_sentence(sentence)

    
# 토큰의 개수가 15개를 넘어가는 문장은 제외
    if len(preprocessed_sentence.split()) > 15: continue        
    
#  정제된 문장 
    corpus.append(preprocessed_sentence)
    
# 정제된 결과 확인
corpus[:1]

['<start> now i ve heard there was a secret chord <end>']

In [7]:
#4) data tokenize
def tokenize(corpus):

    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=12000, 
        filters=' ',
        oov_token="<unk>"
    )
    # corpus를 이용해 tokenizer 내부의 단어장을 완성합니다
    tokenizer.fit_on_texts(corpus)
    
    # 준비한 tokenizer를 이용해 corpus를 Tensor로 변환합니다
    tensor = tokenizer.texts_to_sequences(corpus)   
    
    # 입력 데이터의 시퀀스 길이를 일정하게 맞춰줍니다
    # 만약 시퀀스가 짧다면 문장 뒤에 패딩을 붙여 길이를 맞춰줍니다.
    # 문장 앞에 패딩을 붙여 길이를 맞추고 싶다면 padding='pre'를 사용합니다
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post', maxlen=15)  
    
    print(tensor, tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

[[   2   50    4 ...    0    0    0]
 [   2   15 2967 ...    0    0    0]
 [   2   33    7 ...   46    3    0]
 ...
 [   2    4  118 ...    0    0    0]
 [   2  258  194 ...   12    3    0]
 [   2    7   34 ...    0    0    0]] <keras_preprocessing.text.Tokenizer object at 0x7fe92b92e910>


In [8]:
#4) data tokenize 확인
print(tensor[:5, :]) 

[[   2   50    4   95  303   62   53    9  946 6263    3    0    0    0
     0]
 [   2   15 2967  871    5    8   11 5739    6  374    3    0    0    0
     0]
 [   2   33    7   40   16  164  288   28  333    5   48    7   46    3
     0]
 [   2   11  335   23   41    3    0    0    0    0    0    0    0    0
     0]
 [   2    6 4486    5    6 2039    3    0    0    0    0    0    0    0
     0]]


In [9]:
#4) data tokenize 확인   .... tensor 확인 
print(len(tensor), len(corpus))

156013 156013


In [10]:
#4) data tokenize 확인   .... 단어장 확인 
for idx in tokenizer.index_word:
    print(idx, ":", tokenizer.index_word[idx])

    if idx >= 10: break

1 : <unk>
2 : <start>
3 : <end>
4 : i
5 : ,
6 : the
7 : you
8 : and
9 : a
10 : to


In [11]:
#5) train, test data 생성   .... source, tarket data 생성
# 마지막 토큰은 <end>가 아니라 <pad>일 가능성이 높습니다.
src_input = tensor[:, :-1]  
# tensor에서 <start>를 잘라내서 타겟 문장을 생성합니다.
tgt_input = tensor[:, 1:]    

print(src_input[0])
print(tgt_input[0])

[   2   50    4   95  303   62   53    9  946 6263    3    0    0    0]
[  50    4   95  303   62   53    9  946 6263    3    0    0    0    0]


In [12]:
#5) train, validation data 생성   .... train, validation data 생성  # 20%를 평가용

from sklearn.model_selection import train_test_split
enc_train, enc_val, dec_train, dec_val = train_test_split(src_input, 
                                                    tgt_input, 
                                                    test_size=0.2, 
                                                    random_state=42)
print("Source Train:", enc_train.shape)
print("Target Train:", dec_train.shape)

Source Train: (124810, 14)
Target Train: (124810, 14)


In [13]:
#5) train, validation data 생성   .... train, test data 생성  

BUFFER_SIZE = len(src_input)
BATCH_SIZE = 256
steps_per_epoch = len(src_input) // BATCH_SIZE

VOCAB_SIZE = tokenizer.num_words + 1   

# train 데이터셋
train_dataset = tf.data.Dataset.from_tensor_slices((enc_train, dec_train))
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE, drop_remainder=True)
print(train_dataset)

# test 데이터셋
test_dataset = tf.data.Dataset.from_tensor_slices((enc_val, dec_val))
test_dataset = test_dataset.shuffle(BUFFER_SIZE)
test_dataset = test_dataset.batch(BATCH_SIZE, drop_remainder=True)
print(test_dataset)

<BatchDataset shapes: ((256, 14), (256, 14)), types: (tf.int32, tf.int32)>
<BatchDataset shapes: ((256, 14), (256, 14)), types: (tf.int32, tf.int32)>


In [14]:
#6) 모델 생성  

class TextGenerator(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    
embedding_size = 256  # 값이 커질수록 단어의 추상적인 특징들을 더 잡아낼 수 있지만, 데이터 양이 충분해야함
hidden_size = 1024   # 모델에 얼마나 많은 일꾼을 둘 것인가, 충분한 데이터가 주어져야 배가 산으로 가지 않음
model = TextGenerator(tokenizer.num_words + 1, embedding_size , hidden_size)

In [15]:
#6) 모델 생성  ... 모델 생성  test
for src_sample, tgt_sample in train_dataset.take(1): break

# 한 배치만 불러온 데이터를 모델에 넣어봅니다
model(src_sample)

<tf.Tensor: shape=(256, 14, 12001), dtype=float32, numpy=
array([[[-2.74995509e-05, -1.80320494e-04, -1.21953279e-04, ...,
         -8.82767126e-05, -1.37558178e-04, -3.42023013e-05],
        [-9.57825341e-06, -2.39125555e-04,  1.19950542e-04, ...,
          1.78039227e-05, -3.67643457e-04,  1.09745515e-05],
        [ 1.01028993e-04, -2.29073790e-04,  3.81587975e-04, ...,
         -1.08420361e-04, -6.72303198e-04,  3.16068763e-04],
        ...,
        [-1.66981801e-04,  1.18584570e-03,  7.74725981e-04, ...,
         -9.09806869e-04,  2.77662533e-03,  6.24085602e-04],
        [-1.77620706e-04,  1.17154233e-03,  8.64443486e-04, ...,
         -1.00749207e-03,  3.09734861e-03,  7.01257610e-04],
        [-1.86291232e-04,  1.16351177e-03,  9.60359466e-04, ...,
         -1.11131941e-03,  3.33919842e-03,  7.87424331e-04]],

       [[-2.74995509e-05, -1.80320494e-04, -1.21953279e-04, ...,
         -8.82767126e-05, -1.37558178e-04, -3.42023013e-05],
        [-2.47373042e-04, -3.62511870e-04,  9

In [16]:
#6) 모델 생성  ...    모델 확인 
model.summary()

Model: "text_generator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  3072256   
_________________________________________________________________
lstm (LSTM)                  multiple                  5246976   
_________________________________________________________________
lstm_1 (LSTM)                multiple                  8392704   
_________________________________________________________________
dense (Dense)                multiple                  12301025  
Total params: 29,012,961
Trainable params: 29,012,961
Non-trainable params: 0
_________________________________________________________________


In [17]:
#6) 모델 생성   .. 모델 학습
optimizer = tf.keras.optimizers.Adam()

#Loss
loss = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)

# 모델 compile  
model.compile(loss=loss, 
                     optimizer=optimizer, 
                     metrics=['accuracy']) 

# 모델 fit  
model_history = model.fit(train_dataset, 
                                  validation_data=test_dataset, # 평가 데이터
                                  epochs=20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [18]:
#7) 모델 검증  
def generate_text(lyricist, tokenizer, init_sentence="<start>", max_len=20):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 단어 하나씩 예측해 문장을 만듭니다
    #    1. 입력받은 문장의 텐서를 입력합니다
    #    2. 예측된 값 중 가장 높은 확률인 word index를 뽑아냅니다
    #    3. 2에서 예측된 word index를 문장 뒤에 붙입니다
    #    4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성을 마칩니다
    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환합니다 
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

In [19]:
#7) 모델 검증 
generate_text(model, tokenizer, init_sentence="<start> you ", max_len=20)

'<start> you know i m bad , i m bad you know it <end> '