In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd
import os
import time
import nltk
import re
from collections import Counter
import pickle
import sys

try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=True)
    COLAB = True
    print("Note: using Google CoLab")
    %tensorflow_version 2.x
    # relative imports
    THIS_DIR = os.getcwd()
    THIS_DIR = os.path.join(THIS_DIR, 'drive/My Drive/poetry_phoneme_lstm')
    sys.path.append(f'{THIS_DIR}/g2p_en')
    from g2p import G2p
    import expand
    print(tf.test.gpu_device_name())
except:
    print("Note: not using Google CoLab")
    COLAB = False
    THIS_DIR = os.getcwd()
    sys.path.append(f'{THIS_DIR}/g2p_en')
    from g2p import G2p
    import expand

Note: not using Google CoLab


In [2]:
# enables use of tensorboard
%load_ext tensorboard

## GENERATE SIMPLE RAW TEXT FILE FROM POETRY FILES 

In [3]:
# sets poems subfolder to merge all poems into large text file
POEMS_FOLDER = 'poems/'
POEM_FULL_PATH = os.path.join(THIS_DIR, POEMS_FOLDER)
print(POEM_FULL_PATH)

/Users/samuelmignot/Desktop/hobbies/code/jupyter-notebooks/poetry_phoneme_lstm/poems/


In [4]:
poem_files = [poem_file for poem_file in os.listdir(POEM_FULL_PATH)]
all_poems = []
all_poems_text = ""
for poem_file in poem_files:
    with open(os.path.join(POEM_FULL_PATH, poem_file), 'r') as f:
        all_poems_text+=f.read()

## NORMALIZE AND CLEAN DATA

1. Remove rare characters (those that appear less than 5 times),
2. Substitute angled quotes with regular quotes,
3. Convert character data into phonemes (I hypothesize that phoneme data will better represent poetic language).

In [5]:
def clean_text(text, rare_chars):
    '''Helper function that removes angled quotes and rare characters'''
    text = re.sub(r"“", '"', text)
    text = re.sub(r"”", '"', text) 
    text = re.sub(r"‘", "'", text)
    text = re.sub(r"’", "'", text)
    text = re.sub(re.compile("|".join(rare_chars)), "", text)
    text = re.sub(r"\n", "~", text)
    return text
        

In [6]:
# phonetic embedding
phonetic_embedding = True

if phonetic_embedding:
    # Uses a customized version of g2p that maintains newlines and other important punctuation characters
    g2p = G2p()
    all_poems_text= g2p(all_poems_text)
    phoneme_word_dict = g2p.word_map

## CREATE CHAR TO INT MAPS

In [7]:
vocab = sorted(set(all_poems_text))
print(vocab)
print('{} unique characters'.format(len(vocab)))
character_index_map = {c:i for i, c in enumerate(vocab)}
index_character_map = np.array(vocab)
text_as_int_array = np.array([character_index_map[c] for c in all_poems_text])

# Show how the first 13 characters from the text are mapped to integers
print (f'{repr(all_poems_text[:13])} -- mapped to int -- > {text_as_int_array[:13]}')

[' ', '!', '"', '#', "'", ',', '-', '.', '.   .   .', '.   ...', '.  .', '. .', '. . .', '. . .   . . .', '. . .  . . .', '. . . .', '. . . . .', '. . . . . . . . . . . .', '. . ..', '. . .. . .', '. ..', '. .. .', '..', '.. .', '.. . .', '...', '... ...', '?', 'AA0', 'AA1', 'AA2', 'AE0', 'AE1', 'AE2', 'AH0', 'AH1', 'AH2', 'AO0', 'AO1', 'AO2', 'AW0', 'AW1', 'AW2', 'AY0', 'AY1', 'AY2', 'B', 'CH', 'D', 'DH', 'EH0', 'EH1', 'EH2', 'ER0', 'ER1', 'ER2', 'EY0', 'EY1', 'EY2', 'F', 'G', 'HH', 'IH0', 'IH1', 'IH2', 'IY0', 'IY1', 'IY2', 'JH', 'K', 'L', 'M', 'N', 'NG', 'OW0', 'OW1', 'OW2', 'OY0', 'OY1', 'OY2', 'P', 'R', 'S', 'SH', 'T', 'TH', 'UH0', 'UH1', 'UH2', 'UW0', 'UW1', 'UW2', 'V', 'W', 'Y', 'Z', 'ZH', '_', '__', '___', '~']
101 unique characters
['#', ' ', 'L', 'EH1', 'T', 'ER0', ' ', 'S', 'EH1', 'V', 'AH0', 'N', ' '] -- mapped to int -- > [ 3  0 70 51 84 53  0 82 51 92 34 72  0]


In [29]:
# The maximum length sentence we want for a single input in characters
seq_length = 100
examples_per_epoch = len(text_as_int_array)//(seq_length+1)

# drop remainder
text_as_int_array = text_as_int_array[:examples_per_epoch*(seq_length+1)]
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int_array)

sequences = [np.array(text_as_int_array[i:i + seq_length + 1]) for i in range(0, len(text_as_int_array), seq_length+1)]

In [30]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# BUFFER_SIZE = 10000

# Length of the vocabulary in chars
vocab_size = len(vocab)

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 512 

# Number of gru layers
num_gru_layers = 1
gru_dropout = 0
is_bidirectional = False

