# Big Data Content Analytics - AUEB

## Text Generation using RNNs

* Lab Assistant: George Perakis
* Email: gperakis[at]aeub.gr | perakisgeorgios[at]gmail.com

### Importing Modules

In [None]:
from __future__ import print_function

import random
import sys
from typing import List, Tuple

import numpy as np
from more_itertools import windowed
from tensorflow.python.keras.callbacks import ModelCheckpoint
from tensorflow.python.keras.layers import LSTM, Dense, Dropout
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.utils.data_utils import get_file
from tqdm import tqdm

### Text Generator Class

In [None]:
class TextGenerator:
    """
    Example script to generate text from Nietzsche's writings.
    
    At least 20 epochs are required before the generated text starts sounding 
    coherent.

    It is recommended to run this script on GPU, as recurrent networks are 
    quite computationally intensive.
    
    If you try this script on new data, make sure your corpus
    has at least ~100k characters. ~1M is better.
    """

    def __init__(self, text_url: str = None, verbose: int = 0):
        """
        
        :param text_url: 
        :param verbose: 
        """
        if text_url is None:
            text_url = "https://s3.amazonaws.com/text-datasets/nietzsche.txt"

        assert isinstance(text_url, str)
        assert isinstance(verbose, int)

        self.verbose = verbose
        self.fname = text_url.split('/')[-1]
        self.text_url = text_url

        self.model = None

        self.characters = set()
        self.char2index = dict()
        self.index2char = dict()
        self.sentences = list()
        self.next_characters = list()

        self.max_len = None
        self.step = None

        self.text = None

    def get_data(self):
        """

        :return:
        """
        path = get_file(fname=self.fname, origin=self.text_url)

        text = open(path).read().lower()

        if self.verbose > 0:
            print(f'Corpus Length: {len(text)}')
            print('--------------------------------------------')
            print('Text snapshot')
            print(text[:150])
            print('--------------------------------------------')

        self.text = text

        return text

    def prepare_data(self,
                     text,
                     max_len: int = 40,
                     step: int = 3):
        """
        # cuts the text in semi-redundant sequences of max_len characters

        :param text:
        :param max_len:
        :param step:
        :return:
        """
        chars = set(text)
        print('total chars:', len(chars))

        char2index = dict((c, i) for i, c in enumerate(chars))
        index2char = dict((i, c) for i, c in enumerate(chars))

        sentences, next_chars = list(), list()

        for w in tqdm(windowed(seq=text, n=(max_len + 1), step=step)):
            sent = ''.join(w[:-1])
            next_char = w[-1]

            if next_char:
                sentences.append(sent)
                next_chars.append(next_char)

        if self.verbose > 0:
            print(f'Number of extracted Sequences: {len(sentences)}')
            print(f'Number of extracted next characters: {len(next_chars)}')
            print(f'Number of characters: {len(chars)}')
            print(f'Length of Char2Index: {len(char2index)}')
            print(f'Length of Index2Char: {len(index2char)}')

        self.characters = chars
        self.char2index = char2index
        self.index2char = index2char
        self.sentences = sentences
        self.next_characters = next_chars
        self.max_len = max_len
        self.step = step

    def vectorize_inputs(
            self, sentences: List[str],
            next_chars: List[str]) -> Tuple[np.ndarray, np.ndarray]:
        """

        :param sentences:
        :param next_chars:
        :return:
        """
        if self.verbose > 0:
            print('Vectorizing inputs...')

        X = np.zeros((len(sentences), self.max_len, len(self.characters)),
                     dtype=np.bool)

        y = np.zeros((len(sentences), len(self.characters)), dtype=np.bool)

        for i, sentence in enumerate(sentences):
            for t, char in enumerate(sentence):
                X[i, t, self.char2index[char]] = 1

            try:
                y[i, self.char2index[next_chars[i]]] = 1

            except:
                print(i, next_chars[i])
                print()
                raise Exception()

        if self.verbose > 0:
            print(f'X shape: {X.shape}')
            print(f'y shape: {y.shape}')

        return X, y

    @staticmethod
    def sample(a: np.ndarray, temperature: float = 1.0) -> np.ndarray:
        """

        :param a:
        :param temperature:
        :return:
        """
        # helper function to sample an index from a probability array
        a = np.log(a) / temperature
        a = np.exp(a) / np.sum(np.exp(a))

        return np.argmax(np.random.multinomial(1, a, 1))

    def build_model(self, lstm_size: int = 10, dr: float = 0.0) -> Sequential:
        """

        :param lstm_size:
        :param dr:
        :return:
        """
        # build the model: 2 stacked LSTM

        if self.verbose > 0:
            print('Build model...')

        model = Sequential()

        model.add(LSTM(lstm_size,
                       return_sequences=True,
                       input_shape=(self.max_len, len(self.characters)),
                       name='lstm_layer_1'))

        model.add(Dropout(dr, name='dropout_layer_1'))

        model.add(LSTM(lstm_size,
                       return_sequences=False,
                       name='lstm_layer_2'))

        model.add(Dropout(dr, name='dropout_layer_2'))

        model.add(Dense(len(self.characters),
                        name='output_layer',
                        activation='softmax'))

        model.compile(loss='categorical_crossentropy',
                      optimizer='adam')

        self.model = model

        print(model.summary())

        return model

    def fit_model(self,
                  X,
                  y,
                  n_repeat=10,
                  bs: int = 128,
                  epochs: int = 100,
                  diversity_list=None,
                  run_examples: bool = True):
        """
        
        :param X: 
        :param y: 
        :param n_repeat: 
        :param bs: 
        :param epochs: 
        :param diversity_list: 
        :param run_examples: 
        :return: 
        """

        # train the model, output generated text after each iteration
        if diversity_list is None:
            diversity_list = [0.2, 0.5, 1.0, 1.2]

        fpath = 'text_generator.h5'

        for iteration in range(1, n_repeat + 1):
            print()
            print('-' * 50)
            print('Iteration', iteration)

            md = ModelCheckpoint(filepath=fpath, monitor='loss', verbose=1,
                                 save_best_only=True)

            self.model.fit(X, y, batch_size=bs, epochs=epochs, verbose=2,
                           callbacks=[md])

            if run_examples:

                start_index = random.randint(0,
                                             len(self.text) - self.max_len - 1)
                #
                for diversity in diversity_list:
                    print()
                    print(f'----- Diversity: {diversity}')
                    #
                    generated_text = ''
                    sentence = self.text[
                               start_index: start_index + self.max_len]

                    generated_text += sentence

                    print(
                        f'----- Generating with seed: ""{sentence}"" ')

                    sys.stdout.write(generated_text)

                    #
                    for i in range(150):

                        x = np.zeros((1, self.max_len, len(self.characters)))

                        for t, char in enumerate(sentence):
                            x[0, t, self.char2index[char]] = 1.

                        preds = self.model.predict(x, verbose=0)[0]

                        try:
                            next_index = self.sample(preds, diversity)
                            next_char = self.index2char[next_index]
                        except:
                            next_char = ''

                        generated_text += next_char
                        sentence = sentence[1:] + next_char

                        sys.stdout.write(next_char)
                        sys.stdout.flush()

                    print()

        return self.model

