In [2]:
import numpy as np
import matplotlib.pyplot as plt

## Encoder

Same as in Exercise 4. Included just for completeness.

In [8]:
class Encoder:
    """Encode a set of N symbols (strings) into integer IDs from {0, 1, ..., N-1}

    The encoder can be frozen at some point and then no new symbol can be added.
    When a frozen encoder gets a request for an unknown symbol, it raises an
    exception.
    """

    def __init__(self, symbols: list = None, frozen: bool = False):
        """Create a new encoder with optional list of symbols.

        Args:
            symbols (list[str]): list of symbols (default is None)
            frozen (bool): whether the encoder includes new symbols passed to method `get_id(s)` (default is False)
        """
        self.symbol_to_id: dict[str, int] = {}
        self.id_to_symbol: list[str] = []
        self.frozen: bool = False

        if symbols is not None:
            for s in symbols:
                self.get_id(s)

        if frozen:
            self.frozen = True

    def get_id(self, s: str) -> int:
        """return the ID (integer) corresponding to symbols `s`.

        If symbols `s` has not been encoded already and `self.frozen == False`,
        then include `s` in the mapping, assign a new ID for it, and return it.
        If symbols `s` has not been encoded already and `self.frozen == True`,
        raise an exception.

        Args:
            s (str): symbol to be encoded

        Returns:
            int: ID of the encoded symbol
        """
        if s not in self.symbol_to_id:
            if self.frozen:
                raise ValueError(f"Symbol {s} not in frozen encoder {self}")

            new_id = len(self.id_to_symbol)
            self.symbol_to_id[s] = new_id
            self.id_to_symbol.append(s)
            return new_id

        return self.symbol_to_id[s]

    def get_symbol(self, id: int) -> str:
        """return the symbols associated with `id`

        Args:
            id (int): ID of the symbol

        Returns:
            str: the symbol associated with the given ID
        """
        return self.id_to_symbol[id]

    def __repr__(self):
        return f"Encoder: {self.symbol_to_id}"

In [9]:
# define the set of observations (x_enc) and the set of states (y_enc)
x_enc = Encoder(symbols=["Walk", "Shop", "Clean"], frozen=True)
y_enc = Encoder(symbols=["Sunny", "Rainy"], frozen=True)
print(x_enc)
print(y_enc)

Encoder: {'Walk': 0, 'Shop': 1, 'Clean': 2}
Encoder: {'Sunny': 0, 'Rainy': 1}


## HMM
Define the model parameters:
- $\pi_i$ is the start probability for state $i$: $P(y_1=i)$
- $a_{ij}$ is the transition probabilitiy from state $i$ to state $j$: $P(y_t=j|y_{t-1}=i)$
- $b_j(k)$ is the emission probability of observation $k$ when in state $j$: $P(x_t=k|y_t=j)$

In [10]:
pi = np.array([0.6, 0.4])

a = np.array(
    [
        [0.7, 0.3],  # transitions a_0j (from state 0 (Sunny) to some state j)
        [0.4, 0.6],  # transitions a_1j (from state 1 (Rainy) to some state j)
    ]
)

b = np.array(
    [
        [0.6, 0.3, 0.1],  # emissions b_0(k) (in state 0 (Sunny) emits k)
        [0.1, 0.4, 0.5],  # emissions b_1(k) (in state 1 (Rainy) emits k)
    ]
)

## Solution to bonus question: Generate Data with an HMM
Now, using the HMM defined above, we generate a dataset 
  $\mathcal D = \{(X^{(m)},Y^{(m)})\}_{m=1,\ldots,M}$ 
  comprising $M$ pairs $(X^{(m)},Y^{(m)})$
  using the HMM.

In [5]:
def generate(pi, a, b, M):
    rng = np.random.default_rng()

    num_examples = M
    num_states = pi.size
    num_observations = b.shape[1]
    dataset = []  # the dataset D
    for _ in range(num_examples):
        x = []
        y = []

        # initial state
        y.append(rng.choice(num_states, p=pi))
        # observation at first state y[-1]
        x.append(rng.choice(num_observations, p=b[y[-1]]))

        # generate the remaining 4 elements
        for _ in range(4):
            # next state given previous state y[-1]
            y.append(rng.choice(num_states, p=a[y[-1]]))
            # observation at current state y[-1]
            x.append(rng.choice(num_observations, p=b[y[-1]]))

        # store the generated pair
        dataset.append((x, y))

    return dataset

In [None]:
# TODO: for next course change parameters of HMM to be estimated, such that it is more interesting ...

