In [38]:
import numpy as np

# 随机生成发射概率矩阵 (T x N) 示例，T = 100, N = 5 states
np.random.seed(50)
T = 100
states = ["intergenic", "exon", "DSS", "intron", "ASS"]
N = len(states)
prob = np.random.rand(T, N)
prob = prob / prob.sum(axis=1, keepdims=True)  # 每一行归一化为概率分布

# 状态转移概率（简化版）
trans = {
    "intergenic": ["intergenic", "exon"],
    "exon": ["exon", "DSS"],
    "DSS": ["intron"],
    "intron": ["ASS"],  # intron不能直接跳其他状态
    "ASS": ["exon"]
}
state_to_idx = {s: i for i, s in enumerate(states)}

A = np.array([
    [1, 1, 0, 0, 0],  # 0→
    [0, 1, 1, 0, 0],  # 1→
    [0, 0, 0, 1, 0],  # 2→
    [0, 0, 0, 1, 1],  # 3→
    [0, 1, 0, 0, 0]
])
A = np.log(A)
print(A)
# 最短持续时间设置
min_duration = {"intron": 10}

# 初始化DP矩阵（log概率）
V = np.full((T + 1, N), -np.inf)
B = np.full((T + 1, N), -1, dtype=int)

# 初始状态设置
V[0, state_to_idx["intergenic"]] = 0


[[  0.   0. -inf -inf -inf]
 [-inf   0.   0. -inf -inf]
 [-inf -inf -inf   0. -inf]
 [-inf -inf -inf   0.   0.]
 [-inf   0. -inf -inf -inf]]




In [37]:
for t in range(1, T + 1):
    for j in range(N):
        state_j = states[j]
        if state_j == "intron":
            for d in range(min_duration["intron"], min(t, 100) + 1):  # t-d >= 0
                t0 = t - d
                for i in range(N):
                    if A[i, j] < np.inf and V[t0, i] > -np.inf:
                        emit = -np.sum(np.log(prob[t0:t, j]))
                        score = V[t0, i] + A[i, j] + emit
                        if score > V[t, j]:
                            V[t, j] = score
                            B[t, j] = i
        else:
            i = B[t - 1, :].argmax()
            for i in range(N):
                if A[i, j] < np.inf and V[t - 1, i] > -np.inf:
                    score = V[t - 1, i] + A[i, j] - np.log(prob[t - 1, j])
                    if score > V[t, j]:
                        V[t, j] = score
                        B[t, j] = i

# 回溯路径
decoded_indices = []
t = T
j = np.argmax(V[T])
while t > 0:
    decoded_indices.append(j)
    i = B[t, j]
    state_j = states[j]
    if state_j == "intron":
        # 按最小长度跳回
        for dt in range(min_duration["intron"], t + 1):
            if V[t - dt, i] + A[i, j] - np.sum(np.log(prob[t - dt:t, j])) == V[t, j]:
                for _ in range(dt - 1):
                    decoded_indices.append(j)
                t -= dt
                break
    else:
        t -= 1
    j = i

decoded_indices = decoded_indices[::-1]
decoded_states = [states[idx] for idx in decoded_indices]

# 计算intron连续区间长度
intron_lengths = []
run = 0
for s in decoded_states:
    if s == "intron":
        run += 1
    else:
        if run > 0:
            intron_lengths.append(run)
            run = 0
if run > 0:
    intron_lengths.append(run)

decoded_states, intron_lengths

