### CECS 550 Project 2: Hidden Markov Model
Ian Schenck

Victor Castellanos

In [1]:
import pandas as pd
import numpy as np
from pathlib import Path

In [2]:
# load a file that uses , as delimiter
def load_file(path, names):
    if not path.is_file():
        raise FileNotFoundError(str(path))
    data = pd.read_csv(path, sep=",", names=names, header=None)
    return data

# load data for hmm
def load_df():
    cols = ["state", "emission"]
    file = Path.cwd() / "data.txt"
    return load_file(file, cols)

# calculate state transition probabilities (a)
def state_transition_probability(data):
    # add a column of data for the next state after each state
    data['next'] = data['state'].shift(periods=-1)
    # group by state and next state, and calculate probability for each transition
    b = pd.DataFrame(data=data.groupby(['state', 'next']).size()).reset_index()
    b['subtotal'] = b.groupby('state')[0].transform('sum')
    b['prob'] = b[0]/b['subtotal']
    b = b.drop([0, 'subtotal'], axis=1)
    # rearrange dataframe to desired shape and order
    b = b.pivot(index='state', columns='next', values='prob')
    b = b.reindex(["sunny", "foggy", "rainy"], columns=["sunny", "foggy", "rainy"]) 
    return b

# calculate state -> emission probabilities (b)
def emission_probability(data):
    # group by state and emission, and calculate probability for each emission
    b = pd.DataFrame(data=data.groupby(['state', 'emission'])
                     .size()).reset_index()
    b['subtotal'] = b.groupby('state')[0].transform('sum')
    b['prob'] = b[0]/b['subtotal']
    b = b.drop([0, 'subtotal'], axis=1)
    # rearrange dataframe to desired shape and order
    b = b.pivot(index='state', columns='emission', values='prob')
    b = b.reindex(["sunny", "foggy", "rainy"], columns=["yes", "no"]) 
    return b

# generate column names for hmm based on sequence of observations
def seq_to_col(sequence):
    new_seq = []
    for i, val in enumerate(sequence):
        val = f"({i}) " + val
        new_seq.append(val)
    return new_seq

def find_most_likely(hmm):
    #find max of each column
    i = np.where(hmm == np.amax(hmm))
    i = list(zip(i[1],i[0]))
    i = pd.DataFrame(i)
    #sort the columns max in orders
    sorted = i.sort_values(by=[0], inplace = False )
    sorted = sorted.reset_index()
    
    # decide most likely states
    most_likely = ""
    for x in range(0,sorted.shape[0]):
        if(x == 0):
            pass
        elif(sorted[1][x] == 0 ):
            most_likely += "sunny "
        elif(sorted[1][x] == 1):
            most_likely += "foggy "
        else:
            most_likely += "rainy "
    return most_likely[:-1]

# calculate probability of given sequence
def probability_of_sequence(data, sequence):
    sa = state_transition_probability(data)
    a = sa.to_numpy()
    b = emission_probability(data)
    hidden_states = ["sunny", "foggy", "rainy"]
    # initialize hmm with starting probabilities (sunny=1.0, foggy=0.0, rainy=0.0)
    hmm = pd.DataFrame(data={"0" : (1.0, 0.0, 0.0)}, index=hidden_states)
    hmm = hmm.join(pd.DataFrame(0., columns = sequence, index = hidden_states))
    
    # calculate probabilities at each state using 
    for index in range(1, hmm.shape[1]):
        # get current column (for current alpha and emission probabilities)
        current_obs = hmm.iloc[:, index]
        # get previous column (for previous alpha and transition probabilities)
        last_obs = hmm.iloc[:, index-1]
        # for each state in current column
        for j in range(current_obs.size):
            trans = 0
            # for each state in previous column
            # calculate sum(alpha_i * a_ij)
            for i in range(last_obs.size):
                trans += (last_obs[i] * a[i,j])
            # calculate sum(alpha_i * a_ij) * b_jv(j)  
            current_obs[j] = trans * b.at[hmm.iloc[j].name, current_obs.name[4:]]
    # probability of events is the sum of the probabilities of the last emission
    prob_of_sequence = hmm.iloc[:,-1].sum()
    
    # calculate most likely sequence
    most_likely = find_most_likely(hmm)
    
    return sa, b, prob_of_sequence, most_likely, hmm

In [3]:
data = load_df()

In [4]:
# change this sequence to whatever you want
sequence = ["yes", "no", "yes", "no"]

In [5]:
a, b, probability_of_sequence, most_likely_sequence, hmm = probability_of_sequence(data, seq_to_col(sequence))

In [6]:
# show hmm probabilities
hmm

Unnamed: 0,0,(0) yes,(1) no,(2) yes,(3) no
sunny,1.0,0.067602,0.065758,0.005106,0.007715
foggy,0.0,0.045433,0.029941,0.008081,0.00552
rainy,0.0,0.044484,0.00802,0.013782,0.001994


In [7]:
# output: a_ij matrix
a

next,sunny,foggy,rainy
state,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
sunny,0.795132,0.150101,0.054767
foggy,0.206897,0.501916,0.291188
rainy,0.195918,0.22449,0.579592


In [8]:
# output: b_jk matrix
b

emission,yes,no
state,Unnamed: 1_level_1,Unnamed: 2_level_1
sunny,0.08502,0.91498
foggy,0.302682,0.697318
rainy,0.812245,0.187755


In [9]:
# output: probability of the HMM producing the given visible state
probability_of_sequence

0.015229218224799322

In [10]:
# output: sequence of hidden states that given visible states generate
most_likely_sequence

'sunny sunny rainy sunny'