Hidden Markov Model tagging words from zen_of_python sentencens with POS tags.
HMM matrices computed according to data from Brown Corpus.

In [None]:
import nltk
import numpy as np
import this
import codecs
from collections import Counter
from nltk.corpus import brown
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk import pos_tag

nltk.download('brown')
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

In [3]:
zen_of_python = codecs.encode(this.s, 'rot13')
zen_of_python = zen_of_python[34:]

sentences = [] 
correct = []
all_together = 0
for sentx in sent_tokenize(zen_of_python):
  sentences.append(word_tokenize(sentx)) 
  correct.append(pos_tag(word_tokenize(sentx)))
  all_together += len(word_tokenize(sentx))


In [4]:
class HMM:
  def __init__(self, trans_prob, emiss_prob, pi, states, observations):
    self.trans_prob = trans_prob
    self.emiss_prob = emiss_prob
    self.pi = pi
    self.states_no = len(states)
    self.observations_no = len(observations)
    self.states = states
    self.observations = observations
    self.states_rev = dict(zip(self.states.values(), self.states.keys()))
    self.eps = 1e-9

  def _next_state(self, current_prob): 
    current_prob = current_prob.reshape(-1, 1)
    x = current_prob*self.trans_prob
    return np.amax(x, axis=0), np.argmax(x, axis=0) # computing probability for tag k in i-th step and where did we come from

  def compute_best_states(self, obs):
    parent = np.ones((self.states_no, 1))
    for i in range(self.states_no):
      parent[i][0] = i

    if obs[0] in self.observations:  # if word is present in dictionary compute probability of comming to the first tag
      current_prob = self.pi*self.emiss_prob[:, self.observations[obs[0]]]
    else:
      current_prob = self.pi*self.eps
    for word in obs[1:]:
      current_prob, tmp_par = self._next_state(current_prob)
      if word in self.observations:
        current_prob = current_prob*self.emiss_prob[:, self.observations[word]]
      else:
        current_prob = current_prob*self.eps
      parent = np.concatenate([parent, tmp_par.reshape(-1, 1)], axis=1)

    pos = len(obs)-1 
    best = np.argmax(current_prob)
    res = []
    while(pos >= 0):
      res.append((obs[pos], self.states_rev[best]))
      best = int(parent[best][pos])
      pos -= 1
    return res

In [None]:
for category in brown.categories():
  print(category.upper())
  category_text_tagged = brown.tagged_words(categories=category)
  category_sents = brown.sents(categories=category)
  delta = 0.9

  
  words, tags = map(list, zip(*category_text_tagged)) 
  words = Counter(words)
  words = {word : i for i, word in enumerate(words.keys())}
  words_no = len(words.keys())

  tags = Counter(tags)
  tags_freq = np.array(list(tags.values()))
  tags_freq = tags_freq/np.sum(tags_freq)
  tags = {tag : i for i, tag in enumerate(tags.keys())}
  tags_no = len(tags.keys())
  words_tags = Counter(category_text_tagged)

  pi = np.zeros(tags_no, dtype=np.float32)
  transition_matrix = np.zeros((tags_no, tags_no), dtype=np.float32)
  emission_matrix = np.zeros((tags_no, words_no), dtype=np.float32)

  for key, val in words_tags.items():
    emission_matrix[tags[key[1]]][words[key[0]]] = val
  emission_matrix = emission_matrix/np.sum(emission_matrix, axis=1).reshape(-1, 1)

  counter = 0
  for sent in category_sents:
    prev_tag = ""
    for i, word in enumerate(sent):
      prev_tag = category_text_tagged[max(counter-1, 0)][1]
      if(i == 0):
        pi[tags[category_text_tagged[max(counter-1, 0)][1]]] += 1
      cur_tag = category_text_tagged[counter][1]
      transition_matrix[tags[prev_tag]][tags[cur_tag]] += 1
      counter += 1

  transition_matrix = transition_matrix/np.sum(transition_matrix, axis=1).reshape(-1, 1)  
  transition_matrix = delta*transition_matrix + (1-delta)*tags_freq.reshape(-1,1)
  
  pi = tags_freq 

  hmm = HMM(transition_matrix, emission_matrix, pi, tags, words)
  res = 0
  for pos, sent in enumerate(sentences):
    hmm_sent = hmm.compute_best_states(sent)[::-1]
    print(hmm_sent)
    for i in range(len(hmm_sent)):
      if(hmm_sent[i][1] == correct[pos][i][1]):
        res += 1
  print(res/all_together*100)