# Text Generation With Recurrent Neural Networks

## David E. Weirich

# Goal:

To generate new episodes of Sabrina the Teenage Witch

https://sabrinatranscripts.wordpress.com/

# Tokenizer

In [None]:
example_text_samples = [
    'Hello world!',
    'I love machine learning.',
    'Four score and seven years ago',
    'I love the whole world.'
]

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer

?Tokenizer

In [None]:
tkn = Tokenizer(num_words=32)

tkn.fit_on_texts(example_text_samples)

seqs = tkn.texts_to_sequences(example_text_samples)

In [None]:
seqs

In [None]:
tkn.index_word

# Download the episodes data


Use the Beautifulsoup library to parse out the text of each episode

In [None]:
from requests import get
from requests.exceptions import RequestException
from contextlib import closing
from bs4 import BeautifulSoup

import os


def simple_get(url):
    try:
        with closing(get(url, stream=True)) as resp:
            if is_good_response(resp):
                return resp.content
            else:
                return None

    except RequestException as e:
        print('Error during requests to {0} : {1}'.format(url, str(e)))
        return None


def is_good_response(resp):
    content_type = resp.headers['Content-Type'].lower()
    return (resp.status_code == 200
            and content_type is not None
            and content_type.find('html') > -1)

In [None]:
sabrina_url = 'https://sabrinatranscripts.wordpress.com/'

sabrina_html = simple_get(sabrina_url)

bs = BeautifulSoup(sabrina_html, 'html.parser')

entry = bs.find(class_='entry')

# Not looking for the links to the seasons.
seasons = [ep for ep in entry.findChildren('p') if not ep.text.strip().startswith('Season')]

episodes = []

for season in seasons:
    episodes += [(a['href'], a.text) for a in season.findChildren('a')]

for i, (url, name) in enumerate(episodes):
    episode_html = simple_get(url)

    filename = '{:03}_{}'.format(i, ''.join(c for c in name if c.isalpha()).lower())

    bs = BeautifulSoup(episode_html, 'html.parser')

    print('{}/{}: {}'.format(i, len(episodes), name))

    page_text = [p.text.strip() for p in bs.find_all('p')]

    header_end_index = page_text.index(next(x for x in page_text if x.startswith('DISCLAIMER')))

    footer_begin_index = page_text.index(next(x for x in page_text[::-1] if x.startswith('This entry was posted on')))

    if not os.path.exists('episodes'):
        os.makedirs('episodes')

    with open('episodes/' + filename + '.txt', 'w') as f:
        for p in page_text[header_end_index + 1:footer_begin_index]:
            f.write(p + '\n')

print('Done! :)')

In [None]:
!cat episodes/000_pilot.txt

In [None]:
%%time

import numpy as np

import os


def clean_episode_text(text, header_length=0):
    """
    Clean the text of an episode.
    :param text:
    :param header_length:
    :return:
    """

    # Remove any leading newlines or whitespaces at the beginning and end of the episode text.
    text = text.strip()

    # Insert a space before these punctuation marks so they get treated as a word.
    for c in '.!?),':
        text = text.replace(c, ' ' + c)

    # Treat these two a special cases
    text = text.replace(')', ' )')
    text = text.replace('\n', ' \n ')

    # If the "cleaning" I just did put two or more spaces next to each other, get rid of that.
    while '  ' in text:
        text = text.replace('  ', ' ')

    return text


def load_data(episodes_path='episodes', sequence_length=32):
    # Load the text of every file into a big list
    texts = [clean_episode_text(open(os.path.join(episodes_path, f)).read()) for f in os.listdir(episodes_path)]

    # Encode using the tokenizer
    tkn = Tokenizer(num_words=50000, filters='"#$%&*+-/:;<=>@[\\]^_`{|}~\t')
    tkn.fit_on_texts(texts)
    seqs = tkn.texts_to_sequences(texts)

    # 
    sub_texts = []
    for seq in seqs:
        sub_texts += [
            (seq[i:i+sequence_length], seq[i+sequence_length], [j/len(seq) for j in range(i, i+sequence_length)])
            for i in range(len(seq) - sequence_length)
        ]

    # The target variable is the 
    X = np.array([s[0] for s in sub_texts])
    y = np.array([s[1] for s in sub_texts])

    return X, y, tkn


X, y, tokenizer = load_data()
vocab_size = len(tokenizer.word_index) + 1

In [None]:
X.shape

In [None]:
X

In [None]:
y

In [None]:
tokenizer.sequences_to_texts(X[:3])

# Build the Model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
from tensorflow.keras.callbacks import ModelCheckpoint, LambdaCallback
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.layers import Embedding, Activation, Softmax


# Build a RNN that consumes 
def build_model(vocab_size):
    model = Sequential()
    model.add(Embedding(vocab_size, 128, input_length=32))
    model.add(LSTM(128))
    model.add(Dense(128))
    model.add(Activation('relu'))
    model.add(Dense(vocab_size))
    model.add(Softmax())

    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

    return model


model = build_model(vocab_size)

In [None]:
model.summary()

In [None]:
# Use the model to generate a random sentence.


def random_sentence(sentence_len, temperature=1.0):
    arr = np.random.randint(0, vocab_size, size=32)
    result = []

    for i in range(sentence_len):
        predictions = model(np.expand_dims(arr, axis=0)).numpy().astype(np.float64)
        predictions = np.squeeze(predictions)
        
        predictions += 10**-32 # TO avoid a divide by zero error in log
        
        predictions = np.log(predictions) / temperature
        predictions = np.exp(predictions)
        
        predictions /= (predictions.sum() + 10**-20)
        
        next_word = np.argmax(np.random.multinomial(1, predictions, 1))
        
        result.append([next_word])
        arr = np.insert(arr[1:], 31, next_word, axis=0)

    sentence = tokenizer.sequences_to_texts(result)
    sentence = ' '.join(sentence)

    for c in '.!?(),':
        sentence = sentence.replace(' ' + c, c)

    sentence = sentence.replace('\n', ' \n ')

    return sentence

In [None]:
import colorama


# Call this function at the end of each epoch to see what the model is generating.
def on_epoch_end():
    for t in [0.1, 1.0, 10.0]:
        print(colorama.Fore.RED + colorama.Style.BRIGHT)
        print('Temp = {}'.format(t))
        print(colorama.Style.RESET_ALL + colorama.Fore.BLUE)
        print('\n', random_sentence(50, t), '\n')
        print(colorama.Style.RESET_ALL)

In [None]:
N_samples = 10**6


# Add a callback to print a new example every epoch
callbacks = [LambdaCallback(on_epoch_end=lambda _, __: on_epoch_end())]

model.fit(X[:N_samples], y[:N_samples],
          epochs=10,
          batch_size=100,
          callbacks=callbacks)

model.save('sabrina.hdf5')

print(random_sentence(1000))

In [None]:
tokenizer.index_word[len(tokenizer.index_word)]