In [1]:
import numpy as np
import re
from sklearn.linear_model import LinearRegression

In [31]:
corpus_path = 'Corpus/Ulysses  James Joyce.txt'
sentence = '''What is it now ?'''

In [3]:
def sentencetokeniser(text):
    # split the text into sentences using regex a new line is encountered
    sentences = re.split(r'\n\n', text)
    # remove empty strings
    sentences = list(filter(None, sentences))
    # clean out the \n
    sentences = [sentence.replace('\n', ' ') for sentence in sentences]
    return sentences

In [4]:
def wordtokeniser(sentence):
    # split the sentences into words using regex a space is encountered
    words = re.split(r' ', sentence)
    # remove empty strings
    words = list(filter(None, words))
    return words

In [5]:
def replacenumbers(words):
    # replace numbers with a special token
    for i in range(len(words)):
        # starts and ends with a number with only numbers in between
        if re.match(r'^\d+$', words[i]):
            words[i] = '<NUM>'
    return words

In [6]:
def detectpunctuations(text):
    punctuations = ['!', '?',]
    for punctuation in punctuations:
        text = text.replace(punctuation, ' ' + punctuation + '\n\n')
    
    seperators = [',' , ';', '(', ')', '[', ']', '{', '}', '"', "'"]
    for seperator in seperators:
        text = text.replace(seperator, ' ' + seperator + ' ')
    
    # if before '.' , there is a lowercase letter and after tricky1 there is a space or the text ends , then it a fullstop . Only then replace it with a fullstop and a new line
    text = re.sub(r'([a-z])\. ', r'\1 .\n\n', text)

    # after the last word of text , if there is a fullstop , then replace it with a fullstop and a new line
    text = re.sub(r'([a-z])\.$', r'\1 .\n\n', text)

    return text

In [7]:
def replacemailids(words):
    # replace email ids with a special token
    for i in range(len(words)):
        if re.match(r'^\w+@\w+\.\w+$', words[i]):
            words[i] = '<MAILID>'
    return words

In [8]:
def replaceURLs(words):
    # replace URLs with a special token
    starts = ['http://', 'https://', 'www.']
    for i in range(len(words)):
        for start in starts:
            if words[i].startswith(start):
                words[i] = '<URL>'
    return words

In [9]:
def replacehash(words):
    # replace hashtags with a special token
    for i in range(len(words)):
        # if word starts with # and contains atleast 1 letter
        if words[i].startswith('#') and not re.match(r'^\d+$', words[i][1:]):
            words[i] = '<HASHTAG>'
    return words

In [10]:
def replacementions(words):
    # replace mentions with a special token
    for i in range(len(words)):
        # if word starts with @ and contains atleast 1 letter
        if words[i].startswith('@') and not re.match(r'^\d+$', words[i][1:]):
            words[i] = '<MENTION>'
    return words

In [11]:
def replaceids(words):
    # replace ids with a special token
    for i in range(len(words)):
        # if word starts with # and contains atleast 1 letter
        if words[i].startswith('#') and re.match(r'^\d+$', words[i][1:]):
            words[i] = '<ID>'
    return words

In [12]:
def tokeniser(text):
    newtext = detectpunctuations(text)
    sns = sentencetokeniser(newtext)
    # FOR ALL SENTENCES , SPLIT INTO WORDS
    allsns = []
    for sn in sns:
        words = wordtokeniser(sn)

        # REPLACE WITH A SPECIAL TOKEN
        words = replacenumbers(words)
        words = replacemailids(words)
        words = replaceURLs(words)
        words = replacehash(words)
        words = replacementions(words)
        words = replaceids(words)

        allsns.append(words)

    return allsns

In [13]:
def detokenise(tokenised , save_path):
    num_sen = len(tokenised)
    text = ""
    for sen in tokenised:
        line = ' '.join(sen)
        text += line + " \n"
    
    with open(save_path , "w") as f:
        f.write(text)

In [14]:
def NgramModel(N , corpus_path):
    text = open(corpus_path, 'r').read()
    tokenised = tokeniser(text)

    # N-gram model
    tags = ['<BOS>' , '<EOS>']

    # create a dictionary of all the N-grams
    ngrams = {}
    for sentence in tokenised:
        # add N-1 <BOS> tags in the beginning
        for i in range(N-1):
            sentence.insert(0, tags[0])
        # add 1 <EOS> tag in the end
        sentence.append(tags[1])

        for i in range(len(sentence)-N+1):
            ngram = tuple(sentence[i:i+N])
            if ngram not in ngrams:
                ngrams[ngram] = 0
            ngrams[ngram] += 1
    
    return ngrams

In [15]:
def NgramProb(sentence , n , corpus_path):
    ngram = NgramModel(n , corpus_path)
    tokenised_text = tokeniser(sentence)
    
    num_sentences = len(tokenised_text)

    for i in range(num_sentences):
        # add N-1 <BOS> tags in the beginning
        for j in range(n-1):
            tokenised_text[i].insert(0, '<BOS>')
        # add 1 <EOS> tag in the end
        tokenised_text[i].append('<EOS>')

    prob = 1
    for sentence in tokenised_text:
        for i in range(len(sentence)-n+1):
            ngram_tuple = tuple(sentence[i:i+n])
            if ngram_tuple in ngram:
                prob *= ngram[ngram_tuple]
            else:
                prob *= 1e-6
    
    return prob