In [6]:
dataset = generate(pi, a, b, 1000)
ds = "\n".join([str(s) for s in dataset[:3]])
print(f"D[:3]:\n{ds}")

D[:3]:
([1, 0, 2, 2, 2], [0, 0, 0, 1, 1])
([0, 0, 0, 2, 1], [0, 0, 0, 1, 0])
([1, 1, 0, 0, 0], [0, 1, 0, 0, 0])


Saving data for exercises.

In [7]:
x = []
y = []

for data in dataset:
    x.append(data[0])
    y.append(data[1])

x = np.array(x)
y = np.array(y)

np.savez("exercise-02-data.npz", x=x, y=y)

## Maximum Likelihood Estimation (MLE)
Now, let us estimate the parameters $\theta=(\pi,a,b)$ of an HMM 
  by maximizing the likelihood of a given dataset $\mathcal D = \{(X,Y)\}$: 
$$
\max_\theta \prod_{(X,Y) \in \mathcal D} P(X,Y;\theta)
$$
Remember from the lecture that the HMM parameters that maximize the likelihood are given as the following.

For the start probabilities:
$$
\pi_i = \frac{\sum_{Y \in \mathcal D} [[y_1 = i]]}{M},
$$
where 
  $[[c]]$ is the indicator function (it is equal to $1$ if $c$ is true and $0$ otherwise),
  and $M$ is the number of examples in $D$.

For the observation probabilities:
$$
b_j(k) = \frac{\sum_{(X,Y) \in \mathcal D} \sum_{t=1}^T [[y_t=j \text{ and } x_t=k]]}{\sum_{Y \in \mathcal D} \sum_{t=1}^T [[y_t=j]]},
$$

And, finally, for the transition probabilities:
$$
a_{ij} = \frac{\sum_{Y \in \mathcal D} \sum_{t=2}^T [[y_{t-1}=i \text{ and } y_t=j]]}{\sum_{Y \in \mathcal D} \sum_{t=2}^T [[y_{t-1}=i]]},
$$


Load data from disk and transform to expected dataset structure.

In [4]:
data = np.load("exercise-05-data.npz")
x = data["x"]
y = data["y"]

dataset = []
t: tuple[np.ndarray]
for t in zip(x, y):
    dataset.append([t[0].tolist(), t[1].tolist()])


ds = "\n".join([str(s) for s in dataset[:3]])
print(f"D[:3]:\n{ds}")

D[:3]:
[[1, 0, 2, 2, 2], [0, 0, 0, 1, 1]]
[[0, 0, 0, 2, 1], [0, 0, 0, 1, 0]]
[[1, 1, 0, 0, 0], [0, 1, 0, 0, 0]]


In [5]:
def estimate_mle(dataset, num_states, num_observations):
    # create new parameters with zeros
    pi_est = np.zeros((num_states))
    a_est = np.zeros((num_states, num_states))
    b_est = np.zeros((num_states, num_observations))

    for x, y in dataset:
        # count number of start states
        pi_est[y[0]] += 1
        # consider emission on first state
        b_est[y[0], x[0]] += 1
        # count transitions and emissions
        for t in range(1, len(y)):
            # emission
            b_est[y[t], x[t]] += 1
            # transition
            a_est[y[t - 1], y[t]] += 1

    # normalize counts
    pi_est = pi_est / len(dataset)  # M
    a_est = a_est / a_est.sum(axis=1, keepdims=True)
    b_est = b_est / b_est.sum(axis=1, keepdims=True)

    return pi_est, a_est, b_est

In [11]:
pi_est, a_est, b_est = estimate_mle(dataset, pi.size, b.shape[1])

### Estimated vs Original Parameters
Compare the estimated parameters with the original parameters used to generate the data.

In [12]:
print("Original:")
print(f"pi:\n{pi}\n")
print(f"a:\n{a}\n")
print(f"b:\n{b}\n")

Original:
pi:
[0.6 0.4]

a:
[[0.7 0.3]
 [0.4 0.6]]

b:
[[0.6 0.3 0.1]
 [0.1 0.4 0.5]]



In [13]:
print("Estimated:")
print(f"pi_est:\n{pi_est}\n")
print(f"a_est:\n{a_est}\n")
print(f"b_est:\n{b_est}\n")

Estimated:
pi_est:
[0.596 0.404]

a_est:
[[0.69646751 0.30353249]
 [0.3960164  0.6039836 ]]

b_est:
[[0.61066574 0.28895085 0.10038341]
 [0.09948381 0.39699672 0.50351947]]

