In [None]:
import numpy as np

# test data
observation_variables =  {0: 'Movement', 
      1: 'Passive Social Networking', 
      2: 'Active Social Networking',
      3: 'Texting',
      4: 'Access Psych Health App/Sites'}
Observations = [0, 1, 2, 3, 4]
States = {0: 'Healthy', 1: 'Depressed'}

PI = [0.5, 0.5]
A = np.array([[0.8, 0.2],[0.001, 0.999]])
B = np.array([[0.5, 0.1, 0.3, 0.1, 0],[0.05, 0.35, 0.2, 0.2, 0.2]])
Observables = [0, 3, 0, 3]

In [None]:
# model class
class Hmm:
    def __init__(self, pi, a, b):
        self.PI = pi # a 1D vector, where q1 is a certain state
        self.A = a # a 2D vector, transition probabilities
        self.B = b
    
    def forward_(self, obs):
        return forward(self.PI, self.A, self.B, obs)
    
    def forward_normalized_(self, obs):
        return forward_normalized(self.PI, self.A, self.B, obs)

In [None]:
def forward(pi, a, b, obs):
    alpha = []
    # Initialization
    for p_pi in range(len(pi)):
        alpha.append([pi[p_pi] * B[p_pi][obs[0]]])
        
    # Recursion
    for v in range(1, len(obs)):
        for j in range(len(pi)):
            summation = 0
            for i in range(len(pi)):
                summation += (alpha[i][v-1] * A[i][j])
            alpha[j].append(summation * B[j][obs[v]])
    return np.array(alpha)


def forward_normalized(pi, a, b, obs):
    alpha = [[],[]]
    coefficient = []
    c = 0
    
    # Initialization
    for i in range(len(pi)):
        alpha[i].append(pi[i] * B[i][obs[0]])
        c += pi[i] * B[i][obs[0]]
    coefficient.append(1.0/c)
    for i in range(len(pi)):
        alpha[i][0] *= coefficient[0]
    
    # Recursion
    for v in range(1, len(obs)):
        for j in range(len(pi)):
            summation = 0
            for i in range(len(PI)):
                summation += (alpha[i][v-1] * A[i][j])
            alpha[j].append(summation * B[j][obs[v]])
       
    for v in range(1, len(obs)):
        summation = 0
        for i in range(len(pi)):
            summation += alpha[i][v]
        c = 1.0/summation
        coefficient.append(c)
    
    for v in range(1, len(obs)):
        for i in range(len(pi)):
            alpha[i][v] *= coefficient[v]
            
    return (np.array(alpha), np.array(coefficient))


def evaluation_unnormalized(alpha):
    eval = []
    for i in range(len(alpha[0])):
        summation = 0
        for pi in range(len(alpha)):
            summation += alpha[pi][i]
        eval.append(summation)
    return eval


def evaluation_normalized(c):
    summation = 0
    for i in range(len(c)):
        summation += np.log(c[i])
    return -summation


def backward(pi, a, b, obs):
    # we make this list in reverse
    beta = [[1],[1]]
    for o in range(1, len(obs)):
        for i in range(len(pi)):
            summation = 0
            for j in range(len(pi)):
                summation += b[j][len(obs) - o -1] * beta[j][o - 1]* a[i][j]
            beta[i].append(summation)
    # this should not be considered when checking time complexity
    # this steps can be avoided, but its easier to do it this way
    beta_rev = [[],[]]
    for i in range(len(pi)):
        for j in range(len(beta[0])):
            beta_rev[i].append(beta[i][len(beta[0])- j - 1])
    return np.array(beta_rev)
           

def backward_normalized(pi, a, b, obs):
    c = forward_normalized(pi, a, b, obs)[1]
#     print(c)
    beta = backward(pi, a, b, obs)
    beta_cap = [[],[]]
    for i in range(len(pi)):
        for j in range(len(obs)):
            beta_cap[i].append(c[j]*beta[i][j])
    return(beta_cap)
        
    
# Smoothing by Forward-Backward Algorithm
def smoothed_probability(alpha, beta, noOfStates, noOfTimeSteps):
    smoothed = [[],[]]
    for i in range(noOfTimeSteps):
        for j in range(noOfStates):
            summation = 0
            for k in range(noOfStates):
                summation += (alpha[k][i]*beta[k][i])
            smoothed[j].append((alpha[j][i] * beta[j][i])/summation)
    return np.array(smoothed)

In [None]:
# testing
A = np.array([[0.7, 0.3],[0.3, 0.7]])
B = np.array([[0.9, 0.1],[0.2, 0.8]])
PI = [0.5, 0.5]
Observations = [0, 0]

print("alpha")
print(forward(PI, A, B, Observations))

print("\nalpha normalized")
print(forward_normalized(PI, A, B, Observations)[0])

print("\ncoefficients")
print(forward_normalized(PI, A, B, Observations)[1])

print("\nbeta")
print(backward(PI, A, B, Observations))

print("\nevaluation normalized: log")
print(evaluation_normalized(forward_normalized(PI, A, B, Observations)[1]))

print("\nsmoothed probability")
print(smoothed_probability(forward_normalized(PI, A, B, Observations)[0], backward(PI, A, B, Observations), len(PI), len(Observations)))

In [None]:
# model = hmm(PI, A, B)
# print(forward(PI, A, B, Observables))

# # evaluation_unnormalized(model.forward(Observables))
# evaluation_normalized(forward_normalized(PI, A, B, Observables)[1])
# backward_normalized(PI, A, B, Observables)

# alpha = forward_normalized(PI, A, B, Observables)[0]
# beta = backward_normalized(PI, A, B, Observables)

# smoothed_probability(alpha, beta, len(PI), len(Observables))