In [16]:
def EstimateParams(corpus_path):
    params = [0 for i in range(3)]
    trigram = NgramModel(3 , corpus_path)
    bigram = NgramModel(2 , corpus_path)
    unigram = NgramModel(1 , corpus_path)

    keys = trigram.keys()
    keys = list(keys)
    corpus_size = len(keys)

    # for all possible trigrams 
    for key in keys:
        values = [0.0 for i in range(3)]
        # if value doesn't exist , then it is 0

        if (key[0] , key[1]) in bigram and bigram[(key[0] , key[1])] > 1:
            values[2] = (trigram[key] - 1)/ (bigram[(key[0] , key[1])]-1)

        if (key[1],) in unigram and unigram[(key[1],)] > 1:
            values[1] = (bigram[(key[1] , key[2])] - 1)/ (unigram[(key[1],)]-1)
        
        if (key[2],) in unigram:
            values[0] = (unigram[(key[2],)] - 1)/ (corpus_size-1)

    # find the maximum value index
        max_index = np.argmax(values)
        params[max_index] += trigram[key]
    
    # normalise the params
    params = np.array(params)
    params = params/np.sum(params)
    return params

In [17]:
def LinearInterpolation(corpus_path , sentence , N = 3 , multiple = False):
    tags = ['<BOS>' , '<EOS>']
    tokenised_text = tokeniser(sentence)
    
    # tokenised_text is a 2D list with each row representing a sentence and each column representing a word
    num_sentences = len(tokenised_text)
    for i in range(num_sentences):
        # add N-1 <BOS> tags in the beginning
        for j in range(N-1):
            tokenised_text[i].insert(0, tags[0])
        # add 1 <EOS> tag in the end
        tokenised_text[i].append(tags[1])

    params = EstimateParams(corpus_path)
    trigram = NgramModel(3 , corpus_path)
    bigram = NgramModel(2 , corpus_path)
    unigram = NgramModel(1 , corpus_path)

    keys = trigram.keys()
    keys = list(keys)
    # corpus size is sum of frequency of all unigrams
    corpus_size = sum(unigram.values())

    # break the sentence into trigrams
    if multiple:
        probs = []
    else:
        probs = 1
    for j in range(num_sentences):
        final_prob = 1
        for i in range(len(tokenised_text[j])-N+1):
            tri = tuple(tokenised_text[j][i:i+N])
            bi = tuple(tokenised_text[j][i+1:i+N])
            uni = tuple(tokenised_text[j][i+2:i+N])

            prob_tri = params[2]*trigram[tri]/bigram[bi] if (bi in bigram and tri in trigram) else 0
            prob_bi = params[1]*bigram[bi]/unigram[uni] if (uni in unigram and bi in bigram) else 0
            prob_uni = params[0]*unigram[uni]/corpus_size if (uni in unigram) else 0
            prob = prob_tri + prob_bi + prob_uni
            final_prob *= prob
        if multiple:
            probs.append(final_prob)
        else:
            probs *= final_prob
    
    return probs

In [34]:
def GoodTuringMethod(corpus_path, sentence , Ng = 3 , multiple = False):
    tags = ['<BOS>' , '<EOS>']
    tokenised_text = tokeniser(sentence)
    
    text = open(corpus_path, 'r').read()
    tokenised_corpus = tokeniser(text)
    words = []
    for sen in tokenised_corpus:
        words += sen
    words = list(set(words))
    
    # tokenised_text is a 2D list with each row representing a sentence and each column representing a word
    num_sentences = len(tokenised_text)
    
    for i in range(num_sentences):
        # add N-1 <BOS> tags in the beginning
        for j in range(Ng-1):
            tokenised_text[i].insert(0, tags[0])
        # add 1 <EOS> tag in the end
        tokenised_text[i].append(tags[1])
    
    trigram = NgramModel(3 , corpus_path)
    # max value of trigram.values()
    max_freq = max(trigram.values())
    freqoffreq = np.zeros((max_freq+1) , dtype=np.int32)

    # find the frequency of each frequency
    for key in trigram:
        freqoffreq[trigram[key]] += 1
    
    # Normalising the freqoffreq
    for r in range(2 , max_freq+1):
        # find nearest nonzero indice on left
        t = r-1
        while t > 0 and freqoffreq[t] == 0:
            t -= 1
        q = r+1
        while q < max_freq and freqoffreq[q] == 0:
            q += 1
        
        freqoffreq[r] = freqoffreq[r] / (q-t)

    
    # handle Nr = 0
    rs = np.arange(1 , max_freq+1) # r > 0
    N = 0
    for r in rs:
        # print(freqoffreq[r])
        if freqoffreq[r] == 0:
            freqoffreq[r] = freqoffreq[r-1]
        else:
            N += r*freqoffreq[r]
    
    # Now , for Linear Good Turing Estimate (LGT) , estimate b
    logNr = np.log(np.array(freqoffreq[1:]))
    logr = np.log(rs)
    model  = LinearRegression()
    model.fit(logr.reshape(-1,1) , logNr)
    b = model.coef_[0]
    a = model.intercept_

    # Now calculate prob of each r
    # prob = np.zeros((max_freq+1))
    # prob[0] = np.exp(a) / N
    recounted_rs = np.zeros((max_freq+1))
    recounted_rs[0] = np.exp(a) 
    for r in rs:
        recounted_r = (r+1)*((1 + 1/r)**b)
        recounted_rs[r] = recounted_r


    # Now , we can calculate prob of sentence
    if multiple:
        probs = []
    else:
        probs = 1
    for j in range(num_sentences):
        final_prob = 1
        for i in range(len(tokenised_text[j])-Ng+1):
            tri = tuple(tokenised_text[j][i:i+Ng])
            if tri in trigram:
                r = trigram[tri]
            else:
                r = 0
            num = recounted_rs[r]
            # Now for all possible trigrams , calculate sum of recounted rs
            den = 0
            for word in words:
                bi = tuple(tokenised_text[j][i+1:i+Ng] + [word])
                if bi in trigram:
                    den += recounted_rs[trigram[bi]]
                else:
                    den += recounted_rs[0]
            prob = num/den
            final_prob *= prob
        if multiple:
            probs.append(final_prob)
        else:
            probs *= final_prob
    
    return probs