# Number of dense layers
num_dense_layers = 1
dense_dropout = 0

In [31]:
# X = np.array([np.array([sequence[:-1] for sequence in batch]) for batch in batches])
# print(X.shape)
# print(X[0][0])
# y = np.array([np.array([sequence[1:] for sequence in batch]) for batch in batches])
# print(y.shape)
# print(y[0][0])
print(len(sequences))
sequences = np.array(sequences[:-(len(sequences)%BATCH_SIZE)])
print(len(sequences))

X = np.array([sequence[:-1] for sequence in sequences])
y = np.array([sequence[1:] for sequence in sequences])

81772
81728


In [32]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Embedding, LSTM, GRU, Bidirectional, Dropout

def conditional_bidirection(layer, is_birdirectional):
    if(is_bidirectional):
        return Bidirectional(layer)
    else:
        return layer
    
def build_model(vocab_size, embedding_dim, rnn_units, batch_size, num_gru_layers):
    model = tf.keras.Sequential()
    model.add(Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]))
    for i in range(num_gru_layers):
        model.add(conditional_bidirection(GRU(rnn_units,
                    return_sequences=True,
                    stateful=True,
                    recurrent_initializer='glorot_uniform'),
                  is_bidirectional)
                 )
    if(gru_dropout>0):
        model.add(Dropout(gru_dropout))
    for i in range(num_dense_layers):
        model.add(Dense(vocab_size))
        if(dense_dropout>0):
            model.add(Dropout(dense_dropout))
    return model

In [33]:
model = build_model(
  vocab_size = len(vocab),
  embedding_dim=embedding_dim,
  rnn_units=rnn_units,
  batch_size=BATCH_SIZE, 
  num_gru_layers=num_gru_layers
)

In [34]:
input_example_batch, target_example_batch = (X[:BATCH_SIZE], y[:BATCH_SIZE])
print(input_example_batch)
example_batch_predictions = model(input_example_batch)
print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab")

[[ 3  0 70 ... 32 84  0]
 [82  0 12 ... 44 53  0]
 [72 48  0 ...  0 63 84]
 ...
 [49 57  0 ... 72 48  0]
 [63 69 70 ...  0 93 63]
 [ 0 71 36 ...  0 46 32]]
(64, 100, 101) # (batch_size, sequence_length, vocab


In [35]:
model.summary()

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_2 (Embedding)      (64, None, 256)           25856     
_________________________________________________________________
gru_2 (GRU)                  (64, None, 512)           1182720   
_________________________________________________________________
dense_2 (Dense)              (64, None, 101)           51813     
Total params: 1,260,389
Trainable params: 1,260,389
Non-trainable params: 0
_________________________________________________________________


In [36]:
def loss(labels, logits):
  return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

example_batch_loss  = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss:      ", example_batch_loss.numpy().mean())

Prediction shape:  (64, 100, 101)  # (batch_size, sequence_length, vocab_size)
scalar_loss:       4.6149554


In [37]:
model.compile(optimizer='adam', loss=loss, metrics=['sparse_categorical_accuracy'])

In [38]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [41]:
EPOCHS=10
model_name = f"{'phoentic_' if phonetic_embedding else ''}char_{'b' if is_bidirectional else ''}gru_{num_gru_layers}l_{BATCH_SIZE}b_{rnn_units}u_{embedding_dim}e_{gru_dropout}d_dense_{num_dense_layers}l_{dense_dropout}d_{EPOCHS}epochs_{str(time.time()//1)}"
log_dir=f"logs/fit/{model_name}"
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

In [42]:
%tensorboard --logdir logs/fit
validation_sample = len(X)/10
validation_sample -= validation_sample%64
validation = 64*40/len(X)
history = model.fit(X, y, validation_split=(64*40/len(X)), epochs=EPOCHS, callbacks=[checkpoint_callback, tensorboard_callback])


Reusing TensorBoard on port 6006 (pid 9693), started 1 day, 2:22:09 ago. (Use '!kill 9693' to kill it.)

Train on 79168 samples, validate on 2560 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1, num_gru_layers=num_gru_layers)
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))

def key_or_closest(word):
    try:
        return phoneme_word_dict[word]
    except:
        keys = set(phoneme_word_dict.keys())
        while(word not in keys):
            word=word[:-1]
            if len(word) == 1:
                word = '#'
                break
        return phoneme_word_dict[word]
    
def generate_text(model, start_string):
  num_generate = 1000

  # Converting our start string to numbers (vectorizing)
  input_eval = [character_index_map[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  # Empty string to store our results
  text_generated = []

  # Experiment to find the best setting.
  temperature = 1

  # Here batch size == 1
  model.reset_states()
  for i in range(num_generate):
      predictions = model(input_eval)
      # remove the batch dimension
      predictions = tf.squeeze(predictions, 0)

      # using a categorical distribution to predict the character returned by the model
      predictions = predictions / temperature
      predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

      # We pass the predicted character as the next input to the model
      # along with the previous hidden state
      input_eval = tf.expand_dims([predicted_id], 0)

      text_generated.append(index_character_map[predicted_id])

  ph_text = (str(start_string) + ''.join(text_generated))
  print(ph_text)
  return " ".join([key_or_closest(word) for word in ph_text.split(" ")])

In [None]:
print(generate_text(model, start_string='# '))