In [1]:
import nltk, inspect, math, numpy as np, random

from collections import defaultdict
from math import log

from nltk.corpus import brown
from nltk.tag import map_tag

from nltk.probability import FreqDist

# module for computing a Conditional Frequency Distribution
from nltk.probability import ConditionalFreqDist

# module for computing a Conditional Probability Distribution
from nltk.probability import ConditionalProbDist

from nltk.probability import MLEProbDist
from nltk.probability import LidstoneProbDist

assert map_tag('brown', 'universal', 'NR-TL') == 'NOUN', '''
Brown-to-Universal POS tag map is out of date.'''

In [5]:
class HMM:
    def __init__(self,train_data,test_data):
        self.train_data = train_data
        self.test_data = test_data
        self.states = []
        self.viterbi = []
        self.backpointer = []

    #TODO problem 1
    #compute emission model using ConditionalProbDist with the estimator: Lidstone probability distribution with +0.01 added to the sample count for each bin
    def emission_model(self,train_data):
        #TODO prepare data
        #don't forget to lowercase the observation otherwise it mismatches the test data
    
        ### Collect tags and words (lowercased)
        data = []
        for sentence in train_data:
            tmp = [(tag, word.lower()) for (word, tag) in sentence]
            data.extend(tmp)
    
        #TODO compute the emission model
        emission_FD = ConditionalFreqDist(data)
        self.emission_PD = ConditionalProbDist(emission_FD, LidstoneProbDist, 0.01)
        self.states = [u'.', u'ADJ', u'ADP', u'ADV', u'CONJ', u'DET', u'NOUN', u'NUM', u'PRON', u'PRT', u'VERB', u'X']
        #print "states: ",self.states,"\n\n"
        #states:  [u'.', u'ADJ', u'ADP', u'ADV', u'CONJ', u'DET', u'NOUN', u'NUM', u'PRON', u'PRT', u'VERB', u'X']

        return self.emission_PD, self.states

    #test point 1a
    def test_emission(self):
        print('test emission')
        t1 = -self.emission_PD['NOUN'].logprob('fulton') #10.7862305592
        t2 = -self.emission_PD['X'].logprob('fulton') #12.324461856
        return t1,t2

    #compute transition model using ConditionalProbDist with the estimator: Lidstone probability distribution with +0.01 added to the sample count for each bin
    def transition_model(self,train_data):
        data = []
        # TODO: prepare the data
        # the data object should be an array of tuples of conditions and observations
        # in our case the tuples will be of the form (tag_(i),tag_(i+1))
        # DON'T FORGET TO ADD THE START SYMBOL <s> and the END SYMBOL </s>
        tags = []
        for sentence in train_data:
            tmp = ["<s>"] ### Begining of sentenence
            tags_in_sent = [tag for (word, tag) in sentence] ### Get Tags in sentence
            tmp.extend(tags_in_sent)
            tmp.extend(["</s>"]) ### End of sentence
            tags.extend(tmp) ### Add to existing tag set

        data = [(tags[i], tags[i+1]) for i in range(len(tags) - 1)]
    
        #TODO compute the transition model
        transition_FD = ConditionalFreqDist(data)
        self.transition_PD = ConditionalProbDist(transition_FD, LidstoneProbDist, 0.01)
 
        return self.transition_PD
  
    #test point 1b
    def test_transition(self):
        print('test transition')
        transition_PD = self.transition_model(self.train_data)
        start = -transition_PD['<s>'].logprob('NOUN') #1.78408417115
        end = -transition_PD['NOUN'].logprob('</s>') #7.31426021816
        return start,end

    #train the HMM model
    def train(self):
        self.emission_model(self.train_data)
        self.transition_model(self.train_data)

    def set_models(self,emission_PD,transition_PD):
        self.emission_PD = emission_PD
        self.transition_PD = transition_PD
  
    #initialize data structures for tagging a new sentence
    #describe the data structures with comments
    #use the models stored in the variables: self.emission_PD and self.transition_PD
    #input 'observation' is first word in the sentence that you will need to tag
    def initialise(self,observation):
        #del self.viterbi[:]
        #del self.backpointer[:]
        #initialize for transition from <s> , begining of sentence
        # use costs (-log-base-2 probabilities)
        #TODO

        ### Initilaize Viterbi Structure and Backpointer Structure
        self.viterbi = defaultdict(lambda:defaultdict(lambda:0))
        self.backpointer = defaultdict(lambda:defaultdict(lambda:0))

        for state in self.states:
            self.viterbi[state][0] = -log(self.transition_PD["<s>"].prob(state), 2) - log(self.emission_PD[state].prob(observation), 2)
            self.backpointer[state][0] = "<s>"

    ### Implementations Comments ###
    # In this probelm, I initialize both the viterbi and backpointer structures using nested defaultdicts. This effectively creates a 
    # 2 dimensional array for which we can store our trellis values and the corresponding pointers allowing us to recreate our tagged
    # sequence. I specifically chose to make use of the defaultdict data structure since we don't have to worry about creating the proper
    # dimensions during initialization. Specifically, during the initialization stage, we don't know the length of the sentence we 
    # will tag, and thus don't know how large to make our structues. Since defaultdict structures allow for dynamic memory allocation, we effectively
    # deal with this problem.
   
  
    #tag a new sentence using the trained model and already initialized data structures
    #use the models stored in the variables: self.emission_PD and self.transition_PD
    #update the self.viterbi and self.backpointer datastructures
    #describe your implementation with comments
    #input 'observations' is the list of words to be tagged (including the first word)
    def tag(self,observations):
        for t in range(1,len(observations)):
            
            print('\nPrevious obesrvation: ', observations[t-1])
            print('\nCurrent obesrvation:', observations[t])
            
            #print('\nMinimum previous state:', self.backpointer[state][t-1])
            #print('Previous Viterbi column: ')
            #print(self.viterbi[self.states[i]][t - 1])
            
            for state in self.states:
            ### Compute probability and backpointer for arriving at desired state
                next_viterbi = []
                next_backpointer = []
                
                #print('\nCurrent state: ')
                #print(state)
                
                for i in range(len(self.states)):                  
                    next_viterbi.extend([self.viterbi[self.states[i]][t - 1] - log(self.transition_PD[self.states[i]].prob(state), 2) - log(self.emission_PD[state].prob(observations[t]), 2)])
                    next_backpointer.extend([self.viterbi[self.states[i]][t - 1] - log(self.transition_PD[self.states[i]].prob(state), 2)])
                ### Find Max (min for -log) probability and corresponding backpointer
                            
                #print('\nTranstion probabilities:')
                #print(- log(self.transition_PD[self.states[i]].prob(state), 2))
                
                #print('\nMinimum state cost:')
                #print(min(next_viterbi))
                
                self.viterbi[state][t] = min(next_viterbi)
                
                print('\nCurrent backpointer costs: ')
                print(next_backpointer)
            
                print('\nCurrent backpointer state: ', self.states[np.argmin(next_backpointer)])
                self.backpointer[state][t] = self.states[np.argmin(next_backpointer)]


        #add termination step (for transition to </s> , end of sentence)
        next_viterbi = []
        for i in range(len(self.states)):
            next_viterbi.extend([self.viterbi[self.states[i]][len(observations) - 1] - log(self.transition_PD[self.states[i]].prob("</s>"), 2)])
        self.viterbi["</s>"][len(observations)] = min(next_viterbi)
        self.backpointer["</s>"][len(observations)] = self.states[np.argmin(next_viterbi)]

        #TODO
        #reconstruct the tag sequence using the backpointer
        #return the tag sequence corresponding to the best path as a list (order should match that of the words in the sentence)
        bp = self.backpointer["</s>"][len(observations)]
        tags = [None] * len(observations)
        tags[len(observations)-1] = bp
        for t in reversed(range(1,len(observations))):
            tag = self.backpointer[bp][t]
            tags[t-1] = tag
            bp = tag
            
        print(tags)
        return tags

