In [1]:
# 모듈 불러오기

import glob
import re 
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

print('done')

done


In [2]:
# 가사 파일 불러오기 

txt_file_path = '/content/drive/MyDrive/Colab/lyricist/data/lyrics/*'

txt_list = glob.glob(txt_file_path)

raw_corpus = []

# 여러개의 txt 파일을 모두 읽어서 raw_corpus 에 담기
for txt_file in txt_list:
    with open(txt_file, "r") as f:
        raw = f.read().splitlines()
        raw_corpus.extend(raw)

# 중복된 문장을 지우고 싶어서 set에 한번 넣었다가 꺼냄
raw_corpus = list(set(raw_corpus)) 
print("데이터 크기:", len(raw_corpus))
print("Examples:\n", raw_corpus[:10])

데이터 크기: 118168
Examples:
 ['', 'Let your love lift me up When the morning bright', 'Gucci designer frames, say it (make love to me)', '(Long, long time)', 'Him or the broken pieces of your heart sweet baby', 'Window pain to the soul say they niggas happy', 'So, what you wanna do?', 'My niggas smoke like coffee shops', 'Whatever you wanna drink girl pick right now', "'Cause Love For Us Was Meant To Be (Well)"]


In [3]:
# 문장을 정제하는 함수 만들기

def preprocess_sentence(sentence):
    sentence = sentence.lower().strip() # 1
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence) # 2
    sentence = re.sub(r'[" "]+', " ", sentence) # 3
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence) # 4
    sentence = sentence.strip() # 5
    sentence = '<start> ' + sentence + ' <end>' # 6
    return sentence

print('done')

done


In [4]:
# 정제된 문장 담기
corpus = []

for sentence in raw_corpus:
        # 정제한 문장을 체크하자
    preprocessed_sentence = preprocess_sentence(sentence)
        # 빈 문장이면 건너뛰기
    if len(sentence) == 0: continue
        # 토큰의 길이가 15 초과이면 건너뛰기
    if len(preprocessed_sentence.split()) > 15: continue
        # 아니라면 corpus에 담기
    corpus.append(preprocessed_sentence)

# 정제된 결과 확인
print(len(corpus))
corpus[:10]

102257


['<start> let your love lift me up when the morning bright <end>',
 '<start> gucci designer frames , say it make love to me <end>',
 '<start> long , long time <end>',
 '<start> him or the broken pieces of your heart sweet baby <end>',
 '<start> window pain to the soul say they niggas happy <end>',
 '<start> so , what you wanna do ? <end>',
 '<start> my niggas smoke like coffee shops <end>',
 '<start> whatever you wanna drink girl pick right now <end>',
 '<start> cause love for us was meant to be well <end>',
 '<start> aw shucks <end>']

In [5]:
# 토큰화 함수 만들기

def tokenize(corpus):
    # 12000단어를 기억할 수 있는 tokenizer
    # 12000단어에 포함되지 못한 단어는 '<unk>'로 변환
    tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=12000, 
                                                      filters=' ',
                                                      oov_token="<unk>")
    # corpus를 이용해 tokenizer 내부의 단어장을 만들기
    tokenizer.fit_on_texts(corpus)
    # 준비한 tokenizer를 이용해 corpus를 Tensor로 변환
    tensor = tokenizer.texts_to_sequences(corpus)   
    # 입력 데이터의 시퀀스 길이를 일정하게 맞춰주기 (문장 뒤에 패딩을 붙여줌)
    # 문장 앞에 패딩을 붙여 길이를 맞추고 싶다면 padding='pre'를 사용
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')  
    
    print(tensor,tokenizer)
    return tensor, tokenizer

tensor, tokenizer = tokenize(corpus)

[[    2    69    21 ...     0     0     0]
 [    2  1444  3090 ...     0     0     0]
 [    2   167     4 ...     0     0     0]
 ...
 [    2     5    27 ...     0     0     0]
 [    2 11696  5025 ...     0     0     0]
 [    2   497    10 ...     1    50     3]] <keras_preprocessing.text.Tokenizer object at 0x7f94b50426d0>


In [6]:
print(tensor[:3])
tensor.shape # 최대 15개의 단어로 구성된 문장이 102257개
# 각 숫자는 tokenizer에 구축된 단어 사전의 인덱스

[[   2   69   21   38  832   12   32   45    6  312  613    3    0    0
     0]
 [   2 1444 3090 4888    4   67   11   73   38   10   12    3    0    0
     0]
 [   2  167    4  167   76    3    0    0    0    0    0    0    0    0
     0]]


