State imports

In [1]:
import os
import re
import string
import requests
import numpy as np
import collections
import random
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
# from tensorflow.python.framework import ops
# ops.reset_default_graph()

Start session

In [2]:
sess = tf.Session()

Define hyperparameters - these come straight from the example code of "TensorFlow Cookbook"

In [3]:
min_word_freq = 5 # Trim the less frequent words off
rnn_size = 128 # RNN Model size
embedding_size = 100 # Word embedding size
epochs = 10 # Number of epochs to cycle through data
batch_size = 100 # Train on this many examples at once
learning_rate = 0.001 # Learning rate
training_seq_len = 50 # how long of a word group to consider 
embedding_size = rnn_size
save_every = 500 # How often to save model checkpoints
eval_every = 50 # How often to evaluate the test sentences
prime_texts = ['thou art more', 'to be or not to', 'wherefore art thou']

# Download/store Shakespeare data
data_dir = 'temp'
data_file = 'shakespeare.txt'
model_path = 'shakespeare_model'
full_model_dir = os.path.join(data_dir, model_path)

# Declare punctuation to remove, everything except hyphens and apostrophes
punctuation = string.punctuation
punctuation = ''.join([x for x in punctuation if x not in ['-', "'"]])

# Make Model Directory
if not os.path.exists(full_model_dir):
    os.makedirs(full_model_dir)

# Make data directory
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

Download the data if not already saved

In [4]:
print('Loading Shakespeare data')
# Check if file is downloaded
if not os.path.isfile(os.path.join(data_dir, data_file)):
    print('Not found, downloading Shakespeare text from www.gutenbery.org')
    shakespeare_url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt'
    # Get Shakespeare text
    response = requests.get(shakespeare_url)
    shakespeare_file = response.content
    # Decode binary into string
    s_text = shakespeare_file.decode('utf-8')
    # Drop first few descriptive paragraphs
    s_text = s_text[7675:]
    # Remove newlines
    s_text = s_text.replace('\r\a', '')
    s_text = s_text.replace('\a', '')
    
    # Write to file
    with open(os.path.join(data_dir, data_file), 'w') as out_conn:
        out_conn.write(s_text)
        
else:
    with open(os.path.join(data_dir, data_file), 'r') as file_conn:
        s_text = file_conn.read().replace('\n', '')

# Clean text
print('Cleaning Text')
s_text = re.sub(r'[{}]'.format(punctuation), ' ', s_text)
s_text = re.sub('\s+', ' ', s_text).strip().lower()
print('Done loading/cleaning.')

Loading Shakespeare data
Cleaning Text
Done loading/cleaning.


Build word processing dictionary

In [5]:
def build_vocab(text, min_word_freq):
    word_counts = collections.Counter(text.split(' '))
    word_counts = {key:val for key, val in word_counts.items() if val>min_word_freq}
    words = word_counts.keys()
    # Create vocab to index mapping
    vocab_to_ix_dict = {key:(ix+1) for ix, key in enumerate(words)}
    # Add unknown key
    vocab_to_ix_dict['unknown'] = 0
    # Create index to vocab mapping
    ix_to_vocab_dict = {val:key for key, val in vocab_to_ix_dict.items()}
    
    return (ix_to_vocab_dict, vocab_to_ix_dict)

In [6]:
print('Buidling Shakespeare Vocab')
ix2vocab, vocab2ix = build_vocab(s_text, min_word_freq)
assert(len(ix2vocab) == len(vocab2ix))
vocab_size = len(ix2vocab) + 1
print('Vocabulary Length = {}'.format(vocab_size))

# Convert words to indexes
s_text_words = s_text.split(' ')
s_text_ix = []
for ix, x in enumerate(s_text_words):
    try:
        s_text_ix.append(vocab2ix[x])
    except:
        s_text_ix.append(0)
s_text_ix = np.array(s_text_ix)

Buidling Shakespeare Vocab
Vocabulary Length = 8009