In [3]:
def answer_question4b():
    """ Report a tagged sequence that is incorrect
    :rtype: str
    :return: your answer [max 280 chars]"""

    tagged_sequence = 'fixme'
    correct_sequence = 'fixme'
    # Why do you think the tagger tagged this example incorrectly?
    answer =  inspect.cleandoc("""\
    fill me in""")[0:280]

    return tagged_sequence, correct_sequence, answer

def answer_question5():
    """Suppose you have a hand-crafted grammar that has 100% coverage on
        constructions but less than 100% lexical coverage.
        How could you use a POS tagger to ensure that the grammar
        produces a parse for any well-formed sentence,
        even when it doesn't recognise the words within that sentence?

    :rtype: str
    :return: your answer [max 500 chars]"""

    return inspect.cleandoc("""\
    fill me in""")[0:500]

# Useful for testing
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
    # http://stackoverflow.com/a/33024979
    return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)

def answers():
    global tagged_sentences_universal, test_data_universal, \
           train_data_universal, model, test_size, train_size, ttags, \
           correct, incorrect, accuracy, \
           good_tags, bad_tags, answer4b, answer5
    
    # Load the Brown corpus with the Universal tag set.
    tagged_sentences_universal = brown.tagged_sents(categories='news', tagset='universal')

    # Divide corpus into train and test data.
    test_size = 1000
    train_size = len(tagged_sentences_universal) - 1000 # fixme

    test_data_universal = tagged_sentences_universal[(-test_size):] # fixme
    train_data_universal = tagged_sentences_universal[:train_size] # fixme

    # Create instance of HMM class and initialise the training and test sets.
    model = HMM(train_data_universal, test_data_universal)

    # Train the HMM.
    model.train()

    # Inspect the model to see if emission_PD and transition_PD look plausible
    print('states: %s\n'%model.states)
    # Add other checks

    ######
    # Try the model, and test its accuracy [won't do anything useful
    #  until you've filled in the tag method
    ######
    s='the cat in the hat came back'.split()
    model.initialise(s[0])
    ttags = model.tag(s) # fixme
    print("Tag a trial sentence")
    print(list(zip(s,ttags)))

    # check the model's accuracy (% correct) using the test set
    #correct = 0
    #incorrect = 0

    #for sentence in test_data_universal:
    #    s = [word.lower() for (word, tag) in sentence]
    #    model.initialise(s[0])
    #    tags = model.tag(s)

    #    for ((word,gold),tag) in zip(sentence,tags):
    #        if tag == gold:
    #            pass # fix me
    #        else:
    #            pass # fix me

    #accuracy = 0.0 # fix me
    #print('Tagging accuracy for test set of %s sentences: %.4f'%(test_size,accuracy))

    # Print answers for 4b and 5
    bad_tags, good_tags, answer4b = answer_question4b()
    print('\nAn incorrect tagged sequence is:')
    print(bad_tags)
    print('The correct tagging of this sentence would be:')
    print(good_tags)
    print('\nA possible reason why this error may have occurred is:')
    print(answer4b[:280])
    answer5=answer_question5()
    print('\nFor Q5:')
    print(answer5[:500])

In [6]:
answers()

states: ['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X']


Previous obesrvation:  the

Current obesrvation: cat

Current backpointer costs: 
[26.95205724711751, 27.74909755335604, 30.527287505566758, 25.186234335378394, 28.966855462271152, 9.640005771173499, 25.06999943065416, 25.348879285184925, 24.93438184284509, 27.191979256162565, 28.830878756107833, 25.49075221570879]

Current backpointer state:  DET

Current backpointer costs: 
[28.51035702133997, 27.655991312661936, 27.276419754705845, 25.366136818466106, 26.095175067414058, 4.991157526502384, 28.912559270398305, 27.088396883269283, 27.816245746576687, 28.606682883268274, 29.270156669783024, 35.45797847454478]

Current backpointer state:  DET

Current backpointer costs: 
[27.25193077308766, 27.511605922906476, 29.350900032709973, 24.751446525549774, 27.266772371165892, 9.775642637401923, 25.28800654638021, 26.115179158836167, 25.277833218050812, 25.969701323074634, 27.399416975415715, 27.22435