Let $y_i \sim Y$ be observations and let $x_i \sim X$ be hidden variables. We define the following functions:
<br>
<br>



<table style="width:100%">
  <tr>
    <th>Signature</th>
    <th>Definition</th>
    <th>Description</th>
  </tr>
  <tr>
    <td>$\pi_i$</td>
    <td>$\Pr(x_i)$</td>
    <td>Initial probability</td>
  </tr>
  <tr>
    <td>$a_{ij}$</td>
    <td>$\Pr(x_j | x_i)$</td>
    <td>transition probability</td>
  </tr>
  <tr>
    <td>$b_i(y_t)$</td>
    <td>$\Pr(y_t | x_i)$</td>
    <td>Emission probability</td>
  </tr>
  <tr>
    <td>$\alpha_t(i)$</td>
    <td>$\Pr(y_{1:t}, x_i)$</td>
    <td>Forward probability</td>
  </tr>
  <tr>
    <td>$\beta_t(i)$</td>
    <td>$\Pr(y_{t:T} | x_i)$</td>
    <td>Backward probability</td>
  </tr>
</table>
<br>
<br>


Furthermore we have the following Markov properties:<br><br>

\begin{equation}
    \Pr(y_t | x_j, x_i, y_{1:t-1}) = \Pr(y_t | x_j) 
\end{equation}

\begin{equation}
    \Pr(x_j | x_i, y_{1:t-1}) = \Pr(x_i | x_i) 
\end{equation}
<br>

With these we can derive recursive formulas for computing $\alpha_t(i)$ and $\beta_t(i)$.
<br>
<br>

**Base case for forward probability**
<br>
<br>

\begin{align*}
    &\phantom{=}{} \text{Definition}\\
    \alpha_{1}(j) &={} \Pr(x_j, y_{1:1}) \\
                  &={} \Pr(x_j, y) \\
                  &={} \Pr(x_j) \Pr(y | x_j) \\
                  &={} \pi_j b_j(y)
\end{align*}

<br>
<br>
**Recursive case for forward probability**
<br>
<br>

\begin{align*}
    &\phantom{=}{} \text{Definition}\\
    \alpha_{t}(j) &={} \Pr(x_j, y_{1:t}) \\
    &\phantom{=}{} \text{Marginal distribution}\\
                      &={} \sum_{i=1}^N \Pr(x_j, x_i, y_{1:t})\\
    &\phantom{=}{} \text{Chain rule of probability}\\
                      &={} \sum_{i=1}^N \Pr(y_t | x_j, x_i, y_{1:t-1}) \Pr(x_j | x_i, y_{1:t-1}) \Pr(x_i, y_{1:t-1}) \\
    &\phantom{=}{} \text{Markov assumptions}\\
                      &={} \sum_{i=1}^N \Pr(y_t | x_j) \Pr(x_j | x_i) \Pr(x_i, y_{1:t-1}) \\
    &\phantom{=}{} \text{Definitions}\\
                      &={} \sum_{i=1}^N b_j(y_t) a_{ij} \alpha_{t-1}(i)
\end{align*}


<br>
<br>
**Base case for backward probability**
<br>
<br>

$\beta_T(i) = 1$

where $T$ is the length of the input sequence.


<br>
<br>
**Recursive case for backward probability**
<br>
<br>

Let $1 \leq t < T-1$.

