Contributor: Debanjan Saha

Running on: Local Machine

In [None]:
from itertools import product
import nltk
import math
import random
from tqdm.notebook import tqdm

In [None]:
def load_dataset():
    """
    Load dataset from the folder 'data'
    """
    with open("./data/train.txt", 'r') as f:
        train_data = [line.strip() for line in f.readlines()]

    with open("./data/test.txt", 'r') as f:
        test_data = [line.strip() for line in f.readlines()]

    return train_data, test_data

In [None]:
train_data, test_data = load_dataset()

#### See sample sentences from Dataset

In [None]:
train_data[:5]

['liberty all star usa sets initial payout',
 'we are being accused of not implementing this agreement',
 'entregrowth closed at 135 dlrs and options at 55 cents',
 'usda forecast south african 1986 87 corn exports at 210 mln tonnes vs 300 mln tonnes last month and 1985 86 exports at 275 mln tonnes vs 275 mln tonnes last month',
 'norgolds issued capital will be 2405 mln shares of which 63 pct will be held by nbh after 89 mln are issued to shareholders to raise 196 mln dlrs it said']

In [None]:
test_data[:5]

['the company said each debenture is convertible into shares of businessland common stock at a conversion price of 2050 dlrs',
 'sumita says he does not expect further dollar fall',
 'the tin price is likely to rise to 20 ringgit a kilo this year because of the producers accord on export quotas and the reluctance of brokers and banks to sell the metal at lower prices a malaysian government bulletin said',
 'march and the next two or three months will be a really critical period hernandez said',
 'first union corp said shareholders of first north port bancorp of northport fla have approved a merger into first union for 40 dlrs per share or about 5100000 dlrs']

In [None]:
len(train_data), len(test_data)

(60000, 15000)

In [None]:
SOS_TOKEN = "<s> "
EOS_TOKEN = "</s>"
UNK_TOKEN = "<UNK>"

In [None]:
def add_sentence_tokens(sentences, n):
    """
    Appends start of sentence token and end of sentence tokens at the starting and ending of the sentence
    Based on the value of the n for n-gram model
    """
    sents = []
    sos = ""
    if (n == 1) :
        sos = SOS_TOKEN
    else :
        sos = SOS_TOKEN * (n-1)

    for s in sentences:
        cur_sent = f"{sos}{s} {EOS_TOKEN}" 
        sents.append(cur_sent)
    return sents

In [None]:
# Testing the function
temp = add_sentence_tokens(train_data[:3], 3)
temp[:3]

['<s> <s> liberty all star usa sets initial payout </s>',
 '<s> <s> we are being accused of not implementing this agreement </s>',
 '<s> <s> entregrowth closed at 135 dlrs and options at 55 cents </s>']

In [None]:
def replace_single_occurrence(word_tokens):
    """
    Replace tokens which appear only once in the corpus with <UNK> token
    """
    freqCounter = nltk.FreqDist(word_tokens)
    final_tokens = []
    for token in word_tokens:
        if freqCounter[token] > 1:
            final_tokens.append(token)
        else : 
            final_tokens.append(UNK_TOKEN)

    return final_tokens  

In [None]:
# testing the function
replace_single_occurrence(temp[0].split(" "))

['<s>',
 '<s>',
 '<UNK>',
 '<UNK>',
 '<UNK>',
 '<UNK>',
 '<UNK>',
 '<UNK>',
 '<UNK>',
 '<UNK>']

In [None]:
def process_text(sentences, n):
    """
    Function to add SOS , EOS token and replace single occurence of tokens in corpus with UNK token
    """
    sents = add_sentence_tokens(sentences, n)
    word_tokens = " ".join(sents).split(" ")
    final_word_tokens = replace_single_occurrence(word_tokens)
    return final_word_tokens

