In [4]:
from collections import defaultdict
from itertools import compress
import numpy as np

START_SYMBOL = '*/*'
STOP_SYMBOL = 'STOP/STOP'
RARE_SYMBOL = '_RARE_'
RARE_WORD_MAX_FREQ = 5
LOG_PROB_OF_ZERO = -1000

In [49]:
class SparseMatrix:
    def __init__(self):
        self.elements = {}

    def add(self, tuple, value):
        self.elements[tuple] = value

    def get(self, tuple):
        try:
            return self.elements[tuple]
        except KeyError:
            return LOG_PROB_OF_ZERO

# This function takes data to tag (brown_dev_words), a set of all possible tags (taglist), a set of all known words (known_words),
# trigram probabilities (q_values) and emission probabilities (e_values) and outputs a list where every element is a tagged sentence
# (in the WORD/TAG format, separated by spaces and with a newline in the end, just like our input tagged data)
# brown_dev_words is a python list where every element is a python list of the words of a particular sentence.
# taglist is a set of all possible tags
# known_words is a set of all known words
# q_values is from the return of calc_trigrams()
# e_values is from the return of calc_emissions()
# The return value is a list of tagged sentences in the format "WORD/TAG", separated by spaces. Each sentence is a string with a
# terminal newline, not a list of tokens. Remember also that the output should not contain the "_RARE_" symbol, but rather the
# original words of the sentence!
def viterbi(brown_dev_words, taglist, known_words, q_values, e_values, debug=True):
    # Create a list of tags for easy indexing.
    tags = list(taglist)
    words = list(set([k[0] for k in e_values.keys()]))

    # Convert q_values and e_values into sparse matrices
    # This enables easy indexing to get probabilities:
    #
    # P(word | preposition) = e_mat[w, p]
    # P(p3 | p1 p2) = q_mat[p1, p2, p3]
    #
    # This also simplifies and speeds up computation in the recursion phase
    # because tuples do not need to be created dynamically to calculate
    # the probabilities of each item.
    nTags = len(tags)
    tag_Dict = {k: v for k, v in zip(tags, range(nTags))}
    tag_ReverseDict = {v: k for k, v in tag_Dict.iteritems()}
    START_ID = tag_Dict['*']
    STOP_ID = tag_Dict['STOP']
    nWords = len(words)
    word_Dict = {k: v for k, v in zip(words, range(nWords))}
    
    if (debug):
        print tag_Dict
        print word_Dict

    e_mat = SparseMatrix()
    for t, p in e_values.iteritems():
        w_id = word_Dict[t[0]]
        t_id = tag_Dict[t[1]]
        e_mat.add((w_id, t_id), p)
    # END for
    q_mat = SparseMatrix()
    for t, p in q_values.iteritems():
        t1_id = tag_Dict[t[0]]
        t2_id = tag_Dict[t[1]]
        t3_id = tag_Dict[t[2]]
        q_mat.add((t1_id, t2_id, t3_id), p)
    # END for

    def recurseViterbi(j, n, wordN_id, vMatrix, bMatrix, bOverride = None):
        # Need to have the option to override the backpointed matrix for the
        # second column, where no data has been initialized.
        if bOverride is None:
            q_Probs = [q_mat.get((bMatrix[i, n-2], i, j)) for i in range(nTags)]
        else:
            q_Probs = [q_mat.get((bOverride, i, j)) for i in range(nTags)]
        # END if

        # Word emission probability is constant across all items
        e_Prob = e_mat.get((wordN_id, j))

        # Note that we are working with log probabilities, so we can sum
        # for the intersetion of probabilities.
        return np.add(np.add(q_Probs, vMatrix[:, n-1]), e_Prob)
    # END recurseViterbi

    def myViterbi(sent):
        # Initialize a matrix with N rows and M columns, where:
        #   N = number of POS Tag Classes
        #   M = number of words in the sentence
        print "Initializing Viterbi and Backtrace Matrices"
        vMat = np.zeros((nTags, len(sent)), np.dtype(np.float))
        bMat = np.zeros((nTags, len(sent)-1), np.dtype(np.int))

        # Initizlization step for initial transition probabilities.
        # use first word in the sentence unless it is rare.
        word0 = word_Dict[sent[0] if sent[0] in known_words else RARE_SYMBOL]
        for j in range(nTags):
            # Set the initial transition probabilities.
            vMat[j, 0] = q_mat.get((START_ID, START_ID, j)) + \
                            e_mat.get((word0, j))

            # Note: the backtrace is not used for the first column in this
            #       implementation because the starting tag is known (*).
        # END for
        if debug:
            print "Set initial transition probabilities."
            print vMat

        # Initialization step for the second column of transition probabilities
        # Hard-coded due to trigram model
        word1 = word_Dict[sent[1] if sent[1] in known_words else RARE_SYMBOL]
        for j in range(nTags):
            # Transition probabilities for the second column are recursively
            # defined based on the first column.
            vProbs = recurseViterbi(j, 1, word1, vMat, bMat, START_ID)
            vMat[j, 1] = np.max(vProbs)
            bMat[j, 0] = np.argmax(vProbs)
        # END for
        if debug:
            print "Set second column with recursive definition"
            print vMat
            print bMat

        # Recursion phase
        for n in range(2, len(sent)):
            # Use selected word unless it is a rare word.
            wordN = word_Dict[sent[n] if sent[n] in known_words else RARE_SYMBOL]
            for j in range(nTags):
                vProbs = recurseViterbi(j, n, wordN, vMat, bMat)
                vMat[j, n] = np.max(vProbs)
                bMat[j, n-1] = np.argmax(vProbs)
            # END for
            if debug:
                print "Set column {0} recursively".format(n)
                print vMat
                print bMat
        # END for

        # Terminate the recursion
        vProbs = recurseViterbi(STOP_ID, len(sent), STOP_ID, vMat, bMat)
        p_star = np.max(vProbs)
        q_star = np.argmax(vProbs)
        
        if debug:
            print "P*: {0}".format(p_star)
            print "Q*: {0}".format(q_star)

        # Iterate through the backtrace pointer
        ptr = q_star
        tagIds = []
        for k in range(len(sent)-2, -1, -1):
            tagIds.insert(0, ptr)
            ptr = bMat[ptr, k]
            if debug:
                print "Info for word {0}".format(k+1)
                print tagIds
                print ptr
        # END for
        tagIds.insert(0, ptr)

        # Get back the tags
        tags = [tag_ReverseDict[int(k)] for k in tagIds]

        return ' '.join(['/'.join((sent[i],tags[i])) for i in range(len(sent))])
    # END myViterbi

    tagged = [myViterbi(sent) for sent in brown_dev_words]
    return tagged

