### Compute the Probability of a Hidden Path

https://rosalind.info/problems/ba10a

In [4]:
def hidden_path_probability(path, states, transition):
    # 初始概率均等
    initial_prob = 1 / len(states)
    
    # 建立状态索引
    state_index = {state: i for i, state in enumerate(states)}
    
    # 计算路径概率
    prob = initial_prob
    for i in range(1, len(path)):
        prev_state = path[i - 1]
        curr_state = path[i]
        prob *= transition[state_index[prev_state]][state_index[curr_state]]
    
    return prob


# 读取数据文件
with open("data/rosalind_ba10a.txt", "r") as f:
    lines = [line.strip() for line in f if line.strip()]  # 去掉空行

# 提取内容
path = lines[0]
states = lines[2].split()

# 提取转移矩阵（从表头下一行开始）
start_index = 5
transition = []
for i in range(start_index, start_index + len(states)):
    parts = lines[i].split()
    transition.append([float(x) for x in parts[1:]])

# 调用函数计算结果
result = hidden_path_probability(path, states, transition)
print(result)


4.209523928954466e-15


### Compute the Probability of an Outcome Given a Hidden Path

https://rosalind.info/problems/ba10b

In [None]:
def outcome_given_path_probability(x, path, alphabet, states, emission):
    """
    计算给定隐藏路径下观测序列的概率 Pr(x|π)
    """
    # 建立索引映射
    state_index = {state: i for i, state in enumerate(states)}
    symbol_index = {symbol: j for j, symbol in enumerate(alphabet)}
    
    # 初始概率
    prob = 1.0
    
    # 逐位乘上发射概率
    for i in range(len(x)):
        state = path[i]
        symbol = x[i]
        prob *= emission[state_index[state]][symbol_index[symbol]]
    
    return prob


In [7]:
# 读取数据文件
with open("data/rosalind_ba10b.txt", "r") as f:
    lines = [line.strip() for line in f if line.strip()]  # 去掉空行

# 提取内容
x = lines[0]
alphabet = lines[2].split()
path = lines[4]
states = lines[6].split()

# 提取发射矩阵（跳过表头 “x y z” 这一行）
start_index = 9
emission = []
for i in range(start_index, start_index + len(states)):
    parts = lines[i].split()
    emission.append([float(x) for x in parts[1:]])

# 调用函数计算结果
result = outcome_given_path_probability(x, path, alphabet, states, emission)
print(result)

1.134179885483002e-31


### Implement the Viterbi Algorithm

https://rosalind.info/problems/ba10c

In [30]:
def viterbi_decoding(x, alphabet, states, transition, emission):
    n = len(x)
    k = len(states)
    
    state_index = {s: i for i, s in enumerate(states)}
    symbol_index = {a: j for j, a in enumerate(alphabet)}

    V = [[0.0] * n for _ in range(k)]
    backtrack = [[0] * n for _ in range(k)]

    initial_prob = 1 / k

    for i in range(k):
        V[i][0] = initial_prob * emission[i][symbol_index[x[0]]]

    for t in range(1, n):
        for j in range(k):
            probs = [
                V[i][t-1] * transition[i][j] * emission[j][symbol_index[x[t]]]
                for i in range(k)
            ]
            V[j][t] = max(probs)
            backtrack[j][t] = probs.index(V[j][t])

    last = max(range(k), key=lambda i: V[i][-1])
    path = [last]

    for t in range(n-1, 0, -1):
        last = backtrack[last][t]
        path.insert(0, last)

    return ''.join(states[i] for i in path)

In [31]:
with open("data/rosalind_ba10c.txt") as f:
    raw = [line.strip() for line in f if line.strip()]

x = raw[0]
alphabet = raw[2].split()
states = raw[4].split()

# Transition matrix
transition = [
    list(map(float, raw[i].split()[1:]))
    for i in range(7, 7 + len(states))
]

# Emission matrix
start = 7 + len(states) + 2
emission = [
    list(map(float, raw[i].split()[1:]))
    for i in range(start, start + len(states))
]

print(viterbi_decoding(x, alphabet, states, transition, emission))


BACBACBACCCCBACCBACBBBACCCBACBACCCCBBBACCCBACCCCBAACCBACCCBACCBACBACCCCCCCBACBACCBBBACBACCCCCBACBAAC


In [29]:
import math

def read_hmm(filename):
    with open(filename) as f:
        lines = [line.rstrip() for line in f if line.strip()]
    
    # 按分隔符 '--------' 分段
    segments = []
    cur = []
    for ln in lines:
        if ln.startswith('-'):
            if cur:
                segments.append(cur)
                cur = []
        else:
            cur.append(ln)
    if cur:
        segments.append(cur)
    
    if len(segments) < 5:
        raise ValueError("输入文件分段不足 5")
    
    # 分段解析
    seq = segments[0][0].strip()
    alphabet = segments[1][0].split()
    states = segments[2][0].split()
    
    def parse_matrix(seg):
        col_names = seg[0].split()
        matrix = {}
        for row in seg[1:]:
            parts = row.split()
            r = parts[0]
            vals = list(map(float, parts[1:]))
            matrix[r] = dict(zip(col_names, vals))
        return matrix

    trans = parse_matrix(segments[3])
    emit = parse_matrix(segments[4])
    
    return seq, alphabet, states, trans, emit

