In [8]:
from typing import List, Dict, Tuple
import pandas as pd

In [20]:
def text_to_input(lines):
    pi_input = lines[0].strip()

    states_input = [line.strip() for line in lines[2].split()]
    
    trans_matrix = []
    for line in lines[5:]:
        trans_matrix.append(line.strip().split()[1:])

    trans_df_input = pd.DataFrame(trans_matrix, columns=states_input, index=states_input)

    return pi_input, states_input, trans_df_input

with open('01_ProbabilityHMMPath/inputs/test4.txt', 'r') as file:
    lines = file.readlines()

pi_input, states_input, trans_df_input = text_to_input(lines)
print(pi_input)
print(states_input)
print(trans_df_input)

BCAACAADD
['A', 'B', 'C', 'D']
     A    B    C    D
A  0.1    0  0.3  0.6
B  0.5  0.4  0.1    0
C  0.3  0.3  0.3  0.1
D  0.2  0.5  0.1  0.2


In [21]:
def prob_hidden_path(pi: str, states: List[str], trans_df: pd.DataFrame):
    probability = float(1/len(states))
    for i in range(len(pi)):
        curr_symbol = pi[i]
        if i != len(pi)-1:
            next_symbol = pi[i+1]
            added_prob = float(trans_df.loc[curr_symbol, next_symbol])
            probability = probability * added_prob
    return probability

print(prob_hidden_path(pi_input, states_input, trans_df_input))

8.100000000000001e-07


In [27]:
def given_hidden_path_inputs(lines):
    x_input = lines[0].strip()

    alphabet_input = [line.strip() for line in lines[2].split()]
    
    pi_input = lines[4].strip()

    states_input = [line.strip() for line in lines[6].split()]

    emissions_matrix = []
    for line in lines[9:]:
        emissions_matrix.append(line.strip().split()[1:])

    emissions_df_input = pd.DataFrame(emissions_matrix, columns=alphabet_input, index=states_input)

    return x_input, alphabet_input, pi_input, states_input, emissions_df_input

with open('dataset_40267_10.txt', 'r') as file:
    lines = file.readlines()

x_input, alphabet_input, pi_input, states_input, emissions_df_input = given_hidden_path_inputs(lines)
print(x_input)
print(alphabet_input)
print(pi_input)
print(states_input)
print(emissions_df_input)

zxzzyyzxzzxxxzyzzyyxzyxxxxyxyxzyyxzyxyxzzyzzxyzyyx
['x', 'y', 'z']
ABBABBBBABBABBBABBAABBBAABABBBABAAABAABBBBBBABAABA
['A', 'B']
       x      y      z
A  0.274  0.145  0.581
B  0.109  0.254  0.637


In [None]:
def prob_outcome_given_hidden_path(x:str, pi: str, emissions_df: pd.DataFrame):
    probability = 1
    for i in range(len(pi)):
        curr_pi = pi[i]
        curr_x = x[i]
        added_prob = float(emissions_df.loc[curr_pi, curr_x])
        probability = probability * added_prob
    return probability

print(prob_outcome_given_hidden_path(x_input, pi_input, emissions_df_input))

3.0866055169952337e-28


In [124]:
def viterbi_path_inputs(lines):
    x_input = lines[0].strip()

    alphabet_input = [line.strip() for line in lines[2].split()]

    states_input = [line.strip() for line in lines[4].split()]

    trans_matrix = []
    trans_matrix_start = 7
    for line in lines[trans_matrix_start:trans_matrix_start + len(states_input)]:
        trans_matrix.append(line.strip().split()[1:])

    trans_df_input = pd.DataFrame(trans_matrix, columns=states_input, index=states_input).astype(float)

    emissions_matrix = []
    emissions_start = trans_matrix_start + len(states_input) + 2
    for line in lines[emissions_start:emissions_start + len(states_input)]:
        emissions_matrix.append(line.strip().split()[1:])

    emissions_df_input = pd.DataFrame(emissions_matrix, columns=alphabet_input, index=states_input).astype(float)

    return x_input, alphabet_input, states_input, trans_df_input, emissions_df_input