In [44]:
def compute_perplexity(sentence , corpus_path , model = 'good-turing' , multiple = False):
    tokenised = tokeniser(sentence)
    num_words = len(tokenised[0])
    print("Done tokenising")
    
    if model == 'good-turing':
        prob = GoodTuringMethod(corpus_path , sentence , multiple = multiple)
    else:
        prob = LinearInterpolation(corpus_path , sentence , multiple = multiple)
    
    if multiple:
        per = []
        # open a file
        file = open('perplexity.txt', 'w')
        print("Opened file")
    else:
        per = np.inf
    if multiple:
        num_sentences = len(prob)
        for i in range(num_sentences):
            num_words = len(tokenised[i])
            per.append(prob[i] ** (-1/num_words))
            sen = ' '.join(tokenised[i])
            file.write(sen + " : " + str(per[i]) + "\n")
    elif num_words > 0:
        per = prob ** (-1/num_words)
    return per

In [20]:
def WordPrediction(sentence , corpus_path , ng , k , model = None):
    tokenised = tokeniser(sentence)

    # find all words in the corpus_path
    text = open(corpus_path, 'r').read()
    tokenised_corpus = tokeniser(text)
    words = []
    punctuations = ['!', '?','.' , ',' , ';', '(', ')', '[', ']', '{', '}', '"', "'"]
    for sentence in tokenised_corpus:
        # remove punctuations
        for punctuation in punctuations:
            while punctuation in sentence:
                sentence.remove(punctuation)
        words += sentence
    
    # find the 'k' most probable words , use max-heap with size 'k'
    probs = []
    for word in words:
        prob = 0
        if model == 'good-turing':
            prob = GoodTuringMethod(corpus_path , sentence + ' ' + word , ng)
        elif model == 'linear-interpolation':
            prob = LinearInterpolation(corpus_path , sentence + ' ' + word , ng)
        else:
            prob = NgramProb(sentence + ' ' + word , ng , corpus_path)
        
        probs.append(prob)
    
    probs = np.array(probs)
    max_indices = np.argsort(probs)[-k:]
    max_words = []
    for index in max_indices:
        max_words.append(words[index])
    
    return max_words

In [21]:
prob = LinearInterpolation(corpus_path , sentence , N = 3)
print(prob)

3.286909689231421e-05


In [22]:
prob = GoodTuringMethod(corpus_path , sentence , Ng = 3)
print(prob)

4.406532478556684e-27


In [23]:
# # create Train and Test Corpus
text = open(corpus_path, 'r').read()
all_sentences = tokeniser(text)

test_sentences = all_sentences[:1000]
train_sentences = all_sentences[1000:]

detokenise(test_sentences , 'Corpus/Ulysses  James Joyce_test.txt')
detokenise(train_sentences , 'Corpus/Ulysses  James Joyce_train.txt')

In [49]:
# Now computing perplexity of all sentences in the corpus
print("Number of sentences : " , len(all_sentences))
per = compute_perplexity(text, 'Corpus/Pride and Prejudice - Jane Austen_train.txt' , model = 'good-turing' , multiple = True)

Number of sentences :  23682
Done tokenising
Opened file


  per.append(prob[i] ** (-1/num_words))


In [48]:
# iterate through all sentences , and write perplexity in space , store in text file
persum = 0 
n = 0
with open('per_good_ulysses_train.txt' , 'w') as f:
    for i in range(len(train_sentences)):
        index = i+1000
        if per[index] == np.inf:
            continue
        else:
            sen = ' '.join(all_sentences[index])
            cont = sen + ' ' + str(per[index]) + '\n'
            n += 1
            persum += per[index]
            f.write(' '.join(cont))
print("Mean perplexity : " , persum/n)

Mean perplexity :  33073710.038488723
