In [None]:
import sys
import random
import math
import datetime
import pickle
import json
# import re
from tok import Tokenizer as tk
from queue import PriorityQueue as pq
from scipy.stats import linregress
import numpy as np

In [None]:
def getCorpusName(corpusPath):
        return corpusPath.split('/')[-1].split('.')[0]

# def getModelPath(LMtype, corpusPath, n):
#         return f'./models/{LMtype}/{getCorpusName(corpusPath)}/{n}Gram.json'

def getPickleModelPath(LMtype, corpusPath, n):
        return f'./models/{LMtype}/{getCorpusName(corpusPath)}/{n}Gram.pkl'

# save model
def saveModel(model, filename):
    with open(filename, 'wb') as file:
        pickle.dump(model, file)
    return

# load model
def loadModel(filename):
    with open(filename, 'rb') as file:
        model = pickle.load(file)
    return model

In [None]:
class N_Gram_Model():
    def __init__(self, corpusPath , n  , LMtype , modelPath = None):
        self.corpusPath = corpusPath
        self.n = n
        self.frequencies = {} # n-gram: frequency
        self.numOfNgrams = None
        self.nGramIndices = {} # n-gram: index
        self.normalProbabilities = None # np array, n-gram index - normal probability
        # self.gtProbabilities = {} # n-gram: gt probability
        # self.interpolatedProbabilities = {} # n-gram: interpolated probability
        self.LMtype = LMtype # based on the LM type, decision will be made on which probabilities to use
        self.modelPath = modelPath
        self.tokenizedCorpus = None
        self.trainData = None
        self.testData = None
        self.trainDataDictionary = None
        self.unkThreshold = 3
        self.trainDataWithUNK = None
        self.rStar = None # r* values for gt smoothing of 3-gram model
        self.lambda1 = None
        self.lambda2 = None
        self.lambda3 = None
        # self.uniNormalProb = {}
        # self.biNormalProb = {}
        # self.triNormalProb = {}
        self.uniGram = None
        self.biGram = None
        self.triGram = None
        self.negligibleProb = math.pow(10, -15)
        
    
    def readCorpusAndTokenize(self):
        # read the corpus and save it in a variable
        with open(self.corpusPath, 'r') as f:
            corpusText = f.read()
        # tokenize the corpus
        self.tokenizedCorpus = self.tokenize(corpusText)
    
    def tokenize(self,text):
        # tokenize the text
        tokenizer = tk()
        return tokenizer.Tokenize(text)

    def getTrainTestDataSets(self):
        # split the corpus into train and test sets
        random.seed(42)
        data = self.tokenizedCorpus
        random.shuffle(data)
        self.trainData = data[1000:]
        self.testData = data[:1000]
    
    def getTrainDataDictionary(self):
        # get the dictionary of the train data
        self.trainDataDictionary = self.getDataDictionary(self.trainData)
    
    def getTrainDataWithUNK(self):
        # replace the rare words with <unk>
        self.trainDataWithUNK = self.replaceRareWords(self.trainData , self.trainDataDictionary)

    def setUniGramBiGramTriGramModelsForInterpolation(self , uniGram , biGram , triGram):
        self.uniGram = uniGram
        self.biGram = biGram
        self.triGram = triGram
    
    def train(self):
        # train the model on the train set
        # calculate the frequencies and probabilities of all the n-grams
        self.calculateNgramFrequencies(self.trainDataWithUNK)
        # get the number of n-grams
        self.numOfNgrams = len(self.frequencies)
        if (self.LMtype == "n"):
            # 2. calculate the normal probabilities
            # self.getNormalProbabilities()
            pass
        if (self.LMtype == "g"):
            # 3. calculate the gt probabilities
            self.goodTuringSmoothing()
        if (self.LMtype == "i"):
            # calculate the interpolated probabilities
            self.interpolation()
        # 5. save the model
        # self.saveModel()
        return
    
    def calculateNgramFrequencies(self, dataWithUNK):
        n = self.n
        dataWithStartEndSymbols = self.addStartEndSymbols(dataWithUNK , n)
        index = 0
        for sentence in dataWithStartEndSymbols:
            for i in range((len(sentence) - n) + 1):
                ngram = tuple(sentence[i:i+n])
                if n == 1:
                    ngram = sentence[i]
                if ngram in self.frequencies:
                    self.frequencies[ngram] += 1
                else:
                    self.frequencies[ngram] = 1
                    self.nGramIndices[ngram] = index
                    index += 1
        return
    def getProbabilityOfUserInput(self , userInput):
        # get the probability of the text entered by the user
        # tokenize the user input
        tokens = self.tokenize(userInput)
        # # Flatten the 2D list using the sum function
        one_d_tokens = sum(tokens, [])
        # replace the rare words with <unk>
        replacedSentence = self.replaceRareWords([one_d_tokens] , self.trainDataDictionary)
        # add start and end symbols to the user input
        sentence = self.addStartEndSymbols([replacedSentence[0]] , self.n)[0]
        return math.exp(self.getLogProbabilityOfSentence(sentence))


    def getLogProbabilityOfSentence(self , sentence):
        # calculate the probability of the sentence
        initProbability = 1
        initLog = math.log(initProbability)
        logProbability = float(initLog)
        for i in range((len(sentence) - self.n) + 1):
            ngram = tuple(sentence[i:i+self.n])
            if self.n == 1:
                ngram = sentence[i]
            probability = self.getProbabilityOfNgram(ngram) # p(wn/w1...wn-1)
            logProbability += math.log(probability)
        return logProbability
    
    
    def getProbabilityOfNgram(self, ngram):
        # get the probability of the ngram depending on the LM type p(wn/w1...wn-1)
        if self.LMtype == "n":
            # if ngram not in self.frequencies:
            #     return self.negligibleProb
            # else:
            # get normal probability
            return self.getNormalProbabilityOfNgram(ngram)
        elif self.LMtype == "g":
            # get gt probability of the 3-gram
            if len(ngram) != 3:
                print("The gt probability is only supported for trigram model.")
                exit()
            return self.getGtProbabilityOfTrigram(ngram)
        elif self.LMtype == "i":
            # get interpolated probability of the 3-gram
            if len(ngram) != 3:
                print("The interpolated probability is only supported for trigram model.")
                exit()
            return self.getInterpolatedProbabilityOfTrigram(ngram)
    
    def evaluate(self , data):
        # evaluate the 3-gram model on the data set
        if (self.n != 3):
            print("The evaluation is only supported for 3-gram model.")
            exit()
        # calculate the perplexity of the every sentence in the data set
        # calculate the avg perplexity of the data set
        perplexity = {}
        for sentence in data:
            perplexity[tuple(sentence)] = self.perplexity(sentence)
        avgPerplexity = sum(perplexity.values()) /float(len(perplexity))
        return perplexity, avgPerplexity
    
    def perplexity(self, sentence):
        # calculate the perplexity of the sentence using the 3-gram model
        if (self.n != 3):
            print("The perplexity is only supported for 3-gram model.")
            exit()
        # replace the rare words with <unk>
        replacedSentence = self.replaceRareWords([sentence] , self.trainDataDictionary)
        # add start and end symbols to the user input
        newSentence = self.addStartEndSymbols([replacedSentence[0]] , self.n)[0]
        # probabilities = self.decideProbabilities() # based on the LM type
        logProbOfSentence = self.getLogProbabilityOfSentence(newSentence)
        numOfWordsIncludingEnd = len(newSentence)- 2
        # if probOfSentence == 0:
        #     print("The probability of the sentence is zero.", newSentence)
        #     exit()
        # if probOfSentence < 0:
        #     print("The probability of the sentence is negative.", newSentence)
        #     exit()
        return math.exp(logProbOfSentence * (-1 / float(numOfWordsIncludingEnd)))
    
    def getNormalProbabilityOfNgram(self , ngram):
        # calculate the normal probability p(Wn/W1...Wn-1)
        if ngram not in self.frequencies:
            return self.negligibleProb
        if (self.n == 1):
            denominator = 0
            for unigram in self.frequencies:
                denominator += self.frequencies[unigram]
            return self.frequencies[ngram] / float(denominator)
        denominator = 0
        for ngram2 in self.frequencies:
            if ngram[:-1] == ngram2[:-1]:
                denominator += self.frequencies[ngram2]
        if(denominator == 0): # it never happens
            print("denominator is zero while calculating the normal probability for ngram: " , ngram)
            exit()
        return self.frequencies[ngram] / float(denominator)

    def getAllNormalProbabilities(self):
        # calculate the normal probabilities p(Wn/W1...Wn-1) for all n-grams
        self.normalProbabilities = np.zeros(self.numOfNgrams).astype(float)
        for ngram in self.nGramIndices:
            self.normalProbabilities[self.nGramIndices[ngram]] = self.getNormalProbabilityOfNgram(ngram)
        return

    def goodTuringSmoothing(self):
        # calculate the gtProbabilities p(w3/w1w2)
        # 1. calculate Nr's
        maxFreq = max(self.frequencies.values())
        freqOfFreq = np.zeros(maxFreq+1)
        for ngram in self.frequencies:
            freqOfFreq[self.frequencies[ngram]] += 1
        # 2. fit line to log(r) and log(Zr)  to get log(Nr) = a + b * log(r)
        x = np.array([])
        y = np.array([])
        q = 0
        for r in range(1,maxFreq+1):
            if freqOfFreq[r] > 0:
                x = np.append(x, math.log(r))
                    # y = np.append(y, math.log(freqOfFreq[r]))
                # calculate Zr = 2Nr/(t-q), where q,r,t hree consecutive subscripts with non-zero counts Nq, Nr, Nt
                Zr = None
                if r == 1:
                    # t is next non-zero Nt after Nr
                    temp = 2
                    while freqOfFreq[temp] == 0:
                        temp += 1
                    t = temp
                    Zr = 2 * freqOfFreq[r] / float(t - q)
                if r == maxFreq:
                    Zr = freqOfFreq[r] / float(r - q)
                if r != 1 and r != maxFreq:
                    temp = r + 1
                    while freqOfFreq[temp] == 0:
                        temp += 1
                    t = temp
                    Zr = 2 * freqOfFreq[r] / float(t - q)
                y = np.append(y, math.log(Zr))
                q = r
                    
        line = linregress(x, y)
        a = line.intercept
        b = line.slope
        # 3. calculate r* = (r+1) * (S(Nr+1) / S(Nr)) for all r , S(Nr) = exp(a + b * log(r)) for given r
        # initialize rStar with float zeros
        self.rStar = np.zeros(maxFreq+1).astype(float)
        for r in range(0,maxFreq+1):
            self.rStar[r] = (r+1) * self.smoothedNr(r+1, a, b) / float(self.smoothedNr(r, a, b))
        # # 4. calculate the gt probabilities p(w3/w1w2) where freq(w1w2w3) > 0
        # for ngram in self.frequencies:
        #     self.gtProbabilities[ngram] = self.getGtProbabilityOfTrigram(ngram)
        return
    
    def getGtProbabilityOfTrigram(self , trigram):
        # calculate the gt probability P(w3/w1w2)
        if trigram not in self.frequencies:
            count = 0
        else :
            count = self.frequencies[trigram]
        countStar = self.rStar[count] # Count*(w1w2w3)
        denominator = 0 # sum of all countStars for w1w2wi where i is in the vocabulary
        for word in self.trainDataDictionary:
            newTriGram = tuple([trigram[0],trigram[1],word])
            if newTriGram in self.frequencies:
                # r > 0
                r = self.frequencies[newTriGram]
                denominator += self.rStar[r]
            else:
                # r = 0
                denominator += self.rStar[0]
        if denominator == 0:
            print("The denominator is zero while calculating the gt probability for trigram: " , trigram)
            exit()
        return countStar / float(denominator)
    
    def smoothedNr(self, r, intercept, slope):
        # calculate the smoothed Nr
        if r == 0:
            return 1
        return math.exp(intercept + slope * math.log(r))

    def interpolation(self ):
        # calculate the interpolatedProbabilities of the 3-gram model p(w3/w1w2)
        # train the unigram, bigram and trigram models
        if self.uniGram == None and self.biGram == None and self.triGram == None:
            self.uniGram = N_Gram_Model(self.corpusPath , 1 , "n")
            self.uniGram.trainDataWithUNK = self.trainDataWithUNK
            self.uniGram.train()
            self.biGram = N_Gram_Model(self.corpusPath , 2 , "n")
            self.biGram.trainDataWithUNK = self.trainDataWithUNK
            self.biGram.train()
            # self.triGram.train()
            # self.triGram = self
            self.triGram = N_Gram_Model(self.corpusPath , 3 , "n")
            self.triGram.trainDataWithUNK = self.trainDataWithUNK
            # self.triGram.train()
            self.triGram.frequencies = self.frequencies
            self.triGram.numOfNgrams = self.numOfNgrams
            self.triGram.nGramIndices = self.nGramIndices
        # self.uniNormalProb = uniNormalProb
        # self.biNormalProb = biNormalProb
        # self.triNormalProb = triNormalProb
        if (self.n != 3):
            print("The interpolation is only supported for trigram model.")
            exit()
        self.lambda1, self.lambda2, self.lambda3 = self.getLambdasOfTrigramInterpolation()
        # for trigram in self.frequencies:
        #     self.interpolatedProbabilities[trigram] = self.getInterpolatedProbabilityOfTrigram(trigram , self.lambda1, self.lambda2, self.lambda3 , uniNormalProb, biNormalProb, triNormalProb)
        return

    def getInterpolatedProbabilityOfTrigram(self , trigram):
        # calculate the interpolated probability P(w3/w1w2)
        bigram = tuple(trigram[1:3])
        # unigram = tuple([trigram[2]])
        unigram = trigram[2]
        uniNormProb = self.uniGram.getNormalProbabilityOfNgram(unigram)
        biNormProb = self.biGram.getNormalProbabilityOfNgram(bigram)
        triNormProb = self.triGram.getNormalProbabilityOfNgram(trigram)
        # if uniProb == 0:
        #     uniProb = self.negligibleProb
        # if biProb == 0:
        #     biProb = self.negligibleProb
        # if triProb == 0:
        #     triProb = self.negligibleProb
        return self.lambda1 * uniNormProb + self.lambda2 * biNormProb + self.lambda3 * triNormProb

    def getLambdasOfTrigramInterpolation(self):
        # calculate the lambdas for the trigram interpolation
        lambda1 = lambda2 = lambda3 = 0.0
        uniFreq = self.uniGram.frequencies
        biFreq = self.biGram.frequencies
        triFreq = self.triGram.frequencies
        x = [0, 0, 0]
        for triGram in self.frequencies:
            # uniTuple = tuple([triGram[2]])
            (t1, t2, t3) = triGram
            # uniTuple = triGram[2]
            uniT3 = t3
            # uniT3NormalProb = self.uniGram.getNormalProbabilityOfNgram(uniT3)
            # denominator0 = float((uniFreq[uniT3Tuple]/uniNormalProb) - 1)
            denominator0 = sum(uniFreq.values()) - 1
            if denominator0 == 0:
                x[0] = 0
            else:
                x[0] = (uniFreq[uniT3] - 1)/denominator0
            # biTuple = tuple(triGram[1:3])
            # biNormalProb = self.biGram.getNormalProbabilityOfNgram(biTuple)
            # denominator1 = float((biFreq[biTuple]/biNormalProb) - 1)
            uniT2 = t2
            biT2T3 = (t2, t3)
            denominator1 = float((uniFreq[uniT2] - 1))
            if denominator1 == 0:
                x[1] = 0
            else:
                x[1] = (biFreq[biT2T3] - 1)/denominator1
            # triNormalProb = self.triGram.getNormalProbabilityOfNgram(triGram)
            # denominator2 = float((triFreq[triGram]/triNormalProb) - 1)
            biT1T2 = (t1, t2)
            denominator2 = float((biFreq[biT1T2] - 1))
            if denominator2 == 0:
                x[2] = 0
            else:
                x[2] = (triFreq[triGram] - 1)/denominator2
            maxIndex = x.index(max(x))
            if maxIndex == 0:
                lambda1 += triFreq[triGram]
            elif maxIndex == 1:
                lambda2 += triFreq[triGram]
            elif maxIndex == 2:
                lambda3 += triFreq[triGram]
        # normalize the lambdas
        total = lambda1 + lambda2 + lambda3
        if total <=0 :
            return 0.33, 0.33, 0.34 #assigning default probabilities as backup
        lambda1 = lambda1 / total
        lambda2 = lambda2 / total
        lambda3 = lambda3 / total   
        return lambda1, lambda2, lambda3

    def getDataDictionary(self, data):
        # get the dictionary of the data which is already tokenized
        dic = {}
        for sentence in data:
            for word in sentence:
                if word in dic:
                    dic[word] += 1
                else:
                    dic[word] = 1
        return dic

    def replaceRareWords(self , data , dictionary):
        # replace the rare words with <unk>
        threshold = self.unkThreshold
        dataWithUNK = data.copy()
        for sentence in range(len(dataWithUNK)):
            for word in range(len(dataWithUNK[sentence])):
                if dataWithUNK[sentence][word] not in dictionary or dictionary[dataWithUNK[sentence][word]] < threshold:
                    dataWithUNK[sentence][word] = '<UNK>'
        return dataWithUNK
    
    def addStartEndSymbols(self , data , n):
            # add start and end symbols to the data
            dataWithStartEndSymbols = data.copy()
            for sentence in range(len(data)):
                if n > 1:
                    for i in range(n-1):
                        dataWithStartEndSymbols[sentence].insert(0 , '<START>')
                dataWithStartEndSymbols[sentence].append('<END>')
                if n == 1:
                    dataWithStartEndSymbols[sentence].insert(0 , '<START>')
            return dataWithStartEndSymbols
    
    def fluencyWithA1(self, input):
        # get the probability of the text entered by the user
        # tokenize the user input
        tokens = self.tokenize(input)
        # # Flatten the 2D list using the sum function
        one_d_tokens = sum(tokens, [])
        # replace the rare words with <unk>
        replacedSentence = self.replaceRareWords([one_d_tokens] , self.trainDataDictionary)
        # add start and end symbols to the user input
        sentence = self.addStartEndSymbols([replacedSentence[0]] , self.n)[0]
        m = len(replacedSentence[0])
        fluencyScore = self.getLogProbabilityOfSentence(sentence)/m
        return fluencyScore


    def fluencyWithA2(self,input):
        # get the probability of the text entered by the user
        # tokenize the user input
        tokens = self.tokenize(input)
        # # Flatten the 2D list using the sum function
        one_d_tokens = sum(tokens, [])
        # replace the rare words with <unk>
        replacedSentence = self.replaceRareWords([one_d_tokens] , self.trainDataDictionary)
        # add start and end symbols to the user input
        sentence = self.addStartEndSymbols([replacedSentence[0]] , self.n)[0]
        m = len(replacedSentence[0])
        fluencyScore = 1.0/m
        valForGood , valForBad = self.getValuesGoodBad()
        for i in range(self.n-1,self.n+m):
            # get n-grams from sentence
            ngram = tuple(sentence[i:i+self.n])
            if self.n == 1:
                ngram = sentence[i]
            nGramProbabilty = math.exp(self.getLogProbabilityOfSentence(ngram))
            if nGramProbabilty >= valForGood:
                fluencyScore = fluencyScore/nGramProbabilty
            elif nGramProbabilty <= valForBad:
                fluencyScore = fluencyScore * nGramProbabilty
        return fluencyScore
    
    def getValuesGoodBad(self):
        self.getAllNormalProbabilities()
        if self.normalProbabilities is not None:
            CPs = self.normalProbabilities
        else:
            print("getAllNormalProbabilities did n't update  self.normalProbabilities")
        CPs = np.sort(CPs)
        goodIndex = 6* len(CPs)/10
        badIndex = 2* len(CPs)/10
        return CPs[goodIndex], CPs[badIndex]