with open('04_OutcomeLikelihood/inputs/sample.txt', 'r') as file:
    lines = file.readlines()

x_input, alphabet_input, states_input, trans_df_input, emissions_df_input = viterbi_path_inputs(lines)
print(x_input)
print(alphabet_input)
print(states_input)
print(trans_df_input)
print(emissions_df_input)

xzyyz
['x', 'y', 'z']
['A', 'B']
       A      B
A  0.303  0.697
B  0.831  0.169
       x      y      z
A  0.533  0.065  0.402
B  0.342  0.334  0.324


In [121]:
def viterbi_algorithm(x: str, alphabet: List[str], states: List[str],
                      trans_df: pd.DataFrame, emissions_df: pd.DataFrame):
    pi = ''
    backtrack_df = pd.DataFrame(1, index=states, columns=list(x)).astype(object)
    viterbi_graph = {state: [0] * len(x) for state in states}
    for j in range(len(x)):
        for curr_state in states:
            prob_emission_given_state = emissions_df.loc[curr_state, x[j]]
            if j == 0:
                viterbi_graph[curr_state][j] = prob_hidden_path(pi, states, trans_df) * prob_emission_given_state
                backtrack_df.loc[curr_state, x[j]] = 'S'
            else:
                max_prob = 0.0
                max_prev_state = states[0]
                for prev_state in states:
                    prob_trans = trans_df.loc[prev_state, curr_state]
                    current_prob = viterbi_graph[prev_state][j - 1] * prob_trans * prob_emission_given_state
                    if current_prob > max_prob:
                        max_prob = current_prob
                        max_prev_state = prev_state
                viterbi_graph[curr_state][j] = max_prob
                backtrack_df.iloc[states.index(curr_state), j] = max_prev_state
    max_prob = 0
    most_probable_state = states[0]
    for state in states:
        if viterbi_graph[state][-1] > max_prob:
            max_prob = viterbi_graph[state][-1]
            most_probable_state = state
    pi = most_probable_state + pi

    current_state = most_probable_state
    for i in range(len(x)-1, 0, -1):
        prev_state = backtrack_df.iloc[states.index(current_state), i]
        pi = prev_state + pi
        current_state = prev_state
    return pi
print(viterbi_algorithm(x_input, alphabet_input, states_input, trans_df_input, emissions_df_input))

BBCAAAAAAACBCBCAAAAACBCAAAAAAAAAAAAAAAAAAAAAAAAACBCAAAAAAAAAAAAAAAAAAAACBCAAAAAAAAAAACBCBCACBCAAAAAA


In [134]:
def outcome_likelihood_inputs(lines):
    x_input = lines[0].strip()

    alphabet_input = [line.strip() for line in lines[2].split()]

    states_input = [line.strip() for line in lines[4].split()]

    trans_matrix = []
    trans_matrix_start = 7
    for line in lines[trans_matrix_start:trans_matrix_start + len(states_input)]:
        trans_matrix.append(line.strip().split()[1:])

    trans_df_input = pd.DataFrame(trans_matrix, columns=states_input, index=states_input).astype(float)

    emissions_matrix = []
    emissions_start = trans_matrix_start + len(states_input) + 2
    for line in lines[emissions_start:emissions_start + len(states_input)]:
        emissions_matrix.append(line.strip().split()[1:])

    emissions_df_input = pd.DataFrame(emissions_matrix, columns=alphabet_input, index=states_input).astype(float)

    return x_input, alphabet_input, states_input, trans_df_input, emissions_df_input

with open('dataset_40269_4.txt', 'r') as file:
    lines = file.readlines()

