In [None]:
# inspired by https://github.com/lukas/ml-class/blob/master/videos/text-gen/char-gen.py
# but with binary encoding of characters
import keras
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout
from keras.layers import LSTM, SimpleRNN, GRU
from keras.optimizers import RMSprop
from keras.utils.data_utils import get_file
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
import numpy as np
import pandas as pd
import random
import sys
import io
import math
import matplotlib.pyplot as plt

In [None]:
config = {}

config['batch_size'] = 256
config['file'] = 'english_names.csv'
# length of window of tokens preceding predicted token
config['maxlen'] = 3
config['epochs'] = 250

In [None]:
def sequences_prepared_data(sequence_list):
    max_tokens = max(map(len, sequence_list))
    sequences_tokens_np = pad_sequences(sequence_list, maxlen=max_tokens, dtype=object, padding='post', value='')
    tokens_vocab = list(np.unique(sequences_tokens_np))
    tokens_to_idx = {token:idx for idx, token in enumerate(tokens_vocab)}
    idx_to_tokens = {idx:token for idx, token in enumerate(tokens_vocab)}
    
    return {
        # number of unique tokens
        'vocab_size': len(tokens_vocab),
        # maximum length of sequence
        'max_tokens': max_tokens,
        # sequences splited into separate tokens as numpy array
        'sequences_tokens_np': sequences_tokens_np,
        # list of tokens present in corpus
        'tokens_vocab': tokens_vocab,
        # mapping of tokens to integer identifiers
        'tokens_to_idx': tokens_to_idx,
        # mapping of integer identifiers to tokens
        'idx_to_tokens': idx_to_tokens
    }

# encodes sequences of tokens to their integer identifiers using the mapping
def encode_tokens_to_idx(sequences_tokens_np, tokens_to_idx):
    return np.vectorize(tokens_to_idx.get)(sequences_tokens_np)

# encode array of integer sequences (category ids) to binaray encoding (5 = 101)
# this is an alternative to one-hot-encoding
def binary_system_encode(idx_np_array, num_bits):
    # https://stackoverflow.com/a/22227898/9123190
    return (((idx_np_array[:,:,None] & (1 << np.arange(num_bits)))) > 0).astype(int)

# reverse opeation to binary encoding. from binary encoded sequences to category indentifiers
def binary_system_decode(binary_encoded_array):
    # https://stackoverflow.com/a/15506055/9123190
    return binary_encoded_array.dot(1 << np.arange(binary_encoded_array.shape[-1]))

# predictions of binary vectors [0.43 0.93 0.07 0.66 0.42 0.89 0.00]
# transformed to vectors looking like binary representation base od threshold [0 1 0 1 0 1 0]
def predictions_to_binary(preds, threshold=0.5):
    return np.where(preds > threshold, 1, 0)

In [None]:
# load training dataset of words
names = pd.read_csv(config['file']).drop_duplicates()
names_list = list(names.values.reshape(-1))

# transform words to sequence of letters and add termination character
names_list = [[l for l in word+'|'] for word in names_list]

# do the basic preprocessing (tokens mapping)
prepared_data = sequences_prepared_data(names_list)

# use the mappings to encode the sequences of tokens to sequences of integer identifiers
prepared_data['encoded_tokens'] = encode_tokens_to_idx(
    prepared_data['sequences_tokens_np'],
    prepared_data['tokens_to_idx']
)

In [None]:
# training dataset preparation
# this gets random subsample from the sequence of given length as inputs (x)
# and following character as output (y)
def get_chunk_and_next(token, token_size, chunk_size):
    chunk_start_index = random.randint(0, token_size-chunk_size-1-1)
    chunk = token[chunk_start_index:chunk_start_index+chunk_size]
    next_char = token[chunk_start_index+chunk_size]
    return chunk, next_char

In [None]:
# sampling of subsamples from sequences

chunks = []
next_chars = []

# first part gets 2 chunks of given length from given sequence
chunks_per_word = 2

for token in prepared_data['encoded_tokens']:
    for chunk in range(chunks_per_word):
        chunk, next_char = get_chunk_and_next(token, token_size=prepared_data['max_tokens'], chunk_size=config['maxlen'])
        chunks.append(chunk)
        next_chars.append(next_char)

