# P1 - Small Language Model

In [3]:
from nltk import *
from conllu import *
from treebanks import languages, train_corpus, test_corpus, conllu_corpus
from collections import defaultdict, Counter
from math import log, exp
from sys import float_info

# HMMs
Write your own code to estimate the transition probabilities and the emission prob- abilities of an HMM, on the basis of (tagged) sentences from a training corpus from Universal Dependencies. Do not forget to involve the start-of-sentence marker ⟨s⟩ and the end-of-sentence marker ⟨/s⟩ in the estimation.
The code in this part is concerned with:
- counting occurrences of one part of speech following another in a training corpus, 
- counting occurrences of words together with parts of speech in a training corpus, 
- relative frequency estimation with smoothing.

As discussed in the lectures, smoothing is necessary to avoid zero probabilities for events that were not witnessed in the training corpus. For emission probabilities, implement Witten-Bell smoothing for unigrams, exactly as on slide 23 of Lecture 5 (use 100000 as value of z). This means that we have one smoothed probability distribution over words for each tag. For transition probabilities, implement Witten-Bell smoothing for bigrams of tags, exactly as on slide 22 of that same lecture (and this in turn will make use of Witten-Bell unigram smoothing that you will have implemented before). To be clear: do not use any existing implementations of Witten-Bell, but make your own, based on your understanding of the lecture notes.

Further write your own code to implement the Viterbi algorithm, which determines the sequence of tags for a given sentence that has the highest probability. To avoid underflow for long sentences, we need to use log probabilities. Implement code to compute accuracy of POS tagging (the percentage of correctly predicted tags).

In [9]:

