In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline

In [9]:
def _get_stochastic_row_for_initialization(N):
    eps = (1./N)*(1/100.0)
    if N%2:
        row = [1./N + (-1.0)**i*eps for i in range(N-1)] + [1./N]
    else:
        row = [1./N + (-1.0)**i*eps for i in range(N)]
        
    return row

In [10]:
# Build a HMM class that will output predictions of classes
class HMM(object):
    """docstring for HMM."""
    def __init__(self, arg):
        super(HMM, self).__init__()
        self.arg = arg

In [152]:
# based on  "A Revealing Introduction to Hidden Markov Models"

N,M = 5,4
# these should not be uniform. 
# make them row stochastic, etc. 
A = np.ones((N,N))*_get_stochastic_row_for_initialization(N)
B = np.ones((N,M))*_get_stochastic_row_for_initialization(M)
pi = np.ones((N))*_get_stochastic_row_for_initialization(N)

maxIters = 100
iters = 0
oldLogProb = -np.inf

def _compute_params(A,B,pi,Obs):
    N,M = B.shape
    T = len(Obs)
    
    c = [0.0]*T
    alpha  = np.ones((T,N))
    beta   = np.ones((T,N))
    gamma  = np.ones((T,N,N))
    gamma2 = np.ones((T,N))
    
    # compute a_0(i)
    alpha[0,:] = [pi[i]*B[i,int(Obs[0])] for i in range(N)]
    c[0] = np.sum(alpha[0,:])
    
    # scale a_0[i]
    c[0] = 1.0/c[0]
    alpha[0,:] = [c[0]*alpha[0,i] for i in range(N)]
    
    # compute a_t(i)
    for t in range(1,T):
        for i in range(N):
            alpha[t,i] = 0.0
            
            #for j in range(N):
            #    alpha[t,i] += alpha[t-1,j]*A[j,i]
            alpha[t,i] = np.sum([alpha[t-1,j]*A[j,i] for j in range(N)])
            alpha[t,i] = alpha[t,i]*B[i,Obs[t]]
            c[t] += alpha[t,i]
            
        # scale alpha[t,i]
        c[t] = 1.0/c[t]

        #for i in range(N):
        #    alpha[t,i] = c[t]*alpha[t,i]
        alpha[t,:] = [c[t]*alpha[t,i] for i in range(N)]    
    #### BETA
    beta[T-1,:] = [c[T-1]]*N
    
    # beta pass
    for t in range(T-2,-1,-1):
        for i in range(N):
            beta[t,i] = 0.0
            
            #for j in range(N):
            #    beta[t,i] += A[i,j]*B[j,Obs[t+1]]*beta[t+1,j]
            beta[t,i] = np.sum([A[i,j]*B[j,Obs[t+1]]*beta[t+1,j] for j in range(N)])
            #scale by same factor
            beta[t,i] = c[t]*beta[t,i]
    
    # gamma
    for t in range(T-1):
        denom = 0.0
        for i in range(N):
            for j in range(N):
                denom += alpha[t,i]*A[i,j]*B[j,Obs[t+1]]*beta[t+1,j]
        
        for i in range(N):
            gamma2[t,i] = 0.0
            for j in range(N):
                gamma[t,i,j] = (alpha[t,i]*A[i,j]*B[j,Obs[t+1]]*beta[t+1,j])/denom
                gamma2[t,i] += gamma[t,i,j]
                
                
    # special case
    denom = np.sum(alpha[T-1,:])
    gamma2[T-1,:] = [alpha[T-1,i]/denom for i in range(N)]
    
    # resestimate A,B,pi
    # reestimate pi
    pi = gamma2[0,:]
    
    # re-estimate A
    for i in range(N):
        for j in range(N):
            numer,denom = 0.0,0.0
            
            for t in range(T-1):
                numer += gamma[t,i,j]
                denom += gamma2[t,i]
            A[i,j] = numer/denom
            
    # re-estimate B
    for i in range(N):
        for j in range(M):
            numer,denom = 0.0,0.0
            
            for t in range(T):
                if Obs[t] == j:
                    numer += gamma2[t,i]
                denom += gamma2[t,i]
            B[i,j] = numer/denom
            
    # compute log[P(Obs|lambda)]
    logProb = -np.sum([np.log(val) for val in c])
    return logProb,pi,A,B

