----------START----------

In [1]:
import pandas as pd
from pathlib import Path

In [2]:
# Set it to None to display all columns in the dataframe
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [3]:
def read_txt_file_data(path_to_file, filename) -> pd.DataFrame:
    """Function to read file"""
    data_directory = Path(path_to_file)
    file_to_read = data_directory / filename
    # data_frame_temp = pd.read_csv(file_to_read)# removes first value in total calculation
    data_frame_temp = pd.read_csv(file_to_read, header=None)

    return data_frame_temp

In [4]:
def get_transition_probability(data_frame: pd.DataFrame):
    """Calculate the Transition Probabilities"""
    dict_transition_probabilities = {}
    for row_index in range(len(data_frame) - 1):
        key01 = data_frame.iloc[row_index, 0]
        key02 = data_frame.iloc[row_index + 1, 0]
        final_key = f"{key01}_{key02}"
        dict_transition_probabilities[final_key] = dict_transition_probabilities.get(final_key, 0) + 1

    my_dictionary_of_keys = {}
    for key in dict_transition_probabilities.keys():
        temp_key = key.split("_")[0]
        my_dictionary_of_keys[temp_key] = my_dictionary_of_keys.get(temp_key, 0) + dict_transition_probabilities.get(
            key)

    for key in dict_transition_probabilities.keys():
        temp_key = key.split("_")[0]
        dict_transition_probabilities[key] = round(
            dict_transition_probabilities[key] / my_dictionary_of_keys.get(temp_key), 4)

    return dict_transition_probabilities

In [5]:
def get_emission_probability(data_frame: pd.DataFrame):
    """Calculate the Emission Probabilities"""
    dict_emission_probabilities = {}
    for row_index in range(len(data_frame)):
        key01 = data_frame.iloc[row_index, 0]
        key02 = data_frame.iloc[row_index, 1]
        final_key = f"{key01}_{key02}"
        dict_emission_probabilities[final_key] = dict_emission_probabilities.get(final_key, 0) + 1

    dict_emission_keys = {}
    for key in dict_emission_probabilities.keys():
        temp_key = key.split("_")[0]
        dict_emission_keys[temp_key] = dict_emission_keys.get(temp_key, 0) + dict_emission_probabilities.get(
            key)

    for key in dict_emission_probabilities.keys():
        temp_key = key.split("_")[0]
        dict_emission_probabilities[key] = round(dict_emission_probabilities[key] / dict_emission_keys.get(temp_key), 4)

    return dict_emission_probabilities

In [6]:
def initialize_hmm(initial_sate, row_hidden_states, col_visible_states):
    """Initialize the HMM Matrix"""
    col_visible_states_length = len(col_visible_states) + 2
    row_hidden_states_length = len(row_hidden_states) + 1

    hmm_array = [[0 for col in range(col_visible_states_length)] for row in range(row_hidden_states_length)]

    for r_index in range(row_hidden_states_length - 1):
        hmm_array[r_index + 1][0] = row_hidden_states[r_index]
        if (initial_sate == hmm_array[r_index + 1][0]):
            hmm_array[r_index + 1][1] = 1

    for c_index in range(col_visible_states_length - 2):
        hmm_array[0][c_index + 2] = col_visible_states[c_index]

    hmm_array[0][1] = "t=0"
    hmm_array[0][0] = "states/obs"
    return hmm_array

In [7]:
def calculate_initial_probability(data_frame):
    total = data_frame.shape[0]
    my_map = {}
    for row in range(total):
        key = data_frame.iloc[row, 0]
        my_map[key] = my_map.get(key, 0) + 1

    for key in my_map.keys():
        my_map[key] = my_map.get(key, 0) / total

    return my_map

In [8]:
def convert_list_to_data_frame(input_list) -> pd.DataFrame:
    df = pd.DataFrame(input_list)
    df = df.sort_index(axis=0)
    return df

In [9]:
def convert_dict_to_data_frame(input_items) -> pd.DataFrame:
    df = pd.DataFrame(sorted(input_items))
    return df

In [10]:
def get_summation_of_transition_probabilities(input_dict, input_transition_map: dict):
    """Calculating the Probabilities"""
    total = 0
    # .10466 * .5019 =
    for key in input_dict.keys():
        total += round(input_dict.get(key) * input_transition_map.get(key), 5)
    return total

In [11]:
def do_viterbi(hmm_matrix):
    """Performing the Viterbi Algorithm"""
    start_row = 1  # row
    start_column = 2  # column 2
    current_column = start_column
    current_row = start_row
    out_list = []
    for visible_states in hmm_matrix.iloc[0, start_column:]:
        path_map = {}
        for hidden_states in hmm_matrix.iloc[start_row:, 0]:
            path_map[hmm_matrix.iloc[current_row, current_column]] = hidden_states
            current_row += 1
        current_column += 1
        current_row = start_row
        out_list.append(path_map.get(not(path_map.keys())))

    return out_list

In [12]:
def final_emission_calculation(initial_transition_map, alpha_matrix):
    temp_dict = {}
    for row in range(1, alpha_matrix.shape[0]):
        temp_key = alpha_matrix.iloc[row, 0]
        value = alpha_matrix.iloc[row, alpha_matrix.shape[1] - 1]
        temp_dict.update({temp_key: value})

    total = 0
    for key in temp_dict.keys():
        total += round(temp_dict.get(key) * initial_transition_map.get(key), 6)

    return total * 1