x_input, alphabet_input, states_input, trans_df_input, emissions_df_input = viterbi_path_inputs(lines)
print(x_input)
print(alphabet_input)
print(states_input)
print(trans_df_input)
print(emissions_df_input)

xxyzxxxzxyxzzyzyyyxzxyyzxyxzzzyxxxxyxzzxxzzyxzzzyzxzyzzyxxzyzxyyzxxzxzxzzyxzyzxzxxxxxxyzxzxyyyzxzxyx
['x', 'y', 'z']
['A', 'B', 'C', 'D']
       A      B      C      D
A  0.155  0.364  0.102  0.379
B  0.426  0.190  0.050  0.334
C  0.354  0.032  0.329  0.285
D  0.372  0.343  0.029  0.256
       x      y      z
A  0.046  0.498  0.456
B  0.062  0.496  0.442
C  0.412  0.202  0.386
D  0.223  0.540  0.237


In [135]:
def outcome_likelihood(x: str, alphabet: List[str], states: List[str],
                      trans_df: pd.DataFrame, emissions_df: pd.DataFrame):
    graph = {state: [0] * len(x) for state in states}
    for j in range(len(x)):
        for curr_state in states:
            prob_emission_given_state = emissions_df.loc[curr_state, x[j]]
            if j == 0:
                graph[curr_state][j] = 1/len(states) * prob_emission_given_state
            else:
                total_prob = 0.0
                for prev_state in states:
                    prob_trans = trans_df.loc[prev_state, curr_state]
                    current_prob = graph[prev_state][j - 1] * prob_trans * prob_emission_given_state
                    total_prob += current_prob
                graph[curr_state][j] = total_prob
    
    outcome_likelihood = 0
    for state in states:
        state_likelihood = graph[state][-1]
        outcome_likelihood += state_likelihood
    
    return outcome_likelihood
print(outcome_likelihood(x_input, alphabet_input, states_input, trans_df_input, emissions_df_input))

4.405857847247358e-57


In [123]:
def profile_hmm_inputs(lines):
    threshold_input = lines[0].strip()

    alphabet_input = [line.strip() for line in lines[2].split()]

    alignment_input = [line.strip() for line in lines[4:]]

    return float(threshold_input), alphabet_input, alignment_input

with open('dataset_40270_15.txt', 'r') as file:
    lines = file.readlines()

threshold_input, alphabet_input, alignment_input = profile_hmm_inputs(lines)
print(threshold_input)
print(alphabet_input)
print(alignment_input)

0.322
['A', 'B', 'C', 'D', 'E']
['-CBD-BC-D', 'DCCD-BAE-', 'EACCEBBED', 'ECC-EBDED', 'EEBDE-BED', 'ECCDEEBED']


In [124]:
def populate_graph(alignment, alphabet, hmm_list, trans_matrix, emissions_matrix, seq_pos):
    transition_counts = {state: {target: 0 for target in hmm_list} for state in hmm_list}
    emission_counts = {state: {letter: 0 for letter in alphabet} for state in hmm_list}

    for sequence in alignment:
        prev_state = 'S'
        match_index = 0
        for i in range(len(seq_pos)):
            is_match = seq_pos[i][1]
            if is_match:
                match_index += 1
                state = f'D{match_index}' if sequence[i] == '-' else f'M{match_index}'
            else:
                state = f'I{match_index}' if sequence[i] != '-' else None

            if state:
                if state in hmm_list:
                    if state.startswith('M') or state.startswith('I'):
                        if sequence[i] in alphabet:
                            emission_counts[state][sequence[i]] += 1
                    transition_counts[prev_state][state] += 1
                    prev_state = state

        transition_counts[prev_state]['E'] += 1

    for state in hmm_list:
        total_transitions = sum(transition_counts[state].values())
        if total_transitions > 0:
            for target in hmm_list:
                trans_matrix.at[state, target] = transition_counts[state][target] / total_transitions

    for state in hmm_list:
        total_emissions = sum(emission_counts[state].values())
        if total_emissions > 0:
            for letter in alphabet:
                emissions_matrix.at[state, letter] = emission_counts[state][letter] / total_emissions

    return trans_matrix, emissions_matrix


