<a href="https://colab.research.google.com/github/shauryajain1/SpeechRecognition/blob/main/UCS749_Lab3_Baum_Welch_Algorithm_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

#generating initial probabilities

#transition probabilities  # tansition probabilities define how likely the model is to move between different hidden states
transition = np.array([[0.8,0.1],
                       [0.1,0.8]])
#Emission probabilities  # Emission probabilities define how likely a given observation is emitted from a specific state.
emission = np.array([[0.1,0.2,0.7],
                     [0.7,0.2,0.1]])

#defining states and sequence symbols
states = ['H','C']
states_dic = {'H':0, 'C':1}  # Define the two hidden states, Hot (H) and Cold (C), and map them to indices (0 for H, 1 for C).
sequence_syms = {'1':0,'2':1,'3':2}  # Maps observable symbols (1, 2, 3) to indices (0, 1, 2) for matrix indexing.
sequence = ['1','2','3']   # Represents the possible observations (1, 2, 3).

#test sequence
test_sequence = '331122313'  # sequence of observed symbols
test_sequence = [x for x in test_sequence]   # It is converted into a list of individual symbols

#probabilities of going to end state, terminating the sequence
end_probs = [0.1, 0.1]  # 10% chance of ending when in H and 10% chance of ending when in C.
#probabilities of going from start state
start_probs = [0.5, 0.5]   # 50% chance of starting in H and 50% chance of starting in C.

In [None]:
for each in enumerate(test_sequence):  #The enumerate() function in Python adds an index to each element
  print(each)

(0, '3')
(1, '3')
(2, '1')
(3, '1')
(4, '2')
(5, '2')
(6, '3')
(7, '1')
(8, '3')


In [None]:
# This code implements the forward algorithm for calculating the likelihood of an observed sequence in a Hidden Markov Model (HMM)
def forward_probs():
    # node values stored during forward algorithm
    node_values_fwd = np.zeros((len(states), len(test_sequence)))  # It is a 2D array where rows correspond to hidden states (H, C) and column time steps

    for i, sequence_val in enumerate(test_sequence):
        for j in range(len(states)):
            # if first sequence value then do this
            if (i == 0):       # for the initialization , Probability of starting in state j, Probability of emitting the observed symbol from state j
                node_values_fwd[j, i] = start_probs[j] * emission[j, sequence_syms[sequence_val]]
            # else perform this
            else:   # when i>0
                values = [node_values_fwd[k, i - 1] * emission[j, sequence_syms[sequence_val]] * transition[k, j] for k in   # Likelihood of being in state j is calculated
                          range(len(states))]
                node_values_fwd[j, i] = sum(values)
     # node_values_fwd[k, i - 1]: Probability of being in state k at the previous time step.
     #transition[k, j]: Probability of transitioning from state k to state j
     # emission[j, sequence_syms[sequence_val]]: Probability of emitting the current observed symbol from state j
     # We consider all possible previous states (k) that could transition to state j (summing over all state)

    #end state value, After processing all observations, we calculate the probability of transitioning to the end state
    end_state = np.multiply(node_values_fwd[:, -1], end_probs) # node_values_fwd[:, -1]: Likelihoods of being in each state at the last time step
    end_state_val = sum(end_state)   # end_probs: Probabilities of transitioning from each state to the end state.
    return node_values_fwd, end_state_val

In [None]:
#function to find backward probabilities
def backward_probs():
    # node values stored during forward algorithm
    node_values_bwd = np.zeros((len(states), len(test_sequence)))

    #for i, sequence_val in enumerate(test_sequence):
    for i in range(1,len(test_sequence)+1):
        for j in range(len(states)):
            # if first sequence value then do this
            if (-i == -1):
                node_values_bwd[j, -i] = end_probs[j]
            # else perform this
            else:
                values = [node_values_bwd[k, -i+1] * emission[k, sequence_syms[test_sequence[-i+1]]] * transition[j, k] for k in range(len(states))]
                node_values_bwd[j, -i] = sum(values)

    #start state value
    start_state = [node_values_bwd[m,0] * emission[m, sequence_syms[test_sequence[0]]] for m in range(len(states))]
    start_state = np.multiply(start_state, start_probs)
    start_state_val = sum(start_state)
    return node_values_bwd, start_state_val

In [None]:
# This function calculates (xi) probabilities, also known as the state transition probabilities in the Forward-Backward Algorithm of HMM
# forward_val: The total probability of the observation sequence P(O|lamda)

def si_probs(forward, backward, forward_val):

    si_probabilities = np.zeros((len(states), len(test_sequence)-1, len(states)))

    for i in range(len(test_sequence)-1):
        for j in range(len(states)):
            for k in range(len(states)):
                si_probabilities[j,i,k] = ( forward[j,i] * backward[k,i+1] * transition[j,k] * emission[k,sequence_syms[test_sequence[i+1]]] ) \
                                                    / forward_val
    return si_probabilities    # Return a 3D matrix [j,i,k] where j is state at time t, i is time step and k is state at time t+1