In [52]:
def testViterbi1():
    taglist = set(['*','A','B','C','.','STOP'])
    known_words = set(['He', 'is', 'fun', 'to', 'watch', '.', 'STOP', '*', '_RARE_'])
    sent = [['He', 'is', 'fun', 'to', 'watch', '.']]
    q_vals = { \
             ('*','*','A'): -1., \
             ('*','*','B'): -64., \
             ('*','A','B'): -1., \
             ('*','A','C'): -128., \
             ('A','B','A'): -1., \
             ('A','B','C'): -2., \
             ('B','A','C'): -1., \
             ('A','C','B'): -1., \
             ('C','B','.'): -1., \
             ('B','.','STOP'): 1., \
             ('A','B','C'): -2., \
             ('B','A','A'): -4., \
             ('A','C','C'): -8., \
             ('A','B','.'): -16., \
             ('A','C','.'): -32., \
             ('B','*','*'): -64.}
    e_vals = { \
             ('He','A'): -1., \
             ('He','B'): -2., \
             ('is','B'): -1., \
             ('is','A'): -2., \
             ('fun','A'): -1., \
             ('to','C'): -1., \
             ('watch','B'): -1., \
             ('.','.'): 1., \
             ('STOP','STOP'): 1., \
             ('He','C'): -4., \
             ('fun','B'): -4., \
             ('watch','A'): -8.}
    
    result = viterbi(sent, taglist, known_words, q_vals, e_vals)
    print result
# END

np.set_printoptions(precision=3, suppress=True)
testViterbi1()

{'A': 0, 'C': 1, 'B': 2, '*': 3, 'STOP': 4, '.': 5}
{'watch': 0, 'is': 1, 'STOP': 2, '.': 3, 'to': 4, 'fun': 5, 'He': 6}
Initializing Viterbi and Backtrace Matrices
Set initial transition probabilities.
[[   -2.     0.     0.     0.     0.     0.]
 [-1004.     0.     0.     0.     0.     0.]
 [  -66.     0.     0.     0.     0.     0.]
 [-2000.     0.     0.     0.     0.     0.]
 [-2000.     0.     0.     0.     0.     0.]
 [-2000.     0.     0.     0.     0.     0.]]
Set second column with recursive definition
[[   -2. -1004.     0.     0.     0.     0.]
 [-1004. -1130.     0.     0.     0.     0.]
 [  -66.    -4.     0.     0.     0.     0.]
 [-2000. -2002.     0.     0.     0.     0.]
 [-2000. -2002.     0.     0.     0.     0.]
 [-2000. -2002.     0.     0.     0.     0.]]
[[0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]
Set column 2 recursively
[[   -2. -1004.    -6.     0.     0.     0.]
 [-1004. -1130. -1006.     0.     0.     0.]
 [  -66.    -4. -