def profile_hmm(threshold: float, alphabet: List[str], alignment: List[str]):
    seq_pos = []
    for i in range(len(alignment[0])):
        pos = [seq[i] for seq in alignment]
        num_dels = sum(1 for letter in pos if letter == '-')
        seq_pos.append((pos, num_dels / len(pos) < threshold))

    alignment_star = [pos[0] for pos in seq_pos if pos[1]]

    hmm_length = len(alignment_star)
    hmm_list = ['S', 'I0'] + [f'{state}{i}' for i in range(1, hmm_length + 1) for state in ['M', 'D', 'I']] + ['E']

    trans_matrix = pd.DataFrame(0.0, columns=hmm_list, index=hmm_list)
    emissions_matrix = pd.DataFrame(0.0, columns=alphabet, index=hmm_list)

    populate_graph(alignment, alphabet, hmm_list, trans_matrix, emissions_matrix, seq_pos)

    order = ["S", "I0"]
    for i in range(1, len(alignment_star)+1):
        order.append(f"M{i}")
        order.append(f"D{i}")
        order.append(f"I{i}")
    order.append("E")

    trans_matrix = trans_matrix.reindex(index=order, columns=order)
    emissions_matrix = emissions_matrix.reindex(index=order)
    trans_matrix = trans_matrix.replace(0.0, 0)
    emissions_matrix = emissions_matrix.replace(0.0, 0)

    return trans_matrix, emissions_matrix

transition, emission = profile_hmm(threshold_input, alphabet_input, alignment_input)

with open('output.txt', 'w') as f:
    f.write(transition.to_csv(sep='\t', index=True, header=True))
    f.write('--------\n')
    f.write(emission.to_csv(sep='\t', index=True, header=True))



In [175]:
def profile_hmm_with_pseudocounts_inputs(lines):
    first_line = lines[0].strip().split()
    threshold_input = float(first_line[0])
    pseudocount_input = float(first_line[1])
    
    alphabet_input = lines[2].strip().split()
    
    alignment_input = [line.strip() for line in lines[4:] if line.strip()]
    
    return threshold_input, pseudocount_input, alphabet_input, alignment_input

with open('06_PseudocountProfileHMM/inputs/sample.txt', 'r') as file:
    lines = file.readlines()

threshold_input, pseudocount_input, alphabet_input, alignment_input = profile_hmm_with_pseudocounts_inputs(lines)
print(threshold_input)
print(pseudocount_input)
print(alphabet_input)
print(alignment_input)

0.358
0.01
['A', 'B', 'C', 'D', 'E']
['A-A', 'ADA', 'ACA', 'A-C', '-EA', 'D-A']


In [177]:
def populate_graph(alignment, alphabet, hmm_list, trans_matrix, emissions_matrix, seq_pos, pseudocount):
    transition_counts = {state: {target: 0 for target in hmm_list} for state in hmm_list}
    emission_counts = {state: {letter: 0 for letter in alphabet} for state in hmm_list}

    for sequence in alignment:
        prev_state = 'S'
        match_index = 0
        for i in range(len(seq_pos)):
            is_match = seq_pos[i][1]
            if is_match:
                match_index += 1
                state = f'D{match_index}' if sequence[i] == '-' else f'M{match_index}'
            else:
                state = f'I{match_index}' if sequence[i] != '-' else None

            if state:
                if state in hmm_list:
                    if state.startswith('M') or state.startswith('I'):
                        if sequence[i] in alphabet:
                            emission_counts[state][sequence[i]] += 1
                    transition_counts[prev_state][state] += 1
                    prev_state = state

        transition_counts[prev_state]['E'] += 1

    for state in hmm_list:
        total_transitions = sum(transition_counts[state].values())
        if total_transitions > 0:
            for target in hmm_list:
                trans_matrix.at[state, target] = transition_counts[state][target] / total_transitions
    

    for state in hmm_list:
        total_emissions = sum(emission_counts[state].values())
        if total_emissions > 0:
            for letter in alphabet:
                emissions_matrix.at[state, letter] = emission_counts[state][letter] / total_emissions


    return trans_matrix, emissions_matrix


