In [1]:
import os
import io
import math
import random
from tqdm import tqdm
from collections import defaultdict

dir = os.getcwd().split("/")[-1]
if dir == "ngram":
    os.chdir(os.path.expanduser("../"))
print(f'Project root dir: {os.getcwd()}')

Project root dir: /home/jovyan/word-prediction


In [2]:
from config import MIN_N, MAX_N, START_SYMBOL, NUM_PREDICTIONS, TRAINED_PATH, EXTRINSIC_EVAL_SIZE
from prepare import tokenize_sentences, train_val_test_split, prepend_start_symbol
from evaluate import evaluate_extrinsic
from gui import get_gui

In [3]:
trained_grams = [TRAINED_PATH + fn for fn in os.listdir(TRAINED_PATH) if "model" in fn]
sentences = tokenize_sentences("data/HP_all.txt")
train_sentences, _, test_sentences = train_val_test_split(sentences)

Tokenizing input corpus...
Tokenization ready.
Train dataset size: 51035
Validation dataset size: 17012
Test dataset size: 17012


In [12]:
class NGramTester(object):

    def __init__(self, n):
        """
        This class reads a language model file and a test file, and computes
        the entropy of the latter.
        """

        # Size of grams
        self.n = n

        # Sentences from the test corpus.
        self.sentences = None

        # Sentence starter that adds 'padding' to each sentence
        self.sentence_starter = [START_SYMBOL] * (n - 1)

        # Collection of n-gram counts (pos i corresponds to (i+1)-ngrams)
        self.ngram_counts = [defaultdict(int) for _ in range(n)]

        # Collection of n-gram probabilities (pos i -> (i+1)-ngrams)
        # For k = 1, ..., n:
        # ngrams[k - 1][(w_1, ..., w_k] = log P(w_k | w_{k-1}, ..., w_1)
        self.ngram_probs = [defaultdict(int) for _ in range(n)]

        # For each word, its ID.
        self.word2id = {}

        # For each ID, its word
        self.id2word = {}

        # Vocabulary size: mumber of unique words.
        self.num_unique_words = 0

        # Total number of words from corpus
        self.num_total_words = 0

        # The entropy of the test corpus.
        self.entropy = 0

        # Linear interpolation weights
        self.lambdas = self._get_lambdas()

    def read_model(self, file):
        """
        Reads the contents of the language model file into the appropriate data structures.

        :param f: The name of the language model file.
        """

        try:
            with io.open(file, mode='r', encoding='utf-8-sig') as f:
                self.num_unique_words, self.num_total_words = map(int, f.readline().strip().split(' '))

                self.word2id[START_SYMBOL] = 1
                self.id2word[1] = START_SYMBOL

                # Start with 1-grams.
                k = 0

                num_kgram_lines = int(f.readline().strip())
                while num_kgram_lines != -1:
                    for i in range(num_kgram_lines):
                        if k == 0:
                            id, token, unigram_count, log_prob = f.readline().strip().split(' ')
                            self.word2id[token] = int(id)
                            self.id2word[int(id)] = token
                            self.ngram_counts[0][(token,)] = int(unigram_count)
                            self.ngram_probs[0][(token,)] = float(log_prob)

                        else:
                            kgram_info = f.readline().strip().split(' ')
                            log_prob = float(kgram_info[-1])
                            kgram_count = int(kgram_info[-2])
                            ids = list(map(int, kgram_info[:-2]))
                            words = tuple(self.id2word[id] for id in ids)
                            self.ngram_counts[k][words] = kgram_count
                            self.ngram_probs[k][words] = log_prob

                    # Move on to the next k-grams
                    k += 1
                    num_kgram_lines = int(f.readline().strip())

        except IOError:
            print("Couldn't find the model file {}".format(f))

    def process_sentences(self, sentences):
        """
        Reads and processes test sentences one word at a time.
        """
        try:
            self.sentences = prepend_start_symbol(sentences, self.sentence_starter)
            self.entropy = 0
            for i in tqdm(range(len(self.sentences)), desc="Computing entropy", colour='green'):
                self._accumulate_entropy(tuple(self.sentences[i]))
            return self.entropy

        except IOError:
            print('Error reading testfile')

    def read_extra_unigram(self, word):
        if word not in self.word2id:
            self.word2id[word] = len(self.word2id)
            self.id2word[len(self.id2word)] = word
            self.ngram_counts[0][(word,)] += 1
            self.ngram_probs[0][(word,)] = -float('inf')

    # ==================== INTRINSIC EVALUATION ====================

    def _accumulate_entropy(self, words):
        """
        Computes entropy of a sentence with linear interpolation.
        """

        assert isinstance(words, tuple)

        # For easier readability
        n = self.n
        e = math.exp
        P = self.ngram_probs

        # For each possible n-gram in the sentence
        for i in range(n - 1, len(words)):
            prob = self.lambdas[n]
            for r in range(n):  # r = 0, ..., n-1
                if words[i-n+r+1:i+1] in P[n-r-1]:
                    prob += self.lambdas[r] * e(P[n-r-1][words[i-n+r+1:i+1]])
            self.entropy += (-1) * prob * math.log2(prob)
        return prob

    def _get_lambdas(self, first_lambda=0.9):
        """
        Computes decreasing lambdas for linear interpolation.
        """

        lambdas = [first_lambda]
        remaining_sum = 1 - first_lambda
        factor = 5  # Factor to decrease first lambda

        for i in range(self.n):
            next_lambda = remaining_sum / factor
            lambdas.append(next_lambda)
            remaining_sum -= next_lambda
            factor *= 20  # Increase the factor to make next lambda smaller

        # Divide up the remainder between all components except for the last.
        for i in range(len(lambdas) - 1):
            lambdas[i] += remaining_sum / (len(lambdas) - 1)
        return lambdas

    # ==================== EXTRINSIC EVALUATION ====================

    def _select_candidates(self, candidates, weights, k):

        # Create classes
        classes = defaultdict(list)
        for i in range(len(candidates)):
            classes[weights[i]].append(candidates[i])

        i = 0
        chosen = []
        while k > 0 and len(classes) > 0:

            # Find class with highest weight
            max_weight = max(classes.keys())
            if max_weight == 0:
                break

            # Choose from this class
            weight_class = classes[max_weight]
            num_chosen = min(k, len(weight_class))
            chosen += random.sample(weight_class, num_chosen)

            # Remove this class for next iteration
            classes.pop(max_weight)
            k -= num_chosen

        return chosen

    def predict(self, prev_words, partial_word, k, use_interpolation=True):
        """
        Returns `k` predicted words based on the previous words and the current
        typed partial word, which could be empty
        """

        # Make sure the prediction fits the model
        assert len(prev_words) < self.n

        prev_words = [w.lower() for w in prev_words]
        partial_word = partial_word.lower()
        kgram_bucket = len(prev_words)

        # Obtain candidates to follow the prev_words.
        candidates = [key[0] for key in self.ngram_counts[0].keys()
                      if key[0].startswith(partial_word)]

        # Obtain probabilities P(candidate_i | prev_words)
        weights = []
        for candidate in candidates:
            sequence = tuple(prev_words) + (candidate,)
            if use_interpolation:
                weights.append(self._accumulate_entropy(sequence))
            else:
                if sequence not in self.ngram_probs[kgram_bucket]:
                    weights.append(0)
                else:
                    weights.append(math.exp(self.ngram_probs[kgram_bucket][sequence]))

        chosen = self._select_candidates(candidates, weights, k)
        return chosen, len(candidates)