class HidddenMarkovModel:

    def __init__(self, trainingCorpus, z = 100000): 
        self.z = z
        self.n = 0 # length of training data
        self.m = 0 # number of distinct words                   # TODO can be omitted its just len(self.unigramCounts)
        self.unigramCounts = defaultdict(int)
        self.bigramCount = defaultdict(int)
        self.wordTagCounts = defaultdict(int) # dictionary of dictionaries tracking word occurrences per tag
        self.tagOccurr = defaultdict(int) # total number of word occurrences for tag t
        self.uniqueWordsPerTag = defaultdict(set)
        self.uniqueTags = []
        self.tagCountOccurrences = Counter() # TODO this iss new version of tagOccurr wout eos and sos markers
        #self.tagOccurrences = Counter()
        self.__initialiseVars(trainingCorpus)
        # print("number of distinct words including sentence markers: ", self.m)
        # print("total number of tokens including sentence markers: ", self.n)



    # Initialise all of the vars and train the model
    def __initialiseVars(self, trainingCorpus): 
        # train_sents = conllu_corpus(train_corpus(trainingCorpus))
        self.tagCountOccurrences = Counter([token['upos'] for sent in trainingCorpus for token in sent])
        # Bigram counts
        # for sent in train_sents:                                                # TODO this might overflow if sentence len = 1
        for sent in trainingCorpus:
            for i in range(len(sent)):
                if i == 0:
                    self.__addUnigramCount({"upos": "<s>", "form": "<s>"})
                    self.bigramCount[("<s>", sent[i]['upos'])] += 1
                    self.bigramCount[(sent[i]['upos'], sent[i+1]['upos'])] += 1
                elif i == len(sent) - 1: 
                    self.__addUnigramCount({"upos": "</s>", "form": "</s>"})
                    self.bigramCount[(sent[i]['upos'], "</s>")] += 1
                else: 
                    self.bigramCount[(sent[i]['upos'], sent[i+1]['upos'])] += 1
                self.__addUnigramCount(sent[i])
        self.m = len(self.unigramCounts)

        # self.uniqueTags = list(set([token['upos'] for sent in trainingCorpus for token in sent]))
        # wordTagPairs = [(token['form'], token['upos']) for sent in trainingCorpus for token in sent]
        self.uniqueTags = list(set([token['upos'] for sent in trainingCorpus for token in sent]))
        wordTagPairs = [(token['form'], token['upos']) for sent in trainingCorpus for token in sent]
        wordTagCounter = Counter(wordTagPairs)
        #self.tagOccurrences = Counter(tag for _, tag in wordTagPairs)
        self.wordTagCounts = defaultdict(lambda: defaultdict(int))
        for (word, tag), count in wordTagCounter.items():
            self.wordTagCounts[tag][word] = count

        for word, tag in wordTagPairs:
            self.uniqueWordsPerTag[tag].add(word)
        # T_tag = {tag: len(words) for tag, words in tag_unique_words.items()} # number of possible words per tag
            

        print(self.tagOccurr)


    def __addUnigramCount(self, token): 
        # tokens consist of a word and a tag
        #print(token["form"])
        self.n += 1
        self.tagOccurr[token['upos']] += 1
        self.unigramCounts[token['form']] += 1


    # Calculates P(tag) using Witten-Bell smoothing
    def __unigramProbability(self, tag):
       # print(len(self.tagOccurr))
        m = len(self.tagOccurr)
        if self.tagOccurr[tag] != 0:
            # return self.tagOccurr[tag] / (self.n + m)
            return self.tagCountOccurrences[tag] / (self.n + m)
        else: 
            return m / (self.z * (self.n + m))


    # Calculates P(word|tag) using Witten-Bell smoothing
    def unigramEmissionProbability(self, word, tag):
        tag_cts = self.wordTagCounts[tag]
        t = len(self.uniqueWordsPerTag[tag])
        #n = self.tagOccurr[tag] #
        n = self.tagCountOccurrences[tag] #
        # print(t)
        # print(n)
        # print(tag_cts[word])                                      #TODO this is debugging

        # If word has been seen with the tag
        if word in tag_cts:
            return float(tag_cts[word] / (n+t))
        else: 
            return float(t/(self.z * (n + t)))
        
    # def __wordUnigramProbability(self, word):
    #     if word in self.unigramCounts:
    #         return self.unigramCounts[word] / (self.n + self.m)
    #     else: 
    #         return self.m / (self.z * (self.m + self.n))



        


    # Calculates P(ti | ti-1) or P(tag|precedingTag) using Witten-Bell smoothing
    def bigramTransitionProbability(self, precedingTag, tag):  
        precCount = self.tagOccurr[precedingTag]
        if self.tagOccurr[precedingTag] == 0:
            # if precedingTag count = 0
            lambd = 0
        else :
            # possibleFollowers counts number of tags that were seen following 'precedingTag'
            possibleFollowers = sum(1 for (tag1, tag2) in self.bigramCount if tag1 == precedingTag) 
            lambd = precCount / (precCount + possibleFollowers)

        # P(tag | prevtag)
        if precCount == 0:
            prob_tag_given_preceding = 0
        else: 
            prob_tag_given_preceding = self.bigramCount[(precedingTag, tag)] / precCount
        # find the witten bell smoothed probability of tag
        unig_prob = self.__unigramProbability(tag)
        return (lambd * prob_tag_given_preceding) + ((1 - lambd) * unig_prob)
    



    # Adding a list of probabilities represented as log probabilities.
    def __logsumexp(self, vals):
        min_log_prob = -float_info.max
        if len(vals) == 0:
            return min_log_prob
        m = max(vals)
        if m == min_log_prob:
            return min_log_prob
        else:
            return m + log(sum([exp(val - m) for val in vals]))


    # def viterbi(self, sent): 
    #     # Initialize probability table and backpointer table
    #     probTable = [[-float_info.max] * len(sent) for _ in range(len(self.uniqueTags))]
    #     backpointer = [[-1] * len(sent) for _ in range(len(self.uniqueTags))]
        
    #     # Initialization step
    #     for q in range(len(self.uniqueTags)):
    #         transitionProb = log(self.bigramTransitionProbability("<s>", self.uniqueTags[q]))
    #         emissionProb = log(self.unigramEmissionProbability(sent[0], self.uniqueTags[q]))
    #         probTable[q][0] = transitionProb + emissionProb
            
    #     # Recursion step
    #     for t in range(1, len(sent)):
    #         for q in range(len(self.uniqueTags)):
    #             maxProb = -float_info.max
    #             maxBackpointer = -1
                
    #             for prev_q in range(len(self.uniqueTags)):
    #                 prob = probTable[prev_q][t-1] + \
    #                        log(self.bigramTransitionProbability(self.uniqueTags[prev_q], self.uniqueTags[q])) + \
    #                        log(self.unigramEmissionProbability(sent[t], self.uniqueTags[q]))
                    
    #                 if prob > maxProb:
    #                     maxProb = prob
    #                     maxBackpointer = prev_q
                
    #             probTable[q][t] = maxProb
    #             backpointer[q][t] = maxBackpointer
                
    #     # Termination step
    #     maxProb = -float_info.max
    #     maxBackpointer = -1
        
    #     for q in range(len(self.uniqueTags)):
    #         prob = probTable[q][len(sent)-1] + \
    #                log(self.bigramTransitionProbability(self.uniqueTags[q], "</s>"))
            
    #         if prob > maxProb:
    #             maxProb = prob
    #             maxBackpointer = q
                
    #     # Backtrace to find best path
    #     bestPath = []
    #     current = maxBackpointer
        
    #     for t in range(len(sent)-1, -1, -1):
    #         bestPath.insert(0, self.uniqueTags[current])
    #         current = backpointer[current][t]
            
    #     return bestPath


    def viterbi(self, sent): 
        # sent should be a 
        # Use log probablities -> logsumexptrick.py has stuff on how to use them 
        # # columns = number of words in sent
        # num of rows = number of possible tags
        # sent.insert(0, "<s>")
        # sent.append("</s>")
        probTable = [[0] * (len(sent) + 1) for _ in range(len(self.uniqueTags))]
        maxProb = [-float_info.max] * (len(sent) + 1)
        tagPath = [''] * (len(sent) + 1)            

        # Initialise 
        for q in range(len(probTable)):
            transitionProb = log(self.bigramTransitionProbability("<s>" , self.uniqueTags[q]))
            emissionProb = log(self.unigramEmissionProbability(sent[0], self.uniqueTags[q]))
            holder = [transitionProb, emissionProb ]
            # print(holder.shape())
            # print(type(holder))
            probTable[q][0] = self.__logsumexp( holder )
            if probTable[q][0] > maxProb[0] :            # TODO esto igual se puede optimizar con un max o algo o pivotando la tabla y en los loops de abajo tmb
                maxProb[0] = probTable[q][0]
                tagPath[0] = self.uniqueTags[q]

                # index_min = min(range(len(values)), key=values.__getitem__)

        # Compute
        for i in range(1, len(sent)):
            for q in range(len(probTable)):
                transitionProb = log(self.bigramTransitionProbability(tagPath[i-1] , self.uniqueTags[q]))
                emissionProb = log(self.unigramEmissionProbability(sent[i], self.uniqueTags[q]))
                prob = self.__logsumexp([maxProb[i - 1], transitionProb, emissionProb ])
                probTable[q][i] = prob
                if probTable[q][i] > maxProb[i] :            
                    maxProb[i] = probTable[q][i]
                    tagPath[i] = self.uniqueTags[q]

        # Finalise
        last_i = len(sent)
        for q in range(len(probTable)):
            transitionProb = log(self.bigramTransitionProbability(tagPath[last_i - 1] , "</s>" ))
            probTable[q][last_i] = self.__logsumexp([maxProb[last_i - 1], transitionProb])
            if probTable[q][last_i] > maxProb[last_i] :            
                maxProb[last_i] = probTable[q][last_i]
                tagPath[last_i] = "ONETOOMANY"

        # Reconstruct the sec
        #print(maxProb)
        #return tagPath
        return tagPath[:-1]
    

    # Compute the percentage of correctly predicted tags from a given corpus
    def posTaggingAccuracy(self, testCorpus):
        # testCorpus is a set of tagged sentences from conllu
        tagCount = 0
        correctTags = 0
        for sent in testCorpus:
            words = [token["form"] for token in sent]
            pred_tags = self.viterbi(words)
            given_tags = [token["upos"] for token in sent]
            # print(given_tags)
            # print(pred_tags)
            # print("----")
            correctCount = sum(1 for pred, given in zip(pred_tags, given_tags) if pred == given)
            tagCount += len(given_tags)
            correctTags += correctCount
        
        if tagCount != 0:
            return correctTags / tagCount
        return 0