\begin{align*}
    &\phantom{=}{} \text{Definition}\\
            \beta_t(i) &={} \Pr(y_{t+1:T} | x_i)\\
    &\phantom{=}{} \text{Marginal distribution}\\
                       &={} \sum_{j=1}^N \Pr(y_{t+1:T}, x_j | x_i)\\
                       &={} \sum_{j=1}^N \Pr(y_{t+1}, y_{t+2:T}, x_j | x_i)\\
                         % p(x, y | z) = p(x | y, z) p (y | z)
    &\phantom{=}{} \text{Chain rule of probability}\\
                       &={} \sum_{j=1}^N \Pr(y_{t+2:T} | y_{t+1},  x_j, x_i) \Pr(y_{t+1} |  x_j, x_i) \Pr(x_j | x_i)\\
    &\phantom{=}{} \text{Markov assumptions}\\
                       &={} \sum_{j=1}^N \Pr(y_{t+2:T} | x_j) \Pr(y_{t+1} |  x_j) \Pr(x_j | x_i)\\
    &\phantom{=}{} \text{Definitions}\\
                       &={} \sum_{j=1}^N \beta_{t+1}(j) b_j(y_{t+1}) a_{ij}
\end{align*}

<br>
<br>
Why is $\beta_T(i) = 1$? Consider the following derivation:
<br>
<br>

\begin{align*}
    &\phantom{=}{} \text{Definition}\\
         \beta_{T-1}(i) &={} \Pr(y_T | x_i)\\
    &\phantom{=}{} \text{Marginal distribution}\\
                        &={} \sum_{j=1}^N \Pr(y_T, x_j | x_i)\\
    &\phantom{=}{} \text{Chain rule of probability}\\
                        &={} \sum_{j=1}^N \Pr(y_T |  x_j, x_i) \Pr(x_j | x_i)\\
    &\phantom{=}{} \text{Markov assumptions}\\
                        &={} \sum_{j=1}^N \Pr(y_T |  x_j) \Pr(x_j | x_i)\\
    &\phantom{=}{} \text{Definitions}\\
                       &={} \sum_{j=1}^N  b_j(y_{T}) a_{ij}
\end{align*}

This matches the form $\sum_{j=1}^N \beta_{t+1}(j) b_j(y_{t+1}) a_{ij}$ with $\beta_{t+1}(j) = 1$. So to capture $\beta_{T-1}(i)$ with the same equation as for $1 \leq t < T-1$, we need to define $\beta_T(i) = 1$.


In [1]:
Xi = list(range(10))
D = [1,2,2,2,3,7,7,8,9,9]

p = 1
for x in Xi:
    for y in D:
        if x == y:
            p*=x
print(p)

p = 1
for x in Xi:
    p*=(x**len([y for y in D if x == y]))
print(p)


p = 1
for x in D:
    p*=(x**len([y for y in Xi if x == y]))
print(p)

762048
762048
762048


In [2]:
import numpy as np
np.seterr(under='raise')

{'divide': 'warn', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}

Define helper functions and classes for processing training data.

In [3]:
from collections import Counter, deque
import numpy as np
from collections import Counter


def make_sequences(path, prj_idcs):
    with open(path, 'r') as f:
        sequence = []
        for line in f:
            line = line.strip()
            if line:
                line = line.split()
                sequence.append(tuple([line[i] for i in prj_idcs]) if len(
                    prj_idcs) > 1 else line[prj_idcs[0]])
            else:
                yield tuple(sequence)
                sequence = []


def make_ngrams(sequences, n, n_edge_marks=0):
    """
    produce ngrams from a sequence.

    Parameters
    ----------
    sequence : iterable
        Input sequence.
    n : int
        size of the ngrams.
    n_edge_marks : int, optional
        number of BOS and EOS marks to surround the sequence with

    Yields
    ------
    ngram : tuple
    """
    if n <= 0:
        raise ValueError('ngram size <= 0 is invalid.')
    if n_edge_marks < 0:
        raise ValueError('n_edge_marks < 0 is invalid.')

    for sequence in sequences:
        sequence = tuple(['<BOS>']*(n_edge_marks)) + \
            sequence + \
            tuple(['<EOS>']*n_edge_marks)
        ngram = deque()
        for token in sequence[:n]:
            ngram.append(token)
        for token in sequence[n:]:
            yield tuple(ngram) if n > 1 else ngram[0]
            ngram.append(token)
            ngram.popleft()
        yield tuple(ngram) if n > 1 else ngram[0]