(102257, 15)

In [7]:
# tensor에서 마지막 토큰을 잘라내서 소스 문장을 생성
# 마지막 토큰은 <end>가 아니라 <pad>일 가능성이 높으므로 end가 잘릴 가능성은 적음
src_input = tensor[:, :-1]  
# tensor에서 <start>를 잘라내서 타겟 문장을 생성
tgt_input = tensor[:, 1:]    

print(src_input[0])

print(tgt_input[0])
# 2로 시작하지 않고 왼쪽으로 시프트

[  2  69  21  38 832  12  32  45   6 312 613   3   0   0]
[ 69  21  38 832  12  32  45   6 312 613   3   0   0   0]


In [8]:
# 테스트셋 분할

enc_train, enc_val, dec_train, dec_val = train_test_split(src_input, tgt_input, test_size=0.2, random_state=7)

print("Source Train:", enc_train.shape , enc_val.shape)
print("Target Train:", dec_train.shape , dec_val.shape)

Source Train: (81805, 14) (20452, 14)
Target Train: (81805, 14) (20452, 14)


In [9]:
# 하이퍼파라미터 
BATCH_SIZE = 256
embedding_size = 512
hidden_size = 2048

In [10]:
# 트레인 데이터셋 생성
BUFFER_SIZE = len(enc_train)
steps_per_epoch = len(enc_train) // BATCH_SIZE

 # tokenizer가 구축한 단어사전 내 12000개와,
 # 여기 포함되지 않은 0:<pad>를 포함하여 12001개

VOCAB_SIZE = tokenizer.num_words + 1   # +1은 <pad>

# 준비한 데이터 소스로부터 데이터셋을 생성

train_dataset = tf.data.Dataset.from_tensor_slices((enc_train, dec_train))
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE, drop_remainder=True)
train_dataset

<BatchDataset shapes: ((256, 14), (256, 14)), types: (tf.int32, tf.int32)>

In [11]:
# 테스트 데이터셋 생성
BUFFER_SIZE = len(enc_val)
steps_per_epoch = len(enc_val) // BATCH_SIZE

 # tokenizer가 구축한 단어사전 내 12000개와,
 # 여기 포함되지 않은 0:<pad>를 포함하여 12001개

VOCAB_SIZE = tokenizer.num_words + 1   # +1은 <pad>

# 준비한 데이터 소스로부터 데이터셋을 생성

test_dataset = tf.data.Dataset.from_tensor_slices((enc_val, dec_val))
test_dataset = test_dataset.shuffle(BUFFER_SIZE)
test_dataset = test_dataset.batch(BATCH_SIZE, drop_remainder=True)
test_dataset

<BatchDataset shapes: ((256, 14), (256, 14)), types: (tf.int32, tf.int32)>

In [12]:
# 단방향 모델 만들기