def HMM(A,B,pi,Obs,maxIters=100,oldLogProb=-np.inf):
    iters = 0
    logProb,pi,A,B = _compute_params(A,B,pi,Obs)
    
    while (iters < maxIters and logProb > oldLogProb):
        oldLogProb = logProb
        logProb,pi,A,B = _compute_params(A,B,pi,Obs)
        iters+=1
    return pi,A,B

In [156]:

N,M = 5,4
# these should not be uniform. 
# make them row stochastic, etc. 
A = np.ones((N,N))*_get_stochastic_row_for_initialization(N)
B = np.ones((N,M))*_get_stochastic_row_for_initialization(M)
pi = np.ones((N))*_get_stochastic_row_for_initialization(N)

maxIters = 100
iters = 0
oldLogProb = -np.inf
Obs = [1,1,0,3]
#_compute_params(A,B,pi,Obs)
HMM(A,B,pi,Obs)

(array([ 0.202,  0.198,  0.202,  0.198,  0.2  ]),
 array([[ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ]]),
 array([[ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25]]))

In [155]:

N,M = 5,4
# these should not be uniform. 
# make them row stochastic, etc. 
A = np.ones((N,N))*_get_stochastic_row_for_initialization(N)
B = np.ones((N,M))*_get_stochastic_row_for_initialization(M)
pi = np.ones((N))*_get_stochastic_row_for_initialization(N)

maxIters = 100
iters = 0
oldLogProb = -np.inf
Obs = [1,1,0,3]
#_compute_params(A,B,pi,Obs)
HMM(A,B,pi,Obs)

(array([ 0.202,  0.198,  0.202,  0.198,  0.2  ]),
 array([[ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ],
        [ 0.202,  0.198,  0.202,  0.198,  0.2  ]]),
 array([[ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25],
        [ 0.25,  0.5 ,  0.  ,  0.25]]))

In [157]:
# Toy Problem
AA = np.array([0.7,0.3,0.4,0.6]).reshape(2,2)
BB = np.array([0.1,0.4,0.5,0.7,0.2,0.1]).reshape(2,3)
pi = np.array([0.6,0.4])
Obs = np.array([0,1,0,2])
pi,A,B = HMM(AA,BB,pi,Obs)
print(pi,"\n\n",A,"\n\n",B)

[ 0.  1.] 

 [[  0.00000000e+00   1.00000000e+00]
 [  1.00000000e+00   1.07440277e-91]] 

 [[  9.47123843e-321   5.00000000e-001   5.00000000e-001]
 [  1.00000000e+000   8.87730978e-181   1.07440277e-091]]


In [135]:


def get_brown_corpus_observations(nmbr_obs = 100):
    brown = []
    with open('brown.txt','r') as f:
        for line in f:
            lower_line = line.lower()
            final_line = re.sub(r'([^\s\w]|_)+', '', lower_line).strip()
            if final_line == '':
                continue
            final_line = ' '.join(final_line.split())
            final_line = ''.join(i for i in final_line if not i.isdigit())
            brown.append(final_line)

    output = []
    while (len(output) < nmbr_obs):
        for i,line in enumerate(brown):
            for character in line:
                if character == ' ':
                    output.append(26)
                else:
                    output.append(ord(character) - 97)
            output.append(26)
    return output

In [136]:
N,M = 2,27
# these should not be uniform.
# make them row stochastic, etc.
A = np.array([0.47468,0.52532,0.51656,0.48344]).reshape(2,2)
B = np.ones((N,M))*_get_stochastic_row_for_initialization(M)
pi = np.array([0.51316,0.48684])
Obs =  np.array(get_brown_corpus_observations())#corpus construction
#pi,A,B = HMM(A,B,pi,Obs)

