In [1]:
# from google.colab import drive
# drive.mount('/content/drive')
# %cd /content/drive/MyDrive/SMU_MITB_NLP/Group Project/NLP-Lyric-Generator/src/bin

In [2]:
### Standard Imports
import numpy as np
import re
import sys
import os
from collections import Counter

import tensorflow as tf
from tensorflow.keras import layers

In [3]:
### Custom Imports
sys.path.append('../')
import lib.utilities as utils
import lib.autoencoder_utilities as ae_utils

In [4]:
### Text Parameters
start_token = '<cls>'
end_token = '<eos>'
pad_token = '<pad>'
unk_token = '<unk>'
newline_token = '<new>'

### General Parameters
random_seed = 2022
model_folder = '../../models/autoencoder/lstm/v1'

### Model Parameters
val_split = 0.2
window_len = 15
batch_size = 64
enc_dim, dec_dim = 256, 256
learn_rate = 0.001
epochs = 5
dropout = 0.05
recurrent_dropout = 0.05

In [5]:
os.makedirs(model_folder, exist_ok=True)

In [6]:
### Load Data
corpus = utils.load_corpus()

In [7]:
### Pre-Processing Text
words, word_count, index_to_vocab, vocab_to_index, songs, songs_token_ind = utils.tokenize_corpus(corpus,
                                                                                                  window_length = window_len,
                                                                                                  end_token = end_token,
                                                                                                  start_token = start_token,
                                                                                                  pad_token = pad_token,
                                                                                                  unk_token = unk_token,
                                                                                                  newline_token = newline_token)
vocab_size = len(word_count)

In [8]:
x_encoder, x_decoder, y = ae_utils.construct_seq_data(songs_token_ind, window_len)

In [9]:
rand_int = np.random.randint(0, len(x_encoder), 1)[0]
print([index_to_vocab.get(x) for x in x_encoder[rand_int]])
print([index_to_vocab.get(x) for x in x_decoder[rand_int]])
print(index_to_vocab.get(y[rand_int]))

['much', 'more', '<new>', 'one', 'singapore', '<new>', '<new>', '<chorus>', '<new>', 'whoa', 'oh', 'oh', 'oh', '<new>', 'wooh']
['so', 'much', 'more', '<new>', 'one', 'singapore', '<new>', '<new>', '<chorus>', '<new>', 'whoa', 'oh', 'oh', 'oh', '<new>']
wooh


In [10]:
def construct_datasets(x_decoder, y, validation_split, batch_size, buffer = 10000, random_seed = 2022, one_hot = True, vocab_size = None):
    dataset = tf.data.Dataset.from_tensor_slices((x_decoder, y))
    dataset = dataset.shuffle(buffer_size = buffer, seed = random_seed)
    if one_hot:
        if vocab_size is None:
            print('Error: Please provide vocab size for one hot encoding')
            return None
        dataset = dataset.map(lambda x, y: (tf.one_hot(x, depth = vocab_size),
                                         tf.one_hot(y, depth = vocab_size)))
    train_dataset = dataset.take(int((1-validation_split)*len(dataset)))
    val_dataset = dataset.skip(int((1-validation_split)*len(dataset)))

    train_dataset_final = train_dataset.batch(batch_size).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
    val_dataset_final = val_dataset.batch(batch_size).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
    
    return train_dataset_final, val_dataset_final

In [11]:
train_dataset, val_dataset = construct_datasets(x_decoder, y,
                                                         validation_split = val_split,
                                                         random_seed = random_seed,
                                                         batch_size = batch_size,
                                                         vocab_size = vocab_size)

In [12]:
# temp = ['morning', 'is', 'gone']
# for i, song in enumerate(songs):
#     for j in range(len(song)):
#         sub = song[j:(j+len(temp))]
#         if sub == temp:
#             print(i,j)
#             print(sub)
#             print(song)

In [13]:
# for x,y in train_dataset.take(1):
#     s1 = [index_to_vocab[np.where(v)[0][0]] for v in x[0][0,:,:]]
#     print(s1)
#     s2 = [index_to_vocab[np.where(v)[0][0]] for v in x[1][0,:,:]]
#     print(s2)
#     t = index_to_vocab[np.where(y[0])[0][0]]
#     print(t)
#     #print(np.where(x[1]))
#     #print(np.where(y[0]))

In [15]:
# Decoder RNN
decoder_input = layers.Input(shape=(window_len,vocab_size), name = 'decoder_input')

# Pass the encoder state to a new LSTM, as initial state
decoder_output = layers.LSTM(dec_dim,
                             dropout = dropout, recurrent_dropout = recurrent_dropout,
                             name="decoder_lstm")(decoder_input)
output = layers.Dense(vocab_size, name = 'output', activation = 'softmax')(decoder_output)

