In [1]:
import numpy as np
import warnings
import torch

In [2]:
class HMM:
    """
    Creates augmented HMM adapted to this tutorial for proof of concept.
    Single precendence constraint and two auxillary variables to be updated. This is for precedence.
    """

    def __init__(self, tmat, init_dist, emit):
        """
        all three objects should be np arrays
        If n is # hidden states, m # observation states, then:
        tmat: (n,n)
        init_dist: (n,)
        emit: (n,m)
        updater: list of updaters
        constraint: constraint function
        """
        if tmat.shape[0] != init_dist.shape[0]:
            warnings.warn(
                "transition and initial distribution have different state space sizes",
                UserWarning,
            )
        self.tmat = tmat
        self.init_dist = init_dist
        self.emit = emit

    def random_draw(self, p):
        """
        p is a 1D np array.
        single random draw from p and encode as 1-hot.
        """
        n = len(p)
        draw = np.random.choice(n, p=p)
        one_hot = np.zeros(n, dtype=int)
        one_hot[draw] = 1
        return one_hot

    def simulation(self, time):
        """
        generates a full run for specified time.
        """

        # Generate (X1,Y1)
        x_prev = self.random_draw(self.init_dist)
        x_list = [x_prev]
        y_list = [self.random_draw(x_prev @ self.emit)]

        # Generate rest
        for t in range(time - 1):
            x_curr = self.random_draw(x_prev @ self.tmat)
            y_curr = self.random_draw(x_curr @ self.emit)
            x_list.append(x_curr)
            y_list.append(y_curr)
            x_prev = x_curr

        return x_list, y_list

    # def viterbi(self,):

In [3]:
def softmax(x):
    c = np.exp(x)
    z = c.sum()
    return c / z

In [4]:
tmat = np.random.randn(3, 3)
emit = np.random.randn(3, 2)

# tmat1 = torch.from_numpy(tmat)
tmat = np.apply_along_axis(softmax, 1, tmat)
emit = np.apply_along_axis(softmax, 1, emit)

tmat.sum(axis=1)

array([1., 1., 1.])

In [5]:
init_prob = tmat[0, :]

In [6]:
def update_counter(z, x, i):
    ret_val = z
    if x[i] == 1:
        ret_val = ret_val + 1
    return max(ret_val, 1)


def time_constraint(z1, z2):
    return z1 >= z2


def constraint_checker(x, i, j):
    truth_val = True
    cA = 0
    cB = 0
    for t in range(len(x)):
        cA = update_counter(cA, x[t], 0)
        cB = update_counter(cA, x[t], 1)
        truth_val = time_constraint(cA, cB) and truth_val
    return truth_val

In [35]:
hmm1 = HMM(tmat, init_prob, emit)

Simple HMM.

We're going with a 3-state HMM with:

1. Uniform transition matrix
2. Binary emissions with emission matrix: $\begin{bmatrix} .8 & .2\\ .2 & .8\\ .5 & .5 \end{bmatrix}$

We're going to additionally impose the constraint that state $0$ must happen before state $1$. To make things compact, we're going use just a single mediator $Y$ that tracks if state $0$ has happened yet. Then, if $Z_t = 1$ but $Y_t = 0$ (so $1$ happens before $0$) the constraint is violated.

Our augmented state space has 6 states corresponding to $(Z,Y)$, and our modified emissions matrix will be a $6 \times 4$ matrix: 6 hiddens states and $(X,C)$ emissions - 4 in total.

row/cols are indexed in dictionary order: $(0,0), (1,0),\cdots, (2,1)$. The first index is the orginal hidden state, the second is the tracker for if state $0$ has happened yet. Note the state $(0,0)$ is impossible and we set it as an absorbing state. Similarly, the rows of the emission matrix are indexed as $(0,0),(1,0),\cdots,(1,1)$ where the first index is the origina emission and the second the truth value of the constraints.