In [285]:

A = np.array([0.7,0.3,0.4,0.6]).reshape(2,2)
B = np.array([0.1,0.4,0.5,0.7,0.2,0.1]).reshape(2,3)
pi = np.array([0.6,0.4])
Obs = np.array([1,1,2])

N,M = B.shape
T = len(Obs)

c = [0.0]*T
alpha  = np.zeros((T,N))
beta   = np.zeros((T,N))
digamma  = np.zeros((T-1,N,N))
gamma = np.zeros((T,N))

def compute_normalization(alpha):
    return [1./sum(alpha[t,:]) for t in range(T)]

def scale_alpha_beta(alpha,beta,c):
    for t in range(T):
        alpha[t,:] = alpha[t,:]*c[t]
        beta[t,:] = beta[t,:]*c[t]
    return alpha,beta

def get_alpha(A,B,pi,Obs):
    alpha[0,:] = [pi[i]*B[i,int(Obs[0])] for i in range(N)]

    for t in range(1,T):
        for i in range(N):
            alpha[t,i] = np.sum([alpha[t-1,j]*A[j,i] for j in range(N)])
            alpha[t,i] = alpha[t,i]*B[i,Obs[t]]

    return alpha

def get_beta(A,B,pi,Obs):
    beta[T-1,:] = [1.0]*N

    for t in range(T-2,-1,-1):
        for i in range(N):
            beta[t,i] = np.sum([A[i,j]*B[j,Obs[t+1]]*beta[t+1,j] for j in range(N)])
    return beta

def get_digamma(A,B,pi,Obs,alpha,beta):# gamma
    for t in range(T-1):
        denom = 0.0
        for i in range(N):
            for j in range(N):
                denom += alpha[t,i]*A[i,j]*B[j,Obs[t+1]]*beta[t+1,j]

        for i in range(N):
            for j in range(N):
                digamma[t,i,j] = (alpha[t,i]*A[i,j]*B[j,Obs[t+1]]*beta[t+1,j])/denom
    return digamma

def get_gamma(digamma,alpha):
    for t in range(T-1):
        gamma[t,:] = [np.sum([digamma[t,i,:]]) for i in range(N)]
        
    ## special case T-1
    denom = np.sum(alpha[T-1,:])
    gamma[T-1,:] = [alpha[T-1,i]/denom for i in range(N)]
    
    return gamma

def reestimate_model(A,B,pi,Obs,digamma,gamma):# resestimate A,B,pi
    
    # reestimate pi
    pi = gamma[0,:]

    # re-estimate A
    for i in range(N):
        for j in range(N):
            numer,denom = 0.0,0.0

            for t in range(T-1):
                numer += digamma[t,i,j]
                denom += gamma[t,i]
            A[i,j] = numer/denom

    # re-estimate B
    for i in range(N):
        for j in range(M):
            numer,denom = 0.0,0.0

            for t in range(T):
                if Obs[t] == j:
                    numer += gamma[t,i]
                denom += gamma[t,i]
            B[i,j] = numer/denom
    return A,B,pi
def get_log_prob(c):
    return -np.sum([np.log(val) for val in c])

def compute_params(A,B,pi,Obs):
    alpha      = get_alpha(A,B,pi,Obs)
    beta       = get_beta(A,B,pi,Obs)
    c          = compute_normalization(alpha)
    alpha,beta = scale_alpha_beta(alpha,beta,c)
    digamma    = get_digamma(A,B,pi,Obs,alpha,beta)
    gamma      = get_gamma(digamma,alpha)
    A,B,pi     = reestimate_model(A,B,pi,Obs,digamma,gamma)
    newLogProb = get_log_prob(c)
    
    return A,B,pi,newLogProb

In [286]:
def compute_model(A,B,pi,Obs,maxIters=100):
    oldLogProb = -np.inf # may want to keep track
    iters = 0
    A,B,pi,newLogProb = compute_params(A,B,pi,Obs)

    while (iters < maxIters and newLogProb > oldLogProb):
        oldLogProb = newLogProb
        A,B,pi,newLogProb = compute_params(A,B,pi,Obs)
        iters+=1
    return A,B,pi,newLogProb



