<a href="https://colab.research.google.com/github/mvenouziou/Project-Text-Generation/blob/main/Mo_Text_Generator_CSV.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Text Generation RNN

This program constructs a character-level sequence model to generate text according to a character distribution learned from the dataset. 

- Try my web app implementation at www.communicatemission.com/ml-projects#text_generation 
- My trained model is available at https://raw.githubusercontent.com/mvenouziou/text_generator.

*Model is coded based on techniques from Imperial College London's Coursera course, "Customising your models with Tensorflow 2" (https://www.coursera.org/learn/customising-models-tensorflow2) and the Tensorflow documentation (https://www.tensorflow.org/tutorials/text/text_generation?hl=en).  Web app built on the Anvil platform and hosted on Google Cloud server (CPU).* 

In [None]:
#### PACKAGE IMPORTS ####
# ML design
import tensorflow as tf
from tensorflow import keras
!pip install -q tensorflow-text
import tensorflow_text as text
from tensorflow.keras.callbacks import ModelCheckpoint 

# data handling
import numpy as np
import pandas as pd
import random
import re
import string
import random

# file management
import urllib.request
import os
import pickle

In [None]:
# integrations
# mount google drive:
GDRIVE_DIR = '/content/gdrive/'
from google.colab import drive
drive.mount(GDRIVE_DIR)

# Anvil's web app server
!pip install -q anvil-uplink
import anvil.server

USE_ANVIL = False  
if USE_ANVIL:
    anvil.server.connect('53NFXI7IX7IE233XQTVJDXUM-PUGRV2WON2LETWBG')

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).


##### GLOBAL VARIABLES
File directories and hyperparameters

In [None]:
# GLOBAL VARIABLES

# hyperparameters
BATCH_SIZE = 32
PADDED_EXAMPLE_LENGTH = 500
EMBEDDING_DIM = 256

AUTHOR = 'Robert_Frost' 
         # 'assorted'
         # 'tests'

# file directory structure in Google Drive
FILEPATH = GDRIVE_DIR + 'MyDrive/Colab_Notebooks/models/text_generation/' + AUTHOR
CHECKPOINT_DIR = FILEPATH + '/checkpoints/'
CACHE_DIR = FILEPATH + '/cache/'
MODEL_DIR = FILEPATH + '/prediction_model/'

# online dataset repository
DATASETS_DIR = 'https://raw.githubusercontent.com/mvenouziou/text_generator/main/'

# dataset files (.txt and .csv allowed)
DATASET_PRIMARY =  'robert_frost_collection.csv'  # dataset from Kaggle assignment
DATASETS = [DATASET_PRIMARY] + \
            [# add other CSV data sources here,
            ]

#### Load and inspect the dataset

In [None]:
# Function: loader for .csv files
def prepare_csv(filename, datasets_dir=DATASETS_DIR, 
                content_columns=['Name', 'Content'], shuffle_rows=True):
    
    # load data into DataFrame
    dataframe = pd.read_csv(datasets_dir + filename).dropna()
    
    # specific prep for Robert Frost set:
    if 'Name ' in dataframe.columns:
        dataframe.rename(columns={'Name ':'Name'})
    try:     
        dataframe['Name'] = dataframe['Name'].apply(
                                lambda x: x.upper() + ':\n')
        
        dataframe['Content'] = dataframe['Content'].apply(
                        lambda x: x + '\n')
    except:
        pass
    dataframe = dataframe[content_columns]

    # shuffle entries (rows)
    if shuffle_rows:
        dataframe = dataframe.sample(frac=1)
    
    # data cleanup
    dataframe = dataframe[content_columns]
    
    # merge desired text columns
    dataframe['merge'] = dataframe[content_columns[0]]
    for i in range(1, len(content_columns)):
        dataframe['merge'] = dataframe['merge'] + dataframe[content_columns[i]]

    # convert to list of strings
    data_list = dataframe['merge'].tolist()
    
    return data_list   