In [7]:
class LSTM_Model():
    
    def __init__(self, embedding_size, rnn_size, batch_size, learning_rate, 
                 training_seq_len, vocab_size, infer_sample=False):
        self.embedding_size = embedding_size
        self.rnn_size = rnn_size
        self.vocab_size = vocab_size
        self.infer_sample = infer_sample
        self.learning_rate = learning_rate
        
        # inferring meaning generating text - there is a function that handles this below
        # This is used in the testing stage of our network
        if infer_sample:
            self.batch_size = 1
            self.training_seq_len = 1
        else:
            self.batch_size = batch_size
            self.training_seq_len = training_seq_len
        
        self.lstm_cell = tf.contrib.rnn.BasicLSTMCell(self.rnn_size)
        # return zero-filled state tensor
        self.initial_state = self.lstm_cell.zero_state(self.batch_size, tf.float32)
        
        # tf.placeholder(dtype, size)
        self.x_data = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
        self.y_output = tf.placeholder(tf.int32, [self.batch_size, self.training_seq_len])
        
        with tf.variable_scope('lstm_vars'):
            # Softmax Output Weights
            
            W = tf.get_variable('W', [self.rnn_size, self.vocab_size], tf.float32, tf.random_normal_initializer())
            b = tf.get_variable('b', [self.vocab_size], tf.float32, tf.constant_initializer(0.0))
            
            # Define embedding
            embedding_mat = tf.get_variable('embedding_mat', [self.vocab_size, self.embedding_size],
                                            tf.float32, tf.random_normal_initializer())
            
            embedding_output = tf.nn.embedding_lookup(embedding_mat, self.x_data)
            rnn_inputs = tf.split(axis=1, num_or_size_splits=self.training_seq_len, value=embedding_output)
            rnn_inputs_trimmed = [tf.squeeze(x, [1]) for x in rnn_inputs]
        
        # Add a 'loop' function to generate text if we are inferring
        def inferred_loop(prev, count):
            # Apply hidden layer
            prev_transformed = tf.matmul(prev, W) + b
            # Get the index of the output (don't run the gradient)
            prev_symbol = tf.stop_gradient(tf.argmax(prev_transformed, 1))
            # Get embedded vector
            output = tf.nn.embedding_lookup(embedding_mat, prev_symbol)
            return (output)
        
        decoder = tf.contrib.legacy_seq2seq.rnn_decoder
        outputs, last_state = decoder(rnn_inputs_trimmed, 
                                      self.initial_state, 
                                      self.lstm_cell, 
                                      loop_function=inferred_loop if infer_sample else None)
        
        # Non inferred outputs
        output = tf.reshape(tf.concat(axis=1, values=outputs), [-1, self.rnn_size])
        # Logits and output
        self.logit_output = tf.matmul(output, W) + b
        self.model_output = tf.nn.softmax(self.logit_output)
        
        loss_fun = tf.contrib.legacy_seq2seq.sequence_loss_by_example
        loss = loss_fun([self.logit_output], [tf.reshape(self.y_output, [-1])], 
                        [tf.ones([self.batch_size * self.training_seq_len])],
                        self.vocab_size)
        self.cost = tf.reduce_sum(loss) / (self.batch_size * self.training_seq_len)
        self.final_state = last_state
        gradients, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tf.trainable_variables()), 4.5)
        optimizer = tf.train.AdamOptimizer(self.learning_rate)
        self.train_op = optimizer.apply_gradients(zip(gradients, tf.trainable_variables()))
    
    # Generate text sample
    def sample(self, sess, words=ix2vocab, vocab=vocab2ix, num=10, prime_text='thou art'):
        # initialise cell
        state = sess.run(self.lstm_cell.zero_state(1, tf,float32))
        word_list = prime_text.split()
        for word in word_list[:-1]:
            x = np.zeros((1,1))
            x[0, 0] = vocab[word]
            feed_dict = {self.x_data: x, self.initial_state:state}
            [state] = sess.run([self.final_state], feed_dict=feed_dict)
        
        out_sentence = prime_text
        word = word_list[-1]
        for n in range(num):
            x = np.zeros((1,1))
            x[0, 0] = vocab[word]
            feed_dict = {self.x_data: x, self.initial_state:state}
            [model_output, state] = sess.run([self.model_output, self.final_state], feed_dict=feed_dict)
            sample = np.argmax(model_output[0])
            if sample == 0:
                break
            word = words[sample]
            out_sentence = out_sentence + ' ' + word
        return (out_sentence)

Share the variable scope between the trained and test model, to use the same model

In [8]:
# Define LSTM model
lstm_model = LSTM_Model(embedding_size, rnn_size, batch_size, learning_rate, training_seq_len, vocab_size)

# Reuse scope for testing
with tf.variable_scope(tf.get_variable_scope(), reuse=True):
    test_lstm_model = LSTM_Model(embedding_size, rnn_size, batch_size, learning_rate, 
                                 training_seq_len, vocab_size, infer_sample=True)

In [9]:
# Create a model saving operation
saver = tf.train.Saver(tf.global_variables())