# second part chops out the end of the sequence, as it is padded with empty strings
# this leads to significant amount of empty chunks (x) and even more empty following chars (y)
# this helps to reduce it
# second part gets 2 chunks of given length from given sequence
chunks_per_word = 3
chop_chars = 4

for token in prepared_data['encoded_tokens'][:,0:prepared_data['max_tokens']-chop_chars]:
    for chunk in range(chunks_per_word):
        chunk, next_char = get_chunk_and_next(token, token_size=prepared_data['max_tokens']-chop_chars, chunk_size=config['maxlen'])
        chunks.append(chunk)
        next_chars.append(next_char)


In [None]:
# calculate how many bits are necessary for encoding such amount of unique tokens
num_bits = math.ceil(math.sqrt(prepared_data['vocab_size']))

#encode the input categories to binary encoding
x = binary_system_encode(np.array(chunks), num_bits=num_bits)
#encode the input categories to binary encoding
next_chars_np = np.array(next_chars).reshape(-1,1)
y = binary_system_encode(next_chars_np, num_bits=num_bits).reshape(next_chars_np.shape[0], -1)

In [None]:
# error metric for evaluating prediction of binary encoded vector
def custom_binary_error(y_true, y_pred):
    y_true = tf.cast(y_true, tf.bool)
    y_pred = tf.cast(y_pred, tf.bool)
    xored = tf.math.logical_xor(y_true, y_pred)
    notxored = tf.math.logical_not(xored)
    sum_xored = tf.reduce_sum(tf.cast(xored, tf.float32))
    sum_notxored = tf.reduce_sum(tf.cast(notxored, tf.float32))
    return sum_xored / (sum_xored + sum_notxored)

In [None]:
# model definition
model = Sequential()
# RNN/GRU/LSTM layer for sequence characteristic processing
model.add(GRU(32, input_shape=(config['maxlen'], num_bits)))
#model.add(Dense(num_bits, activation='sigmoid'))
# dense layer of binary encoded vector using hard_sigmoid activation as we want highly polarized output
# aka as much as close to either 0 or 1
model.add(Dense(num_bits, activation='hard_sigmoid'))
# loss function is binary_crossentropy, because in simplified scenario we are judging each correctly/falsely predicted bit separately
# custom loss function addressing this issue can significantly improve the performance
model.compile(loss='binary_crossentropy', optimizer='Adam', metrics=[custom_binary_error])

model.summary()

hist = model.fit(x, y, batch_size=config['batch_size'], epochs=config['epochs'], callbacks=[])

In [None]:
plt.plot(hist.history['loss'])
plt.plot(hist.history['custom_binary_error'])
plt.show()

In [None]:
keras.utils.plot_model(model, show_shapes=True)

In [None]:
# function for generating the whole sequence until terminating character is output
def predict_word(seed, threshold, model, input_size, tokens_to_idx, idx_to_tokens, vocab_size):
    predicted_word = seed
    pred_char = ''
    
    num_bits = math.ceil(math.sqrt(vocab_size))
    
    for x in range(30):
    
        encoded_word = binary_system_encode(
            encode_tokens_to_idx(
                # as a sideeffect thanks to this slicing it will not throw an error when inputing seed longer than input size of the model
                np.array([[x for x in predicted_word[-input_size:]],]),
                tokens_to_idx
            ),
            num_bits=num_bits
        )
        # predict next charater predictions for binary encoding
        preds = model.predict(encoded_word, verbose=0)[0]
        # debug showing predicted values for the bits
        print('preds: ', preds)
        # plot to see the polarization and differences between values for specific bits
        plt.bar(list(range(len(preds))), preds)
        plt.show()
        # convert the values to "valid" binary vector
        # this can easily end with KeyError,
        # because conversion of predicted output to binary vector using cutoff threshold doesn't guarantee to output integer in vocabulary
        pred_char = idx_to_tokens[
            binary_system_decode(
                predictions_to_binary(preds, threshold=threshold)
            )
        ]
        
        print('pred_char: ', pred_char)

        predicted_word += pred_char
        
        if (pred_char == '|'):
            break
    
    return predicted_word[:-1]

In [None]:
predict_word(
    'Jam',
    0.5,
    model,
    config['maxlen'],
    prepared_data['tokens_to_idx'],
    prepared_data['idx_to_tokens'],
    prepared_data['vocab_size']
)
