## Katona Máté (PD6YOR)


# Homework 2

The maximum score of this homework is 100 points. Grading is listed in this table:

| Grade | Score range |
| --- | --- |
| 5 | 85+ |
| 4 | 70-84 |
| 3 | 55-69 |
| 2 | 40-54 |
| 1 | 0-39 |

Most exercises include tests which should pass if your solution is correct.
However successful test do not guarantee that your solution is correct.
You are free to add more tests.

## Deadline
__2017 November 20<sup>th</sup> Monday 23:59__


# Main exercise (60 points)

Implement the Viterbi algorithm fot $k=3$.

The input can be found [here](http://sandbox.hlt.bme.hu/~gaebor/ea_anyag/python_nlp/).
You can use the 1, 10 or 100 million word corpus, it is advised to use the 1 million corpus while you are developing and you can try te larger ones afterwards.

Write a class called `Viterbi` with the following attributes:
* `__init__`: has no arguments, except `self`
* `train`: one argument (aside `self`), an iterable object which generates 2-tuples of strings `[(word1, pos1), (word2, pos2), ...]`
  * initializes the vocabularies, empirical probabilities or any other data attributes needed for the algorithm.
  * you can read the data (generator) only once
  * returns `None`
* predict: one argument (aside `self`), an iterable object, a list of words.
  * returns the predicted label sequence: a list of labels with the same length as the input.

Don't use global variables!

### Hint
* Use a 3-dimensional array for the transition probabilities $\mathbb{P}(v \ | \ w, u)$.
* Use a 3-dimensional array for the Viterbi table, one index for time, one index for the previous state and one index for the state before that.
* Same for the backtracking table

In [None]:
import numpy as np
from collections import deque

In [None]:
class NotTrainedException(Exception):
    pass


class Viterbi(object):
    def __init__(self):
        self.trained = False
        
    def train(self, data):
        wt, ts, self.wd2id, self.tg2id = {}, {}, {}, {}
        prev_ti, prev_prev_ti = None, None
        # process data: word and tag ids, count tags and tag sequences
        for w, t in data:
            self.wd2id[w] = self.wd2id.get(w, len(self.wd2id))
            self.tg2id[t] = self.tg2id.get(t, len(self.tg2id))
            wi = self.wd2id[w]  # id of word
            ti = self.tg2id[t]  # id of tag
            wt[wi] = wt.get(wi, {})
            wt[wi][ti] = wt[wi].get(ti, 0) + 1  # count word w tag t
            if prev_ti is not None and prev_prev_ti is not None:
                seq = (ti, prev_ti, prev_prev_ti)
                ts[seq] = ts.get(seq, 0) + 1  # count tag sequence (ppt, pt, t)
            prev_prev_ti = prev_ti
            prev_ti = ti
        self.id2wd = {wi: w for w, wi in self.wd2id.items()}
        self.id2tg = {ti: t for t, ti in self.tg2id.items()}
        # create tables from dicts (now that size is known)
        self.W = len(self.wd2id)  # number of words
        self.T = len(self.tg2id)  # number of tags
        word_tags = np.zeros((self.W, self.T), dtype=np.int32)  # number of (word, tag) occurences
        tag_seqs = np.zeros((self.T, self.T, self.T), dtype=np.int32)  # number of (tag, tag, tag) occurences
        for word, tags in wt.items():
            for tag, count in tags.items():
                word_tags.itemset((word, tag), count)
        for seq, count in ts.items():
            tag_seqs.itemset(seq, count)
        # convert nr of occurences to probabilities
        self.convert_probs(word_tags, tag_seqs)
        # enable prediction
        self.trained = True
        print("trained on {} words and {} tags".format(self.W, self.T))
        return
    
    def convert_probs(self, word_tags, tag_seqs):
        self.word_tag_probs = word_tags / word_tags.sum(axis=0).astype(np.float32)
        self.tag_seq_probs = tag_seqs / tag_seqs.sum(axis=(1,2)).astype(np.float32)
        return
    
    def _step(self, k_prev, p_tag, tag, word_tag_prob):
            m, arg_m = 0., -1  # 0 is a valid tag
            for pp_tag in range(self.T):
                prod = (self.viterbi[k_prev, pp_tag, p_tag] * 
                        self.tag_seq_probs[tag, p_tag, pp_tag] * 
                        word_tag_prob)
                if prod > m:
                    m, arg_m = prod, pp_tag
            return m, arg_m
        
    def predict(self, words):
        # raise error if not trained yet    
        if not self.trained:
            raise NotTrainedException("Train me first!")
        # viterbi init
        N = len(words)  # length of sentence
        self.viterbi = np.zeros((N, self.T, self.T), dtype=np.float32)
        self.backpts = -np.ones((N-2, self.T, self.T), dtype=np.int32)  # 0 is a valid index
        # k=0 -> (prev_)prev_tag does not exist
        # set viterbi(0, *, v) = P(w_0|v)
        wi = self.wd2id[words[0]]
        for tag in range(self.T):
            self.viterbi[0, :, tag] = self.word_tag_probs[wi, tag] * np.ones((self.T))
        # k=1 -> prev_prev_tag does not exist
        # set viterbi(1, u, v) = P(w_1|u) * P(u|v)
        wi = self.wd2id[words[1]]
        for tag in range(self.T):
            word_tag_prob = self.word_tag_probs[wi, tag]
            if word_tag_prob == 0:
                self.viterbi[k, :, tag] = np.zeros((self.T))
                self.backpts[k-2, :, tag] = -np.ones((self.T))  # zero is a valid tag
                continue
            for prev_tag in range(self.T):
                tag_seq_prob = self.tag_seq_probs[tag, prev_tag, :].sum()
                self.viterbi[1, prev_tag, tag] = word_tag_prob * tag_seq_prob
        # k=2...N -> find most likely prev_prev_tag
        for k in range(2, N):
            wi = self.wd2id[words[k]]
            for tag in range(self.T):
                word_tag_prob = self.word_tag_probs[wi, tag]
                if word_tag_prob == 0:
                    self.viterbi[k, :, tag] = np.zeros((self.T))
                    self.backpts[k-2, :, tag] = -np.ones((self.T))  # zero is a valid tag
                    continue
                for prev_tag in range(self.T):
                    p, t = self._step(k-1, prev_tag, tag, word_tag_prob)
                    self.viterbi[k, prev_tag, tag] = p  # to make lines shorter
                    self.backpts[k-2, prev_tag, tag] = t
        # get max likelihood last tags
        max_prob = self.viterbi[-1, :, :].max()
        max_prob_tags = np.where(self.viterbi[-1, :, :] == max_prob)
        next_tag = max_prob_tags[0][0]
        next_next_tag = max_prob_tags[1][0]
        pred_tags = deque([self.id2tg[next_tag], self.id2tg[next_next_tag]], maxlen=N)
        # backtrack max likelihood tags
        for k in range(N-2-1, -1, -1):
            pred_current_tag = self.backpts[k, next_tag, next_next_tag]
            pred_tags.appendleft(self.id2tg[pred_current_tag])
            next_next_tag = next_tag
            next_tag = pred_current_tag
        return list(pred_tags)
    

In [None]:
tags = {'the': 'DT',
        'cat': 'NN', 'dog': 'NN', 'man': 'NN',
        'goes': 'VBZ', 'sits': 'VBZ',
        'to': 'TO', 'on': 'IN', 
        'store': 'NN', 'chair': 'NN', 'bed': 'NN',
        '.': '.',
       }
sents = ('the cat goes to the store .',
         'the cat sits on the bed .',
         'the cat sits on the chair .',
         'the dog goes to the store .',
         'the dog sits on the bed .',
         'the dog sits on the chair .',
         'the man goes to the store .',
         'the man sits on the bed .',
         # 'the man sits on the chair .',
        )
dummy = [(word, tags[word]) for sent in sents for word in sent.split(' ')]

In [None]:
vd = Viterbi()
vd.train(dummy)

In [None]:
vd.predict('the man sits on the chair .'.split(' '))
print(vd.id2wd)
print(vd.id2tg)
print(vd.word_tag_probs)
print(vd.tag_seq_probs)
print(vd.viterbi)
print(vd.backpts)

In [None]:
v = Viterbi()
with open("umbc.casesensitive.word_pos.1M.txt", "r", encoding="utf8", errors="ignore") as f:
    generator1 = (line.strip().split("\t") for line in f)
    generator2 = (line for line in generator1 if len(line) == 2)
    v.train(generator2)

In [None]:
print(v.predict("You talk the talk .".split()))
#assert(v.predict("You talk the talk .".split()) == ['PRP', 'VB', 'DT', 'NN', '.'])
print(v.predict("The dog .".split()))
print(v.predict("The dog runs .".split()))
print(v.predict("The dog runs slowly .".split()))
print(v.predict("The dog 's run was slow .".split()))

## Small exercise 1. (10 points)
Modify the Viterbi class: use a logarithmic scale for probabilities.

In the Viterbi table instead of 
$$
\pi(k,u,v) = \max_{w\in L} \pi(k-1, w, u)\cdot \mathbb{P}(v \ | \ w,u)\cdot \mathbb{P}(w_k \ | \ v) 
$$
use
$$
\hat\pi(k,u,v) = \max_{w\in L} \hat\pi(k−1,w,u) + \log\mathbb{P}(v \ | \ w,u) + \log\mathbb{P}(w_k \ | \ v) 
$$

Note that the minimum probability is $0$, but the minimum logarithm is $-\infty$. Both numpy and python float can deal with minus infinity.<br>
Precalculate the log-probabilities in the initializer, not during the dymanic programming.

This should not affect the result, just the numbers in the viterbi table.

Name the log-scaled imlementation `ViterbiLog`, it can inherit from `Viterbi` or it can be a whole new class.

In [None]:
class ViterbiLog(Viterbi):
    def convert_probs(self, word_tags, tag_seqs):
        super().convert_probs(word_tags, tag_seqs)
        self.word_tag_probs = np.log(self.word_tag_probs)
        self.tag_seq_probs = np.log(self.tag_seq_probs)
        return
    
    def _step(self, k_prev, p_tag, tag, word_tag_prob):
        m, arg_m = 0., -1  # 0 is a valid tag
        for pp_tag in range(self.T):
            prod = (self.viterbi[k_prev, pp_tag, p_tag] + 
                    self.tag_seq_probs[tag, p_tag, pp_tag] + 
                    word_tag_prob)
            if prod > m:
                m, arg_m = prod, pp_tag
        return m, arg_m
        
    def predict(self, words):
        # raise error if not trained yet    
        if not self.trained:
            raise NotTrainedException("Train me first!")
        # viterbi init
        N = len(words)  # length of sentence
        self.viterbi = np.zeros((N, self.T, self.T), dtype=np.float32)
        self.backpts = -np.ones((N-2, self.T, self.T), dtype=np.int32)  # 0 is a valid index
        # k=0 -> (prev_)prev_tag does not exist
        # set viterbi(0, *, v) = log P(w_0|v)
        wi = self.wd2id[words[0]]
        for tag in range(self.T):
            self.viterbi[0, :, tag] = self.word_tag_probs[wi, tag] * np.ones((self.T))
        # k=1 -> prev_prev_tag does not exist
        # set viterbi(1, u, v) = log P(w_1|u) + log P(u|v)
        wi = self.wd2id[words[1]]
        for tag in range(self.T):
            word_tag_prob = self.word_tag_probs[wi, tag]
            if word_tag_prob == -np.inf:
                    self.viterbi[1, :, tag] = np.log(np.zeros((self.T)))
                    continue
            for prev_tag in range(self.T):
                tag_seq_prob = self.tag_seq_probs[tag, prev_tag, :].sum()
                self.viterbi[1, prev_tag, tag] = word_tag_prob + tag_seq_prob
        # k=2...N -> find most likely prev_prev_tag
        for k in range(2, N):
            wi = self.wd2id[words[k]]
            for tag in range(self.T):
                word_tag_prob = self.word_tag_probs[wi, tag]
                if word_tag_prob == - np.inf:
                    self.viterbi[k, :, tag] = np.log(np.zeros((self.T)))
                    self.backpts[k-2, :, tag] = -np.ones((self.T))  # zero is a valid tag
                    continue
                for prev_tag in range(self.T):
                    p, t = self._step(k-1, prev_tag, tag, word_tag_prob)
                    self.viterbi[k, prev_tag, tag] = p  # to make lines shorter
                    self.backpts[k-2, prev_tag, tag] = t
        print(self.viterbi)
        # get max likelihood last tags
        max_prob = self.viterbi[-1, :, :].max()
        max_prob_tags = np.where(self.viterbi[-1, :, :] == max_prob)
        next_tag = max_prob_tags[0][0]
        next_next_tag = max_prob_tags[1][0]
        pred_tags = deque([self.id2tg[next_tag], self.id2tg[next_next_tag]], maxlen=N)
        # backtrack max likelihood tags
        for k in range(N-2-1, -1, -1):
            pred_current_tag = self.backpts[k, next_tag, next_next_tag]
            pred_tags.appendleft(self.id2tg[pred_current_tag])
            next_next_tag = next_tag
            next_tag = pred_current_tag
        return list(pred_tags)



In [None]:
with open("umbc.casesensitive.word_pos.1M.txt", "r", encoding="utf8", errors="ignore") as f:
    generator1 = (line.strip().split("\t") for line in f)
    generator2 = (line for line in generator1 if len(line) == 2)

    viterbi_log = ViterbiLog()
    viterbi_log.train(generator2)

In [None]:
# assert(viterbi.predict("The dog runs slowly .".split()) == viterbi_log.predict("The dog runs slowly .".split()))
print(v.predict("The dog runs slowly .".split()))
print(viterbi_log.predict("The dog runs slowly .".split()))

In [None]:
vd = ViterbiLog()
vd.train(dummy)

In [None]:
#vd.predict('the man sits on the chair .'.split(' '))
print(vd.id2wd)
print(vd.id2tg)
print(vd.word_tag_probs)
print(vd.tag_seq_probs)
print(vd.viterbi)
print(vd.backpts)

## Small exercise 2. (30 points)
### a) 15 points
Modify the Viterbi class: use a sparse storage for transition probabilities, not a 3-dimensional array.

Use a dict to store the frequencies of the 2 and 3 tuples of labels.

For example if you had _"adjective noun"_ 10 times and _"adjective noun determinant"_ 5 times, then store the following

In [None]:
{('JJ', 'NN'): 10, ('JJ', 'NN', 'DT'): 5}

In the example $\mathbb{P}(DT \ | \ JJ, NN ) = 0.5$

Note that whenever $\#\{JJ, NN\} = 0$ or $\#\{JJ, NN, DT\} = 0$, then $\mathbb{P}(DT \ | \ JJ, NN ) = 0$.

Implement this in a new class `ViterbiSparse`, it can inherit from the original one or it can be a new class.

### b) 15 points
Try to find a sparse representation (with `dict`-s) which makes the inner for loop shorter. Note that you don't have to take the maximum over all the $w\in L$ elements, if you already know that some transition probabilities are zeros.

$$
\max_{\substack{w\in L \\ \mathbb{P}(v \ | \ w,u) > 0}} \pi(k-1, w, u)\cdot \mathbb{P}(v \ | \ w,u)\cdot \mathbb{P}(w_k \ | \ v)
$$