class TextGenerator_one(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.drop1 = tf.keras.layers.Dropout(0.2)
        self.rnn_2 = tf.keras.layers.LSTM(hidden_size, return_sequences=True)
        self.linear = tf.keras.layers.Dense(vocab_size)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.drop1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    

model_one = TextGenerator_one(tokenizer.num_words + 1, embedding_size , hidden_size)

print('done')

done


In [13]:
# 한 배치만 불러온 데이터를 모델에 넣어보기

for src_sample, tgt_sample in train_dataset.take(1): break

model_one(src_sample)

<tf.Tensor: shape=(256, 14, 12001), dtype=float32, numpy=
array([[[-1.3257879e-04,  1.5640320e-04,  2.2742828e-05, ...,
          6.6029141e-05,  3.1790306e-04,  6.6688488e-05],
        [-1.4789985e-04,  1.4737253e-04, -2.2552063e-04, ...,
         -1.0870217e-04,  6.1087927e-04,  2.8689907e-04],
        [-7.5282960e-04, -9.4107891e-05, -2.3624265e-04, ...,
         -3.8111111e-04,  5.8584998e-04,  3.2111470e-04],
        ...,
        [-4.4806299e-04,  2.2944023e-03, -2.0680735e-03, ...,
         -6.3730002e-04, -1.8265512e-03,  2.6288274e-04],
        [-2.5344890e-04,  2.1808012e-03, -2.4649545e-03, ...,
         -5.9601350e-04, -1.9612745e-03,  4.2059436e-04],
        [-8.5993845e-05,  2.0299295e-03, -2.8089432e-03, ...,
         -5.1331887e-04, -2.0875549e-03,  6.2772259e-04]],

       [[-1.3257879e-04,  1.5640320e-04,  2.2742828e-05, ...,
          6.6029141e-05,  3.1790306e-04,  6.6688488e-05],
        [ 6.9538881e-05,  1.3427265e-05, -1.4874169e-04, ...,
          2.7278476e-04, 

In [14]:
model_one.summary() 

Model: "text_generator_one"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        multiple                  6144512   
_________________________________________________________________
lstm (LSTM)                  multiple                  20979712  
_________________________________________________________________
dropout (Dropout)            multiple                  0         
_________________________________________________________________
lstm_1 (LSTM)                multiple                  33562624  
_________________________________________________________________
dense (Dense)                multiple                  24590049  
Total params: 85,276,897
Trainable params: 85,276,897
Non-trainable params: 0
_________________________________________________________________


In [15]:
# 단방향 모델 학습하기

optimizer = tf.keras.optimizers.Adam()
loss = tf.keras.losses.SparseCategoricalCrossentropy( 
    from_logits=True, reduction='none')

model_one.compile(loss=loss, optimizer=optimizer)
model_one.fit(train_dataset, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f94b34de4d0>

In [16]:
# 단방향 모델 loss 확인

test_loss = model_one.evaluate(test_dataset, verbose=2)
print("단방향 test_loss: {} ".format(test_loss))
#print("test_accuracy: {}".format(test_accuracy))

79/79 - 15s - loss: 3.0220
단방향 test_loss: 3.021951198577881 


In [17]:
# 양방향 모델 만들기

class TextGenerator_two(tf.keras.Model):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super().__init__()
        
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_size)
        self.rnn_1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(hidden_size, return_sequences=True))
        self.drop1 = tf.keras.layers.Dropout(0.2)
        self.rnn_2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(hidden_size, return_sequences=True))
        self.linear = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(vocab_size))
        
    def call(self, x):
        out = self.embedding(x)
        out = self.rnn_1(out)
        out = self.drop1(out)
        out = self.rnn_2(out)
        out = self.linear(out)
        
        return out
    

model_two = TextGenerator_two(tokenizer.num_words + 1, embedding_size , hidden_size)

print('done')

done


In [18]:
# 한 배치만 불러온 데이터를 모델에 넣어보기

for src_sample, tgt_sample in train_dataset.take(1): break

model_two(src_sample)

<tf.Tensor: shape=(256, 14, 12001), dtype=float32, numpy=
array([[[-1.8796955e-03,  1.3406251e-03, -3.5697289e-04, ...,
         -6.8199402e-04, -1.6107429e-03, -1.4159721e-04],
        [-1.5458841e-03,  1.6436396e-03,  4.3054204e-04, ...,
         -5.0491205e-04, -1.4614059e-03, -3.8395790e-04],
        [-1.2117303e-03,  1.6956801e-03,  1.5517599e-03, ...,
         -2.9964332e-04, -1.5510807e-03, -3.5908111e-04],
        ...,
        [-1.5018384e-03, -3.7183140e-03,  4.4487781e-04, ...,
          3.4229866e-05, -4.6100104e-03,  2.8877899e-03],
        [-1.5464957e-03, -4.2830198e-03, -4.6912202e-04, ...,
          1.4989874e-04, -4.2230855e-03,  3.4733505e-03],
        [-1.7515675e-03, -4.6373815e-03, -1.4284441e-03, ...,
          1.8989193e-04, -3.6588272e-03,  3.8442514e-03]],

       [[-8.0383930e-04,  1.0459754e-03, -1.3711426e-03, ...,
          7.9448662e-05,  1.0067250e-04,  8.7826396e-04],
        [-8.5387117e-04,  1.3573063e-03, -1.4062320e-03, ...,
          1.1825136e-04, 

In [19]:
model_two.summary() 

Model: "text_generator_two"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_1 (Embedding)      multiple                  6144512   
_________________________________________________________________
bidirectional (Bidirectional multiple                  41959424  
_________________________________________________________________
dropout_1 (Dropout)          multiple                  0         
_________________________________________________________________
bidirectional_1 (Bidirection multiple                  100679680 
_________________________________________________________________
time_distributed (TimeDistri multiple                  49168097  
Total params: 197,951,713
Trainable params: 197,951,713
Non-trainable params: 0
_________________________________________________________________


In [20]:
# 양방향 모델 학습하기

model_two.compile(loss=loss, optimizer=optimizer)
model_two.fit(train_dataset, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f94b2864310>

In [21]:
# 양뱡향 모델 loss 확인

test_loss = model_two.evaluate(test_dataset, verbose=2)
print("양방향 test_loss: {} ".format(test_loss))
#print("test_accuracy: {}".format(test_accuracy))

79/79 - 33s - loss: 0.1334
양방향 test_loss: 0.133412167429924 


In [22]:
# 문장 생성 함수 만들기

def generate_text(model, tokenizer, init_sentence="<start>", max_len=14):
    # 테스트를 위해서 입력받은 init_sentence도 텐서로 변환합니다
    test_input = tokenizer.texts_to_sequences([init_sentence])
    test_tensor = tf.convert_to_tensor(test_input, dtype=tf.int64)
    end_token = tokenizer.word_index["<end>"]

    # 단어 하나씩 예측해 문장을 만듭니다
    #    1. 입력받은 문장의 텐서를 입력합니다
    #    2. 예측된 값 중 가장 높은 확률인 word index를 뽑아냅니다
    #    3. 2에서 예측된 word index를 문장 뒤에 붙입니다
    #    4. 모델이 <end>를 예측했거나, max_len에 도달했다면 문장 생성을 마칩니다
    while True:
        # 1
        predict = model(test_tensor) 
        # 2
        predict_word = tf.argmax(tf.nn.softmax(predict, axis=-1), axis=-1)[:, -1] 
        # 3 
        test_tensor = tf.concat([test_tensor, tf.expand_dims(predict_word, axis=0)], axis=-1)
        # 4
        if predict_word.numpy()[0] == end_token: break
        if test_tensor.shape[1] >= max_len: break

    generated = ""
    # tokenizer를 이용해 word index를 단어로 하나씩 변환합니다 
    for word_index in test_tensor[0].numpy():
        generated += tokenizer.index_word[word_index] + " "

    return generated

print('done')

done


In [34]:
# 단방향 학습모델 결과

print(generate_text(model_one, tokenizer, init_sentence="<start> i love", max_len=14))
print(generate_text(model_one, tokenizer, init_sentence="<start> you are", max_len=14))
print(generate_text(model_one, tokenizer, init_sentence="<start> my heart is", max_len=14))

<start> i love you , i love you <end> 
<start> you are the lady in my life let me touch you girl <end> 
<start> my heart is a dancer beating in my heart <end> 


In [39]:
# 양방향 학습모델 결과

print(generate_text(model_two, tokenizer, init_sentence="<start> i love", max_len=14))
print(generate_text(model_two, tokenizer, init_sentence="<start> you are", max_len=14))
print(generate_text(model_two, tokenizer, init_sentence="<start> my heart is", max_len=14))

<start> i love <end> 
<start> you are <end> 
<start> my heart is <end> 


# 결론

loss 를 줄이기 위해 양방향 학습으로 전환했으나

loss는 줄었지만 문장을 제대로 만들지 못하는 현상이 발생했다.

즉 two모델은 대부분의 단어의 뒤에 end가 붙는걸 적합하다고 여기고 있고,

이는 정뱡향만으로 학습할 때는 없던 현상이므로

반대방향에서의 학습에서 'end 앞에 어떤 단어가 있어도 이상하지 않다' 라는 걸 배운 것 같다.

의심가는 이유를 찾아보자면 LSTM이라는 건 

입력 받은 단어가 있을 때 그 뒤에 나올 수 있는 확률이 큰 단어를 찾는 작업이므로

구하고자 하는건 입력단어에 대한 조건부확률이 될 것이다

따라서 love 뒤에 자주 나왔던 단어가 스코어가 높아지는 방향으로 업데이트가 이루어질 것이다

즉 love 뒤에 end가 적당하다는 판단을 내리기에는 썩 자주 나오는 케이스가 아니므로,

다른 단어를 가져다가 문장을 이어갈 수 있다.

이걸 반대방향에서도 학습을 시키면

end 앞에 love가 붙는 것이 타당한지에 대한 판단을 할텐데

문제는 거의 모든 문장이 end를 가지고 있다보니,

end에 관해서는 계속 스코어 보정이 이루어지는게 아닐까..

제대로 된 문장이 아니라 가사라서 비문이 섞여있을 가능성 역시 무시할 수 없을 것 같다.

보통은 is 하고 문장을 끝낼 일은 없지만 노래가사에서야 얼마든지 있을 수 있는 문장이니까..

딥러닝이 왜 블랙박스라고 불리는지 너무 절실하게 느낀 노드였다....

너 무슨 생각하고있니 😂

