In [1]:
#!/usr/bin/env python3
"""
Этап 4: Реализация HMM с алгоритмом Витерби
"""

import math
from typing import List, Dict, Tuple

class HMM:
    """
    Скрытая Марковская Модель с двумя состояниями
    для различения последовательностей с разным GC-составом
    """
    
    def __init__(self, seq1: str, seq2: str, mean_segment_length: int = 300):
        """
        Инициализация HMM
        
        Параметры:
        - seq1, seq2: обучающие последовательности для состояний 1 и 2
        - mean_segment_length: средняя длина сегмента (для переходных вероятностей)
        """
        
        self.states = [1, 2]
        self.mean_segment_length = mean_segment_length
        
        # Вычисляем частоты нуклеотидов
        self.emission_probs = self._calculate_emission_probabilities(seq1, seq2)
        
        # Вычисляем переходные вероятности
        self.transition_probs = self._calculate_transition_probabilities()
        
        # Начальные вероятности (равновероятны)
        self.initial_probs = {1: 0.5, 2: 0.5}
        
        self._print_parameters()
    
    def _calculate_emission_probabilities(self, seq1: str, seq2: str) -> Dict[int, Dict[str, float]]:
        """
        Вычисляет эмиссионные вероятности для двух состояний
        
        Согласно заданию:
        - Для G и C: p(G) = p(C) = 0.5 * f_GC
        - Для A и T: p(A) = p(T) = 0.5 * f_AT
        """
        
        # Очищаем последовательности
        valid_bases = {'A', 'T', 'G', 'C'}
        seq1_clean = [b for b in seq1 if b in valid_bases]
        seq2_clean = [b for b in seq2 if b in valid_bases]
        
        # Вычисляем частоты для seq1
        total1 = len(seq1_clean)
        if total1 == 0:
            # По умолчанию, если последовательность пустая
            f_gc1, f_at1 = 0.5, 0.5
        else:
            gc_count1 = seq1_clean.count('G') + seq1_clean.count('C')
            at_count1 = seq1_clean.count('A') + seq1_clean.count('T')
            f_gc1 = gc_count1 / total1
            f_at1 = at_count1 / total1
        
        # Вычисляем частоты для seq2
        total2 = len(seq2_clean)
        if total2 == 0:
            f_gc2, f_at2 = 0.5, 0.5
        else:
            gc_count2 = seq2_clean.count('G') + seq2_clean.count('C')
            at_count2 = seq2_clean.count('A') + seq2_clean.count('T')
            f_gc2 = gc_count2 / total2
            f_at2 = at_count2 / total2
        
        # Создаем словари эмиссионных вероятностей
        # Добавляем небольшое значение, чтобы избежать нулевых вероятностей
        epsilon = 1e-10
        
        emission_probs = {
            1: {
                'A': 0.5 * f_at1 + epsilon,
                'T': 0.5 * f_at1 + epsilon,
                'G': 0.5 * f_gc1 + epsilon,
                'C': 0.5 * f_gc1 + epsilon
            },
            2: {
                'A': 0.5 * f_at2 + epsilon,
                'T': 0.5 * f_at2 + epsilon,
                'G': 0.5 * f_gc2 + epsilon,
                'C': 0.5 * f_gc2 + epsilon
            }
        }
        
        # Нормализуем вероятности
        for state in [1, 2]:
            total = sum(emission_probs[state].values())
            for base in emission_probs[state]:
                emission_probs[state][base] /= total
        
        return emission_probs
    
    def _calculate_transition_probabilities(self) -> Dict[int, Dict[int, float]]:
        """
        Вычисляет переходные вероятности
        
        Среднее число шагов в каждом состоянии = mean_segment_length
        Для экспоненциального распределения длин сегментов:
        P(остаться в состоянии) = exp(-1/mean_segment_length)
        P(переключиться) = 1 - P(остаться)
        """
        
        p_stay = math.exp(-1 / self.mean_segment_length)
        p_switch = 1 - p_stay
        
        transition_probs = {
            1: {1: p_stay, 2: p_switch},
            2: {1: p_switch, 2: p_stay}
        }
        
        return transition_probs
    
    def _print_parameters(self):
        """Выводит параметры HMM"""
        
        print("=" * 60)
        print("ПАРАМЕТРЫ HMM")
        print("=" * 60)
        
        print("\nЭмиссионные вероятности:")
        print(f"{'Состояние':<10} {'P(A)':<10} {'P(T)':<10} {'P(G)':<10} {'P(C)':<10}")
        print("-" * 50)
        
        for state in [1, 2]:
            probs = self.emission_probs[state]
            print(f"{state:<10} {probs['A']:<10.6f} {probs['T']:<10.6f} "
                  f"{probs['G']:<10.6f} {probs['C']:<10.6f}")
        
        print(f"\nПереходные вероятности (средняя длина сегмента = {self.mean_segment_length}):")
        print(f"P(остаться в состоянии) = {self.transition_probs[1][1]:.6f}")
        print(f"P(переключиться) = {self.transition_probs[1][2]:.6f}")
        
        print("\nНачальные вероятности:")
        print(f"P(начать в состоянии 1) = {self.initial_probs[1]:.2f}")
        print(f"P(начать в состоянии 2) = {self.initial_probs[2]:.2f}")
        
        # Проверяем свойства
        print("\nПроверка свойств:")
        for state in [1, 2]:
            total_emission = sum(self.emission_probs[state].values())
            total_transition = sum(self.transition_probs[state].values())
            print(f"Состояние {state}: сумма эмиссионных вероятностей = {total_emission:.6f}, "
                  f"сумма переходных = {total_transition:.6f}")
    
    def viterbi_decode(self, sequence: str) -> List[int]:
        """
        Алгоритм Витерби для декодирования скрытой последовательности состояний
        
        Возвращает:
        - states: список наиболее вероятных состояний для каждой позиции
        """
        
        # Очищаем последовательность (берем только A, T, G, C)
        valid_bases = {'A', 'T', 'G', 'C'}
        cleaned_seq = [base for base in sequence.upper() if base in valid_bases]
        
        if not cleaned_seq:
            return []
        
        T = len(cleaned_seq)  # Длина последовательности
        
        # Матрица Витерби: V[t][state] = максимальная вероятность пути
        # до позиции t, заканчивающегося в состоянии 'state'
        V = [{}]
        
        # Матрица для хранения обратных указателей
        backpointer = [{}]
        
        # Инициализация (t = 0)
        for state in self.states:
            emission_prob = self.emission_probs[state].get(cleaned_seq[0], 1e-10)
            V[0][state] = math.log(self.initial_probs[state]) + math.log(emission_prob)
            backpointer[0][state] = None
        
        # Рекурсия (t = 1..T-1)
        for t in range(1, T):
            V.append({})
            backpointer.append({})
            
            for current_state in self.states:
                # Для каждого текущего состояния ищем лучшее предыдущее
                max_prob = -float('inf')
                best_prev_state = None
                
                for prev_state in self.states:
                    # Вероятность перехода из prev_state в current_state
                    trans_prob = self.transition_probs[prev_state][current_state]
                    
                    # Вероятность пути до prev_state в момент t-1
                    prev_prob = V[t-1][prev_state]
                    
                    # Общая вероятность
                    prob = prev_prob + math.log(trans_prob)
                    
                    if prob > max_prob:
                        max_prob = prob
                        best_prev_state = prev_state
                
                # Добавляем эмиссионную вероятность для текущего символа
                emission_prob = self.emission_probs[current_state].get(cleaned_seq[t], 1e-10)
                V[t][current_state] = max_prob + math.log(emission_prob)
                backpointer[t][current_state] = best_prev_state
        
        # Терминация: находим наиболее вероятное конечное состояние
        final_t = T - 1
        final_state = max(V[final_t], key=V[final_t].get)
        
        # Обратный проход для восстановления пути
        states = [None] * T
        states[final_t] = final_state
        
        for t in range(final_t - 1, -1, -1):
            states[t] = backpointer[t+1][states[t+1]]
        
        return states
    
    def save_decoding(self, states: List[int], output_file: str):
        """
        Сохраняет декодированные состояния в файл
        
        Формат: по одному состоянию на строку
        """
        with open(output_file, 'w') as f:
            for state in states:
                f.write(f"{state}\n")
        
        print(f"Декодированные состояния сохранены в {output_file}")

