In [39]:
import sys
from collections import defaultdict
import math
import random
import os
import os.path
"""
COMS W4705 - Natural Language Processing - Fall 2022 
Prorgramming Homework 1 - Trigram Language Models
Daniel Bauer
"""

def corpus_reader(corpusfile, lexicon=None): 
    with open(corpusfile,'r') as corpus: 
        for line in corpus: 
            if line.strip():
                sequence = line.lower().strip().split()
                if lexicon: 
                    yield [word if word in lexicon else "UNK" for word in sequence]
                else: 
                    yield sequence

def get_lexicon(corpus):
    word_counts = defaultdict(int)
    for sentence in corpus:
        for word in sentence: 
            word_counts[word] += 1
    return set(word for word in word_counts if word_counts[word] > 1)  



def get_ngrams(sequence, n):
    """
    COMPLETE THIS FUNCTION (PART 1)
    Given a sequence, this function should return a list of n-grams, where each n-gram is a Python tuple.
    This should work for arbitrary values of n >= 1 
    """
    if n == 1:
        ngram_list = [tuple(["START"])]
        for seq in sequence:
            ngram_list.append(tuple([seq]))
        ngram_list.append(tuple(["STOP"]))
    else:
        ngram_list = [tuple(["START" for _ in range(n-1)] + [sequence[0]])]
        for seq in sequence[1:]:
            prevSeq = list(ngram_list[-1])
            ngram_list.append(tuple(prevSeq[1:]+[seq]))
        prevSeq = list(ngram_list[-1])
        ngram_list.append(tuple(prevSeq[1:]+["STOP"]))

    return ngram_list


class TrigramModel(object):
    
    def __init__(self, corpusfile):
    
        # Iterate through the corpus once to build a lexicon 
        generator = corpus_reader(corpusfile)
        self.lexicon = get_lexicon(generator)
        self.lexicon.add("UNK")
        self.lexicon.add("START")
        self.lexicon.add("STOP")
    
        # Now iterate through the corpus again and count ngrams
        generator = corpus_reader(corpusfile, self.lexicon)
        self.count_ngrams(generator)


    def count_ngrams(self, corpus):
        """
        COMPLETE THIS METHOD (PART 2)
        Given a corpus iterator, populate dictionaries of unigram, bigram,
        and trigram counts. 
        """
   
        self.unigramcounts = {} # might want to use defaultdict or Counter instead
        self.bigramcounts = {} 
        self.trigramcounts = {}
        self.unigram_start = {}
        self.total_unigrams = 0.0
        self.total_sentences = 0.0

        ##Your code here
        for sentence in corpus:
            self.total_sentences += 1
            unigram_list = get_ngrams(sentence, 1)
            bigram_list = get_ngrams(sentence, 2)
            trigram_list = get_ngrams(sentence, 3)

            for unigram in unigram_list:
                if unigram not in self.unigramcounts.keys():
                    self.unigramcounts[unigram] = 1
                else:
                    self.unigramcounts[unigram] += 1
                self.total_unigrams += 1
            
            for bigram in bigram_list:
                if bigram not in self.bigramcounts.keys():
                    self.bigramcounts[bigram] = 1
                else:
                    self.bigramcounts[bigram] += 1
            
            for trigram in trigram_list:
                if trigram not in self.trigramcounts.keys():
                    self.trigramcounts[trigram] = 1
                else:
                    self.trigramcounts[trigram] += 1
                leading_bigram = tuple(list(trigram)[:-1])
                if leading_bigram == ("START","START"):
                    if leading_bigram not in self.bigramcounts.keys():
                        self.bigramcounts[leading_bigram] = 1
                    else:
                        self.bigramcounts[leading_bigram] += 1

        return

    def raw_trigram_probability(self,trigram):
        """
        COMPLETE THIS METHOD (PART 3)
        Returns the raw (unsmoothed) trigram probability
        """
        trig = tuple(["UNK" if word not in self.lexicon else word for word in list(trigram)])
        bigram = tuple(list(trig)[:-1])
        prob = 1/(len(self.lexicon)-1)
        if bigram == ("START", "START") and trig[1:] in self.bigramcounts:
            prob =  self.bigramcounts[trig[1:]]/self.total_sentences
        elif bigram in self.bigramcounts and trig in self.trigramcounts:
            prob = self.trigramcounts[trig]/self.bigramcounts[bigram]
        return prob

    def raw_bigram_probability(self, bigram):
        """
        COMPLETE THIS METHOD (PART 3)
        Returns the raw (unsmoothed) bigram probability
        """
        bigr = tuple(["UNK" if word not in self.lexicon else word for word in list(bigram)])
        unigram = tuple(list(bigr)[:-1])
        prob = 0.0
        if bigr[0] == "START" and bigr in self.bigramcounts:
            prob = self.bigramcounts[bigr]/self.total_sentences
        elif unigram in self.unigramcounts and bigr in self.bigramcounts:
            prob = self.bigramcounts[bigr]/self.unigramcounts[unigram]            
        return prob
    
    def raw_unigram_probability(self, unigram):
        """
        COMPLETE THIS METHOD (PART 3)
        Returns the raw (unsmoothed) unigram probability.
        """
        #hint: recomputing the denominator every time the method is called
        # can be slow! You might want to compute the total number of words once, 
        # store in the TrigramModel instance, and then re-use it.  
        unigr = tuple(["UNK" if word not in self.lexicon else word for word in list(unigram)])
        try:
            prob = self.unigramcounts[unigr]/self.total_sentences
        except:
            prob = 0.0
        return prob

    def generate_sentence(self,t=20): 
        """
        COMPLETE THIS METHOD (OPTIONAL)
        Generate a random sentence from the trigram model. t specifies the
        max length, but the sentence may be shorter if STOP is reached.
        """
        return result            

    def smoothed_trigram_probability(self, trigram):
        """
        COMPLETE THIS METHOD (PART 4)
        Returns the smoothed trigram probability (using linear interpolation). 
        """
        lambda1 = 1/3.0
        lambda2 = 1/3.0
        lambda3 = 1/3.0
        bigram = tuple(list(trigram)[:-1])
        unigram = tuple(list(bigram)[:-1])
        prob_trigram = self.raw_trigram_probability(trigram=trigram)
        prob_bigram = self.raw_bigram_probability(bigram=bigram)
        prob_unigram = self.raw_unigram_probability(unigram=unigram)

        return lambda1*prob_trigram+lambda2*prob_bigram+lambda3*prob_unigram
        
    def sentence_logprob(self, sentence):
        """
        COMPLETE THIS METHOD (PART 5)
        Returns the log probability of an entire sequence.
        """
        trigrams = get_ngrams(sentence,3)
        logprob = 0.0
        for trigram in trigrams:
            trigram = tuple(["UNK" if word not in self.lexicon else word for word in list(trigram)])
            logprob += math.log2(self.smoothed_trigram_probability(trigram))
        return logprob

    def perplexity(self, corpus):
        """
        COMPLETE THIS METHOD (PART 6) 
        Returns the log probability of an entire sequence.
        """
        l = 0.0
        lengths = 0
        for sentence in corpus:
            l += self.sentence_logprob(sentence)
            lengths += (len(sentence)-1)
        l = (l/lengths)
        return math.pow(2,-l)


