In [None]:
import csv
import random
import typing as tp
import re
from collections import defaultdict

import numpy as np
import keras
import keras.layers
import keras.optimizers
import keras.callbacks

from tqdm import tqdm
from keras.utils import pad_sequences, to_categorical
from keras.preprocessing.text import Tokenizer


In [None]:
TO_EXCLUDE = '!"#$%&()*+-/:;<=>@[\\]^_`{|}~.,:;!?\t'
TO_TOKENIZE = '\n'

In [None]:
def load_data(file, include: str, exclude: str, size: int = None):
    _text =  []
    with open(file) as f:
        data = csv.reader(f)
        for verse, *_ in tqdm(list(data)[:size] if size is not None else list(data)):
            # Separate characters that we want to tokenize
            correct_verse = re.sub(r'(['+include+'])', r' \1 ', verse)
            # Exclude characters that we do not want to tokenize
            correct_verse = correct_verse.translate(str.maketrans('', '', exclude))
            _text.append(correct_verse.lower() + ' \n')
    return _text

TEXT = load_data('data/poems/limericks.csv', TO_TOKENIZE, TO_EXCLUDE)
print(*TEXT[:3])

In [None]:
def extract_rhymes(_text: tp.List[str]):
    rhymes = defaultdict(list)

    def add_rhymes(rhyme_words: tp.List[str]):
        for word in rhyme_words:
            others = rhyme_words.copy()
            others.remove(word)
            rhymes[word].extend(others)

    for verse in tqdm(_text):
        lines = verse.strip().split('\n')
        try:
            last_words = [line.strip().split(' ')[-1] for line in lines]
            # The rhyme scheme in the limerick is aabba
            a_rhymes = [last_words[0], last_words[1], last_words[4]]
            b_rhymes = [last_words[2], last_words[3]]
            add_rhymes(a_rhymes)
            add_rhymes(b_rhymes)
        except IndexError:
            # Invalid limerick
            continue
    return {k: list(set(v)) for k, v in rhymes.items()}

RHYMES = extract_rhymes(TEXT)
print("...")
print(*[f"{k}: {v}\n" for k, v in sorted(RHYMES.items())[100:110]])
print("...")
most_common = max(RHYMES.keys(), key=lambda x: len(RHYMES[x]))
print(f"Most common rhyme: '{most_common}' | Length: {len(RHYMES[most_common])} | Rhymes: {RHYMES[most_common][:10]} ...")

In [None]:
TOKENIZER = Tokenizer(filters=TO_EXCLUDE, char_level=True)
TOKENIZER.fit_on_texts(TEXT)
TOTAL_CHARS = len(TOKENIZER.word_index) + 1
print(TOTAL_CHARS)
print(TOKENIZER.word_index)

In [None]:
def create_sequences(rhymes: tp.Dict[str, tp.List[str]], tokenizer: Tokenizer, total_chars: int):
    sequences = []
    for base_word, rhymes_words in tqdm(rhymes.items()):
        for rhyme_word in rhymes_words:
            seq = f"{base_word} {rhyme_word} "
            encoded = tokenizer.texts_to_sequences(seq)
            for i in range(len(base_word) + 2, len(encoded) + 1):
                sequences.append(encoded[:i])
    return sequences

SEQUENCES = create_sequences(RHYMES, TOKENIZER, TOTAL_CHARS)
print(f"Size of training set {len(SEQUENCES)}")

In [None]:
SEQUENCE_LENGTH = max([len(seq) for seq in SEQUENCES])
ENCODED_SEQUENCES = pad_sequences(SEQUENCES, maxlen=SEQUENCE_LENGTH, padding='pre')
X, y = ENCODED_SEQUENCES[:,:-1], to_categorical(ENCODED_SEQUENCES[:,-1], num_classes=TOTAL_CHARS)
print(f'Max Sequence Length: {SEQUENCE_LENGTH}')

In [None]:
def create_model(total_chars: int):
    model = keras.models.Sequential()
    model.add(keras.layers.Embedding(total_chars, 100, input_length=1))
    model.add(keras.layers.LSTM(64, return_sequences=True, activation='relu'))
    model.add(keras.layers.LSTM(64, return_sequences=True, activation='relu'))
    model.add(keras.layers.LSTM(64, return_sequences=True, activation='relu'))
    model.add(keras.layers.LSTM(64, activation='relu'))
    model.add(keras.layers.Dense(total_chars, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

MODEL_NAME = "RhymeModel2"
MODEL = create_model(TOTAL_CHARS)
print(MODEL.summary())

In [None]:
REVERSE_TOKEN_INDEX = {v: k for k, v in TOKENIZER.word_index.items()}

class PredictionCallback(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        frequency = 1
        if epoch % frequency:
            return

        in_word = "before "
        for i in range(10):
            encoded = TOKENIZER.texts_to_sequences([in_word])[0]
            padded = pad_sequences([encoded], maxlen=SEQUENCE_LENGTH - 1, padding='pre')
            predicted = np.argmax(MODEL.predict(padded, verbose=0))
            out_char = REVERSE_TOKEN_INDEX.get(predicted, None)
            if out_char == " " or out_char is None:
                break
            in_word += out_char
        print(f"\n{in_word}")

HISTORY = MODEL.fit(
    X,
    y,
    epochs=10,
    verbose=1,
    batch_size=512,
    shuffle=True,
    callbacks=[PredictionCallback()]
)

MODEL.save(f"data/models/{MODEL_NAME}/weights.h5")

In [None]:
def generate_rhyme(model: keras.Model, sequence_length: int, in_word: str):
    in_word = in_word + " " if in_word[-1] != " " else in_word
    for i in range(20):
        encoded = TOKENIZER.texts_to_sequences([in_word])[0]
        padded = pad_sequences([encoded], maxlen=sequence_length - 1, padding='pre')
        predicted = np.argmax(model.predict(padded, verbose=0))
        out_char = REVERSE_TOKEN_INDEX.get(predicted, None)
        if out_char == " " or out_char is None:
            break
        in_word += out_char
    print(f"{in_word}")

for _ in range(20):
    generate_rhyme(MODEL, SEQUENCE_LENGTH, random.choice(list(RHYMES.keys())))
