In [28]:
import numpy as np
import pandas as pd

import plotly.express as px
pd.options.plotting.backend = "plotly"
from scipy.stats import binom,beta
def posterior(A, B, initial, O):
    # now compute P(X(t) | o_1, ..., o_t)
    forward = np.empty((T, 2))
    normalization =  np.empty(T)
    forward[0] = initial

    for i in range(1, T):
        forward[i] = forward[i - 1] @ A @ np.diag(B[:, int(O[i]) - 2])
        normalization[i] = forward[i].sum()
        forward[i] /= normalization[i]

    # now compute backward P(o_{t + 1}, .. o_T | X_t)
    backward = np.empty((T, 2))
    # in any state of X(T) we have prob 1 of seeing o_T
    # by def
    backward[T - 1] = np.ones(2)

    for i in reversed(range((T - 1))):
        # transpose or left mult because
        # we are going backward in time
        backward[i] = A @ np.diag(B[:, int(O[i + 1]) - 2]) @ backward[i + 1]
        backward[i] /= normalization[i + 1]
    # these will not be normalized as prob dist
    # since we are using normalization to make
    # entire conditional (prod) work
    return forward * backward

In [4]:
# sampling two coins with replacement.
# Emissions are two states: heads or tails
# for some reason, the processing of selecting
# the coin is biased, so we might select p0
# three times in a row more often on average than
# p1. So the flips are no longer independent.
# We can also view this as a single Bernoulli
# dist with a p that depends in a markovian way
# on time. This is closer to our model of the trains.

array([1., 0., 1., 1., 1., 0., 1., 1., 0., 0.])

In [80]:
# Model is HMM with two states:
A = np.array([[.1, .9],
              [.3, .7]])
# chance of success for each coin
p = np.array([.1, .75])
# emission probabilities
B = np.array([1 - p, p]).transpose()
# one sample up to time T
T = 1000
initial = np.array([.2, .8])
# sample hidden state
X = np.empty(T)
X[0] = np.random.binomial(1, initial[1])

for i in range(1, T):
    X[i] = np.random.binomial(1, A[int(X[i - 1]), 1])

# now generate emissions
O = np.empty(T)
for i in range(T):
    O[i] = np.random.binomial(1, B[int(X[i]), 1])

In [30]:
posterior(A, B, initial, O)

array([[0.23746761, 0.76253239],
       [0.034726  , 0.965274  ],
       [0.53298487, 0.46701513],
       [0.37586272, 0.62413728],
       [0.43704601, 0.56295399],
       [0.37632166, 0.62367834],
       [0.53135079, 0.46864921],
       [0.04008531, 0.95991469],
       [0.06337539, 0.93662461],
       [0.06364725, 0.93635275],
       [0.03462594, 0.96537406],
       [0.64617162, 0.35382838],
       [0.02315005, 0.97684995],
       [0.49886404, 0.50113596],
       [0.49151119, 0.50848881],
       [0.04197743, 0.95802257],
       [0.06334907, 0.93665093],
       [0.06233302, 0.93766698],
       [0.06238133, 0.93761867],
       [0.06237889, 0.93762111],
       [0.06238213, 0.93761787],
       [0.06231626, 0.93768374],
       [0.06370156, 0.93629844],
       [0.03456305, 0.96543695],
       [0.64746603, 0.35253397],
       [0.01916998, 0.98083002],
       [0.64747521, 0.35252479],
       [0.03453458, 0.96546542],
       [0.06477208, 0.93522792],
       [0.04025581, 0.95974419],
       [0.

In [86]:
K = 40
sampled_p = np.empty((K, 2))
sampled_p[0] = np.array([.2, .6])

# initial conditions don't matter?
# might cause problems with converging
# to local minima
# dirichlet multinomial vs dirichlet?
sampled_X = np.empty((K, T))
sampled_X[0] = np.ones(T)

sampled_A = np.empty((K, 2, 2))
sampled_A[0] = np.array([[.2, .8],
                         [.4, .6]])
sampled_B = np.empty((K, 2, 2))
sampled_B[0] =  np.array([1 - sampled_p[0], sampled_p[0]]).transpose()

A_prior = np.ones((2,2))

# p_0: (success, failures)
# p_1: (success, failures)
p_prior = np.ones((2, 2))
for i in range(1, K):
    # sample from posterior
    X_posterior = posterior(sampled_A[i - 1], sampled_B[i - 1], initial, O)[:, 1]
    sampled_X[i] = np.random.binomial(1,X_posterior)
    # use conjugate priors (beta for p, Dirichlet for rows of A)

    # total number of heads / tails in coin p0
    # total number of heads / tails in coin p1
    # update counts. each row is different hypothess
    # that we are flipping coin p0 or p1
    # this is really estimating emission matrix
    # H = 0, T = 1
    p_prior +=  np.array([
        [(1 - O) @ (1 - sampled_X[i]), (1 - O) @ sampled_X[i]],
        [O @ (1 - sampled_X[i]), O @ sampled_X[i]]])

    sampled_p[i] = np.random.beta(p_prior[:, 0], p_prior[:, 1])

    # does that give rows of B a dirichlet distr?
    # feels that it should
    sampled_B[i] = np.array([1 - sampled_p[i], sampled_p[i]]).transpose()
    # now estimate transition counts
    for m in range(1, T):
        i0 = sampled_X[i, m - 1]
        i1 = sampled_X[i, m]
        A_prior[int(i0), int(i1)] += 1
    # if we iterate, do we loop through rows?
    sampled_A[i] = np.vstack([
        np.random.dirichlet(A_prior[i, :], 1)
            for i in range(A_prior.shape[0])])

sampled_A[-1]

array([[0.32363317, 0.67636683],
       [0.32524681, 0.67475319]])