In [2]:
def viterbi(A, C, B, O):
    """Viterbi algorithm for solving the uncovering problem

    Notebook: C5/C5S3_Viterbi.ipynb

    Args:
        A (np.ndarray): State transition probability matrix of dimension I x I
        C (np.ndarray): Initial state distribution  of dimension I
        B (np.ndarray): Output probability matrix of dimension I x K
        O (np.ndarray): Observation sequence of length N

    Returns:
        S_opt (np.ndarray): Optimal state sequence of length N
        D (np.ndarray): Accumulated probability matrix
        E (np.ndarray): Backtracking matrix
    """
    I = A.shape[0]  # Number of states
    N = len(O)  # Length of observation sequence

    # Initialize D and E matrices
    D = np.zeros((I, N))
    E = np.zeros((I, N - 1)).astype(np.int32)
    D[:, 0] = np.multiply(C, B[:, O[0]])

    # Compute D and E in a nested loop
    for n in range(1, N):
        for i in range(I):
            temp_product = np.multiply(A[:, i], D[:, n - 1])
            D[i, n] = np.max(temp_product) * B[i, O[n]]
            E[i, n - 1] = np.argmax(temp_product)

    # Backtracking
    S_opt = np.zeros(N).astype(np.int32)
    S_opt[-1] = np.argmax(D[:, -1])
    for n in range(N - 2, -1, -1):
        S_opt[n] = E[int(S_opt[n + 1]), n]

    return S_opt, D, E

In [5]:
tmat = np.array(
    [
        [1, 0, 0, 0, 0, 0],
        [0, 1 / 3, 1 / 3, 1 / 3, 0, 0],
        [0, 1 / 3, 1 / 3, 1 / 3, 0, 0],
        [0, 0, 0, 1 / 3, 1 / 3, 1 / 3],
        [0, 0, 0, 1 / 3, 1 / 3, 1 / 3],
        [0, 0, 0, 1 / 3, 1 / 3, 1 / 3],
    ]
)
# row/cols are indexed in dictionary order: (0,0), (1,0),...(2,1)
#

emit = np.array(
    [
        [0, 0, 0.8, 0.2],
        [0.2, 0.8, 0, 0],
        [0, 0, 0.5, 0.5],
        [0, 0, 0.8, 0.2],
        [0, 0, 0.2, 0.8],
        [0, 0, 0.5, 0.5],
    ]
)
emit

array([[0. , 0. , 0.8, 0.2],
       [0.2, 0.8, 0. , 0. ],
       [0. , 0. , 0.5, 0.5],
       [0. , 0. , 0.8, 0.2],
       [0. , 0. , 0.2, 0.8],
       [0. , 0. , 0.5, 0.5]])

Intialize with uniform distribution over all original hidden states. Note that if we initialize to $0$, then we also initialize to $Y = 1$.

In [6]:
init_prob = np.array([0, 1 / 3, 1 / 3, 1 / 3, 0, 0])
init_prob

array([0.        , 0.33333333, 0.33333333, 0.33333333, 0.        ,
       0.        ])

We observe the original sequence below. Since we also condition on the constraints being true, we shfit by two ie. $i \rightarrow (i,1)$

In [7]:
init_prob

array([0.        , 0.33333333, 0.33333333, 0.33333333, 0.        ,
       0.        ])

In [8]:
og_obs = [1, 1, 0, 1, 1, 0, 1]
shift_obs = np.array(og_obs) + 2
shift_obs

array([3, 3, 2, 3, 3, 2, 3])

In [18]:
# Define model parameters
A = tmat
C = init_prob
B = emit
O = shift_obs

# O = np.array([1]).astype(np.int32)
# O = np.array([1, 2, 0, 2, 2, 1]).astype(np.int32)

# Apply Viterbi algorithm
S_opt, D, E = viterbi(A, C, B, O)

# Now convert expanded states into original states

S_convert = S_opt % 3


print("Observation sequence:   O = ", O)
print("Optimal Augmented Hidden State: S_aug = ", S_opt)
print("Optimal Original Hidden State: S = ", S_convert)

Observation sequence:   O =  [3 3 2 3 3 2 3]
Optimal Augmented Hidden State: S_aug =  [2 2 3 4 4 3 4]
Optimal Original Hidden State: S =  [2 2 0 1 1 0 1]


In [115]:
X, Y = np.meshgrid([0, 1, 2], [0, 1])

In [12]:
def f(x, y):
    return x + y

In [17]:
f(X, Y).max().item()

3

