In [None]:
import sys
from collections import defaultdict
import math
import random
import os
import os.path
import copy

In [None]:
def corpus_reader(corpusfile, lexicon=None): 
    with open(corpusfile,'r') as corpus: 
        for line in corpus: 
            if line.strip():
                sequence = line.lower().strip().split()
                if lexicon: 
                    yield [word if word in lexicon else "UNK" for word in sequence]
                else: 
                    yield sequence

def get_lexicon(corpus):
    word_counts = defaultdict(int)
    for sentence in corpus:
        for word in sentence: 
            word_counts[word] += 1
    return set(word for word in word_counts if word_counts[word] > 1)  



def get_ngrams(sequence, n):
    """
    Input: a list of strings
    Output:a list of n-grams, where each n-gram is a Python tuple
    """
    start_token = 'START'
    end_token = 'STOP'
    _sequence = copy.deepcopy(sequence) #use another sentence to manipulate so that adding tokens will not affect other operations
    _sequence.insert(0,start_token)
    _sequence.append(end_token)
    if n>=3:
        for i in range(n-2):
            _sequence.insert(0,start_token)
    
    return [
        (
            tuple([_sequence[i-n+j] for j in range(n)])
        )
        for i in range(n, len(_sequence)+1)
    ]


class TrigramModel(object):
    
    def __init__(
        self, 
        corpusfile,
        smooth_rate,
        
                ):
    
        # Iterate through the corpus once to build a lexicon 
        generator = corpus_reader(corpusfile)
        self.lexicon = get_lexicon(generator)
        self.lexicon.add("UNK")
        self.lexicon.add("START")
        self.lexicon.add("STOP")
        self.unigram_smoothing_rate=smooth_rate[0]
        self.bigram_smoothing_rate=smooth_rate[1]
        self.trigram_smoothing_rate=smooth_rate[2]
    
        # Now iterate through the corpus again and count ngrams
        generator = corpus_reader(corpusfile, self.lexicon)
        self.count_ngrams(generator) #this defines the uni/bi/tri dictionary for the corpus file
        self.sum_of_unigram_frequency=sum(self.unigramcounts.values())

    def count_ngrams(self, corpus):
        """
        Input: corpus iterator
        Output:dictionaries of unigram, bigram, and trigram counts
        """
   
        self.unigramcounts = defaultdict(int) 
        self.bigramcounts = defaultdict(int) 
        self.trigramcounts = defaultdict(int)

        
        for sentence in corpus:
            for unigram in get_ngrams(sentence,1):
                self.unigramcounts[unigram]+=1
            for bigram in get_ngrams(sentence,2):
                self.bigramcounts[bigram]+=1
            for trigram in get_ngrams(sentence,3):
                self.trigramcounts[trigram]+=1

    def raw_unigram_probability(self, unigram):
        """
        Input: one unigram tuple
        Output: raw (unsmoothed) unigram probability
        """
        return self.unigramcounts[unigram]/self.sum_of_unigram_frequency #sum_of_unigram_frequency is the total number of tokens 

    def raw_bigram_probability(self, bigram):
        """
        Input: one bigram tuple
        Output: raw (unsmoothed) bigram probability
        """

        all_appearance = self.unigramcounts[tuple((bigram[0],))]
        if all_appearance==0:
            return 0.0#return a probability of 0 if the conext is not seen in the training data
        
        return self.bigramcounts[bigram]/all_appearance
    
    def raw_trigram_probability(self,trigram):
        """
        Input: one trigram tuple
        Output: raw (unsmoothed) trigram probability
        """
        all_appearance = self.bigramcounts[tuple((trigram[0],trigram[1]))]
        if all_appearance==0:
            return 0.0 #return a probability of 0 if the conext is not seen in the training data
        return self.trigramcounts[trigram]/all_appearance

    


    def generate_sentence(self,t=20): 
        """
        Generate a random sentence from the trigram model. t specifies the
        max length, but the sentence may be shorter if STOP is reached.
        """

        previous2 = ['START','START']
        end_token = 'STOP'

        generated_sentence=[]
        count=0
        while count<t: #generated senetence has length limit of t
            #generate the distribution of next_word
            choice_list=[]
            choice_weight=[]
            for x,y,z in self.trigramcounts.keys():
                if x==previous2[0] and y==previous2[1]:
                    choice_list.append(tuple((x,y,z)))
            a = sum([self.trigramcounts[i] for i in choice_list])
            for i in choice_list:
                choice_weight.append(self.trigramcounts[i]/a)

            next_word = random.choices(choice_list,choice_weight)[0][2] #Sample the next word

            if next_word !=end_token:
                generated_sentence.append(next_word)
                previous2.pop(0) 
                previous2.append(next_word)#update the previous word list
            else:
                break
            count+=1
        
        return generated_sentence            

    def smoothed_trigram_probability(self, trigram):

        lambda1 = self.unigram_smoothing_rate
        lambda2 = self.bigram_smoothing_rate
        lambda3 = self.trigram_smoothing_rate

        _unigram = tuple((trigram[2],)) #grab the last word in a trigram
        _bigram = tuple((trigram[1],trigram[2]))#grab the last two words in a trigram
        _trigram = trigram

        result = lambda1*self.raw_unigram_probability(_unigram)+lambda2*self.raw_bigram_probability(_bigram)+lambda3*self.raw_trigram_probability(_trigram)

        return result
        
    def sentence_logprob(self, sentence):
        """
        Returns the log probability of an entire sequence.
        """
        trigrams = get_ngrams(sentence,3)
        sum_logp=0
        for trigram in trigrams:
            sum_logp+=math.log2(self.smoothed_trigram_probability(trigram))
        
        return sum_logp

    def perplexity(self, corpus):
        """
        Returns the perplexity of an entire corpus.
        """
        sum=0
        token_count = 0
        for sentence in corpus:
            sum+= self.sentence_logprob(sentence)
            token_count += len(get_ngrams(sentence,1))#count the number of tokens in the testing data

        return 2**(-sum/token_count)

### Now train the model with your dataset.
We are using the brown [corpus](https://www.kaggle.com/nltkdata/brown-corpus). <br>
And I'm putting only the tokenized column version in the current path. <br>

<br>
<br>
Play around the weights on smoothing to see what it does on the generated sentences!

In [None]:
model = TrigramModel(
    './brown_train.txt',
    [1/3.0,1/3.0,1/3.0]) #this is the coefficient list used for linear interpolation. (unigram,bigram,trigram)
dev_corpus = corpus_reader('./brown_train.txt', model.lexicon)
print(f'The perplexity for this corpus is: {model.perplexity(dev_corpus)}')
for i in range(10):
    print(f'The NO.{i}th sentence generated is: {model.generate_sentence(t=50)}.')
#Generate 10 sentences using the current model