def count_ngrams(sequence, n, n_edge_marks):
    return Counter(make_ngrams(sequence, n, n_edge_marks))

from frozendict import frozendict
from itertools import chain

class X2ID:
    """
    maps item of type X to IDs and vice versa.
    only works in Python 3.7+.
    """

    def __init__(self, *containers, unknown=None):
        self.unknown = unknown
        self.d = frozendict({s: i for i, s in enumerate(chain(*containers))})

    def get_value(self, n):
        """
        This sucks. Needs C++ style iterators.
        """
        if n < 0:
            n += len(self.d)
        for i, key in enumerate(self.d.keys()):
            if i == n:
                return key
        raise IndexError("dictionary index out of range")

    def __getitem__(self, item):
        try:
            return self.get_value(item) if type(item) is int else self.d[item]
        except:
            return self.d[self.unknown]
        
        
from typing import Dict
import pandas as pd
from scipy.sparse import csr_matrix


def make_count_matrix(counts: Dict, smoothing_constant=0):

    row_indxs = []
    col_indxs = []
    dat_values = []

    for i, ((a, b), cnt) in enumerate(counts.items(), 1):

        col_indxs.append(a)
        row_indxs.append(b)
        dat_values.append(cnt)

    return csr_matrix((dat_values, (row_indxs, col_indxs))).toarray()+smoothing_constant

Define a Bimonoid that determines the arithmetic operations performed by the HMM.

In [4]:
from scipy.special import logsumexp


def logdot(a, b):
    max_a, max_b = np.max(a), np.max(b)
    exp_a, exp_b = a - max_a, b - max_b
    np.exp(exp_a, out=exp_a)
    np.exp(exp_b, out=exp_b)
    c = np.dot(exp_a, exp_b)
    np.log(c, out=c)
    c += max_a + max_b
    return c


# https://www.informatik.uni-leipzig.de/~droste/papers/Droste-Stueber-Vogler-final.pdf
class Bimonoid():
    def add(self, *args):
        """abstract addition"""
        raise NotImplementedError

    def zeros(self, *args):
        """identity for addition"""
        raise NotImplementedError

    def ones(self, *args):
        """identity for multiplication"""
        raise NotImplementedError

    def mul(self, a, b):
        """abstract multiplication"""
        raise NotImplementedError

    def scale(self, a):
        """maps `a` to the elements of the set
        the bimonoid is defined over."""
        raise NotImplementedError


class LogBimonoid(Bimonoid):
    """`mul` is not distributive if `mul` is matrix
    logsumexp and `add` is elementwise addition. So
    this forms a bimonoid."""

    def add(self, *args, **kwargs):
        return np.add(*args, **kwargs)

    def zeros(self, *args):
        return np.zeros(*args)

    def ones(self, *args):
        return np.ones(*args)

    def mul(self, a, b):
        return logdot(a, b)

    def scale(self, a):
        return np.log(a)

Define the HMM class.

In [5]:
import numpy as np
from numpy.random import dirichlet
import matplotlib.pyplot as plt
from functools import reduce


