# Implementation of NGram language model for benchmarking MLM task 


In [1]:
import os
import json
import re
import string
import random
import time
import datetime
import pandas as pd 
import numpy as np

from functools import lru_cache

import matplotlib.pyplot as plt 
from argparse import Namespace

from pprint import pprint

import nltk
from nltk import FreqDist, ngrams, WittenBellProbDist, KneserNeyProbDist, SimpleGoodTuringProbDist, word_tokenize

import torch 
import transformers 
from transformers import AutoTokenizer, AutoModelForMaskedLM, BertTokenizer

import heapq

nltk.download('punkt')

[nltk_data] Downloading package punkt to /home/jz75/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [2]:
args = Namespace(
    data_path = './processed_data/sentences.csv',
    num_samples = 10000,
    train_split = 0.7,
)

In [3]:
df = pd.read_csv(args.data_path)
df = df[:args.num_samples]

df['split'] = 'train'
num_train_rows = int(len(df) * (1 - args.train_split)//2) - 1
df.loc[:num_train_rows, 'split'] = 'val'
df.loc[num_train_rows:num_train_rows + num_train_rows, 'split'] = 'test'

train_sents = df[df['split'] == 'train']['sentence'].tolist()
val_sents = df[df['split'] == 'val']['sentence'].tolist()
test_sents = df[df['split'] == 'test']['sentence'].tolist()


In [4]:
def tokenize(sentence):
    return sentence.lower().split(' ')

### Model Implementation 
- Uses witten bell smoothing for transition probabilities 


In [28]:
class NGramModel():

    def __init__(self,train_sents, N, smoothing_type, num_samples):
        self.N = N
        self.smoothing_type = smoothing_type 
        self.num_samples = num_samples

        self.train_sents = self.preprocess_sentences(train_sents)

        self.train_types = {w for sentence in self.train_sents for w in sentence}

        self.transitions = self.init_transitions()


    def preprocess_sentences(self, sentences):
        sents = [] 

        for sentence in sentences:
            sent = tokenize(sentence)
            for i in range(self.N-1):sent.insert(0, '<s>')
            sent.append('</s>')
            sents.append(sent)

        return sents


    def init_transitions(self):
        ''' 
            For training transition probabilities
        '''

        grams = [] 

        for sentence in self.train_sents:
            grams += ngrams(sentence, self.N)

        freqs = FreqDist(grams)
        distribution = WittenBellProbDist(freqs, bins=1e8)

        return distribution 

    def mask_elements(self, sentences, mask_prob=0.15):
        ''' 
            
        '''

        masked_indices = []
        for ids, sentence in enumerate(sentences):
            random_prob = np.random.rand(len(sentence))

            idt = 0
            for element, prob in zip(sentence, random_prob):
                if prob <= mask_prob and element not in ('<s>', '</s>'):
                    masked_indices.append((ids, idt, element))
                idt += 1
                
        return masked_indices

    def predict(self, context, k):
        context = [elem.lower() for elem in context]

        # Top 5 predictions
        return heapq.nlargest(k, [(w, self.transitions.logprob(tuple(context + [w]))) for w in self.train_types], key=lambda x:x[1])


    def make_predictions(self, sentences, k):
        sentences = self.preprocess_sentences(sentences)
        masked_indices = self.mask_elements(sentences)

        num_correct = 0
        total = 0

        for ids, idt, token in masked_indices:
            context = [sentences[ids][idt-2], sentences[ids][idt-1]]
            predictions = [w for w,_ in self.predict(context, k)]
            if token in predictions:
                num_correct += 1
            total += 1

        return num_correct/total
    
    def calculate_perplexity(self, test_sents):
        """
        Calculate the perplexity of the model on a set of test sentences.
        """
        test_sents = self.preprocess_sentences(test_sents)
        masked_indices = self.mask_elements(test_sents)
        log_prob_sum = 0
        N = 0

        for ids, idt, token in masked_indices:
            context = [test_sents[ids][idt-2], test_sents[ids][idt-1]]
            prediction = [w for w,_ in self.predict(context, 1)][0]
            
            log_prob_sum -= self.transitions.logprob(tuple(context +  [prediction]))
            N += 1

        avg_log_prob = log_prob_sum / N
        perplexity = np.exp(avg_log_prob)
        return perplexity

        # for sentence in test_sents:
        #     sentence_ngrams = list(ngrams(sentence, self.N))
        #     for ngram in sentence_ngrams:
        #         context, word = ngram[:-1], ngram[-1]
        #         # Calculate the log probability of the word given the context
        #         # log_prob = self.transitions.logprob(word, context)
        #         log_prob = self.transitions.logprob((context, word))
        #         log_prob_sum -= log_prob
        #         N += 1

        # avg_log_prob = log_prob_sum / N
        # perplexity = np.exp(avg_log_prob)
        # return perplexity


### Trigram Model

In [29]:
model = NGramModel(train_sents, 3, 'w', args.num_samples)


In [56]:

accuracy = model.make_predictions(test_sents, 5)
print('Top 5 accuracy :', accuracy)

Top 5 accuracy : 0.5327204736678093


In [57]:
accuracy = model.make_predictions(test_sents, 10)
print('Top 10 accuracy :', accuracy)

Top 10 accuracy : 0.5556082898275589


In [30]:
perplexity = model.calculate_perplexity(test_sents)
print(perplexity)

TypeError: unhashable type: 'list'

### Bigram Model

In [58]:
model = NGramModel(train_sents, 2, 'w', args.num_samples)

In [59]:
accuracy = model.make_predictions(test_sents, 5)
print('Top 5 accuracy :', accuracy)

Top 5 accuracy : 0.16490785950543393


In [60]:
accuracy = model.make_predictions(test_sents, 10)
print('Top 10 accuracy :', accuracy)

Top 10 accuracy : 0.1612193588937775