def read_sequence_from_file(filename: str) -> str:
    """
    Читает последовательность из файла
    
    Поддерживает:
    - FASTA формат (строка начинается с '>')
    - Простой текст (только последовательность)
    """
    with open(filename, 'r') as f:
        content = f.read().strip()
    
    # Если файл в формате FASTA
    if content.startswith('>'):
        lines = content.split('\n')
        # Пропускаем строку с описанием и объединяем остальные
        sequence_lines = []
        for line in lines[1:]:
            if line and not line.startswith('>'):
                sequence_lines.append(line.strip())
        sequence = ''.join(sequence_lines)
    else:
        # Простая последовательность - удаляем пробелы и переводы строк
        sequence = content.replace('\n', '').replace(' ', '').replace('\t', '')
    
    return sequence.upper()

# Основная программа для тестирования HMM
if __name__ == "__main__":
    print("=" * 60)
    print("ТЕСТИРОВАНИЕ HMM С АЛГОРИТМОМ ВИТЕРБИ")
    print("=" * 60)
    
    # Читаем последовательности
    print("\nЗагрузка обучающих последовательностей...")
    seq1 = read_sequence_from_file("chromosome1.fasta")
    seq2 = read_sequence_from_file("chromosome2.fasta")
    
    print(f"seq1: {len(seq1)} bp")
    print(f"seq2: {len(seq2)} bp")
    
    # Создаем HMM
    print("\nСоздание HMM...")
    hmm = HMM(seq1, seq2, mean_segment_length=300)
    
    # Тестируем на короткой тестовой последовательности
    test_seq = "ATGC" * 10 + "GGCC" * 10
    print(f"\nТестовая последовательность: {len(test_seq)} bp")
    
    decoded_states = hmm.viterbi_decode(test_seq)
    print(f"Декодировано состояний: {len(decoded_states)}")
    
    # Сохраняем результат
    hmm.save_decoding(decoded_states, "test_decoding.txt")
    
    print("\nHMM готова к использованию!")

ТЕСТИРОВАНИЕ HMM С АЛГОРИТМОМ ВИТЕРБИ

Загрузка обучающих последовательностей...
seq1: 4602429 bp
seq2: 8317371 bp

Создание HMM...
ПАРАМЕТРЫ HMM

Эмиссионные вероятности:
Состояние  P(A)       P(T)       P(G)       P(C)      
--------------------------------------------------
1          0.262101   0.262101   0.237899   0.237899  
2          0.136774   0.136774   0.363226   0.363226  

Переходные вероятности (средняя длина сегмента = 300):
P(остаться в состоянии) = 0.996672
P(переключиться) = 0.003328

Начальные вероятности:
P(начать в состоянии 1) = 0.50
P(начать в состоянии 2) = 0.50

Проверка свойств:
Состояние 1: сумма эмиссионных вероятностей = 1.000000, сумма переходных = 1.000000
Состояние 2: сумма эмиссионных вероятностей = 1.000000, сумма переходных = 1.000000

Тестовая последовательность: 80 bp
Декодировано состояний: 80
Декодированные состояния сохранены в test_decoding.txt

HMM готова к использованию!