class HMM:
    def __init__(self, bimonoid=None, n_hidden=None, n_observed=None):
        if not bimonoid:
            bimonoid = LogBimonoid()
        self.__bimonoid = bimonoid
        if n_hidden:
            self.n_hidden = n_hidden
            self.__initial = self.bimonoid.scale(
                dirichlet(np.ones(n_hidden), size=1))
            # treat column index as FROM and row index as TO
            self.__transitions = self.bimonoid.scale(
                dirichlet(np.ones(n_hidden), size=n_hidden))
            if n_observed:
                self.__emissions = self.bimonoid.scale(dirichlet(
                    np.ones(n_hidden), size=n_observed))

    @property
    def bimonoid(self):
        return self.__bimonoid

    @property
    def initial(self):
        return self.__initial

    @initial.setter
    def initial(self, x):
        self.__initial = x

    @property
    def transitions(self):
        return self.__transitions

    @transitions.setter
    def transitions(self, x):
        assert(x.shape[0] == x.shape[1])
        self.n_hidden = x.shape[0]
        self.__transitions = x

    @property
    def emissions(self):
        return self.__emissions

    @emissions.setter
    def emissions(self, x):
        self.__emissions = x

    def forward(self, idcs):

        t = len(idcs)

        trellis = self.bimonoid.zeros((t, self.n_hidden))
        trellis[0] = self.bimonoid.add(self.initials, self.emissions[idcs[0]])

        for i, idx in enumerate(idcs[1:t]):
            trellis[i+1] = self.bimonoid.add(self.bimonoid.mul(
                self.transitions, trellis[i]), self.emissions[idx])

        return np.sum(trellis[-1]), trellis

    def backward(self, idcs):

        trellis = self.bimonoid.ones((len(idcs), self.n_hidden))

        for i, idx in zip(range(len(idcs)-2, -1, -1), idcs[-1:0:-1]):
            trellis[i] = self.bimonoid.add(self.bimonoid.mul(
                self.transitions, trellis[i+1]), self.emissions[idx])

        return np.sum(self.bimonoid.add(self.bimonoid.add(
            trellis[0], self.initials), self.emissions[idcs[0]])), trellis

    def viterbi(self, idcs):

        # stores probabilities for states at current timestep
        trellis = self.bimonoid.zeros((len(idcs), self.n_hidden))
        # stores indices in rows, each of which denotes which hidden
        # variable for the last observed variable makes the current
        # hidden variable most likely
        backpointers = self.bimonoid.zeros((len(idcs), self.n_hidden))
        trellis[0] = self.bimonoid.add(self.initials, self.emissions[idcs[0]])

        for i, idx in enumerate(idcs[1:]):

            tmp = self.bimonoid.add(self.bimonoid.add(
                self.transitions, trellis[i]), self.emissions[idx][:, np.newaxis])

            trellis[i+1] = np.max(tmp, axis=1)
            backpointers[i+1] = np.argmax(tmp, axis=1)

        sq = []  # sequence of hidden variables
        i = np.argmax(trellis[-1])
        for row in backpointers[::-1]:
            sq.append(i)
            i = int(row[i])

        return sq[::-1]

Count the frequency of unigrams, bigrams and pairs of tokens and tags.

In [6]:
path = 'data_tiger_annotated.txt'
tag_sqs = list(make_sequences(path, [1]))
token_tag_sqs = list(make_sequences(path, [0, 1]))
token_tag_sqs.append((('<ukn>', 'XY'),))

TgUgCnts = count_ngrams(tag_sqs, 1, 2)  # tag unigram counts
TgBgCnts = count_ngrams(tag_sqs, 2, 2)  # tag bigram counts
TkTgCnts = count_ngrams(token_tag_sqs, 1, 0)  # token-tag pair counts

tags = list(TgUgCnts.keys())
tokens = list(zip(*TkTgCnts.keys()))[0]

Tg2id = X2ID(tags)  # maps tags to IDs
Tk2id = X2ID(tokens, unknown='<ukn>')  # maps tokens to IDs

# make new count dicts with IDs rather than strings
TgUgCnts = {Tg2id[tag]: cnt for tag, cnt in TgUgCnts.items()}
TgBgCnts = {tuple(Tg2id[tag] for tag in TgTuple)
                  : cnt for TgTuple, cnt in TgBgCnts.items()}
TkTgCnts = {(Tk2id[tkn], Tg2id[tag]): cnt for (
    tkn, tag), cnt in TkTgCnts.items()}