model = tf.keras.Model(decoder_input, output)
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 15, 1041)]        0         
                                                                 
 decoder_lstm (LSTM)         (None, 256)               1329152   
                                                                 
 output (Dense)              (None, 1041)              267537    
                                                                 
Total params: 1,596,689
Trainable params: 1,596,689
Non-trainable params: 0
_________________________________________________________________


In [16]:
model.compile(loss = 'categorical_crossentropy',
              optimizer = tf.keras.optimizers.Adam(learning_rate=learn_rate),
              metrics = ['accuracy'])

In [17]:
### Callbacks
callback_es = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=0,
    patience=5,
    verbose=0,
    mode='min',
    baseline=None,
    restore_best_weights=True
)

callback_mc = tf.keras.callbacks.ModelCheckpoint(
    filepath=model_folder+'/weights.{epoch:02d}-{val_loss:.2f}-{val_accuracy:.2f}.hdf5',
    save_weights_only=True,
    monitor='val_loss',
    mode='min',
    save_best_only=False)

In [18]:
history = model.fit(x = train_dataset, validation_data = val_dataset, epochs = epochs, callbacks = [callback_es, callback_mc])

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [19]:
#model.save_weights('../../../autoencoder/lstm/v1/final_weights.hdf5')

In [20]:
#model.load_weights('../../../autoencoder/lstm/v1/final_weights.hdf5')

In [21]:
start_string = 'Whenever I think back'

In [22]:
tokenized_str = utils.tokenize_text(start_string, newline_token)
input_indices = [vocab_to_index.get(s) for i, s in enumerate(tokenized_str) if i < window_len-1]
input_indices = [i if i is not None else vocab_to_index.get(unk_token) for i in input_indices]
input_indices = [vocab_to_index.get(pad_token)]*(window_len - len(input_indices)-1) + [vocab_to_index.get(start_token)] + input_indices

In [23]:
input_oh = tf.one_hot(input_indices, depth = vocab_size)
model_input = tf.expand_dims(input_oh, 0)

In [24]:
text_generated = []

In [25]:
for _ in range(100):
    #print([index_to_vocab[ind] for ind in np.where(model_input[0].numpy())[1]])
    prediction = model.predict(model_input)

    topk = tf.math.top_k(prediction,10)
    #print([index_to_vocab.get(x) for x in topk.indices.numpy()[0]])
    pred_id = tf.random.categorical(topk[0], num_samples = 1)

    prediction = prediction / 1
    #print(prediction[0,1])
    #print(sum(prediction[0,:10]))
    predicted_id = tf.random.categorical(np.log(prediction), num_samples=1, seed = random_seed)[-1,0]
    pred_word = index_to_vocab[predicted_id.numpy()]
    #print(pred_word)

    pred_oh = tf.one_hot(predicted_id, depth = vocab_size)
    input_index = tf.expand_dims(pred_oh, 0)
    model_input = tf.expand_dims(tf.concat([model_input[0][1:,:], input_index], axis = 0), 0)
    
    if pred_word == '<new>':
        text_generated.append('\n')
    else:
        text_generated.append(pred_word)

In [26]:
print(' '.join(text_generated))


 look a of unfurled standing hopes long end these said told struggled feel my moments hands in four colleagues shore not our stronger the changing the in 
 till 
 one out 
 stand 
 more 
 
 sing the cooking be 
 i are worth always wide to be here and 
 make i la seen aside five every special and 
 a grow together 
 ages must ahead experienced what favourite way and love 
 know may be awaits stars heart we the singaporean dating come me working will brings different take 
 i feel eiffel brave


In [None]:
# prompts = ['Whenever I think back', 'And so this I know',
#            'I am tired of being what you want me to be', 'Feeling so faithless, lost under the surface',
#            'Relight our fire, we will find our way', 'We will rise stronger together']
# result_strings = {}
# results = {}
# for prompt in prompts:
#     result_str, result = utils.generate_text(model,
#                                              ae_utils.ind_to_input_fun, ae_utils.update_input_fun,
#                                              start_string = prompt,
#                                              window_length = window_len,
#                                              vocab_to_index_dict = vocab_to_index, index_to_vocab_dict = index_to_vocab,
#                                              vocab_size = vocab_size,
#                                              num_generate = 100, temperature = 1.0,
#                                              random_seed = random_seed,
#                                              end_token = end_token, start_token = start_token,
#                                              pad_token = pad_token, unk_token = unk_token,
#                                              newline_token = newline_token,
#                                              depth = vocab_size)
#     result_strings[prompt] = result_str
#     results[prompt] = result

# final_str = f'\n\n{end_token}\n\n'.join([f'{k}:\n{v}' for k, v in result_strings.items()])

In [None]:
# print(final_str)

In [None]:
# with open(model_folder+'/generated_text.txt', 'w') as f:
#     f.write(final_str)