In [None]:
class NGramLanguageModel:
    """
    An n-gram language model trained on a given corpus.
    """

    def __init__(self, train_data, n, laplace=1):
        """
        Initiates the model with the training data and appropriate parameters
        """
        self.n = n
        self.laplace = laplace
        self.word_tokens = process_text(train_data, n)
        self.vocab = nltk.FreqDist(self.word_tokens)
        self.model = self.create_model()

        self.masks = list(product((0,1), repeat=n))
        self.masks.reverse()


    def laplace_smooth(self):
        """
        Function to apply laplace smoothing on frequency distribution of the corpus
        """
        vocab_size = len(self.vocab)

        n_grams = nltk.ngrams(self.word_tokens, self.n)
        n_vocab = nltk.FreqDist(n_grams)

        m_grams = nltk.ngrams(self.word_tokens, self.n-1)
        m_vocab = nltk.FreqDist(m_grams)

        def get_laplace_smoothened_value(n_gram, n_count):
            m_gram = n_gram[:-1]
            m_count = m_vocab[m_gram]
            value = (n_count + self.laplace) / (m_count + self.laplace * vocab_size)
            return value 
        
        prob_dist = {}
        for n_gram, n_count in n_vocab.items():
            prob_dist[n_gram] = get_laplace_smoothened_value(n_gram, n_count)

        return prob_dist


    def create_model(self):
        """
        Builds probability distribution for the vocabulary of the training corpus based on value of n.
        if n == 1(unigram), they are simple probabilities of tokens in the corpus otherwise laplace smoothing is applied
        """
        prob_dist = {}
        if self.n == 1:
            num_word_tokens = len(self.word_tokens)
            for unigram, word_count in self.vocab.items():
                prob_dist[(unigram,)] = word_count / num_word_tokens
        else:
            prob_dist = self.laplace_smooth()
        
        return prob_dist


    def handle_out_of_vocab_words(self, ngram):
        """
        This function -
        Handles out of vocab words during inference on test set.
        Replaces some subset of tokens of the ngram with UNK token such that the model contains some entry corresponding to it 
        """
        
        def generate_masked_word_tokens(ngram, bitmask):
            final_ngram = []
            for token, mask in zip(ngram, bitmask):
                if mask == 1:
                    final_ngram.append(token)
                else:
                    final_ngram.append(UNK_TOKEN)
            
            return tuple(final_ngram)
        
        if type(ngram) == str:
            ngram = (ngram,)

        for bitmask in self.masks:
            combination = generate_masked_word_tokens(ngram, bitmask)
            if combination in self.model:
                return combination


    def calculate_perplexity(self, test_data):
        """
        Calculates the perplexity of the model on a test corpus.
        """

        test_word_tokens = process_text(test_data, self.n)
        num_test_word_tokens = len(test_word_tokens)

        test_ngrams = nltk.ngrams(test_word_tokens, self.n)

        known_ngrams_list = []
        for ngram in test_ngrams:
            known_ngrams_list.append(self.handle_out_of_vocab_words(ngram))
        
        probability_list = [self.model[ngram] for ngram in known_ngrams_list]
        
        perplexity = math.exp( (-1/num_test_word_tokens)* sum(map(math.log, probability_list)) )
        return perplexity


    def generate_sentence(self, min_length=10, max_length=20, given_incomplete_sentence = ""):
        """
        Completes an incomplete sentence within a window of a minimum length and maximum length
        """
        
        def select_best_candidate(prev, exclude=[]):
            """
            This utility function chooses the most probable next token given the previous (n-1) tokens.
            """
            unwanted = [UNK_TOKEN] + exclude
            candidates = [(ngram[-1], p) for ngram, p in self.model.items() if ngram[:-1] == prev]
            desired_candidates = [cand for cand in candidates if cand[0] not in unwanted]
            desired_candidates.sort(key=lambda c:c[1], reverse=True)
            if (len(desired_candidates) == 0):
                return (EOS_TOKEN, 1)
            else:
                # return the best candidate
                return desired_candidates[0]
    
        sentence = []
        probability = 1
        if self.n == 1:
            sentence = ["<s>"]
        else:
            sentence = ["<s>"] * (self.n - 1)

        # assuming incomplete sentence is in correct format
        for word in given_incomplete_sentence.split():
            sentence.append(word.lower())

        while (sentence[-1] != EOS_TOKEN):
            prev = tuple()
            if self.n != 1:
                prev = tuple(sentence[-(self.n-1):])
            
            unwanted = []
            if len(sentence) < min_length:
                unwanted = sentence + [EOS_TOKEN]
            
            word, p = select_best_candidate(prev, exclude = unwanted)
            sentence.append(word)
            probability *= p

            if len(sentence) >= max_length:
                sentence.append(EOS_TOKEN)

        resulting_sentence = " ".join(sentence)

        if probability == 1:
            return resulting_sentence, probability

        return resulting_sentence, -1/math.log(probability)