def essay_scoring_experiment(training_file1, training_file2, testdir1, testdir2):

        model1 = TrigramModel(training_file1)
        model2 = TrigramModel(training_file2)

        total = 0.0
        correct = 0.0       
 
        for f in os.listdir(testdir1):
            pp = model1.perplexity(corpus_reader(os.path.join(testdir1, f)))
            pp2 = model2.perplexity(corpus_reader(os.path.join(testdir1, f)))
            if pp < pp2:
                correct += 1
            total += 1
        # print("=============")
        for f in os.listdir(testdir2):
            pp = model2.perplexity(corpus_reader(os.path.join(testdir2, f)))
            # print("=============")
            pp2 = model1.perplexity(corpus_reader(os.path.join(testdir2, f)))
            if pp < pp2:
                correct += 1
            total += 1
        
        return (correct/total)

# if __name__ == "__main__":

#     model = TrigramModel(sys.argv[1]) 

#     # put test code here...
#     # or run the script from the command line with 
#     # $ python -i trigram_model.py [corpus_file]
#     # >>> 
#     #
#     # you can then call methods on the model instance in the interactive 
#     # Python prompt. 

    
#     # Testing perplexity: 
#     dev_corpus = corpus_reader(sys.argv[2], model.lexicon)
#     pp = model.perplexity(dev_corpus)
#     print(pp)


#     # Essay scoring experiment: 
#     acc = essay_scoring_experiment('hw1_data/ets_toefl_data/train_high.txt', "hw1_data/ets_toefl_data/train_low.txt", "hw1_data/ets_toefl_data/test_high", "hw1_data/ets_toefl_data/test_low")
#     print(acc)



In [37]:
model = TrigramModel("/Users/sowryagali/Documents/Classes/NLP/Assignments/HW-1/hw1_data/brown_train.txt") 

In [25]:
model.raw_unigram_probability(["ABC"])

0.46743884269716923

In [9]:
model.unigramcounts[("START", )]

41614

In [10]:
model.bigramcounts[("START", "START" )]

41614

In [12]:
model.trigramcounts[("START", "START", "i")]

882

In [40]:
essay_scoring_experiment('/Users/sowryagali/Documents/Classes/NLP/Assignments/HW-1/hw1_data/ets_toefl_data/train_high.txt',\
                         "/Users/sowryagali/Documents/Classes/NLP/Assignments/HW-1/hw1_data/ets_toefl_data/train_low.txt", \
                         "/Users/sowryagali/Documents/Classes/NLP/Assignments/HW-1/hw1_data/ets_toefl_data/test_high",\
                         "/Users/sowryagali/Documents/Classes/NLP/Assignments/HW-1/hw1_data/ets_toefl_data/test_low")

0.7808764940239044