# Function: Load and standardize data files
def load_parse(data_list, display_samples=True):  

    # remove paragraph / line marks and split up words  
    tokenizer = text.WhitespaceTokenizer()

    # tokenize data (outputs bytestrings)
    cleaned_list_byte = [tokenizer.tokenize(data).numpy() for data in data_list]

    # convert data back to string format
    num_entries = len(cleaned_list_byte)

    clean_list = [' '.join(map(lambda x: x.decode(), cleaned_list_byte[i])) 
                    for i in range(num_entries)]

    # Display some text samples
    if display_samples:
        num_samples = 5
        inx = np.random.choice(len(clean_list), num_samples, replace=False)
        for example in np.array(clean_list)[inx]:
            print(example)
            print()

        print('len(text_chunks):', len(clean_list))

    return clean_list

In [None]:
def create_blocks(full_examples, max_len=PADDED_EXAMPLE_LENGTH):
    # creates list of sliding n-grams for each example
    # and creates a list of these lists

    blocks = []

    for example in full_examples:      

        example_block = []
        example_length = len(example)

        # small blocks at start (will be zero-padded later)
        leading_characters = 5  # characters input into model 
                                # before starting prediction
        for i in range(leading_characters, example_length - max_len - 1):
            example_block.append(example[: i])
        
        # full length blocks
        for i in range(example_length - max_len - 1):
            # create n-gram
            example_block.append(example[i: max_len + i])

        # small blocks at end (will be zero-padded later)
        for i in range(example_length - max_len - 1, example_length-1):
            example_block.append(example[i: ])
    
        blocks.append(example_block)
    
    return blocks

#### Encode data for model

In [None]:
# Function: Create and fit tokenizer object
def create_character_tokenizer():

    # Initialize standard keras tokenizer
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
                    num_words=None,  # number of tokens is not limited
                    filters='#$%&()*+-/<=>@[]^_`{|}~\t', 
                    lower=True,  # whether to convert to lowercase letters
                    char_level=True,  # tokens created at character level
                    oov_token=None,  # drop unknown characters
                    )
    
    # fit tokenizer
    vocab = string.punctuation + string.digits + string.ascii_letters + ' '
    tokenizer.fit_on_texts(vocab)

    return tokenizer

#### Create input and target Datasets for stateful RNN

In [None]:
def make_padded_array(text_blocks, tokenizer, max_len=PADDED_EXAMPLE_LENGTH):
    # Tokenizes and applies padding for uniform length

    # tokenize
    token_blocks = tokenizer.texts_to_sequences(text_blocks)

    # zero padding
    padded_blocks = tf.keras.preprocessing.sequence.pad_sequences(
                        sequences=token_blocks,  # dataset
                        maxlen=max_len, 
                        dtype='int32', 
                        padding='pre',
                        truncating='pre', 
                        value=0.0
                  )
    
    return padded_blocks

In [None]:
def create_inputs_and_targets(array_of_sequences):
    """ create input / target pairs """   
    input_arr = array_of_sequences[:, :-1]
    target_arr = array_of_sequences[:, 1:]

    return input_arr, target_arr    

Note: Our text data are not independent random samples. Content from earlier in the text can inform future predictions. In this case we can use a 'stateful' model to capture some of this dependence relation.  (The data must be broken into continuous blocks of text for this to work. Care must be taken for that to occur-- rows in one epoch must be continuations from the corresponding row in an earlier epoch.)



In [None]:
# Function: data prep to create stateful RNN batches
# note: This will be applied separately on each example text, 
# so that RNN can reset internal state / distinguish between unrelated passages

def preprocess_stateful(input, target, batch_size=BATCH_SIZE):

    # Prepare input and output arrays for training the stateful RNN
    num_examples = input.shape[0]

    # adjust for batch size to divide evenly into sample size
    num_processed_examples = num_examples - (num_examples % batch_size)
    input_cropped = input[:num_processed_examples]
    target_cropped = target[:num_processed_examples]

    # separate out samples so rows of data match up across epochs
    # 'steps' measures how to space them out
    steps = num_processed_examples // batch_size  

    # define reordering
    inx = np.empty((0,), dtype=np.int32)  # initialize empty array object
    
    for i in range(steps):
        inx = np.concatenate((inx, i + np.arange(0, num_processed_examples, 
                                                    steps)))

    # reorder the data
    input_seq_stateful = input_cropped[inx]
    target_seq_stateful = target_cropped[inx]

    return input_seq_stateful, target_seq_stateful