def profile_hmm_with_pseudocounts(threshold: float, pseudocount: float, alphabet: List[str], alignment: List[str]):
    seq_pos = []
    for i in range(len(alignment[0])):
        pos = [seq[i] for seq in alignment]
        num_dels = sum(1 for letter in pos if letter == '-')
        seq_pos.append((pos, num_dels / len(pos) < threshold))

    alignment_star = [pos[0] for pos in seq_pos if pos[1]]

    hmm_length = len(alignment_star)
    hmm_list = ['S', 'I0'] + [f'{state}{i}' for i in range(1, hmm_length + 1) for state in ['M', 'D', 'I']] + ['E']

    trans_matrix = pd.DataFrame(0.0, columns=hmm_list, index=hmm_list)
    emissions_matrix = pd.DataFrame(0.0, columns=alphabet, index=hmm_list)

    populate_graph(alignment, alphabet, hmm_list, trans_matrix, emissions_matrix, seq_pos, pseudocount)

    order = ["S", "I0"]
    for i in range(1, len(alignment_star)+1):
        order.append(f"M{i}")
        order.append(f"D{i}")
        order.append(f"I{i}")
    order.append("E")

    trans_matrix = trans_matrix.reindex(index=order, columns=order)
    emissions_matrix = emissions_matrix.reindex(index=order)
    trans_matrix = trans_matrix.replace(0.0, 0)
    emissions_matrix = emissions_matrix.replace(0.0, 0)

    return trans_matrix, emissions_matrix

transition, emission = profile_hmm_with_pseudocounts(threshold_input, pseudocount_input, alphabet_input, alignment_input)

# # with open('output.txt', 'w') as f:
# #     f.write(transition.to_csv(sep='\t', index=True, header=True))
# #     f.write('--------\n')
# #     f.write(emission.to_csv(sep='\t', index=True, header=True))

print(transition)
print(emission)





      S   I0        M1        D1   I1   M2   D2   I2    E
S   0.0  0.0  0.833333  0.166667  0.0  0.0  0.0  0.0  0.0
I0  0.0  0.0  0.000000  0.000000  0.0  0.0  0.0  0.0  0.0
M1  0.0  0.0  0.000000  0.000000  0.4  0.6  0.0  0.0  0.0
D1  0.0  0.0  0.000000  0.000000  1.0  0.0  0.0  0.0  0.0
I1  0.0  0.0  0.000000  0.000000  0.0  1.0  0.0  0.0  0.0
M2  0.0  0.0  0.000000  0.000000  0.0  0.0  0.0  0.0  1.0
D2  0.0  0.0  0.000000  0.000000  0.0  0.0  0.0  0.0  0.0
I2  0.0  0.0  0.000000  0.000000  0.0  0.0  0.0  0.0  0.0
E   0.0  0.0  0.000000  0.000000  0.0  0.0  0.0  0.0  0.0
           A    B         C         D         E
S   0.000000  0.0  0.000000  0.000000  0.000000
I0  0.000000  0.0  0.000000  0.000000  0.000000
M1  0.800000  0.0  0.000000  0.200000  0.000000
D1  0.000000  0.0  0.000000  0.000000  0.000000
I1  0.000000  0.0  0.333333  0.333333  0.333333
M2  0.833333  0.0  0.166667  0.000000  0.000000
D2  0.000000  0.0  0.000000  0.000000  0.000000
I2  0.000000  0.0  0.000000  0.00000