In [1]:
from nltk.corpus import brown
import random
import operator

In [2]:
from TrigramLanguageModel import TrigramLanguageModel, null_tokenizer

In [3]:
START = '<s>'
STOP = '<st>'
TERMINAL_POS = 'TER'

## Models for HMM
* `Emission_model(<o1>, <q2>, <o2>) = P(o2 | o1, q2)`
* `POS_model(<q1>, <q2>, <q3>) = P(q3 | q2, q1)`

In [4]:
POS_model = TrigramLanguageModel(null_tokenizer)
Emission_model = TrigramLanguageModel(null_tokenizer)
possible_tags = {}

In [5]:
train_ = len(brown.tagged_sents()) * 0.8
train = brown.tagged_sents()[0:int(train_)]
dev_ = brown.tagged_sents()[int(train_):]
dev = brown.tagged_sents()[int(train_):]

In [6]:
def fill_in(sentence): return [(START, TERMINAL_POS), (START, TERMINAL_POS)] + sentence + [(STOP, TERMINAL_POS)]

## Trigram Hidden Markov Model

In [7]:
print(train[0])

[('The', 'AT'), ('Fulton', 'NP-TL'), ('County', 'NN-TL'), ('Grand', 'JJ-TL'), ('Jury', 'NN-TL'), ('said', 'VBD'), ('Friday', 'NR'), ('an', 'AT'), ('investigation', 'NN'), ('of', 'IN'), ("Atlanta's", 'NP$'), ('recent', 'JJ'), ('primary', 'NN'), ('election', 'NN'), ('produced', 'VBD'), ('``', '``'), ('no', 'AT'), ('evidence', 'NN'), ("''", "''"), ('that', 'CS'), ('any', 'DTI'), ('irregularities', 'NNS'), ('took', 'VBD'), ('place', 'NN'), ('.', '.')]


** Building POS, Emission  models**

In [8]:
for sentence in train:
    sentence = fill_in(sentence)
    for i in range(2, len(sentence)): POS_model.update_tree([sentence[i-2][1], sentence[i-1][1], sentence[i][1]])
    for i in range(1, len(sentence)): Emission_model.update_tree([sentence[i-1][0], sentence[i][1], sentence[i][0]])
    for w, p in sentence:
        if w in possible_tags: 
            if p not in possible_tags[w]:
                possible_tags[w].append(p)
        else: possible_tags[w] = [p]

## Viterbi

In [9]:
class Viterbi():
    def __init__(self, POS_model, Emission_model, possible_tags):
        self.Q_model = POS_model
        self.O_model = Emission_model
        self.Q = list(POS_model.tree.keys())
        self.states = None
        self.sentence = None
        self.observations = None
        self.__cache = {}
        self.Q_ = possible_tags
        
    def __call__(self, sentence):
        for word in sentence:
            if not word in self.Q_:
                self.Q_[word] = self.Q
        self.states = None
        self.sentence = None
        self.observations = None
        self.__cache = {}
        self.states = None
        self.observations = self.__fill_in(sentence)
        self.r()
        self.get_pos()
        
        return self.states
    
    def __fill_in(self, sentence):
        return [START, START] + sentence + [STOP]
    
    def __r(self, n, position):
        if position > 3: # for both n-1 and n-2
            qn = self.Q_[self.observations[position]][n]
            for q1 in self.Q_[self.observations[position-1]]:
                arr = []
                for q2 in self.Q_[self.observations[position-2]]:
                    cur_prob = self.Q_model.get_prob([q2, q1, qn])
                    cur_prob *= self.O_model.get_prob([self.observations[position-2], q1 , self.observations[position-1]])
                    cur_prob *= self.__cache[position-1][q1][q2]
                    arr.append(cur_prob)
                self.__cache_probs(position, qn, q1, max(arr))
                    
        else: # only for n-1, q1 and q0 are constants
            max_q = None
            max_prob = 0
            q3 = self.Q_[self.observations[position]][n]
            o1, o2 = self.observations[1:3]
            for q2 in self.Q_[self.observations[position - 1]]:
                cur_prob = self.Q_model.get_prob([TERMINAL_POS, q2, q3])
                cur_prob *= self.O_model.get_prob([o1, q2 , o2])
                cur_prob *= self.Q_model.get_prob([TERMINAL_POS, TERMINAL_POS, q2])
                self.__cache_probs(position, q3, q2, cur_prob)
#                print(self.__cache)
                
    def __cache_probs(self, position, cur_pos, cur_his, prob):
        # position, current pos, prev two pos, prob
        if position not in self.__cache: self.__cache[position] = {}
        if cur_pos not in self.__cache[position]: self.__cache[position][cur_pos] = {}
        if cur_his not in self.__cache[position][cur_pos]: self.__cache[position][cur_pos][cur_his] = prob
        
    def r(self):
        for position in range(3, len(self.observations)):
            for n in range(len(self.Q_[self.observations[position]])): #for each q for each position cache probability based on q-1 and q-2
                self.__r(n, position)
            
    def get_pos(self):
        a = self.__cache
        top = len(a) + 2
        cur = 'TER'
        states = []
        while(top > 2):
            max_k, max_v = None, -1
            for i in a[top][cur]:
                if a[top][cur][i] > max_v:
                    max_v = a[top][cur][i]
                    max_k = i
            states.append(max_k)
            cur = max_k
            top -= 1
        self.states =  ['TER', 'TER'] + states[::-1]