# store counts of the unigrmas, bigrams and pairs of tags and
# tokens in co-occurence matrices
TgUgVec = np.array(list(TgUgCnts.values()))
TgBgMat = make_count_matrix(TgBgCnts, 1)
TkTgMat = make_count_matrix(TkTgCnts, 1).T  # tranpose, so tags are columns

Next we need to generate probability distributions for unigrmas, bigrams and tag-token paris. First we will derive the correct way to do this:

Let $X$ be a random variable distributed according to a PMF $p$ parametrized by unkowm probabilities $\Theta = \{ \Theta_1, \dots, \Theta_m \}$ such that $p(X_i | \Theta) = \Theta_i$ and $\sum_{i=1}^n p(X_i \mid \Theta) = 1$. Let $D = \{X_1, \dots, X_n\}$ be a set of i.i.d. data points. Let $\hat{\theta} = \underset{\theta}{\text{argmax}}~ p(D \mid \theta)$, where $\theta = \{\theta_1, \dots, \theta_n\}$ and $\sum_i \theta_i = 1$. Now we want to maximize $p(D \mid \theta) = \prod_{i=1}^n p(X_i | \theta)$ to arrive at the *maximum likelihood estimate* $\hat{\theta}$ for $D$. Note that we can write $p(X_i | \theta) = \theta_i$ as $\prod_{j=1}^m \theta_j^{1(x_i = x_j)}$. So we have

\begin{align*}
    p(D | \theta) &={} \prod_{i=1}^n \theta_i\\
                  &={} \prod_{i=1}^n \prod_{j=1}^m \theta_j^{1(x_i = x_j)}\\
                  &={} \prod_{j=1}^m \prod_{i=1}^n \theta_j^{1(x_i = x_j)}\\
                  &={} \prod_{j=1}^m \theta_j^{\sum_{i=1}^n 1(x_i = x_j)}\\
                  &={} \prod_{j=1}^n \theta_j^{|\{i : x_i = x_j \}|}\\
                  &=:{} \prod_{j=1}^n \theta_j^{c_j}\\
\end{align*}

Note that $c_j$ is simply the number of occurences of $x_j$ in $D$. Furthermore, due to the monotonicity of the logarithm we have  $\underset{\theta}{\text{argmax}}~ p(D \mid \theta) = \underset{\theta}{\text{argmax}}~ \log p(D \mid \theta)$. Also $\underset{\theta}{\text{argmax}}~ f = \underset{\theta}{\text{argmax}}~ a f$ for some function $f$ and a constant $a$. Hence:


\begin{align*}
    \log p(D \mid \theta) &={} \log \prod_{j=1}^n \theta_j^{c_j} \\
                           &={} \sum_{j=1}^n c_j \log \theta_j\\
                           &\iff{} \\
     \frac{1}{n} \log p(D \mid \theta) &={} \sum_{j=1}^n \frac{c_j}{n} \log \theta_j\\
\end{align*}

Now let $q_j = \frac{c_j}{n}$. Note that $\sum_{j=1}^m q_j = 1$, since $\sum_{j=1}^m c_j = n$ by definition of $c_j$.

\begin{align*}
    \sum_{j=1}^m c_j &{}= \sum_{j=1}^m \sum_{i=1}^n 1(x_i = x_j)\\
                     &{}= \sum_{i=1}^n \sum_{j=1}^m 1(x_i = x_j)\\
                     &{}= \sum_{i=1}^n 1\\
                     &{}= n\\
\end{align*}

This means $q$ is also a PMF. So now we have  $\sum_{j=1}^n q_j \log \theta_j$, which is the negative cross entropy $ - H[q,\theta]$ of $q$ and $\theta$.
Note that the KL-Divergence of distributions $P$ and $Q$ is equal to the difference of the cross entropy of $P$ and $Q$ and the entropy of $P$: $D(P \mid \mid Q) = H[P, Q] - H[P]$. Thus $-H[P, Q] = - D(P \mid \mid Q) - H[P]$ and so we end up with