In [292]:
A,B,pi,newLogProb = compute_model(A,B,pi,Obs)
A

array([[ 1.        ,  0.        ],
       [ 0.66666667,  0.33333333]])

In [351]:
def viterbi(A,B,pi,Obs):
    V = np.zeros((T,N))
    
    #Initialization is weird
    path = {}
    for i in range(N):
        path[i] = [str(i)]
    V[0,:] = [pi[i]*B[i,Obs[0]] for i in range(N)]
    
    for t in range(1,T):
        newpath = {}
        for i in range(N):
            (prob, state) = max([(V[t-1,j] * A[j,i] * B[i,Obs[t]], j) for j in range(N)])
            V[t,i] = prob
            newpath[i] = path[state] + [str(i)]
        path = newpath # Don't need to remember the old paths
    (prob, state) = max([(V[T-1][j], j) for j in range(N)])
    
    return (prob, path[state])

In [352]:
A = np.array([0.7,0.3,0.4,0.6]).reshape(2,2)
B = np.array([0.1,0.4,0.5,0.7,0.2,0.1]).reshape(2,3)
pi = np.array([0.6,0.4])
Obs = np.array([0,1,2])

N,M = B.shape
T = len(Obs)

(prob, state) = viterbi(A,B,pi,Obs)
prob
state

['1', '0', '0']

In [345]:
obs = ('normal', 'cold', 'dizzy')
states = ('Healthy', 'Fever')
start_p = {'Healthy': 0.6, 'Fever': 0.4}
trans_p = {
   'Healthy' : {'Healthy': 0.7, 'Fever': 0.3},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.6}
   }
emit_p = {
   'Healthy' : {'normal': 0.1, 'cold': 0.4, 'dizzy': 0.5},
   'Fever' : {'normal': 0.7, 'cold': 0.2, 'dizzy': 0.1}
   }
def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]
    path = {}
    for y in states:
        V[0][y] = start_p[y] * emit_p[y][obs[0]]	# Initialize base cases (t == 0)
        path[y] = [y]
        print([y],type([y]))
        print(path[y])
    for t in range(1,len(obs)):	# Run Viterbi for t > 0
        V.append({})
        newpath = {}
        for y in states:
            (prob, state) = max([(V[t-1][y0] * trans_p[y0][y] * emit_p[y][obs[t]], y0) for y0 in states])
            V[t][y] = prob
            newpath[y] = path[state] + [y]
        path = newpath	# Don't need to remember the old paths

    (prob, state) = max([(V[len(obs) - 1][y], y) for y in states])
    return (prob, path[state])

In [346]:
viterbi(obs,states,start_p,trans_p,emit_p)

['Healthy'] <class 'list'>
['Healthy']
['Fever'] <class 'list'>
['Fever']


(0.01568, ['Fever', 'Healthy', 'Healthy'])

In [308]:
[3,4] + [4,5]

[3, 4, 4, 5]

In [334]:
path = {}
path[0] = 12
path[1] = 1

In [335]:
path[0]

12

In [14]:
beta = np.zeros((10,4))
beta[:,-1] = [4.9]*10


In [8]:
beta

array([[ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9],
       [ 0. ,  0. ,  0. ,  4.9]])

In [17]:
type(beta2)

numpy.ndarray

In [2]:
import HMM as HMM
A = np.array([0.7,0.3,0.4,0.6]).reshape(2,2)
B = np.array([0.1,0.4,0.5,0.7,0.2,0.1]).reshape(2,3)
pi = np.array([0.6,0.4])
Obs = np.array([1,1,2])

In [16]:
hmm = HMM.HMM()
hmm.HMM(A,B,pi,Obs)

TypeError: HMM() takes 4 positional arguments but 5 were given

In [13]:
hmm.__dict__

{'maxIters': 1000, 'tol': 1e-06}