In [172]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation

In [5]:
np.random.seed(2)

In [9]:
class StateGrid:
    def __init__(self,grid_matrix):
        self.grid = grid_matrix
        self.h = np.argmax(self.grid)


    @classmethod
    def initialize_random_grid(cls,size):
        index_hidden = np.unravel_index(np.argmax(np.random.rand(*size)),size)
        grid = np.zeros(size)
        grid[index_hidden] = 1
        return cls(grid)

In [10]:
states = StateGrid.initialize_random_grid((3,3))

In [11]:
transition_matrix = np.eye(3,3)

In [12]:
transition_matrix

array([[1., 0., 0.],
       [0., 1., 0.],
       [0., 0., 1.]])

In [13]:
states.grid.dot(transition_matrix)

array([[0., 0., 0.],
       [0., 0., 0.],
       [0., 1., 0.]])

In [14]:
prior = np.random.rand(3,3)

In [15]:
prior = prior/prior.sum()

In [16]:
prior

array([[0.06087432, 0.14170628, 0.12071917],
       [0.03070324, 0.11716838, 0.04207835],
       [0.17916738, 0.19482704, 0.11275584]])

In [140]:
class Observation:
    def __init__(self, pos, value):
        self.pos = pos
        self.value = value

    

In [163]:
def normalize(a):
    return a / a.sum()

def update(likelihood,prior):
    return normalize(likelihood * prior)

def predict(belief,transition_matrix):
    shape = belief.shape
    if belief.ndim != 1:
        belief = belief.ravel()
    return np.reshape(belief.dot(transition_matrix),shape)


def update_beliefs(observations,emission_matrix,belief):
    for obs in observations:
        likelihood = emission_matrix[obs.value]
        new_belief = likelihood[1] * belief[obs.pos]
        neg_belief = likelihood[0] * (1-belief[obs.pos])
        normalized_new_belief = new_belief / (new_belief + neg_belief)
        belief[obs.pos] = normalized_new_belief
    return normalize(belief)
        


In [126]:
observations = [Observation((i,i),i%2) for i in range(3)]

In [127]:
for o in observations:
    print(o.pos,o.value)

(0, 0) 0
(1, 1) 1
(2, 2) 0


In [128]:
prior = np.zeros((3,3))
prior[0,0] = 1

In [129]:
prior

array([[1., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.]])

In [130]:
alpha = 0.9
beta = 0.2
B = np.array([[1-beta, 1-alpha], [beta,alpha]])

In [131]:
B

array([[0.8, 0.1],
       [0.2, 0.9]])

In [132]:
update_beliefs(observations,B,prior)

array([[1., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.]])

In [34]:
row,col = np.indices((3,3))

array([[[0, 0, 0],
        [1, 1, 1],
        [2, 2, 2]],

       [[0, 1, 2],
        [0, 1, 2],
        [0, 1, 2]]])

In [56]:
a = np.array([[0.8, 0.8, 0.8],
 [0.2, 0.2, 0.2]])

In [69]:
a[a.argmax(axis=0),np.arange(a.shape[1])]

array([0.8, 0.8, 0.8])

In [72]:
m = a.max(axis=0)
sampled = np.random.rand(3)


array([False, False, False])

In [77]:
i = a.argmax(axis=0)

In [141]:
def sample_observations(state,emission_matrix,num_obs=3):
    grid_size = state.shape
    inds = np.random.randint(0,grid_size[0]*grid_size[1],size=3)
    rows,cols = np.unravel_index(inds,grid_size)
    sampled_states = [state[r,c] for r,c in zip(rows,cols)]
    #print(inds,sampled_states)
    emission_probs = emission_matrix[:,np.array(sampled_states,dtype=int)]
    #print(emission_probs)
    sampled_obs = np.random.rand(num_obs)
    #print(sampled_obs)
    maxes = emission_probs.max(axis=0)
    ind_max = emission_probs.argmax(axis=0)
    mask = sampled_obs < maxes
    obs = np.zeros((num_obs,))
    obs = []
    #print(maxes,ind_max,mask,obs)
    for i in range(len(mask)):
        val = ind_max[i] if mask[i] else 1 - ind_max[i]
        pos = (rows[i],cols[i])
        obs.append(Observation(pos,val))
    return obs

def sample_next_state(state,transition_matrix):
    sample_probs = transition_matrix[state.ravel()==1].squeeze()
    ind = np.random.choice(np.arange(sample_probs.size),p=sample_probs)
    row,col = np.unravel_index(ind,state.shape)
    new_state = np.zeros(state.shape)
    new_state[row,col] = 1
    return new_state