In [None]:
# create Models
corpus = "./corpus/ggword.txt"
### No smoothing
def getTrainedModel(model : N_Gram_Model):
    # read and tokenize the corpus
    model.readCorpusAndTokenize()
    # get train and test data
    model.getTrainTestDataSets()
    # get trainDataDictionary
    model.getTrainDataDictionary()
    # get trainDataReplacedByUnk
    model.getTrainDataWithUNK()
    # train the model
    model.train()
    return model
# N = 1,2,3
# create a 3-gram model of the given corpus of given type

uni1Model = N_Gram_Model(corpus,1 , "n", getPickleModelPath("n",corpus,1))
uni1Model = getTrainedModel(uni1Model)
saveModel(uni1Model,uni1Model.modelPath)
bi1Model = N_Gram_Model(corpus,2 , "n", getPickleModelPath("n",corpus,2))
bi1Model = getTrainedModel(bi1Model)
saveModel(bi1Model,bi1Model.modelPath)
tri1Model = N_Gram_Model(corpus,3 , "n", getPickleModelPath("n",corpus,3))
tri1Model = getTrainedModel(tri1Model)
saveModel(tri1Model,tri1Model.modelPath)
# triGramModel = getTrainedModel(model)
uni2Model = N_Gram_Model(corpus,1 , "g", getPickleModelPath("g",corpus,1))
uni2Model = getTrainedModel(uni2Model)
saveModel(uni2Model,uni2Model.modelPath)
bi2Model = N_Gram_Model(corpus,2 , "g", getPickleModelPath("g",corpus,2))
bi2Model = getTrainedModel(bi2Model)
saveModel(bi2Model,bi2Model.modelPath)
tri2Model = N_Gram_Model(corpus,3 , "g", getPickleModelPath("g",corpus,3))
tri2Model = getTrainedModel(tri2Model)
saveModel(tri2Model,tri2Model.modelPath)
# save the current tri-gram model


In [None]:
# take command line arguments
commandLineLMtype = sys.argv[1]
commandLineCorpusPath = sys.argv[2]
# only i,n,g types are allowed
if commandLineLMtype not in ["i","n","g"]:
    print("Invalid language model type")
    sys.exit()
# load model 
triGramModel = loadModel(getPickleModelPath(commandLineLMtype,commandLineCorpusPath , 3))
# User Prompt
while True:
    try :
        inputSentence = input("input sentence: ")
        if(inputSentence == ""):
            raise Exception("Empty input!")
        print("A1 fluency score: ", triGramModel.fluencyWithA1(inputSentence))
        print("A2 fluency score: ", triGramModel.fluencyWithA2(inputSentence))
    except Exception as e:
        print(e)
        print("Please enter a valid sentence")
    finally:
        print("Do you want to continue? (y/n)")
        if(input() == 'n'):
            break