In [1]:
import numpy as np
import torch

In [27]:
states = ('Rainy', 'Sunny')
observations = ('walk', 'shop', 'clean')
pi = np.array([0.6, 0.4])  #initial probability 
A = np.array([[0.7, 0.3],[0.4, 0.6]]) #Transmission probability 
B = np.array([[0.1, 0.4, 0.5],[0.6, 0.3, 0.1]]) #Emission probability
bob_says = np.array([0, 2, 1, 1, 2, 0, 1,2,1,0,0,2,1])

### Forward path 

In [32]:
def forward(obs_seq, pi, A, B):
    T = len(obs_seq)
    N = A.shape[0]
    alpha = np.zeros((T, N))
    alpha[0] = pi*B[:,obs_seq[0]]
    for t in range(1, T):
        alpha[t] = np.inner(alpha[t-1],A) * B[:, obs_seq[t]]
    return alpha

def likelihood(alpha):
    # returns log P(Y  \mid  model)
    # using the forward part of the forward-backward algorithm
    return  alpha[-1].sum()  

In [33]:
alpha = forward(bob_says, pi, A, B)
print(alpha)

[[  6.00000000e-02   2.40000000e-01]
 [  5.70000000e-02   1.68000000e-02]
 [  1.79760000e-02   9.86400000e-03]
 [  6.21696000e-03   3.93264000e-03]
 [  2.76583200e-03   4.84636800e-04]
 [  2.08147344e-04   8.38268928e-04]
 [  1.58873528e-04   1.75866088e-04]
 [  8.19856479e-05   1.69069064e-05]
 [  2.49848102e-05   1.28815209e-05]
 [  2.13538234e-06   1.06337020e-05]
 [  4.68487823e-07   4.34062447e-06]
 [  8.15064409e-07   2.79176981e-07]
 [  2.61719272e-07   1.48059586e-07]]


In [34]:
likelihood(alpha)

4.0977885787645574e-07

## Backward path

In [35]:
def backward(obs_seq, A, B):
    N = A.shape[0]
    T = len(obs_seq)

    beta = np.zeros((N,T))
    beta[:,-1:] = 1

    for t in reversed(range(T-1)):
        for n in range(N):
            beta[n,t] = np.sum(beta[:,t+1] * A[n,:] * B[:, obs_seq[t+1]])

    return beta

In [36]:
def gamma(alpha, beta):
    obs_prob = likelihood(alpha)
    return (np.multiply(alpha,beta.T) / obs_prob)

In [37]:
beta=backward(bob_says, A, B)
beta.T

array([[  2.06140910e-06,   1.38377453e-06],
       [  5.47808732e-06,   4.80261779e-06],
       [  1.53838921e-05,   1.30066392e-05],
       [  4.44028627e-05,   3.27898953e-05],
       [  1.12031660e-04,   1.73059389e-04],
       [  5.10039314e-04,   4.24049489e-04],
       [  1.49007285e-03,   1.03132130e-03],
       [  3.89764880e-03,   4.19652560e-03],
       [  8.99693000e-03,   1.53167600e-02],
       [  2.67710000e-02,   3.95720000e-02],
       [  1.39700000e-01,   9.44000000e-02],
       [  3.70000000e-01,   3.40000000e-01],
       [  1.00000000e+00,   1.00000000e+00]])

In [39]:
gamma(alpha, beta)

array([[ 0.30183242,  0.81045149],
       [ 0.76199875,  0.19689639],
       [ 0.67485386,  0.31308958],
       [ 0.67365804,  0.31468401],
       [ 0.75616578,  0.20467368],
       [ 0.25907469,  0.86746181],
       [ 0.57770948,  0.44261542],
       [ 0.77981393,  0.17314282],
       [ 0.54855585,  0.48148693],
       [ 0.1395053 ,  1.02688766],
       [ 0.1597148 ,  0.99994166],
       [ 0.73594288,  0.23163756],
       [ 0.63868418,  0.36131582]])

## Viterbi

In [40]:
def viterbi(obs_seq,pi, A, B):
    # returns the most likely state sequence given observed sequence x
    # using the Viterbi algorithm
    T = len(obs_seq)
    N = A.shape[0]
    delta = np.zeros((T, N))
    psi = np.zeros((T, N))
    delta[0] = pi*B[:,obs_seq[0]]
    for t in range(1, T):
        for j in range(N):
            delta[t,j] = np.max(delta[t-1]*A[:,j]) * B[j, obs_seq[t]]
            psi[t,j] = np.argmax(delta[t-1]*A[:,j])

    # backtrack
    states = np.zeros(T, dtype=np.int32)
    states[T-1] = np.argmax(delta[T-1])
    for t in range(T-2, -1, -1):
        states[t] = psi[t+1, states[t+1]]
    return states

In [41]:
alice_hears=viterbi(bob_says, pi, A, B)
print("Bob says:", ", ",list(map(lambda y: observations[y], bob_says)))
print("Alice hears:", ", ", list(map(lambda s: states[s], alice_hears)))

Bob says: ,  ['walk', 'clean', 'shop', 'shop', 'clean', 'walk', 'shop', 'clean', 'shop', 'walk', 'walk', 'clean', 'shop']
Alice hears: ,  ['Sunny', 'Rainy', 'Rainy', 'Rainy', 'Rainy', 'Sunny', 'Rainy', 'Rainy', 'Rainy', 'Sunny', 'Sunny', 'Rainy', 'Rainy']


## Pytorch 

In [42]:
obs_seq=torch.from_numpy(bob_says)
pi_ = torch.from_numpy(pi).float()
A_ = torch.from_numpy(A).float()
B_ = torch.from_numpy(B).float()