In [143]:
prior

array([[1., 0., 0.],
       [0., 0., 0.],
       [0., 0., 0.]])

1

In [122]:
t = np.ones((9,9))
t /= t.sum(axis=1)
s_p = t[prior.ravel() == 1].squeeze()

In [138]:
belief = np.zeros(9)
belief[2:4] = 0.5
belief

array([0. , 0. , 0.5, 0.5, 0. , 0. , 0. , 0. , 0. ])

In [112]:
s_pi = np.random.choice(np.arange(s_p.size),p=s_p)

In [113]:
s_pi

0

In [219]:
class HMM:
    def __init__(self,transmission_matrix,emission_matrix,prior,state):
        self.transmission_matrix = transmission_matrix
        self.emission_matrix = emission_matrix
        self.prior = prior
        self.state = state
        
    def run(self,length):
        transition_belief_history = [self.prior]
        correction_belief_history = [self.prior]
        state_history = [self.state]
        obs_history = []
        # In the case that we know the prior
        belief = self.prior
        for t in range(1,length):
            new_state = sample_next_state(state_history[t-1],self.transmission_matrix)
            state_history.append(new_state)
            observations = sample_observations(new_state,self.emission_matrix)
            obs_history.append(observations)
            belief = predict(belief,self.transmission_matrix)
            print("Transition Model Belief: ", belief)
            transition_belief_history.append(belief)
            belief =update_beliefs(observations,self.emission_matrix,belief)
            print("Corrected Belief: ", belief)
            correction_belief_history.append(belief)
        return correction_belief_history

    def plot_beliefs(self,beliefs):
        def update(i,beliefs,bar):
            bar.set_3d_properties([beliefs[i].flatten()])

        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')
        x_data, y_data = np.meshgrid(np.arange(beliefs[0].shape[1]),np.arange(beliefs[0].shape[0]))
        x_data = x_data.flatten()
        y_data = y_data.flatten()
        z_data = beliefs[0].flatten()
        bar = ax.bar3d(x_data,
                 y_data,
                 np.zeros(len(z_data)),
                        1, 1, z_data, alpha=0.5)
        
        anim = animation.FuncAnimation(fig,update,len(beliefs),fargs=(beliefs,bar), interval=1000, blit=False)
        anim.save('mymovie.mp4',writer=animation.FFMpegWriter(fps=10))
        plt.show()



In [220]:
trans = np.eye(9,9)
prior = np.ones((3,3))/9
state = np.zeros((3,3))
state[2,1] = 1

In [221]:
state

array([[0., 0., 0.],
       [0., 0., 0.],
       [0., 1., 0.]])

In [222]:
hmm = HMM(trans,B,prior,state)

In [223]:
beliefs = hmm.run(30)

Transition Model Belief:  [[0.11111111 0.11111111 0.11111111]
 [0.11111111 0.11111111 0.11111111]
 [0.11111111 0.11111111 0.11111111]]
Corrected Belief:  [[0.01791292 0.12937109 0.12937109]
 [0.12937109 0.12937109 0.07648947]
 [0.12937109 0.12937109 0.12937109]]
Transition Model Belief:  [[0.01791292 0.12937109 0.12937109]
 [0.12937109 0.12937109 0.07648947]
 [0.12937109 0.12937109 0.12937109]]
Corrected Belief:  [[0.02351363 0.16982069 0.00304068]
 [0.16982069 0.02393728 0.10040494]
 [0.16982069 0.16982069 0.16982069]]
Transition Model Belief:  [[0.02351363 0.16982069 0.00304068]
 [0.16982069 0.02393728 0.10040494]
 [0.16982069 0.16982069 0.16982069]]
Corrected Belief:  [[0.04159239 0.04410193 0.00537854]
 [0.04410193 0.04234176 0.17760254]
 [0.04410193 0.30038949 0.30038949]]
Transition Model Belief:  [[0.04159239 0.04410193 0.00537854]
 [0.04410193 0.04234176 0.17760254]
 [0.04410193 0.30038949 0.30038949]]
Corrected Belief:  [[0.04860187 0.05153434 0.00628498]
 [0.00670035 0.049477

In [224]:
len(beliefs)

30

In [225]:
hmm.plot_beliefs(beliefs)

TypeError: set_3d_properties() takes 1 positional argument but 2 were given