\begin{align*}
     \sum_{j=1}^n q_j \log \theta_j &={} - H[q, \theta]\\
                                    &={} -D(q \mid \mid \theta) - H[q]\\
                                    % &={} \sum_{j=1}^n q_j \log \frac{q_j}{\theta_j} + \sum_{j=1}^n q_j \log q_j
\end{align*}

Note that we are maximizing over $\theta$ but $H[q]$ does not depend on $\theta$. Thus we now have

\begin{align*}
\underset{\theta}{\text{argmax}}~ p(D \mid \theta) &={} \underset{\theta}{\text{argmax}}~ -D(q \mid \mid \theta)\\
                                                           &={} \underset{\theta}{\text{argmin}}~ D(q \mid \mid \theta)\\
\end{align*}

Now note that $D(P \mid \mid Q) \geq 0$ and $D(P \mid \mid Q) = 0 \iff H[P, Q] - H[P] = 0 \iff H[P, Q] = H[P] \iff P = Q$. Hence
$$\underset{\theta}{\text{argmin}}~ D(q \mid \mid \theta) = q$$

Recall that $q = \left\{\frac{c_1}{n}, \dots, \frac{c_n}{n}\right\}$, where $\frac{c_j}{n}$ is the relative frequency of $x_j$ in $D$.

In [7]:
initial_probs = TgUgVec/(np.sum(TgUgVec)) # MLE for unigrams
transition_probs = TgBgMat/(np.sum(TgBgMat, axis=0)) # MLEs for bigrams
emission_probs = TkTgMat/(np.sum(TkTgMat, axis=0)) # MLEs for pairs of tokens and tags

Set up a model and run a test.

In [8]:
hmm = HMM()

hmm.initials = np.log(initial_probs)
hmm.transitions = np.log(transition_probs)
hmm.emissions = np.log(emission_probs)

sentence = 'Der Viterbi-Algorithmus ist ein Algorithmus der dynamischen Programmierung zur Bestimmung der wahrscheinlichsten Sequenz von verborgenen Zuständen bei einem gegebenen Hidden Markov Model (HMM) und einer beobachteten Sequenz von Symbolen .'
sentence = sentence.split()
[(Tg2id[int(s)], w) for s, w in zip(hmm.viterbi([Tk2id[w] for w in sentence]), sentence)]

[('ART', 'Der'),
 ('NN', 'Viterbi-Algorithmus'),
 ('VAFIN', 'ist'),
 ('ART', 'ein'),
 ('NN', 'Algorithmus'),
 ('ART', 'der'),
 ('ADJA', 'dynamischen'),
 ('NN', 'Programmierung'),
 ('APPRART', 'zur'),
 ('NN', 'Bestimmung'),
 ('ART', 'der'),
 ('ADJA', 'wahrscheinlichsten'),
 ('NN', 'Sequenz'),
 ('APPR', 'von'),
 ('ART', 'verborgenen'),
 ('NN', 'Zuständen'),
 ('APPR', 'bei'),
 ('ART', 'einem'),
 ('ADJA', 'gegebenen'),
 ('NN', 'Hidden'),
 ('APPR', 'Markov'),
 ('ART', 'Model'),
 ('NN', '(HMM)'),
 ('KON', 'und'),
 ('ART', 'einer'),
 ('ADJA', 'beobachteten'),
 ('NN', 'Sequenz'),
 ('APPR', 'von'),
 ('NE', 'Symbolen'),
 ('$.', '.')]

Evaluate the computational performance of the mode.

In [None]:
hmm = HMM()

hmm.initials = np.log(initial_probs)
hmm.transitions = np.log(transition_probs)
hmm.emissions = np.log(emission_probs)

sentences = [[Tk2id[tkn] for tkn in sentence] for sentence in make_sequences(path, [0])][:10000]

def f():
    for i, sentence in enumerate(sentences):
        hmm.forward(sentence)
        print(i, end='\r')
        
%time f()