In [None]:
import itertools

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import nltk
from tqdm import tqdm
from keras.utils.vis_utils import plot_model

import tweeterator_pos
from loader import Loader
from data_generator_pos import DataGenerator

# Setup

Setup training parameters

In [None]:
input = 'data/trump.csv'
text_column = 'text'
file_type = 'csv'

w_net_type = 'LSTM'
w_latent_dim = 60
w_n_units = 256
w_dropout = 0.2
w_n_hidden_layers = 1

pos_net_type = 'LSTM'
pos_latent_dim = 8
pos_n_units = 64
pos_dropout = 0
pos_n_hidden_layers = 1

window = 5
batch_size = 64
epochs = 30
learning_rate = 0.001
perc_val = 0.2
regex_to_remove = ['^rt ']
shuffle = True
train_two_nets = False

Initialise loader

In [None]:
loader = Loader(flatten_hashtags=False, flatten_mentions=False)
data = loader.load(input, file_type=file_type, text_column=text_column, window=window, regex_to_remove=regex_to_remove)
data = np.array(data, dtype=object)

# Inspect

Look at loaded sentences

In [None]:
# Number of unique words
print(f"Number of words: {len(list(itertools.chain(*data)))}")
print(f"Number of unique words: {len(set(itertools.chain(*data)))}")

Remove words that appear only once (probably typos, errors, etc.)

In [None]:
flattened_text = list(itertools.chain(*data))
vc = pd.value_counts(flattened_text)
words_to_remove = vc[vc == 1].index

In [None]:
words_to_remove

In [None]:
counts = []
for sentence in data:
    count = 0
    for word in list(sentence):
        if word in words_to_remove:
            sentence.remove(word)
            count += 1
    
    counts.append(count)

In [None]:
print(f'Number of affected sentences: {np.sum(np.array(counts) > 0)}')

In [None]:
print(f"Total number of sentences: {len(counts)}")

In [None]:
empty_sentences = []
for i in range(len(data)):
    sentence = data[i]
    if len(sentence) == 0:
        empty_sentences.append(i)

data = np.delete(data, empty_sentences)

In [None]:
for sentence in data[:5]:
    print('-' + ' '.join(sentence))

POS tagging

In [None]:
nltk.download('averaged_perceptron_tagger')

In [None]:
data_plus_pos = []
for sentence in tqdm(data):
    sentence_plus_pos = nltk.pos_tag(sentence)
    sentence_plus_pos = [list(pos) for pos in sentence_plus_pos]
    data_plus_pos.append(sentence_plus_pos)
data_plus_pos = np.array(data_plus_pos, dtype=object)

# Train

Train and get the trained model, the history and the word dictionaries

In [None]:
model, history, dicts, _ = tweeterator_pos.train(data_plus_pos, window, batch_size, epochs, perc_val, shuffle, learning_rate, train_two_nets,
                                                 w_net_type, w_latent_dim, w_n_units, w_dropout, w_n_hidden_layers,
                                                 pos_net_type, pos_latent_dim, pos_n_units, pos_dropout, pos_n_hidden_layers)

Visualise training results

In [None]:
plt.figure(dpi=150)
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('Epoch')
plt.legend()

plt.figure(dpi=150)
plt.plot(history.history['categorical_accuracy'], label='categorical_accuracy')
plt.plot(history.history['val_categorical_accuracy'], label='val_categorical_accuracy')
plt.xlabel('Epoch')
plt.legend()

# Test

Take a random sentence and start generating text from its first words

In [None]:
w2i = dicts['word2int']
i2w = dicts['int2word']
pos2i = dicts['pos2int']
i2pos = dicts['int2pos']

In [None]:
sentence = data_plus_pos[np.random.choice(data_plus_pos.size, 1)]
gen = DataGenerator(sentence, w2i, pos2i, window, 1, shuffle=False)
test_example = next(gen)

In [None]:
deterministic = False

In [None]:
output_size = 40
output_int_word = np.empty(output_size, dtype=int)

# Set the first window elements to the start of the phrase
output_int_word[:window] = [word for word in test_example[0][0][0]]

# Predict the next word from the preceding ones (using the words already predicted)
for i in range(0, output_int_word.size - window):
    input_int_word = output_int_word[np.newaxis, i:window+i, np.newaxis]

    input_str = [i2w[ii] for ii in output_int_word[i:window+i]]
    words_and_pos_tags = nltk.pos_tag(input_str)
    pos_tags = list(zip(*words_and_pos_tags))[1]
    pos_tags_int = np.array([pos2i[pos] for pos in pos_tags])
    pos_tags_int = pos_tags_int[np.newaxis, :, np.newaxis]
    
    # Even if the whole sentence is known in this test, run POS tagging only on the part
    # of sentence preceding the word to generate (real-like scenario)
    input_str = [i2w[ii] for ii in output_int_word[i:window+i]]
    
    if train_two_nets:
        prediction = model[0]([input_int_word, pos_tags_int]).numpy()[0]
        prediction_pos = model[1]([input_int_word, pos_tags_int]).numpy()[0]

        best_guesses = np.argsort(prediction)[::-1][:10]
        posterior = []
        for guess in best_guesses:
            # Get the POS tag for the examined word
            test_str = input_str + [i2w[guess]]
            guess_pos = nltk.pos_tag(test_str)[-1][1]

            prior = prediction[guess]

            # Get index of POS in prediction output
            pos_i = pos2i[guess_pos]
            pos_prob = prediction_pos[pos_i]
            posterior.append(prior * pos_prob)

        if deterministic:
            chosen_guess = np.argmax(posterior)
        else:
            posterior = np.array(posterior) / np.sum(posterior)
            chosen_guess = np.random.choice(range(len(posterior)), 1, p=posterior)[0]
        
        word_int = best_guesses[chosen_guess]
    else:
        prediction = model([input_int_word, pos_tags_int]).numpy()[0]
        if deterministic:
            word_int = np.argmax(prediction)
        else:
            word_int = np.random.choice(range(len(prediction)), 1, p=prediction)[0]

    output_int_word[window + i] = word_int

# Convert integers to words
output = []
for i in range(len(output_int_word)):
    word_int = output_int_word[i]
    word = i2w[word_int]
    output.append(word)

Visualise the produced output

In [None]:
' '.join(output)