In [None]:
# This function calculates (gamma) probabilities, which represent the probability of being in a particular state at a specific time step, given the entire observation sequence.
#function to find gamma probabilities
# alpha_t(j) is Forward probability of being in state s_j at time t
# beta_t(j) is Backward probability of being in state s_j at time t
# P(O|lamda) is total probability of the observation sequence : foeward_val
def gamma_probs(forward, backward, forward_val):

    gamma_probabilities = np.zeros((len(states), len(test_sequence)))

    for i in range(len(test_sequence)):
        for j in range(len(states)):
            #gamma_probabilities[j,i] = ( forward[j,i] * backward[j,i] * emission[j,sequence_syms[test_sequence[i]]] ) / forward_val
            gamma_probabilities[j, i] = (forward[j, i] * backward[j, i]) / forward_val   # j is state index and i is time step

    return gamma_probabilities

In [None]:
# This code implements the Baum-Welch Algorithm, which is an Expectation-Maximization (EM) algorithm used to train a HMM
for iteration in range(2000):

    print('\nIteration No: ', iteration + 1)
    # print('\nTransition:\n ', transition)
    # print('\nEmission: \n', emission)

    #Calling probability functions to calculate all probabilities
    fwd_probs, fwd_val = forward_probs()
    bwd_probs, bwd_val = backward_probs()
    si_probabilities = si_probs(fwd_probs, bwd_probs, fwd_val)
    gamma_probabilities = gamma_probs(fwd_probs, bwd_probs, fwd_val)

    # print('Forward Probs:')
    # print(np.matrix(fwd_probs))
    #
    # print('Backward Probs:')
    # print(np.matrix(bwd_probs))
    #
    # print('Si Probs:')
    # print(si_probabilities)
    #
    # print('Gamma Probs:')
    # print(np.matrix(gamma_probabilities))

    #caclculating 'a' and 'b' matrices
    a = np.zeros((len(states), len(states)))
    b = np.zeros((len(states), len(sequence_syms)))

    #'a' matrix
    for j in range(len(states)):
        for i in range(len(states)):
            for t in range(len(test_sequence)-1):
                a[j,i] = a[j,i] + si_probabilities[j,t,i]

            denomenator_a = [si_probabilities[j, t_x, i_x] for t_x in range(len(test_sequence) - 1) for i_x in range(len(states))]
            denomenator_a = sum(denomenator_a)

            if (denomenator_a == 0):
                a[j,i] = 0
            else:
                a[j,i] = a[j,i]/denomenator_a

    #'b' matrix
    for j in range(len(states)): #states
        for i in range(len(sequence)): #seq
            indices = [idx for idx, val in enumerate(test_sequence) if val == sequence[i]]
            numerator_b = sum( gamma_probabilities[j,indices] )
            denomenator_b = sum( gamma_probabilities[j,:] )

            if (denomenator_b == 0):
                b[j,i] = 0
            else:
                b[j, i] = numerator_b / denomenator_b


    print('\nMatrix a:\n')
    print(np.matrix(a.round(decimals=4)))
    print('\nMatrix b:\n')
    print(np.matrix(b.round(decimals=4)))

    transition = a
    emission = b

    new_fwd_temp, new_fwd_temp_val = forward_probs()
    print('New forward probability: ', new_fwd_temp_val)
    diff =  np.abs(fwd_val - new_fwd_temp_val)
    print('Difference in forward probability: ', diff)

    if (diff < 0.0000001):  # If the difference is below a very small threshold (0.0000001), the model has converged and the loop breaks early.
        break


Iteration No:  1

Matrix a:

[[0.7859 0.2141]
 [0.2094 0.7906]]

Matrix b:

[[0.2005 0.1735 0.626 ]
 [0.5016 0.284  0.2145]]
New forward probability:  5.695152119409513e-06
Difference in forward probability:  5.002073262273513e-06

Iteration No:  2

Matrix a:

[[0.7185 0.2815]
 [0.2161 0.7839]]

Matrix b:

[[0.235  0.1419 0.6231]
 [0.4256 0.2976 0.2768]]
New forward probability:  7.128566815415427e-06
Difference in forward probability:  1.4334146960059136e-06

Iteration No:  3

Matrix a:

[[0.6595 0.3405]
 [0.2027 0.7973]]

Matrix b:

[[0.2536 0.1131 0.6333]
 [0.3925 0.3032 0.3043]]
New forward probability:  7.898268000620344e-06
Difference in forward probability:  7.697011852049176e-07

Iteration No:  4

Matrix a:

[[0.6083 0.3917]
 [0.1856 0.8144]]

Matrix b:

[[0.2609 0.0838 0.6553]
 [0.378  0.3076 0.3144]]
New forward probability:  8.505915984403414e-06
Difference in forward probability:  6.076479837830701e-07

Iteration No:  5

Matrix a:

[[0.5653 0.4347]
 [0.1681 0.8319]]

Matri