# Language modelling using HMMs
As discussed in the lectures, HMMs can be used to determine the probability of an input sentence, using the forward algorithm. For this, you need to be able to add prob- abilities together that are represented as log probabilities, without getting underflow in the conversion from log probabilities to probabilities and back. See the included logsumexptrick.py for a demonstration.
Write your own code to compute the perplexity of a test corpus. (Consider the length of a corpus to be the total number of actual tokens plus the number of sentences; in effect we count one additional end-of-sentence token for each sentence in addition to the actual words and punctuation tokens.)

In [5]:
# Compute the perplexity of a test corpus given a HMM
# use length count as described above

# Language modelling using bigrams
Further write your own code to implement estimation of bigram probabilities of input tokens (so words and punctuation tokens, not POS tags). To avoid zero probabilities, you again need Witten-Bell smoothing; you should here be able to reuse the code you implemented earlier for transition probabilities of HMMs.
Again, implement your own code to compute perplexity of a test corpus, given a trained bigram model.

In [6]:
# Estimate bigram probabilities with witten bell smoothing 
# re-use code for transition probabilities of HMMs

In [7]:
# Compute perplexity of a test corpus given a trained bigram model 

# Experiments
Run the developed code for the three treebanks. Train using the training parts of the treebanks, and test using the testing parts of the treebanks. (It is good practice to mainly use the development parts during development of the code.) 