In [13]:
def calculate_hmm(alpha, emission_probability, transition_probability, visible_states):
    start_row = 1  # row
    start_column = 2  # column 2

    df_alpha = convert_list_to_data_frame(alpha)
    df_V = convert_list_to_data_frame(visible_states)

    current_column = start_column
    current_row = start_row

    for visible_states in df_alpha.iloc[0, start_column:]:
        for hidden_states in df_alpha.iloc[start_row:, 0]:
            emission_key = f"{hidden_states}_{visible_states}"

            temp_dict = {}
            for row in range(1, df_alpha.shape[0]):
                temp_key = f"{df_alpha.iloc[row, 0]}_{hidden_states}"
                temp_dict.update({temp_key: df_alpha.iloc[row, current_column - 1]})

            current_aij = get_summation_of_transition_probabilities(temp_dict, transition_probability)
            current_bjk = emission_probability.get(emission_key)
            df_alpha.iloc[current_row, current_column] = round(current_aij * current_bjk, 5)

            current_row += 1
        current_column += 1
        current_row = start_row
    return df_alpha

In [15]:
training_data_frame = read_txt_file_data("F:\Subjects\Spring 2020\Pattern\Project\Project 2", "Project2Data.txt")

unique_hidden_state_list = list((training_data_frame.iloc[:, 0]).unique())
input_visible_states = ['no', 'no', 'no', 'yes', 'no', 'no', 'yes', 'yes', 'no', 'yes']
initial_state = "sunny"

print("=====================================================================================================")
print("Observations", input_visible_states)
print("Initial State :", initial_state)
print("=====================================================================================================")

transition_map = get_transition_probability(training_data_frame)
emission_map = get_emission_probability(data_frame=training_data_frame)
# print("Transition Map :", transition_map)
# print("Emission Map :", emission_map)

alpha_matrix = initialize_hmm(initial_state, unique_hidden_state_list, input_visible_states)
initial_probability_map = calculate_initial_probability(training_data_frame)

alpha_matrix = calculate_hmm(alpha=alpha_matrix, emission_probability=emission_map,
                             transition_probability=transition_map,
                             visible_states=input_visible_states)

final_emission_probability = final_emission_calculation(initial_probability_map, alpha_matrix)

df_transition = convert_dict_to_data_frame(transition_map.items())
print("a_ij(Transition Probability)")
# print(df_transition)
df_transition_matrix = pd.DataFrame({'foggy': [transition_map.get("foggy_foggy"), transition_map.get("foggy_rainy"), transition_map.get("foggy_sunny")],
                   'rainy': [transition_map.get("rainy_foggy"), transition_map.get("rainy_rainy"), transition_map.get("rainy_sunny")],
                   'sunny': [transition_map.get("sunny_foggy"), transition_map.get("sunny_rainy"), transition_map.get("sunny_sunny")]},
                  index=['foggy', 'rainy', 'sunny'])
print(df_transition_matrix)

print("=====================================================================================================")

df_emission = convert_dict_to_data_frame(emission_map.items())
print("b_jk(Emission Probability)")
# print(df_emission)
df_emission_matrix = pd.DataFrame(
                    {'yes': [emission_map.get("foggy_yes"), emission_map.get("rainy_yes"), emission_map.get("sunny_yes")],
                   'no': [emission_map.get("foggy_no"), emission_map.get("rainy_no"), emission_map.get("sunny_no")]},
                  index=['foggy', 'rainy', 'sunny'])
print(df_emission_matrix)

viterbi_path = do_viterbi(alpha_matrix)
print("=====================================================================================================")
print("Final Alpha Matrix:")
print(alpha_matrix)
print("=====================================================================================================")
print("Viterbi Path (most probable path):", viterbi_path)
print("=====================================================================================================")
print("Final emission probability:", final_emission_probability)

Observations ['no', 'no', 'no', 'yes', 'no', 'no', 'yes', 'yes', 'no', 'yes']
Initial State : sunny
a_ij(Transition Probability)
        foggy   rainy   sunny
foggy  0.5019  0.2245  0.1501
rainy  0.2912  0.5796  0.0548
sunny  0.2069  0.1959  0.7951
b_jk(Emission Probability)
          yes      no
foggy  0.3027  0.6973
rainy  0.8122  0.1878
sunny  0.0850  0.9150
Final Alpha Matrix:
            0    1        2        3        4        5        6        7  \
0  states/obs  t=0       no       no       no      yes       no       no   
1       foggy    0  0.10466  0.11439  0.09994  0.03541  0.02326  0.01328   
2       rainy    0  0.01029  0.01433  0.01349   0.0489  0.00757  0.00248   
3       sunny    1  0.72752  0.55094  0.42504  0.03071  0.03782  0.03327   

         8        9       10       11  
0      yes      yes       no      yes  
1   0.0037  0.00107    0.001  0.00024  
2  0.00579  0.00372  0.00047   0.0005  
3  0.00252  0.00033  0.00111   0.0001  
Viterbi Path (most probable path): 

----------END----------