In [10]:
v = Viterbi(POS_model, Emission_model, possible_tags)

### Testing on first 500 sentences from held-out Dev set

In [11]:
a = 0
b = 0
count = 0
for i in dev[0:500]:
    print(count, end='\r')
    count += 1
    sentence, tags = list(zip(*i))
    try:
        pred = v(list(sentence))[2:-1]
        for i, j in zip(pred, list(tags)):
            if i == j: a += 1
            b += 1
    except Exception as e:
        print(e)

499

In [12]:
print('Held out accuracy on first 500 sentences: ' +  str(float(a)/b * 100 ) + '%')

Held out accuracy on first 500 sentences: 92.38488783943329%


## Forward Procedure

In [13]:
class Forward():
    def __init__(self, POS_model, Emission_model, possible_tags):
        self.Q_model = POS_model
        self.O_model = Emission_model
        self.Q = list(POS_model.tree.keys())
        self.states = None
        self.observations = None
        self.__cache = {}
        self.sentence = None
        self.Q_ = possible_tags
        
    def __call__(self, sentence):
        for word in sentence:
            if not word in self.Q_:
                self.Q_[word] = self.Q
        self.observations = [START, START] + list(sentence) + [STOP]
        self.sentence = sentence
        self.p_ = []
        self.p()
        
        return self.get_prob()
    
    def cache_prob(self, q, q1, n, prob):
        if n not in self.__cache: self.__cache[n] = {}
        if q not in self.__cache[n]: self.__cache[n][q] = {}
        self.__cache[n][q][q1] = prob
        
    def get_prob(self):
        f = len(self.observations) - 1
        p = 0.0
        for i in self.__cache[f]:
            for j in self.__cache[f][i]:
                p += self.__cache[f][i][j]
        return p
        
    def __p(self, q, n):
        if n > 2:
            prob = 0.0
            for q1 in self.Q_[self.observations[n-1]]:
                for q2 in self.Q_[self.observations[n-2]]:
                    temp = self.Q_model.get_prob([q2, q1, q]) 
                    temp *= self.__cache[n-1][q1][q2] 
                    temp *= self.O_model.get_prob([self.observations[n-2], q1, self.observations[n-1]])
                    prob += temp
                self.cache_prob(q, q1, n, prob)
            
        else:
            for q1 in self.Q_[self.observations[n-1]]:
                temp = self.Q_model.get_prob([TERMINAL_POS, q1, q])
                temp *= self.O_model.get_prob([self.observations[n-2], q1, self.observations[n-1]])
                prob = temp
                self.cache_prob(q, q1, n, prob)
            
    def p(self):
        for n in range(2, len(self.observations)):
            for q in self.Q_[self.observations[n]]:
                self.__p(q, n)

In [14]:
f = Forward(POS_model, Emission_model, possible_tags)

In [15]:
avg_train_prob = 0
for i in train[0:500]:
    try: avg_train_prob += f(list(zip(*i))[0])
    except: pass
avg_train_prob /= 500.
print('Average train probability is:', avg_train_prob)

Average train probability is: 0.013157183104485501


**Avg probability for dev set is relatively low because dev set has some previously unseen words, for which the Emission probabilities and State Transition probabilities are very low**

In [16]:
avg_dev_prob = 0
for i in dev[0:500]:
    try: avg_dev_prob += f(list(zip(*i))[0])
    except: pass
avg_dev_prob /= 500.
print('Average dev probability is:', avg_dev_prob)

Average dev probability is: 0.00011150557999454281


## Top 50 words emitted by each POS tag

In [17]:
POS = {}

In [18]:
for sentence in train:
    for word, pos in sentence:
        if pos not in POS: POS[pos] = {word: 1}
        elif word not in POS[pos]: POS[pos][word] = 1
        else: POS[pos][word] += 1

In [19]:
for p in POS:
    print(p,": ", list(zip(*sorted(POS[p].items(), key=operator.itemgetter(1), reverse=True)))[0][0:50])

PPS+BEZ-HL :  ("It's",)
. :  ('.', ';', '?', '!', ':')
PPSS+BER-NC :  ("we're",)
NNS$-TL :  ("Women's", "People's", "States'", "Motors'", "Nations'", "Citizens'", "Queens'", "Ladies'", "Giants'", "Dealers'", "Children's", "Mothers'", "Artists'", "Juniors'", "Veterans'", "Policemen's", "Officers'", "Archbishops'", "Writers'", "Trustees'", "Portwatchers'", "Delegates'", "Raiders'", "Sportsmen's", "Ass'ns'", "Friends'", "Princes'", "Girls'", "Students'", "Rogues'", "Bros.'", "Guideposts'", "Handlers'", "Pirates'", "Manufacturers'", "Employers'", "Grizzlies'", "Braves'", "Bombers'", "Falcons'", "Times'", "Men's", "Peoples'")
NRS :  ('Sundays', 'Fridays', 'Mondays', 'Saturdays', 'Wednesdays')
FW-NNS :  ('al', 'cetera', 'voyageurs', 'corroborees', 'kolkhozes', 'contadini', 'rata', 'amici', 'circonscriptions', 'crus', 'al.', 'virtuosi', 'boites', 'filles', 'illusions', 'libellos', 'Einsatzkommandos', 'engages', 'Engages', 'lutihaw', 'coureurs', 'pains', 'banditos', 'culpas', 'Pacta', 'phis', 