def viterbi_log(seq, states, trans, emit, init_prob=None):
    n = len(seq)
    k = len(states)
    neg_inf = float('-inf')

    state_index = {s:i for i,s in enumerate(states)}
    idx_state = {i:s for s,i in state_index.items()}

    # log概率矩阵
    log_trans = {s:{t:(math.log(trans[s].get(t,0.0)) if trans[s].get(t,0.0)>0 else neg_inf) for t in trans[s]} for s in trans}
    log_emit  = {s:{c:(math.log(emit[s].get(c,0.0)) if emit[s].get(c,0.0)>0 else neg_inf) for c in emit[s]} for s in emit}

    # 初始概率
    if init_prob is None:
        init_log = math.log(1.0/k)
    else:
        init_log = {s: math.log(init_prob[s]) for s in states}

    dp = [[neg_inf]*k for _ in range(n)]
    back = [[-1]*k for _ in range(n)]

    # 初始化
    for s in states:
        i = state_index[s]
        e = log_emit[s].get(seq[0], neg_inf)
        dp[0][i] = init_log if isinstance(init_log,float) else init_log[s]
        dp[0][i] += e

    # 动态规划
    for t in range(1, n):
        for s2 in states:
            j = state_index[s2]
            best = neg_inf
            best_prev = -1
            for s1 in states:
                i = state_index[s1]
                val = dp[t-1][i] + log_trans[s1].get(s2, neg_inf) + log_emit[s2].get(seq[t], neg_inf)
                if val > best:
                    best = val
                    best_prev = i
            dp[t][j] = best
            back[t][j] = best_prev

    # 回溯
    last = max(range(k), key=lambda i: dp[n-1][i])
    path_idx = [last]
    for t in range(n-1,0,-1):
        last = back[t][last]
        path_idx.insert(0,last)
    
    return ''.join(idx_state[i] for i in path_idx)

# ---------- 使用示例 ---------- #
if __name__ == "__main__":
    seq, alphabet, states, trans, emit = read_hmm("data/rosalind_ba10c.txt")
    path = viterbi_log(seq, states, trans, emit)
    print(path)

BACBACBACCCCBACCBACBBBACCCBACBACCCCBBBACCCBACCCCBAACCBACCCBACCBACBACCCCCCCBACBACCBBBACBACCCCCBACBAAC


### Compute the Probability of a String Emitted by an HMM

https://rosalind.info/problems/ba10d

In [None]:
def OutcomeLikelihood(x, states, transition, emission):
    n = len(x)
    k = len(states)
    
    # 建立索引映射
    state_index = {state: i for i, state in enumerate(states)}
    symbol_index = {symbol: j for j, symbol in enumerate(emission[0])}

    # 初始化概率矩阵
    V = [[0.0] * n for _ in range(k)]  # 概率表

    # 初始概率相等
    initial_prob = 1 / k

    # 初始化第一列（第一个观测符号）
    for i in range(k):
        V[i][0] = initial_prob * emission[i][symbol_index[x[0]]]

    # 动态规划递推
    for t in range(1, n):
        for j in range(k):
            V[j][t] = sum(
                V[i][t-1] * transition[i][j] * emission[j][symbol_index[x[t]]]
                for i in range(k)
            )

    # 总概率为最后一列的和
    total_prob = sum(V[i][-1] for i in range(k))

    return total_prob

In [5]:
import sys
import math

def parse_matrix(lines):
    header = lines[0].split()
    matrix = {}
    for line in lines[1:]:
        parts = line.split()
        row = parts[0]
        values = list(map(float, parts[1:]))
        matrix[row] = dict(zip(header, values))
    return matrix


def forward_algorithm(sequence, states, transition, emission):
    # assume uniform initial probability
    n = len(states)
    initial_prob = {s: 1.0 / n for s in states}

    # Initialize forward table
    f = [{} for _ in range(len(sequence))]

    # Base case
    for s in states:
        f[0][s] = initial_prob[s] * emission[s][sequence[0]]

    # Recursion
    for t in range(1, len(sequence)):
        for s in states:
            f[t][s] = sum(f[t-1][prev] * transition[prev][s] for prev in states) * emission[s][sequence[t]]

    # Termination
    return sum(f[len(sequence)-1][s] for s in states)


def read_hmm_file(filename):
    with open(filename, 'r') as f:
        lines = [line.strip() for line in f if line.strip()]

    # Split by ------
    parts = []
    tmp = []
    for line in lines:
        if line.startswith("--------"):
            parts.append(tmp)
            tmp = []
        else:
            tmp.append(line)
    parts.append(tmp)

    sequence = parts[0][0].strip()
    alphabet = parts[1][0].split()
    states = parts[2][0].split()

    transition = parse_matrix(parts[3])
    emission = parse_matrix(parts[4])

    return sequence, alphabet, states, transition, emission

# if __name__ == "__main__":
#     if len(sys.argv) != 2:
#         print("Usage: python hmm_forward.py input.txt")
#         sys.exit(1)

#     file = sys.argv[1]
#     sequence, alphabet, states, transition, emission = read_hmm_file(file)
#     prob = forward_algorithm(sequence, states, transition, emission)

#     print(f"Pr(x) = {prob}")


In [8]:
if __name__ == "__main__":
    with open("data/rosalind_ba10d.txt") as f:
        lines = [line.strip() for line in f if line.strip()]

    # Split by '--------'
    parts = []
    tmp = []
    for line in lines:
        if line.startswith("--------"):
            parts.append(tmp)
            tmp = []
        else:
            tmp.append(line)
    parts.append(tmp)

    sequence = parts[0][0].strip()
    alphabet = parts[1][0].split()
    states = parts[2][0].split()

    transition = parse_matrix(parts[3])
    emission = parse_matrix(parts[4])

    prob = forward_algorithm(sequence, states, transition, emission)
    print(prob)


4.147511265278653e-55
