In [21]:
# Import the libraries.
import tensorflow as tf
from tensorflow.keras import losses
from tensorflow.keras import layers
from tensorflow.keras import preprocessing
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras import regularizers

import numpy as np
import os
import time
import re
import string

print(tf.__version__)

2.1.0


In [22]:
# Download the dataset if not already
path = r'C:\Users\pmspr\Documents\Machine Learning\Courses\Tensorflow Cert\Data'
loadPath = r'C:\Users\pmspr\Documents\Machine Learning\Courses\Tensorflow Cert\Saved_Models\Models\2'
folder = 'nlp'
abs_path = os.path.join(path, folder)
abs_path = os.path.join(abs_path, 'shakespeare')
if not os.path.exists(os.path.join(abs_path, 'essay.txt')):
    sp_txt = tf.keras.utils.get_file('shakespeare.txt',
                                     cache_subdir=abs_path,
                                     origin='https://homl.info/shakespeare',
                                     )
    sp_dir = abs_path
else:
    sp_dir = abs_path

In [23]:
# Read the file
filepath = os.path.join(abs_path, 'essay.txt')
with open(filepath,'rb') as f:
    text = f.read().decode(encoding='utf-8') #use utf-8 to deocde special characters
print('Length of text: {} characters'.format(len(text)))


Length of text: 7055 characters


In [24]:
# Preprocess the corpus

# join the text, at next line, in to a list 
corpus = text.lower().split("\n")
tokenizer = preprocessing.text.Tokenizer(char_level=True)
tokenizer.fit_on_texts(corpus)

total_words = len(tokenizer.word_index)
dataset_size = tokenizer.document_count
print("Number of different characters:",total_words)
print('Total number of characters in the document', dataset_size)
print(tokenizer.word_index)

Number of different characters: 41
Total number of characters in the document 1
{' ': 1, 'e': 2, 'a': 3, 't': 4, 'n': 5, 'i': 6, 's': 7, 'o': 8, 'r': 9, 'h': 10, 'l': 11, 'c': 12, 'u': 13, 'g': 14, 'd': 15, 'm': 16, 'f': 17, 'p': 18, 'w': 19, 'b': 20, 'y': 21, ',': 22, '.': 23, 'v': 24, 'k': 25, 'x': 26, '’': 27, 'j': 28, '-': 29, 'z': 30, '0': 31, '3': 32, '7': 33, "'": 34, '(': 35, ')': 36, 'q': 37, '1': 38, '8': 39, '%': 40, '/': 41}


We set 'char_level= True' to get character-level encoding rather than the default word-level encoding. Note that this tokenizer converts the text to leovwercase by default. Now the tokenizer can encode a senetence or a list of sentencces to a a list of character IDs and back, and it tells us how many distinct characters there are and then total number of characters in the text:

In [25]:
# Test the tokenizer
print(tokenizer.texts_to_sequences(["Tensorflow"]))
print(tokenizer.sequences_to_texts(tokenizer.texts_to_sequences(["Tensorflow"])))

[[4, 2, 5, 7, 8, 9, 17, 11, 8, 19]]
['t e n s o r f l o w']


Our encoded sequence is a single list covering entire corpus. RNN works on a list of smaller sequences. We will chop the encoded dataset in to multiple windows and every instance in this will be a fairly short substring of the whole text and the RNN will be unrolled only over the lenght of these substrings. This is called trunated backpropogation through time. 

In [26]:
# convert corpus in to sequences
[encoded] = np.array(tokenizer.texts_to_sequences([text])) - 1

#Split the dataset in to Training and Test
train_size = len(encoded) * 90 // 100
dataset_train = tf.data.Dataset.from_tensor_slices(encoded[:train_size]) 

n_steps = 25
window_size = n_steps + 1

# Nested dataset, dataset of datasets, with window_size.
dataset_train = dataset_train.window(window_size, shift=1, drop_remainder = True) 

# Flat the nested dataset and create batches
dataset_train = dataset_train.flat_map(lambda window: window.batch(window_size))

# print sample input
test_sequence = []
for i in dataset_train.take(1):
    print({'Input sequence': np.array(i), 'Input sentence': tokenizer.sequences_to_texts([np.array(i)+1])})
    test_sequence = np.array(i)