In [36]:
def viterbi(tmat, init_prob, emit, y_fun, c_fun, obs):
    """
    tmat and emit are the transition and emission matrices respectively.
    init_prob is the starting prob, a 1D array.
    y_fun is the function p(y_t|z_t,y_{t-1}): 3 arguments
    c_fun is a function that checkes if z_t, y_t is consistent with c_t = 1: 2 arguments

    obs is a list of integers
    """
    T = len(obs)
    n_s, n_e = tmat.shape[1], emit.shape[1]
    V = np.zeros((T, n_s, 2))
    E = np.zeros((T - 1, n_s, 2))
    x = obs

    Z, Y = np.meshgrid(list(range(n_s)), list(range(2)))

    for k in range(n_s):
        for r in range(2):
            V[0, k, r] = init_prob[k] * emit[k, x[0]] * c_fun(k, r)

    for t in range(1, T):
        for k in range(n_s):
            for r in range(2):
                val_fun = (
                    lambda z, y: tmat[z, k]
                    * V[t - 1, z, y]
                    * emit[k, x[t]]
                    * y_fun(r, z, y)
                    * c_fun(k, r)
                )
                V[t, k, r] = val_fun(Z, Y).max().item()
                e_fun = (
                    lambda z, y: tmat[z, k] * V[t - 1, z, y] * y_fun(r, z, y)
                )
                E_val = e_fun(Z, Y)
                E[t - 1, k, r] = np.unravel_index(
                    np.argmax(E_val), E_val.shape
                )

In [77]:
def maximize(f, n_s, n_y, ret_val=True):
    """
    brute-force maximizer since numpy dones't play well with grids and logical functions
    we only need to keep track of the original states
    """
    arg_max = None
    val_max = -999
    for i in range(n_s):
        for j in range(n_y):
            val = f(i, j)
            if val > val_max:
                val_max = val
                arg_max = (i, j)
    if ret_val:
        return val_max
    else:
        return arg_max

In [78]:
def y_fun(y1, z, y0):
    """
    Checks assignment y_1 is consistent with z,y0.
    """
    target = int(max(y0, int(z == 0)))
    return int(y1 == target)


def c_fun(z, y):
    violate = (z == 1) and (y == 0)
    return int(not violate)

In [79]:
# def y_fun(y1, z, y0):
#     '''
#     Checks assignment y_1 is consistent with z,y0.
#     '''
#     target = (np.max(y0, int(z == 0))).astype(int)
#     return (y1 == target).astype(int)

# def c_fun(z,y):
#     violate = ((z == 1) and (y == 0))
#     return int(not violate)

In [53]:
init_prob = np.array([1 / 3, 1 / 3, 1 / 3])
tmat = np.tile(init_prob, (3, 1)).T
emit = np.array([[0.8, 0.2], [0.2, 0.8], [0.5, 0.5]])
obs = [0, 1, 1]

In [81]:
T = len(obs)
n_s, n_e = tmat.shape[1], emit.shape[1]
V = np.zeros((T, n_s, 2))
E = np.zeros((T - 1, n_s, 2))
x = obs

for k in range(n_s):
    for r in range(2):
        V[0, k, r] = init_prob[k] * emit[k, x[0]] * c_fun(k, r)

    for t in range(1, T):
        for k in range(n_s):
            for r in range(2):
                val_fun = (
                    lambda z, y: tmat[z, k]
                    * V[t - 1, z, y]
                    * emit[k, x[t]]
                    * y_fun(r, z, y)
                    * c_fun(k, r)
                )
                V[t, k, r] = maximize(val_fun, n_s, 2, ret_val=True)
                e_fun = (
                    lambda z, y: tmat[z, k] * V[t - 1, z, y] * y_fun(r, z, y)
                )
                E[t - 1, k, r] = maximize(e_fun, n_s, 2, ret_val=False)

max_prob = V[T - 1, :, :].max().item()

SyntaxError: invalid syntax (1401959093.py, line 19)

In [74]:
max_val

0.018962962962962963

In [71]:
V.shape

(3, 3, 2)

In [70]:
E

array([[[2., 0.],
        [2., 0.],
        [2., 0.]],

       [[2., 1.],
        [2., 1.],
        [2., 1.]]])

In [28]:
max(0, int(1 == 1))

1

In [37]:
constraint_checker(x_test, 0, 1)

True

In [38]:
x_test

[array([1, 0, 0]), array([0, 0, 1])]

In [39]:
x_list, y_list = hmm1.simulation(10)

In [40]:
constraint_checker(x_list, 0, 1)

False

In [41]:
x_list

[array([0, 0, 1]),
 array([0, 1, 0]),
 array([0, 1, 0]),
 array([0, 1, 0]),
 array([0, 0, 1]),
 array([0, 0, 1]),
 array([0, 1, 0]),
 array([0, 0, 1]),
 array([0, 1, 0]),
 array([0, 1, 0])]

In [70]:
A_updater = lambda z, x: update_counter(z, x, 0)
B_updater = lambda z, x: update_counter(z, x, 1)

In [110]:
x_list, y_list = hmm1.simulation(10)