In [45]:
def forward(obs_seq, pi, A, B):
    
    T = len(obs_seq)
    N = A.shape[0]
    alpha = torch.empty(T, N)
    alpha[0] = pi*B[:,obs_seq[0]]
    for t in range(1, T):
        alpha[t]=torch.mv(A, alpha[t-1]) * B[:, obs_seq[t]]
    return alpha
        


def likelihood(alpha):
    # returns log P(Y  \mid  model)
    # using the forward part of the forward-backward algorithm
    return  alpha[-1].sum()  

def backward(obs_seq, A, B):
    N = A.shape[0]
    T = len(obs_seq)

    beta = torch.empty(N, T)
    beta[:,-1:] = 1
    

    for t in reversed(range(T-1)):
        for n in range(N):
            beta[n,t] = torch.sum(beta[:,t+1] * A[n,:] * B[:, obs_seq[t+1]])
            
    return beta
            
def gamma(alpha, beta):
    obs_prob = likelihood(alpha)
    return (torch.mul(alpha,beta.transpose(0, 1)) / obs_prob)

def viterbi(obs_seq,pi, A, B):
    # returns the most likely state sequence given observed sequence x
    # using the Viterbi algorithm
    T = len(obs_seq)
    N = A.shape[0]
    delta = torch.zeros(T, N)
    psi = torch.zeros(T, N)
    delta[0] = pi*B[:,obs_seq[0]]
    for t in range(1, T):
        for j in range(N):
            delta[t,j] = torch.max(delta[t-1]*A[:,j]) * B[j, obs_seq[t]]
            psi[t,j] = torch.argmax(delta[t-1]*A[:,j])

    # backtrack
    states = torch.zeros(T).long()
    states[T-1] = torch.argmax(delta[T-1])
    for t in range(T-2, -1, -1):
        states[t] = psi[t+1, states[t+1]]
    return states

In [44]:
alpha= forward(obs_seq, pi_, A_, B_)
beta =backward(obs_seq, A_, B_)
gamma(alpha, beta)

tensor([[0.3018, 0.8105],
        [0.7620, 0.1969],
        [0.6749, 0.3131],
        [0.6737, 0.3147],
        [0.7562, 0.2047],
        [0.2591, 0.8675],
        [0.5777, 0.4426],
        [0.7798, 0.1731],
        [0.5486, 0.4815],
        [0.1395, 1.0269],
        [0.1597, 0.9999],
        [0.7359, 0.2316],
        [0.6387, 0.3613]])

In [47]:
alice_hears=viterbi(bob_says,pi_, A_, B_)
print("Bob says:", ", ",list(map(lambda y: observations[y], bob_says)))
print("Alice hears:", ", ", list(map(lambda s: states[s], alice_hears.numpy())))

Bob says: ,  ['walk', 'clean', 'shop', 'shop', 'clean', 'walk', 'shop', 'clean', 'shop', 'walk', 'walk', 'clean', 'shop']
Alice hears: ,  ['Sunny', 'Rainy', 'Rainy', 'Rainy', 'Rainy', 'Sunny', 'Rainy', 'Rainy', 'Rainy', 'Sunny', 'Sunny', 'Rainy', 'Rainy']


## Pytorch Class

In [48]:
class DHMM(torch.nn.Module):
    def __init__(self, M=2, T=10,pi=None, A=None, B=None):
        self.M = M 
        self.T = T
        if pi is None:
            self.initilize()
        else:
            self.pi=pi
            self.A=A
            self.B=B
        super(DHMM, self).__init__()
        
    def random_normalized(self, d1, d2):
        x = torch.rand(d1, d2)
        return x / x.sum()
    
    def initilize(self):
        self.pi = torch.ones(self.M) / self.M
        self.A = self.random_normalized(self.M, self.M)
        self.B = self.random_normalized(self.M, self.T)
        
        
    def forward_path(self, obs_seq):
        
        alpha = torch.empty(self.T, self.M)
        scale = torch.empty(self.T)
        alpha[0] = self.pi*self.B[:,obs_seq[0]]
        scale[0] = alpha[0].sum()
        alpha[0] /= scale[0]
        for t in range(1, self.T):
            alpha[t]=torch.mv(self.A, alpha[t-1]) * self.B[:, obs_seq[t]]
            scale[t] = alpha[t].sum()
            alpha[t] /= scale[t]
            
        return alpha, scale
    
    def backward_path(self, obs_seq, scale):
        
        beta = torch.empty(self.M, self.T)
        beta[:,-1:] = 1
        
        for t in reversed(range(self.T-1)):
            for m in range(self.M):
                beta[m,t] = torch.sum(beta[:,t+1] * self.A[m,:] * self.B[:, obs_seq[t+1]]/scale[t+1])
        return beta
    
    def likelihood(self, scale):
        return  torch.log(scale).sum()
    
    def forward_backward(self, obs_seq):
        
        alpha, scale=self.forward_path(obs_seq)
        beta  = self.backward_path(obs_seq, scale)
        obs_prob = self.likelihood(alpha)
        return (torch.mul(alpha,beta.transpose(0, 1)) / obs_prob)
    
    def fit(self, obs_seq):
        alpha, scale=self.forward_path(obs_seq)
        beta  = self.backward_path(obs_seq, scale)
        beta  = beta.transpose(0, 1)
        x=torch.stack([alpha[n][0] * beta[n][0] for n in range(self.M)])
        print(x)
        
        
        
            
    

In [49]:
T = len(obs_seq)
hmm = DHMM(M=2,T=T, pi=pi_, A=A_, B=B_)
alpha, scale=hmm.forward_path(obs_seq)
beta = hmm.backward_path(obs_seq, scale)
hmm.forward_backward(obs_seq)
hmm.likelihood(scale)

tensor(-14.7076)