Testing here means:
• computing accuracy of POS tagging using an HMM, 
• computing perplexity using an HMM, and
• computing perplexity using a bigram model.


In [4]:
lang = "en" 
#lang = "orv" 
# lang = "tr" 

train_sents = conllu_corpus(train_corpus(lang))
test_sents = conllu_corpus(test_corpus(lang))

# for sent in train_sents:
#     for token in sent:
#         print(token['form'], '->', token['upos'], sep='', end=' ')
#         #print(token['form'], sep = '', end= "")
#     print()
    
uniqueTags = set([token['upos'] for sent in train_sents for token in sent])
tagCountOccurrences = Counter([token['upos'] for sent in train_sents for token in sent])
print(uniqueTags)
hmm = HidddenMarkovModel(trainingCorpus=train_sents)
print(tagCountOccurrences)
print(tagCountOccurrences['PROPN'])



{'NOUN', 'PROPN', 'AUX', 'INTJ', 'ADJ', 'VERB', 'PART', 'ADP', 'DET', 'PRON', 'ADV', 'NUM', 'CCONJ'}
defaultdict(<class 'int'>, {'<s>': 4274, 'PRON': 3022, 'AUX': 1732, 'DET': 3805, 'NOUN': 8621, 'ADP': 10791, 'PROPN': 11657, 'VERB': 4595, 'NUM': 933, 'ADJ': 1632, 'CCONJ': 751, '</s>': 4274, 'ADV': 431, 'PART': 366, 'INTJ': 319})
Counter({'PROPN': 11657, 'ADP': 10791, 'NOUN': 8621, 'VERB': 4595, 'DET': 3805, 'PRON': 3022, 'AUX': 1732, 'ADJ': 1632, 'NUM': 933, 'CCONJ': 751, 'ADV': 431, 'PART': 366, 'INTJ': 319})
11657


In [10]:
# HMM testing
train_sents = conllu_corpus(train_corpus("en"))
test_sents = conllu_corpus(test_corpus("en"))
hmm = HidddenMarkovModel(trainingCorpus=train_sents)
#hmm = HidddenMarkovModel("tr")

print("unig: ", hmm.unigramEmissionProbability("memphis", "PROPN")) 
print("unig: ", hmm.unigramEmissionProbability("show", "VERB")) 
print("unig: ", hmm.unigramEmissionProbability("me", "PRON")) 
print("unig: ", hmm.unigramEmissionProbability("petersburg", "PRON")) 
print("unig: ", hmm.unigramEmissionProbability("me", "VERB")) 
print("big: ", hmm.bigramTransitionProbability("ADP", "NOUN")) 
print("big: ", hmm.bigramTransitionProbability("NOUN", "NOUN")) 
print("big: ", hmm.bigramTransitionProbability("fgfsgs", "NOUN")) 

# what->PRON is->AUX the->DET cost->NOUN of->ADP
print(hmm.viterbi(["what", "is", "the", "cost", "of"]))
print(hmm.viterbi(["hello", "world"]))

print(hmm.posTaggingAccuracy(train_sents))
print(hmm.posTaggingAccuracy(test_sents))
#what->DET flights->NOUN are->VERB there->PRON from->ADP phoenix->PROPN to->ADP milwaukee->PROPN 

defaultdict(<class 'int'>, {'<s>': 4274, 'PRON': 3022, 'AUX': 1732, 'DET': 3805, 'NOUN': 8621, 'ADP': 10791, 'PROPN': 11657, 'VERB': 4595, 'NUM': 933, 'ADJ': 1632, 'CCONJ': 751, '</s>': 4274, 'ADV': 431, 'PART': 366, 'INTJ': 319})
unig:  0.00640863479214099
unig:  0.17722855935783693
unig:  0.33070088845014806
unig:  5.5939453767686737e-08
unig:  2.9362061681453316e-07
big:  0.12967037225167752
big:  0.1788198461119748
big:  0.15066673657351579
['PRON', 'AUX', 'DET', 'NOUN', 'ADP']
['VERB', 'ADP']
0.40219915733223716
0.4034954407294833


In [10]:
# Train - use only testing parts of treebank

In [11]:
# Test - use only testing part of treebank


# call the three functions implemented above 
# • computing accuracy of POS tagging using an HMM, 
# • computing perplexity using an HMM, and
# • computing perplexity using a bigram model.