<a href="https://colab.research.google.com/github/jimwu6/rnn-genius/blob/master/song_generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RNN Genius


Resources:
* [Lab Notebook Doc](https://docs.google.com/document/d/1yXhX0KPZ_Z4c9jY0r8-wTuBfnI-qHExfJMpQCCfy6zs/edit#)
* [Original inspiration](http://karpathy.github.io/2015/05/21/rnn-effectiveness/?fbclid=IwAR2jDIjSieoc9ZtG_7FLN03Q3LZcUUtkw7V_4mnW0pNrelXoi6PAQsO2ffQ)
* [Keras for RNNs/LSTMs](https://www.tensorflow.org/guide/keras/rnn)
* [Text Classification](https://www.tensorflow.org/tutorials/text/text_classification_rnn)
* [Text Generation](https://www.tensorflow.org/tutorials/text/text_generation) - Code was extrapolated from here.

### Future Plans:
* Move webapp to cloud


---




## Setup

We suggest enabling the GPU or TPU if you would like to train models.

### Import the libraries


In [None]:
import numpy as np
import os
import time

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing

import matplotlib.pyplot as plt

### Define constants
Some Resources:
* [Embedding](https://towardsdatascience.com/deep-learning-4-embedding-layers-f9a02d55ac12)

In [None]:
# Batches
SEQ_LENGTH = 500
BATCH_SIZE = 64
BUFFER_SIZE = 10000

# Model 
EMBEDDING_DIM = 256
RNN_UNITS = 768

# Train
EPOCHS = 30

### Get dataset


In [None]:
# get file path for input
# path_to_file = keras.utils.get_file('lyrics.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
path_to_file = "/content/DS_2.txt"
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')

## Text Processing

To encode the words, we will use the sorted list of characters that exist in the text.



In [None]:
# CHECK TEXT
print('Length of text: {} characters'.format(len(text)))
#print(text[:1000])

# CHECK UNIQUE CHARS
vocab = sorted(set(text))
print('{} unique characters'.format(len(vocab)))
print(vocab)

VOCAB_SIZE = len(vocab)

# CHAR ENCODING TO ID
chars = tf.strings.unicode_split(text, input_encoding='UTF-8')
ids_from_chars = preprocessing.StringLookup(vocabulary=list(vocab))
ids = ids_from_chars(chars)

# INVERSION TO CHAR
chars_from_ids = preprocessing.StringLookup(vocabulary=ids_from_chars.get_vocabulary(), invert=True)

def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

Length of text: 20467159 characters
84 unique characters
['\n', '\r', ' ', '!', '"', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', ']', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '{', '}']


### Training examples & targets

In [None]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

ids_dataset = tf.data.Dataset.from_tensor_slices(ids)

sequences = ids_dataset.batch(SEQ_LENGTH + 1, drop_remainder=True)

dataset = sequences.map(split_input_target)

for input_example, target_example in dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy())
    print("Target:", text_from_ids(target_example).numpy())

Input : b'[Intro]\r\nHmm\r\n\r\n[Verse 1]\r\nYou might think I\'m crazy\r\nThe way I\'ve been cravin\'\r\nIf I put it quite plainly\r\nJust gimme them babies\r\nSo what you doin\' tonight?\r\nBetter say, "Doin\' you right" (Yeah)\r\nWatchin\' movies, but we ain\'t seen a thing tonight (Yeah)\r\n\r\n[Pre-Chorus]\r\nI don\'t wanna keep you up (You up)\r\nBut show me, can you keep it up? (It up)\r\n\'Cause then I\'ll have to keep you up\r\nShit, maybe I\'ma keep you up, boy\r\nI\'ve been drinkin\' coffee (I\'ve been drinkin\' coffee; coffee)\r\nAnd I\'ve been'
Target: b'Intro]\r\nHmm\r\n\r\n[Verse 1]\r\nYou might think I\'m crazy\r\nThe way I\'ve been cravin\'\r\nIf I put it quite plainly\r\nJust gimme them babies\r\nSo what you doin\' tonight?\r\nBetter say, "Doin\' you right" (Yeah)\r\nWatchin\' movies, but we ain\'t seen a thing tonight (Yeah)\r\n\r\n[Pre-Chorus]\r\nI don\'t wanna keep you up (You up)\r\nBut show me, can you keep it up? (It up)\r\n\'Cause then I\'ll have to keep you up

### Batch dataset


In [None]:
dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))


## Building Model
We used four different model archetictures (not including specific unit numbers)

In [None]:
# Embedding -> GRU -> Dense

class ModelOne(keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units,
                                    return_sequences=True, 
                                    return_state=True,
                                    )
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else: 
            return x

model = ModelOne(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=EMBEDDING_DIM,
    rnn_units=RNN_UNITS)

for input_batch, target_batch in dataset.take(1):
    example_batch_predictions = model(input_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 86) # (batch_size, sequence_length, vocab_size)


In [None]:
# Embedding -> GRU -> GRU -> Dense

class ModelTwo(keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru_one = tf.keras.layers.GRU(rnn_units,
                                    return_sequences=True, 
                                    return_state=True)
        self.gru_two = tf.keras.layers.GRU(rnn_units,
                                    return_sequences=True, 
                                    return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = [self.gru_one.get_initial_state(x), self.gru_two.get_initial_state(x)]
        x, states[0] = self.gru_one(x, initial_state=states[0], training=training)
        x, states[1] = self.gru_two(x, initial_state=states[1], training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else: 
            return x

model = ModelTwo(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=EMBEDDING_DIM,
    rnn_units=RNN_UNITS)

for input_batch, target_batch in dataset.take(1):
    example_batch_predictions = model(input_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 300, 86) # (batch_size, sequence_length, vocab_size)


In [None]:
# Embedding -> LSTM -> Dense

class ModelThree(keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(rnn_units,
                                    return_sequences=True, 
                                    return_state=True
                                    )
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        # if states is None:
        #     states = [tf.zeros([64,768]), tf.zeros([64, 768])]
        x, h, c = self.lstm(x, initial_state=states, training=training)
        states = [h,c]
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else: 
            return x

model = ModelThree(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=EMBEDDING_DIM,
    rnn_units=RNN_UNITS)

for input_batch, target_batch in dataset.take(1):
    example_batch_predictions = model(input_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
# Embedding -> LSTM -> LSTM -> Dense

class ModelFour(keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm_one = tf.keras.layers.LSTM(rnn_units,
                                    return_sequences=True, 
                                    return_state=True
                                    )
        self.lstm_two = tf.keras.layers.LSTM(rnn_units,
                                    return_sequences=True, 
                                    return_state=True
                                    )
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = [None, None]
        x, h, c = self.lstm_one(x, initial_state=states[0], training=training)
        states[0] = [h,c]
        x, h, c = self.lstm_two(x, initial_state=states[1], training=training)
        states[1] = [h,c]
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else: 
            return x

model = ModelFour(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=EMBEDDING_DIM,
    rnn_units=RNN_UNITS)

for input_batch, target_batch in dataset.take(1):
    example_batch_predictions = model(input_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 500, 86) # (batch_size, sequence_length, vocab_size)


In [None]:
model.summary()

### Testing stuff

In [None]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
print("Input:\n", text_from_ids(input_batch[0]).numpy())
print()
print("Predictions:\n", text_from_ids(sampled_indices).numpy())

## Training Model



In [None]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

example_batch_loss = loss(target_batch, example_batch_predictions)
mean_loss = example_batch_loss.numpy().mean()
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", mean_loss)

model.compile(optimizer='adam', loss=loss)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback = keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix, 
    save_weights_only=True, 
    save_freq= 5 * (len(sequences) // BATCH_SIZE),
    verbose=1
) # save_freq for how many checkpoints (every 5 epochs)

# Train
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

## Generate Text

In [None]:
class OneStep(keras.Model):
    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=0.85):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        # Create mask to prevent "" and "[UNK]"
        skip_ids = self.ids_from_chars(['','[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            values = [-float('inf')] * len(skip_ids),
            indices = skip_ids,
            dense_shape = [len(ids_from_chars.get_vocabulary())]
        )
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        # Convert strings to tokens
        input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
        input_ids = self.ids_from_chars(input_chars).to_tensor()
        print((self.ids_from_chars(input_chars)).shape)
        # Run model
        if (type(states) == type([])):
          predicted_logits, states = self.model(inputs=input_ids, states=states[:], return_state=True)
        else:
          predicted_logits, states = self.model(inputs=input_ids, states=states, return_state=True)

        # Only use the last prediction
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits / self.temperature
        # Apply prediction mask
        predicted_logits = predicted_logits + self.prediction_mask

        # Generate IDs
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # Convert to chars
        predicted_chars = self.chars_from_ids(predicted_ids)

        # Return chars, model state
        return predicted_chars, states

### Testing train

In [None]:
# Set the temperature (usually it is in (0, 1])
temperature = 0.85

# Set the starting text
next_char = tf.constant(['[Intro]'])

# Set how many characters to generate
length = 1000

# Create the model to test
one_step_model = OneStep(model, chars_from_ids, ids_from_chars, temperature=temperature)

start = time.time()
states = None
result = [next_char]

# Generate lyrics
for n in range(length):
    next_char, states = one_step_model.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()

# Print generated lyrics
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*72)
print(f"\nRun time: {end - start}")
print()

### Visualizers

Only loss is being tracked, so we will plot it from the training history.

In [None]:
plt.plot(history.history['loss'])

# Trained Model

## Loading weights
If you have weights saved, you can load them in two different ways but it will only work if the weights are the same size as your current model vocabulary. Otherwise you can initialize a new model if you know the vocabulary size.

Resources:
* [Save and load models](https://www.tensorflow.org/tutorials/keras/save_and_load)

In [None]:
# # LOAD PREVIOUS WEIGHTS IF YOU HAVE THEM
checkpoint_dir = './' # change the directory as needed
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_30") # only if you want to load specific weights
# model.load_weights(tf.train.latest_checkpoint(checkpoint_dir)) 
model.load_weights(checkpoint_prefix)

# LOAD FROM GCLOUD
# from google.colab import auth
# from tensorflow.python.lib.io import file_io

# auth.authenticate_user()

# model_file = file_io.FileIO('GCLOUD BUCKET DIRECTORY', mode='rb')

# temp_model_location = './model21_checkpoints/ckpt_30.index'
# temp_model_file = open(temp_model_location, 'wb')
# temp_model_file.write(model_file.read())
# temp_model_file.close()
# model_file.close()

# model_file = file_io.FileIO('gs://rnn-genius-models-bucket/model21/training_checkpoints/ckpt_30.data-00000-of-00001', mode='rb')

# temp_model_location = './model21_checkpoints/ckpt_30.data-00000-of-00001'
# temp_model_file = open(temp_model_location, 'wb')
# temp_model_file.write(model_file.read())
# temp_model_file.close()
# model_file.close()

### Train More Epochs

In [None]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback = keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix, 
    save_weights_only=True, 
    save_freq=5 * (len(sequences) // BATCH_SIZE),
    verbose=1
) # save_freq for how many checkpoints (every 5 epochs)

EPOCHS = 30

history2 = model.fit(dataset, epochs=EPOCHS, initial_epoch=EPOCHS, callbacks=[checkpoint_callback])
plt.plot(history2.history['loss'])

### Test Run

In [None]:
# Set the temperature (usually it is in (0, 1])
temperature = 0.85

# Set the starting text
next_char = tf.constant(['[Intro]'])

# Set how many characters to generate
length = 1000

# Create the model to test
one_step_model = OneStep(model, chars_from_ids, ids_from_chars, temperature=temperature)

start = time.time()
states = None
result = [next_char]

# Generate lyrics
for n in range(length):
    next_char, states = one_step_model.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()

# Print generated lyrics
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*72)
print(f"\nRun time: {end - start}")
print()

## Finetuning
To finetune training on a smaller dataset, we can train on the original model archeticture we used or we freeze some layers. As an example, there is Model Four with its two LSTM layers frozen.

In [None]:
# Freeze the LSTM Layers
class ModelFour_Freeze(keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm_one = tf.keras.layers.LSTM(rnn_units,
                                    return_sequences=True, 
                                    return_state=True,
                                    trainable=False # freeze layer
        )
        self.lstm_two = tf.keras.layers.LSTM(rnn_units,
                                    return_sequences=True, 
                                    return_state=True,
                                    trainable=False # freeze layer
        )
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = [None, None]
        x, h, c = self.lstm_one(x, initial_state=states[0], training=training)
        states[0] = [h,c]
        x, h, c = self.lstm_two(x, initial_state=states[1], training=training)
        states[1] = [h,c]
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else: 
            return x

model = ModelFour_Freeze(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=EMBEDDING_DIM,
    rnn_units=RNN_UNITS)

for input_batch, target_batch in dataset.take(1):
    example_batch_predictions = model(input_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 300, 86) # (batch_size, sequence_length, vocab_size)


### Get new text to finetune against

In [None]:
path_to_file = "/content/ShawnMendes.txt"
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')

chars = tf.strings.unicode_split(text, input_encoding='UTF-8')
ids = ids_from_chars(chars)


### Create new dataset


In [None]:
ids_dataset = tf.data.Dataset.from_tensor_slices(ids)

sequences = ids_dataset.batch(SEQ_LENGTH + 1, drop_remainder=True)

dataset = sequences.map(split_input_target)
dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

### Load previous weights (if necessary)

In [None]:
# # LOAD PREVIOUS WEIGHTS IF YOU HAVE THEM
checkpoint_dir = './'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_30") # only if you want to load specific weights
# model.load_weights(tf.train.latest_checkpoint(checkpoint_dir)) 
model.load_weights(checkpoint_prefix)

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7fceff8e3048>

### Train on smaller dataset



In [None]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback = keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix, 
    save_weights_only=True, 
    save_freq=5 * (len(sequences) // BATCH_SIZE),
    verbose=1
) # save_freq for how many checkpoints (every 5 epochs)

history_finetune = model.fit(dataset, epochs=10, callbacks=[checkpoint_callback])
plt.plot(history.history['loss'] + finetune.history['loss'])

### Test again

In [None]:
# Set the temperature (usually it is in (0, 1])
temperature = 0.85

# Set the starting text
next_char = tf.constant(['[Intro]'])

# Set how many characters to generate
length = 1000

# Create the model to test
one_step_model = OneStep(model, chars_from_ids, ids_from_chars, temperature=temperature)

start = time.time()
states = None
result = [next_char]

# Generate lyrics
for n in range(length):
    next_char, states = one_step_model.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()

# Print generated lyrics
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*72)
print(f"\nRun time: {end - start}")
print()

## Saving weights
We will save every 5th file for weights, along with the checkpoint


In [None]:
!zip model.zip training_checkpoints/ckpt_*0.* training_checkpoints/ckpt_*5.* training_checkpoints/checkpoint

  adding: training_checkpoints/ckpt_20.data-00000-of-00001 (deflated 7%)
  adding: training_checkpoints/ckpt_20.index (deflated 66%)
  adding: training_checkpoints/ckpt_30.data-00000-of-00001 (deflated 7%)
  adding: training_checkpoints/ckpt_30.index (deflated 66%)
  adding: training_checkpoints/ckpt_25.data-00000-of-00001 (deflated 7%)
  adding: training_checkpoints/ckpt_25.index (deflated 66%)
  adding: training_checkpoints/checkpoint (deflated 38%)
