In [1]:
import numpy as np
import random
import operator

<h2> Initializing the models for performing forward and backward sequences and generating direct samplings.

In [2]:
transition_model = {
    'Rainy':{'Rainy':0.7,'NotRainy':0.3},
    'NotRainy':{'Rainy':0.3,'NotRainy':0.7}
}

observation_model = {
    'Rainy':{'Umbrella':0.9,'NoUmbrella':0.1},
    'NotRainy':{'Umbrella':0.2,'NoUmbrella':0.8}
}

prior_prob_obs = {
    'Rainy':0.5,
    'NotRainy':0.5
}

true_obs = np.array([[0.9,0.0],
                     [0.0,0.2]
                    ])

false_obs = np.array([[0.1,0.0],
                      [0.0,0.8]
                     ])

prior_prob = np.array([[0.5,0.5]])

trans_model = np.array([[0.7,0.3],
                        [0.3,0.7]])


<h2> Functions for generating the forward and backward sequences.

In [3]:
def forward_sequence():
    f_states = []
    f_states.append(prior_prob)
    for i in range(1,len(seq_model)+1):
        #print(seq_model[i-1])
        if seq_model[i-1] == 'Umbrella':
            x = true_obs
        else:
            x= false_obs
        cal_seq = x.dot(trans_model.transpose().dot(np.transpose(f_states[i-1])))
        cal_seq = normalize(cal_seq)
        f_states.append(cal_seq.transpose())
        #print ("\n {} observation has foll prob. \n {}".format(seq_model[i-1],cal_seq))
    return f_states

In [4]:
def backward_sequence():
    b_states =[]
    b_states.append(np.array([[1.0,1.0]]))

    for i in range(1,len(seq_model)+1):
        if seq_model[i-1] == 'Umbrella':
            x = true_obs
        else:
            x = false_obs
        cal_seq = trans_model.dot((x).dot(np.transpose(b_states[i-1])))
        cal_seq = normalize(cal_seq)
        b_states.append(cal_seq.transpose())
        #print("\n {} observation has foll prob. \n {}".format(seq_model[i - 1], cal_seq))

    return b_states

In [5]:
def final_observation(f_state,b_state):

    final_obs=[]
    final_seq=([])
    
    for i in range(0,len(f_state)):
        
        obs_final = (f_state[i]*(b_state[i]))
        obs_final = normalize(obs_final)
        #print(obs_final)
        if obs_final.argmax()==0:
            #print("Rainy",obs_final[0])
            final_obs.append("Rainy")
        else:
            #print("Not Rainy",obs_final[1])
            final_obs.append("Not Rainy")
        final_seq.append(obs_final)
        
    return final_obs,final_seq




In [6]:
def normalize(arr):
    arr = arr/arr.min()
    arr = arr/arr.sum()
    return arr

In [7]:
def direct_sampling(seq_len,sam_num):
    
    t_states=[]
    final_seq =[]
    
    for i in range(sam_num):
        
        episodes=[]
        states_epi=[]
        uniform_dist_seq = np.random.uniform(0,1,seq_len)
        #uniform_dist_seq = np.array([[0.0785260],[ 0.5716089],[ 0.3007914],[  0.0390747],[ 0.9264768],[ 0.2659581],[ 0.7663956],[ 0.7542092],[ 0.6354107],[ 0.8061113],[ 0.1309188],[ 0.1396221],[ 0.6336578],[ 0.6429383],[ 0.3504181],[ 0.7059498],[ 0.8618811],[ 0.6972462],[ 0.6165900],[ 0.4021687 ]])
        #print('Uniform Sequence : \n',uniform_dist_seq)
        if(uniform_dist_seq[0]<prior_prob_obs["Rainy"]):
            states_epi.append("Rainy")
        else:
            states_epi.append("NotRainy")
        
        for j in range(1,seq_len):
            
            #If J is even, then its transition state, else its Observation State.
            
            if (j%2==0):
                
                if(states_epi[-1]=="Rainy"):
                    if(uniform_dist_seq[j]<=transition_model[states_epi[-1]]["NotRainy"] ):
                        states_epi.append("NotRainy")
                    else:
                        states_epi.append("Rainy")
                elif(states_epi[-1]=="NotRainy"):
                    if(uniform_dist_seq[j]<=transition_model[states_epi[-1]]["NotRainy"]):
                        states_epi.append("NotRainy")
                    else:
                        states_epi.append("Rainy")
                
            else:
                
                if(states_epi[-1]=="Rainy"):
                    if(uniform_dist_seq[j]<=observation_model[states_epi[-1]]["NoUmbrella"]):
                        episodes.append(tuple((states_epi[-1],"NoUmbrella")))
                    else:
                        episodes.append(tuple((states_epi[-1],"Umbrella")))
                elif(states_epi[-1]=="NotRainy"):
                    if(uniform_dist_seq[j]<=observation_model[states_epi[-1]]["NoUmbrella"]):
                        episodes.append(tuple((states_epi[-1],"NoUmbrella")))
                    else:
                        episodes.append(tuple((states_epi[-1],"NoUmbrella")))
                    
        final_seq.append(episodes)
        #print("States at {} : {}".format(i,episodes))
    return final_seq