#### Define RNN model

In [None]:
# Function: Model Definition
def get_model(batch_size=BATCH_SIZE, embedding_dim=EMBEDDING_DIM):
    # Defines and compiles a stateful RNN model
    
    from keras.layers import Embedding, GRU, Dense, \
                             Dropout, BatchNormalization

    vocab_size = len(create_character_tokenizer().word_index) + 1

    model = keras.Sequential([
                # embedding layer
                Embedding(input_dim=vocab_size, 
                          output_dim=embedding_dim,
                          mask_zero=True, 
                          batch_input_shape=(batch_size, None),
                          name='Embedding',
                          ),

                # RNN layers
                GRU(units=256, 
                    stateful=True, 
                    return_sequences=True,
                    name='GRU_1',
                    ),
                Dropout(rate=.10,
                        name='Dropout_1',
                         ),
                BatchNormalization(name='Batch_Norm_1',
                                   ),
                GRU(units=256, 
                    stateful=True, 
                    return_sequences=True,
                    name='GRU_2',
                    ),
                Dropout(rate=.10,
                        name='Dropout_2',
                         ),
                BatchNormalization(name='Batch_Norm_2',
                                   ),
                GRU(units=256, 
                    stateful=True, 
                    return_sequences=True,
                    name='GRU_OUTPUT',
                    ),
               
                # prediction layer
                Dense(units=vocab_size, 
                      activation=None,
                      name='Decoding',
                      )          
    ])

    return model

In [None]:
def compile_model(model, learning_rate):
    model.compile(optimizer=tf.keras.optimizers.Adamax(
                                learning_rate=learning_rate),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(
                                                    from_logits=True),
                  metrics=['sparse_categorical_accuracy', 
                        'sparse_categorical_crossentropy'],
                 )
    
    return model

In [None]:
# Function: Train model
def train_model(model, train_datasets_list, 
                checkpoint, checkpoint_manager,
                num_epochs=1, num_datasets_to_use=10,
                learning_rate=0.00001,
                batch_size=BATCH_SIZE, filepath=FILEPATH, 
                checkpoint_dir=CHECKPOINT_DIR):

    # compile model
    model = compile_model(model, learning_rate=learning_rate)

    # begin training loop
    for epoch in range(num_epochs):

        print('Epoch {epoch:03d}'.format(epoch = epoch))

        # shuffle dataset order
        random.shuffle(train_datasets_list)

        for i in range(num_datasets_to_use):

            # select dataset
            data = train_datasets_list[i]
            
            # choose random starting point (every other epoch)
            if i % 2 == 0:
                k = np.random.randint(5, 10)
                data.skip(batch_size * k) 

            # train model
            history = model.fit(data,
                                shuffle=False,
                                epochs=1,
                                verbose=1)
            
            # reset RNN hidden states
            model.reset_states()

            # save checkpoint
            checkpoint_manager.save()

        # save full model at end of each epoch
        model.save(checkpoint_dir + 'saved_model_epoch_' + str(epoch))

    return model

### Create Text Generation Function
Adapt trained model

In [None]:
# Final Prediction Function #######

def make_prediction(init_string, num_generation_steps, 
                    prediction_tokenizer, prediction_model, 
                    precision_reduction=0,
                    model_name=AUTHOR, print_result=False):

    # Use the model to generate a token sequence
    # note: prediction model should use batch size=1

    # get final GRU layer for logits
    GRU_layer = prediction_model.get_layer('GRU_OUTPUT')
    
    # convert user input to model input
    token_sequence = prediction_tokenizer.texts_to_sequences([init_string])
    initial_state = None
    input_sequence = token_sequence
    init_len = len(input_sequence[0])

    for i in range(num_generation_steps):
        
        # get next character predictions (probability distribution)
        logits = get_logits(prediction_model, input_sequence, 
                            initial_state=initial_state)

        # choose next character
        sampled_token = sample_token(logits, precision_reduction)
        
        # add character to generated text
        token_sequence[0].append(sampled_token)
        
        # pass forward GRU state
        input_sequence = [[sampled_token]]
        initial_state = GRU_layer.states[0].numpy()

    # decode tokens to text
    predicted_text = \
        prediction_tokenizer.sequences_to_texts(token_sequence)[0][::2]
    
    if print_result:
        print(predicted_text)

    return predicted_text