(['intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'intergenic',
  'exon',
  'exon',
  'DSS',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'ASS',
  'exon',
  'exon',
  'DSS',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'ASS',
  'exon',
  'DSS',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'ASS',
  'exon',
  'exon',
  'exon',
  'exon',
  'exon',
  'exon',
  'exon',
  'DSS',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'intron',
  'ASS',
  'exon',
  'exon',
  'DSS',

In [16]:
emit_probs  = np.random.rand(T, n_states)
emit_probs  /= emit_probs .sum(axis=1, keepdims=True)  # 归一化为概率

# 状态转移矩阵（原始概率）
transition_probs  = np.array([
    [1, 1, 0, 0, 0],  # intergenic →
    [0, 1, 1, 0, 0],  # exon →
    [0, 0, 0, 1, 0],  # DSS →
    [0, 0, 0, 1, 1],  # intron →
    [0, 1, 0, 0, 0],  # ASS →
])
eps = 1e-64
log_trans = np.log(np.where(transition_probs == 0, eps, transition_probs ))  # log 概率


[[2.49095328e-01 2.30134075e-01 1.72640623e-01 2.87784207e-01
  6.03457683e-02]
 [3.47705548e-02 2.71751668e-01 1.39748255e-01 2.69526288e-01
  2.84203234e-01]
 [2.46744449e-02 3.49020853e-01 1.77112553e-01 2.01651034e-01
  2.47541115e-01]
 [3.27023452e-01 6.74819577e-03 1.68331093e-01 2.42773596e-01
  2.55123663e-01]
 [3.02479855e-01 5.87177099e-02 1.30768077e-01 1.56677457e-01
  3.51356902e-01]
 [1.13612738e-01 8.58759463e-02 3.90222201e-01 3.61236386e-03
  4.06676751e-01]
 [2.65202257e-01 3.24623248e-01 1.15442184e-01 5.60922647e-03
  2.89123085e-01]
 [2.57466203e-01 2.28554592e-01 4.40920210e-02 2.32552278e-01
  2.37334906e-01]
 [2.21522961e-01 2.57709687e-01 2.84555039e-01 2.06407043e-01
  2.98052711e-02]
 [3.37240126e-01 5.37017595e-02 2.81854029e-01 2.81552120e-01
  4.56519659e-02]
 [1.52452640e-01 4.85119180e-02 3.53240647e-01 4.14188750e-01
  3.16060451e-02]
 [4.05613212e-01 2.21925032e-01 1.27217948e-01 3.26413746e-02
  2.12602433e-01]
 [3.91849630e-01 3.48730212e-01 2.477112

[   0.         -147.36544595 -147.36544595 -147.36544595 -147.36544595]


生成的状态序列片段: [0, 1, 1, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 1, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 1, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 1, 1, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 1, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 1, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
观测序列片段: [1, 1, 1, 2, 2, 2, 2, 2, 0, 0, 0, 1, 1, 1, 1, 2, 0, 0, 1, 1, 2, 0, 1, 0, 1, 2, 2, 2, 0, 2, 1, 0, 1, 0, 2, 0, 0, 1, 2, 2, 2, 1, 2, 0, 0, 0, 1, 1, 0, 0, 1, 2, 2, 2, 2, 2, 2, 2, 2, 0, 1, 0, 2, 0, 1, 2, 2, 2, 0, 2, 2, 1, 2, 1, 1, 1, 1, 0, 0, 2, 1, 1, 2, 0, 1, 0, 1, 2, 2, 2, 2, 0, 2, 1, 2, 2, 1, 0, 0, 1]
状态3持续时间段: [18, 9, 9, 9, 18, 11]
是否满足最短持续时间: False

状态3的观测分布统计:
观测值0: 出现次数=23 (比例=0.31)
观测值1: 出现次数=24 (比例=0.32)
观测值2: 出现次数=27 (比例=0.36)


In [42]:
import pandas as pd

def parse_ranges(lst, targets):
    def process_sublist(sublist, start_index, target_values):
        sub_ranges = {'CDS': [], 'intron': []}
        df = pd.DataFrame({'value': sublist})
        for target in target_values:
            if target == 1:
                mask = df['value'] == target
                key = 'CDS'
            elif target == 2:
                mask = df['value'] == target
                key = 'intron'
            else:
                continue

            if mask.any():
                df_target = df[mask]
                groups = (df_target.index.to_series().diff() != 1).cumsum()
                grouped = df_target.groupby(groups).apply(lambda x: (x.index.min(), x.index.max()))
                sub_ranges[key].extend([(start + start_index, end + start_index) for start, end in grouped.tolist()])
        return sub_ranges

    sublists = []
    start_idx = 0
    in_zero_sequence = False
    for i, value in enumerate(lst):
        if value == 0:
            if not in_zero_sequence:
                if i != start_idx:
                    sublists.append((lst[start_idx:i], start_idx))
                in_zero_sequence = True
                start_idx = i + 1
        else:
            in_zero_sequence = False

    if start_idx < len(lst) and not all(v == 0 for v in lst[start_idx:]):
        sublists.append((lst[start_idx:], start_idx))

    all_sublist_ranges = []
    for sublist, start_index in sublists:
        sub_ranges = process_sublist(sublist, start_index, targets)
        all_sublist_ranges.append(sub_ranges)

    return all_sublist_ranges

targets = [1, 2]
lst = [1, 1, 0, 0, 2, 2, 0, 1, 1, 1, 2, 2, 0, 0, 0, 1, 1, 1, 2, 2, 1, 1, 1, 2, 2, 1, 1, 0, 0]
result = parse_ranges(lst, targets)
print(result)

[{'CDS': [(0, 1)], 'intron': []}, {'CDS': [], 'intron': [(4, 5)]}, {'CDS': [(7, 9)], 'intron': [(10, 11)]}, {'CDS': [(15, 17), (20, 22), (25, 26)], 'intron': [(18, 19), (23, 24)]}]


In [50]:
import numpy as np

labels = ["intergenic", "intron", "exon"]
label2id = {name: i for i, name in enumerate(labels)}
n_states = len(labels)

# 几何分布平均长度设置
expected_lengths = {
    "intergenic": 10000,
    "intron": 4500,
    "exon": 200
}

# 每类状态的离开概率
p_exit = {k: 1 / v for k, v in expected_lengths.items()}
log_one_minus_p = {k: np.log(1 - p_exit[k]) for k in labels}
log_p = {k: np.log(p_exit[k]) for k in labels}
print(label2id)
print(log_one_minus_p)
print(log_p)

{'intergenic': 0, 'intron': 1, 'exon': 2}
{'intergenic': -0.00010000500033334732, 'intron': -0.0002222469172388482, 'exon': -0.005012541823544286}
{'intergenic': -9.210340371976182, 'intron': -8.411832675758411, 'exon': -5.298317366548036}


In [48]:
probs = np.random.dirichlet([1.0, 1.0, 1.0], size=10)
T = len(probs)
log_probs = np.log(probs + 1e-8)  # 防止 log(0)
dp = [dict() for _ in range(T + 1)]  # dp[t][state] = (score, prev_t)
dp[0] = {s: (-np.inf, None) for s in labels}
dp[0]["intergenic"] = (0.0, None)  # 初始状态均匀可改
print(dp)

[{'intergenic': (0.0, None), 'intron': (-inf, None), 'exon': (-inf, None)}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}]


In [None]:
def logsumexp(arr):
    """Numerically stable log-sum-exp"""
    a_max = np.max(arr)
    return a_max + np.log(np.sum(np.exp(arr - a_max)))

def viterbi_duration(probs, max_duration=1000):
    """
    probs: shape (T, 3), 每个位置外部模型预测的 [intergenic, intron, exon] 概率
    返回：最优路径（状态 id 序列）
    """
    T = len(probs)
    log_probs = np.log(probs + 1e-8)  # 防止 log(0)
    dp = [dict() for _ in range(T + 1)]  # dp[t][state] = (score, prev_t)
    dp[0] = {s: (-np.inf, None) for s in labels}
    dp[0]["intergenic"] = (0.0, None)  # 初始状态均匀可改

    backtrack = [dict() for _ in range(T + 1)]

    for t in range(1, T + 1):
        for s in labels:
            best_score = -np.inf
            best_prev = None
            max_d = min(t, max_duration)
            for d in range(1, max_d + 1):
                prev_t = t - d
                prev_score = dp[prev_t]
                if not prev_score:
                    continue
                for prev_s in labels:
                    stay_log_prob = (d - 1) * log_one_minus_p[s] + log_p[s]
                    emission_score = log_probs[prev_t:t, label2id[s]].sum()
                    score = prev_score[prev_s][0] + stay_log_prob + emission_score
                    if score > best_score:
                        best_score = score
                        best_prev = (prev_t, prev_s, d)
            dp[t][s] = (best_score, best_prev)
            backtrack[t][s] = best_prev

    # 找最终得分最高的结束状态
    best_final_state = max(dp[T], key=lambda s: dp[T][s][0])
    path = []
    t = T
    s = best_final_state
    while t > 0:
        prev_t, prev_s, d = backtrack[t][s]
        path = [s] * d + path
        t = prev_t
        s = prev_s

    return [label2id[state] for state in path]

In [26]:
import h5py
import os
import numpy as np

folder_path = f'../../test'
file_names = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
for prediction_result in file_names:
    print(prediction_result)
    genome_predictions = {}
    with h5py.File(prediction_result, 'r') as h5file:
        for chromosome in h5file.keys():
            data = []
            chr_group = h5file[chromosome]
            labels = ['predictions_forward', 'predictions_reverse']
            for label in labels:
                dataset = np.array(chr_group[label])
                data.append(dataset)
            genome_predictions[chromosome] = data

    for chromosome in genome_predictions:
        predictions_forward, predictions_reverse = genome_predictions[chromosome]

../../test\model_predictions_0.h5


In [27]:
from Bio import SeqIO
with open('../../test.fasta') as fna:
    genome_seq = SeqIO.to_dict(SeqIO.parse(fna, "fasta"))

In [6]:
dp = np.load('../../my_array.npy')

In [33]:
np.set_printoptions(precision=8, suppress=True, threshold=np.inf, linewidth=np.inf)
region_start = 856508
region_end = 856538

for chr in genome_seq:
    seq = genome_seq[chr].seq
print(seq[region_start - 1:region_end])
print(predictions_forward[region_start - 1:region_end])


ATGGGGAGAAGGCAGGGCGGAAGCTTCGCAG
[[0.774      0.00000364 0.00000155 0.0000025  0.000868   0.01772    0.2075    ]
 [0.776      0.0000016  0.0000042  0.0000025  0.0007696  0.01837    0.205     ]
 [0.774      0.00000477 0.00000376 0.00000197 0.0009437  0.01762    0.2075    ]
 [0.7773     0.00000376 0.00000197 0.0000027  0.0009212  0.01872    0.2028    ]
 [0.7686     0.00000155 0.0000023  0.00000226 0.0009756  0.01912    0.2114    ]
 [0.7744     0.0000032  0.0000033  0.0000016  0.0008187  0.0197     0.205     ]
 [0.778      0.000004   0.0000025  0.0000031  0.0009747  0.01859    0.2028    ]
 [0.78       0.0000031  0.00000495 0.00000364 0.0009155  0.01851    0.2007    ]
 [0.7686     0.00000465 0.00000393 0.0000021  0.000961   0.01755    0.213     ]
 [0.7705     0.00000453 0.0000026  0.0000038  0.0008593  0.01857    0.2098    ]
 [0.7686     0.000003   0.00000423 0.00000435 0.001058   0.01819    0.2123    ]
 [0.7695     0.00000554 0.0000056  0.00000274 0.000983   0.01947    0.2102    ]
 [0.759 

In [319]:

# np.set_printoptions(precision=0, suppress=True, threshold=np.inf, linewidth=np.inf)

# print(dp[region_start-1-100521843:region_end-100521843, 4:10])
# # print(dp[region_start-1-100521843:region_end-100521843, 19])
# # print(dp[region_start-1-100521843:region_end-100521843, 20])
# print(dp[region_start-1-100521843:region_end-100521843, 23])
# print(dp[region_start-1-100521843:region_end-100521843, 24])
# print(dp[region_start-1-100521843:region_end-100521843, 25])
# print(dp[region_start-1-100521843:region_end-100521843, 28])
# print(dp[region_start-1-100521843:region_end-100521843, 26])
# print(dp[region_start-1-100521843:region_end-100521843, 27])
# print(dp[region_start-1-100521843:region_end-100521843, 8])
# print(dp[region_start-100521843:region_end-100521843, 6])
# print(dp[region_start-100521843:region_end-100521843, 9])
# print(dp[region_start-100521843:region_end-100521843, 19])
# print(dp[region_start-100521843:region_end-100521843, 20])
# print('----------------------------------------------------------------------')
# print(predictions_reverse[region_start - 1:region_end])
def softmax(x, T=1.0):
    x = np.exp(x / T)
    return x / np.sum(x, axis=1, keepdims=True)

x = np.array([[0.3, 0.3, 0.1],
             [0.999, 0., 0.]])
intron_sum = x.sum(axis=1, keepdims=True)
x = np.exp(x / 1)
x_scaled = x / np.sum(x, axis=1, keepdims=True) * intron_sum
print(x_scaled)
print(np.log(1e-8))

[[0.24833872 0.24833872 0.20332255]
 [0.57529679 0.21185161 0.21185161]]
-18.420680743952367