In [None]:
# Generates a bitmask of 5 bits (testing to see if it is correct or not)
# list(reversed(list(product((0,1), repeat=5))))

## Test the model

In [None]:
hyperparams = {
    "n": 1,
    "laplace": 0.01,
}

# Unigram

In [None]:
hyperparams["n"] = 1

model = NGramLanguageModel(
    train_data=train_data, 
    n=hyperparams["n"],
    laplace=hyperparams["laplace"],
)
print("Vocabulary size: {}".format(len(model.vocab)))

Vocabulary size: 23505


In [None]:
perplexity = model.calculate_perplexity(test_data=test_data)
print("Test set perplexity: {:.3f}".format(perplexity))

Test set perplexity: 762.939


In [None]:
sentence, p = model.generate_sentence(10, 20, "the company")
print(f"Generated Sentence: {sentence}")
print(f"probability: {p}")

Generated Sentence: <s> the company of to in and said a mln the the the the the the the the the the </s>
probability: 0.017014244038644752


# Bigram

In [None]:
hyperparams["n"] = 2

model = NGramLanguageModel(
    train_data=train_data, 
    n=hyperparams["n"],
    laplace=hyperparams["laplace"],
)
print("Vocabulary size: {}".format(len(model.vocab)))

Vocabulary size: 23505


In [None]:
perplexity = model.calculate_perplexity(test_data=test_data)
print("Test set perplexity: {:.3f}".format(perplexity))

Test set perplexity: 85.795


In [None]:
sentence, p = model.generate_sentence(10, 20, "the company")
print(f"Generated Sentence: {sentence}")
print(f"probability: {p}")

Generated Sentence: <s> the company said it has been made a share </s>
probability: 0.052618529404116515


# Trigram

In [None]:
hyperparams["n"] = 3

model = NGramLanguageModel(
    train_data=train_data, 
    n=hyperparams["n"],
    laplace=hyperparams["laplace"],
)
print("Vocabulary size: {}".format(len(model.vocab)))

Vocabulary size: 23505


In [None]:
perplexity = model.calculate_perplexity(test_data=test_data)
print("Test set perplexity: {:.3f}".format(perplexity))

Test set perplexity: 51.555


In [None]:
sentence, p = model.generate_sentence(10, 20, "the company")
print(f"Generated Sentence: {sentence}")
print(f"probability: {p}")

Generated Sentence: <s> <s> the company said it has agreed to sell its shares in the first quarter of 1986 </s>
probability: 0.02913691081782431


# 4-gram

In [None]:
hyperparams["n"] = 4

model = NGramLanguageModel(
    train_data=train_data, 
    n=hyperparams["n"],
    laplace=hyperparams["laplace"],
)
print("Vocabulary size: {}".format(len(model.vocab)))

Vocabulary size: 23505


In [None]:
perplexity = model.calculate_perplexity(test_data=test_data)
print("Test set perplexity: {:.3f}".format(perplexity))

Test set perplexity: 40.549


In [None]:
sentence, p = model.generate_sentence(10, 20, "the company")
print(f"Generated Sentence: {sentence}")
print(f"probability: {p}")

Generated Sentence: <s> <s> <s> the company said it will offer a stake in the company </s>
probability: 0.03218777588375814