*************************************
*************************************

### Implement Functions to Create Models

*************************************
*************************************

##### Preprocess Data / Create list of TF Datasets

In [None]:
# preprocess data
def preprocess(filename=DATASET_PRIMARY, 
               batch_size=BATCH_SIZE):

    # get tokenizer
    tokenizer = create_character_tokenizer()

    # load /clean csv data
    data_list = prepare_csv(filename, 
                            datasets_dir=DATASETS_DIR, 
                            content_columns=['Name', 'Content'], 
                            shuffle_rows=True)

    # preprocessing step
    full_examples = load_parse(data_list, 
                               display_samples=False)

    # reshape data to appropriate size for stateful RNN
    # ## tokenize and break up into blocks
    text_blocks = create_blocks(full_examples, 
                                max_len=PADDED_EXAMPLE_LENGTH)
    
    # ## pad to uniform size
    padded_blocks = []
    all_datasets = []
    
    for block in text_blocks:
        padded_arr = make_padded_array(block, 
                                       tokenizer, 
                                       max_len=PADDED_EXAMPLE_LENGTH) 
        
        # create input / targets
        input_arr, target_arr = \
            create_inputs_and_targets(padded_arr)

        # rearrange order as stateful RNN batches
        input_seq_stateful, target_seq_stateful = \
            preprocess_stateful(input_arr, target_arr)
        
        # convert to Dataset
        dataset = tf.data.Dataset.from_tensor_slices((input_seq_stateful, 
                                                      target_seq_stateful)
                                                    )
        dataset = dataset.batch(batch_size, 
                                drop_remainder=True)
        dataset = dataset.prefetch(tf.data.AUTOTUNE)

        # update dataset list
        all_datasets.append(dataset)

    # report summary
    print('full_examples:', len(full_examples))
    print('text_blocks:', len(text_blocks))
    print('padded_blocks:', len(padded_blocks))
    print('example input_arr :', input_arr.shape)
    print('example target_arr:', target_arr.shape)
    print('all_datasets:', len(all_datasets))
    print('all_datasets[0]:', all_datasets[0])
    
    return all_datasets, tokenizer

##### Create / Train Model

In [None]:
def build_load_model(checkpoint_dir=CHECKPOINT_DIR, batch_size=BATCH_SIZE):

    """ Initializes and trains a model for 1 epoch, 
    and saves it to disk """

    # initialize model structure
    tokenizer = create_character_tokenizer()
    model = get_model(batch_size=batch_size)

    # initialize checkpoint managers
    checkpoint = tf.train.Checkpoint(model=model)

    checkpoint_manager = tf.train.CheckpointManager(
                            checkpoint=checkpoint, 
                            directory=checkpoint_dir, 
                            max_to_keep=4, 
                            keep_checkpoint_every_n_hours=None,
                            checkpoint_name='ckpt', 
                            step_counter=None, 
                            checkpoint_interval=None,
                            init_fn=None
                            )
    
    # load any saved weights
    try:
        checkpoint_manager.restore_or_initialize()
        print('loaded checkpoint')
    except:
        print('new model initialized')

    model.summary()

    return model, tokenizer, checkpoint, checkpoint_manager