In [9]:
for n in range(MIN_N, MAX_N + 1):
    print("\nTesting " + str(n) + "-grams:")

    tester = NGramTester(n)

    # Get corresponding model and make tester read it.
    gram_model = [fn for fn in trained_grams if "model_" + str(n) in fn][0]
    print("Using model:", gram_model.split('/')[-1])
    tester.read_model(gram_model)

    # Provide testing files to tester
    entropy = tester.process_sentences(test_sentences)
    print("Entropy:", format(entropy, '.2f'))
    evaluate_extrinsic(test_sentences[:EXTRINSIC_EVAL_SIZE], tester, NUM_PREDICTIONS, n)


Testing 3-grams:
Using model: model_3gram_hp_all.txt


Computing entropy: 100%|[32m██████████[0m| 17012/17012 [00:00<00:00, 34458.04it/s]


Entropy: 42233.58
k=3 suggestion(s)


Keystrokes evaluation: 100%|██████████| 1000/1000 [10:33<00:00,  1.58it/s]


 Keystrokes: 19417
 All characters: 60155
 Keystroke savings: 67.72%
 Average number of possible words when correctly guessed: 8180
k=4 suggestion(s)


Keystrokes evaluation: 100%|██████████| 1000/1000 [10:31<00:00,  1.58it/s]

 Keystrokes: 17501
 All characters: 60155
 Keystroke savings: 70.91%
 Average number of possible words when correctly guessed: 9010





## GUI

In [13]:
N = MAX_N
model = NGramTester(N)
trained_grams = [TRAINED_PATH + fn for fn in os.listdir(TRAINED_PATH) if "model" in fn]
gram_model = [fn for fn in trained_grams if "model_" + str(N) in fn][0]
model.read_model(gram_model)
get_gui(model, f'{model.n}-gram')

Running on local URL:  http://127.0.0.1:7863
Running on public URL: https://f8c20f2a804921c132.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


## Extra evaluation on completely different text

In [8]:
import codecs

def load_glove_words(embedding_file, model):
    with codecs.open(embedding_file, 'r', 'utf-8') as f:
        for line in f:
            data = line.split()
            word = data[0].lower()
            model.read_extra_unigram(word)

N = 5
model = NGramTester(N)
load_glove_words('data/glove.6B.50d.txt', model)
guardian_test = tokenize_sentences("data/guardian_test.txt", [START_SYMBOL]*N)
_, _, guardian_test = train_val_test_split(guardian_test, 0, 0, 1)
evaluate_extrinsic(guardian_test[:EXTRINSIC_EVAL_SIZE], tester, NUM_PREDICTIONS[-1:], n)

Tokenizing input corpus...
Tokenization ready.
Train dataset size: 0
Validation dataset size: 0
Test dataset size: 33672
k=4 suggestion(s)


Keystrokes evaluation: 100%|██████████| 1000/1000 [19:05<00:00,  1.15s/it]

 Keystrokes: 76361
 All characters: 119123
 Keystroke savings: 35.90%
 Average number of possible words when correctly guessed: 3933