In [None]:
def run_example(max_len=40, step=3, lstm_size=100, dropout=0.0,
                n_samples=10_000, n_repeat=5, epochs=100, batch_size=512,
                div_list=None, run_examples=True):
    """

    :param max_len:
    :param step:
    :param lstm_size:
    :param dropout:
    :param n_samples:
    :param n_repeat:
    :param epochs:
    :param batch_size:
    :param div_list:
    :param run_examples:
    :return:
    """

    tgo = TextGenerator()

    text = tgo.get_data()

    tgo.prepare_data(text=text, max_len=max_len, step=step)

    X, y = tgo.vectorize_inputs(sentences=tgo.sentences,
                                next_chars=tgo.next_characters)

    if n_samples:
        X = X[:n_samples, :, :]
        y = y[:n_samples, :]

    tgo.build_model(lstm_size=lstm_size, dr=dropout)

    if div_list is None:
        div_list = [0.2, 0.5, 1.0, 1.2]

    tgo.fit_model(X=X, y=y, n_repeat=n_repeat, epochs=epochs,
                  bs=batch_size, diversity_list=div_list,
                  run_examples=run_examples)

    return tgo.model

### Run Example

In [None]:
run_example(max_len=40,
            step=3,
            lstm_size=100,
            dropout=0.0,
            n_samples=10_000,
            epochs=5,
            batch_size=1024,
            div_list=None,
            run_examples=True)