Create Prediction Model (adapt model to accept single line inputs (batch size = 1).

In [None]:
# Function: produce modified model for making text predictions
def get_prediction_model(pretrained_model,
                         model_dir=MODEL_DIR, 
                         checkpoint_dir=CHECKPOINT_DIR):
    
    # get tokenizer
    tokenizer = create_character_tokenizer()
    
    # initialize new model with batch size = 1
    prediction_model = get_model(batch_size=1)

    # load weights from pre-trained model
    trained_weights = pretrained_model.get_weights()
    prediction_model.set_weights(trained_weights)

    return prediction_model, tokenizer


# Store trained model separate from checkpoints
def save_prediction_model(prediction_model,
                          model_dir=MODEL_DIR, 
                          checkpoint_dir=CHECKPOINT_DIR):

    # save model
    prediction_model.save(model_dir)

    # get tokenizer
    prediction_tokenizer = create_character_tokenizer()
    
    # save tokenizer
    with open(model_dir + 'tokenizer.pickle', 'wb') as file:
        pickle.dump(prediction_tokenizer, file, pickle.HIGHEST_PROTOCOL)

    return None

Define text generation function

In [None]:
# Function: Outputs weighted predictions for next token 
# (as logits / log-odds ration)
def get_logits(model, token_sequence, initial_state=None):
    """
    Paramater:
    model - our prediction model (batch size = 1)
    """
    
    # enable carrying forward state from top GRU layer
    GRU_layer = model.get_layer('GRU_OUTPUT')
    GRU_layer.reset_states(initial_state)

    # Get the model's next token prediction (as logits)
    input = tf.constant(token_sequence)
    final_pred = model(input)[:, -1, :]
        
    return final_pred.numpy()    


# Function: selects a value from logits prediction
def sample_token(logits, precision_reduction=0):   

    # choose a value from logits distribution
    # (fuzz_factor: perturbs probabilities for extra variation)
    fuzz_factor = tf.random.normal(shape=logits.shape, mean=1, stddev=.2)

    sample = tf.random.categorical(
                        logits=logits * (1 + precision_reduction * fuzz_factor), 
                        num_samples=1, 
                        )

    # convert to integer
    next_token = sample[0,0].numpy()

    return next_token

Text Generator Function

In [None]:
# test model
@anvil.server.callable
def generate_text(starting_text, precision_reduction,
                  prediction_tokenizer, prediction_model,
                  author='assorted'):

    # set length of generated text
    num_generation_steps = 350
  
    # format user input
    starting_text = starting_text.upper() + ': '

    prediction = make_prediction(init_string=starting_text, 
                               num_generation_steps=num_generation_steps, 
                               prediction_tokenizer=prediction_tokenizer, 
                               prediction_model=prediction_model, 
                               precision_reduction=precision_reduction, 
                               model_name=author, 
                               print_result=True)
    
    output = starting_text + '\n'
    split_on = ['?', '.', ',', ';', '!', ':']
    splits = '([' + ''.join(split_on) + '])'
    split_lines_prediction = re.split(splits, prediction)
    
    for line in split_lines_prediction:
        line_update = line[0].upper()
        try: 
            line_update += line[1:]
        except:
            pass
        if line_update[-1] in split_on or line_update[-2:] == '\n':
            output= ''.join([output, line_update])
        else:
            output= '\n'.join([output, line_update])
    
    return output + '... '

if USE_ANVIL:
    anvil.server.wait_forever()

# Prepare Datasets

In [None]:
all_datasets, tokenizer = preprocess(filename=DATASET_PRIMARY, 
                                         batch_size=BATCH_SIZE)

# Load Model

In [None]:
init_model, tokenizer, checkpoint, checkpoint_manager = \
        build_load_model(batch_size=BATCH_SIZE)

# Train Model

Suggested Techniques:
 - Reduce learning rate after initial training runs, as model can easily overfit to most recent data example
 - Allow some overfitting immediately before creating final text generation model. 

In [None]:
train_more = True

if train_more:
    num_epochs = 1
    if train_more:
        train_model(model=init_model, 
                    checkpoint=checkpoint, 
                    checkpoint_manager=checkpoint_manager,
                    train_datasets_list=all_datasets, 
                    num_epochs=num_epochs, 
                    num_datasets_to_use=10,
                    learning_rate=0.001,)

# Create Prediction Model

In [None]:
# Load Prediction Model
prediction_model, prediction_tokenizer = \
    get_prediction_model(pretrained_model=init_model)
prediction_model.summary()

# Save the prediction model
save_prediction_mod = True
if save_prediction_mod:
    save_prediction_model(prediction_model,
                        model_dir=MODEL_DIR)

# Generate text from the model

In [None]:
text_generated = generate_text(starting_text='She ', 
                               prediction_tokenizer=prediction_tokenizer, 
                               prediction_model=prediction_model,
                               precision_reduction=0,
                               author='Robert_Frost')
print(text_generated)

NameError: ignored