In [9]:
if __name__ == "__main__":
    
    seq_model = ['Umbrella','Umbrella','Not Umbrella','Umbrella','Umbrella']
    sequence_length= 20
    sampling_number=15
    print('Starting Main...') 

    forward_seq = forward_sequence()
    print("\nForward Sequence : ",forward_seq)
    
    backward_seq = backward_sequence()
    print("\nBackward Sequence : ",backward_seq)
    
    
    final_obs,final_seq = final_observation(forward_seq,backward_seq[::-1])
    print("\nFINAL SEQ OF STATES : \n {} \n".format(final_obs))
    #rint("\nFinal SEQ of Prob : \n {} \n".format(final_seq))


    
    direct_sampling_seq = direct_sampling(sequence_length, sampling_number)
    #print("\n\nDirect Sampling Seq : ",direct_sampling_seq)
    
    
    #calculating the number of times a sequence occurs : 
    counter=0
    for i in range(len(direct_sampling_seq)):
        seq_sampled_list=[]
        for states in direct_sampling_seq[i]:
            seq_sampled_list.append(states[0])
        #if i==0:
        #    seq_sampled_list = ['Rainy', 'Rainy', 'Rainy', 'Rainy', 'Not Rainy', 'Rainy', 'Rainy', 'Rainy', 'Rainy', 'Rainy']
        print ('seq_sampled_list [{}] : {}'.format(i,seq_sampled_list))
        for j in range(len(seq_sampled_list)-len(seq_model)):
            for k in range(len(final_obs)):
                if (seq_sampled_list[j+k] != final_obs[k]):
                    break
            if (k== len(final_obs)-1):
                counter+=1
            print('seq[j+k] : ',(seq_sampled_list[j+k]!= final_obs[k]),k)    
        print ('counter : ',counter)
    
    
    # The probability of the sequence to appear from direct sampling sequence is the number of appearence over the number of produced states
    p_sequence = counter / (sequence_length*sampling_number)
    print ('Probability of Sequence : ',p_sequence)
    
    
    

        
     
    probability = 1
    for i in range(0,len(final_obs)):
        if final_obs[i] == "Rainy":
            probability = probability * final_seq[i][0][0] # as final_seq is alist containing arrays so 
                                                            # we first put the row, then the matrix and then its 
                                                            #particular column
        else :
            probability = probability * final_seq[i][0][1]
    print ("Probability for Forward Backward Algorithm : ",probability)
        
    
    

Starting Main...

Forward Sequence :  [array([[0.5, 0.5]]), array([[0.81818182, 0.18181818]]), array([[0.88335704, 0.11664296]]), array([[0.19066794, 0.80933206]]), array([[0.730794, 0.269206]]), array([[0.86733889, 0.13266111]])]

Backward Sequence :  [array([[1., 1.]]), array([[0.62727273, 0.37272727]]), array([[0.65334282, 0.34665718]]), array([[0.37626718, 0.62373282]]), array([[0.5923176, 0.4076824]]), array([[0.64693556, 0.35306444]])]

FINAL SEQ OF STATES : 
 ['Rainy', 'Rainy', 'Rainy', 'Not Rainy', 'Rainy', 'Rainy'] 

seq_sampled_list [0] : ['Rainy', 'Rainy', 'Rainy', 'Rainy', 'Rainy', 'Rainy', 'Rainy', 'Rainy', 'NotRainy', 'NotRainy']
seq[j+k] :  True 3
seq[j+k] :  True 3
seq[j+k] :  True 3
seq[j+k] :  True 3
seq[j+k] :  True 3
counter :  0
seq_sampled_list [1] : ['NotRainy', 'NotRainy', 'NotRainy', 'NotRainy', 'NotRainy', 'NotRainy', 'NotRainy', 'Rainy', 'Rainy', 'NotRainy']
seq[j+k] :  True 0
seq[j+k] :  True 0
seq[j+k] :  True 0
seq[j+k] :  True 0
seq[j+k] :  True 0
counter