# Shuffle the dataset (not applicable for timeseries)
batch_size = 10
dataset_train = dataset_train.shuffle(10000).batch(batch_size)

# Seperate sequence and label. 
dataset_train = dataset_train.map(lambda window: (window[:,:-1], window[:,-1]))

# dataset_train2 = dataset_train.map(
#     lambda X_batch, y_batch : (tf.one_hot(X_batch, depth=total_words), y_batch)
# )

# for item in dataset_train2.as_numpy_iterator():
#     print(item[0].shape)
#     print(item[1].shape)
#     break

# create batches and prefetch 
#AUTOTUNE = tf.data.AUTOTUNE
dataset_train = dataset_train.cache().prefetch(1)
# dataset_train2 = dataset_train2.prefetch(1)

{'Input sequence': array([14, 12,  8,  5,  4, 13,  0,  3,  9,  1,  0, 17,  2,  6,  3,  0, 11,
        1,  4,  3, 12,  8, 20, 21,  0,  9]), 'Input sentence': ['d u r i n g   t h e   p a s t   c e n t u r y ,   h']}


In [27]:
# Create the RNN layers
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Embedding(total_words, 25, input_length=n_steps))
model.add(tf.keras.layers.GRU(128, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))
model.add(tf.keras.layers.GRU(128, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))
model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(150, return_sequences=True)))
model.add(tf.keras.layers.LSTM(100))
model.add(tf.keras.layers.Dense(total_words/2, activation='relu',kernel_regularizer=regularizers.l2(0.01)))
model.add(
                tf.keras.layers.Dense(total_words, activation='softmax')
         )
model.summary()

Model: "sequential_4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_4 (Embedding)      (None, 25, 25)            1025      
_________________________________________________________________
gru_8 (GRU)                  (None, 25, 128)           59520     
_________________________________________________________________
gru_9 (GRU)                  (None, 25, 128)           99072     
_________________________________________________________________
bidirectional_4 (Bidirection (None, 25, 300)           334800    
_________________________________________________________________
lstm_9 (LSTM)                (None, 100)               160400    
_________________________________________________________________
dense_8 (Dense)              (None, 20)                2020      
_________________________________________________________________
dense_9 (Dense)              (None, 41)               

In [29]:
model.compile(optimizer = tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics=['accuracy']
             )

earlystop = tf.keras.callbacks.EarlyStopping(monitor='accuracy', baseline=0.98, mode="auto", patience=100)

history = model.fit(dataset_train, epochs=500, verbose=1, callbacks=[earlystop])

mpath = os.path.join(loadPath,'rnn.h5')
model.save(mpath)

Epoch 1/500
Epoch 2/500

KeyboardInterrupt: 

In [None]:
# Reload the module 
mpath = os.path.join(loadPath,'rnn.h5')
load_model = tf.keras.models.load_model(mpath)

# Check its architecture
load_model.summary()

test = np.expand_dims(test_sequence[:-1], axis=0)
predict_seq = load_model.predict(test)

print('Log probabilities:')
print(predict_seq)
print(' ')

print('Index of high probability - predicted char:', np.argmax(predict_seq))

In [20]:
# Predict the next characters for a sample text
seed_text = "the last ice age which ended sever"
next_words = 100
process_text = np.array(tokenizer.texts_to_sequences(seed_text)) - 1
process_text = pad_sequences([np.ravel(process_text)], maxlen=n_steps, padding='pre')
 
predict_seq = load_model.predict([test_sequence.tolist()[:-1]])
predict_classes=list(np.argmax(predict_seq,axis=1) + 1)
print(tokenizer.sequences_to_texts([predict_classes]))

['h']


In [None]:
for _ in range(next_words):
    token_list = np.array(tokenizer.texts_to_sequences(seed_text)) - 1
    token_list = pad_sequences([token_list], maxlen=max_sequence_len-1, padding='pre')
    predicted = model.predict_classes(token_list, verbose=0)
    output_word = ""
    for word, index in tokenizer.word_index.items():
        if index == predicted:
            output_word = word
            break
    seed_text += " " + output_word
print(seed_text)