## **Data Preprocessing**

In [3]:
from Bio import SeqIO
from Bio.Seq import Seq, MutableSeq
from BCBio import GFF
import pandas as pd
import re
from collections import Counter
import numpy as np
import pickle
import json
from tqdm import tqdm

In [4]:
#fasta_file = "C:\\Users\\Godel\\Documents\\PYTHON\\Analisis Computacional de Datos\\dna\\ncbi_dataset\\data\\GCF_000001405.40\\GCF_000001405.40_GRCh38.p14_genomic.fna"
fasta_file = "GCF_000001405.40_GRCh38.p14_genomic.fna"
sequences = {}
pattern = re.compile(r"chromosome (\d+)")
cant_seq = 0
with open(fasta_file, "r") as file:
    for record in SeqIO.parse(file, "fasta"):
        if record.description[:2] == "NC":
            try: 
                n_crom = pattern.search(record.description).group(1)
            except:
                print(f"Error en la secuencia {record.description}")
                n_crom = cant_seq
            print(f"\rNC{n_crom}", end="")
            sequences[f"NC{n_crom}"] = record.seq
        cant_seq += 1


sequences['NCX'] = sequences.pop('NC61') # Cromosoma X
sequences['NCY'] = sequences.pop('NC62') # Cromosoma Y
sequences['NCM'] = sequences.pop('NC704') # Mitocondria

NC22Error en la secuencia NC_000023.11 Homo sapiens chromosome X, GRCh38.p14 Primary Assembly
NC61Error en la secuencia NC_000024.10 Homo sapiens chromosome Y, GRCh38.p14 Primary Assembly
NC62Error en la secuencia NC_012920.1 Homo sapiens mitochondrion, complete genome
NC704

In [5]:
uniques = {}
for cromosome, seq in sequences.items():
    uniques[cromosome] = list(set(seq))

In [6]:
uniques_all = set()
for values in uniques.values():
    uniques_all.update(values)
uniques_all = list(uniques_all)
states = ['A', 'C', 'G', 'T']
to_delete = [x for x in uniques_all if x.upper() not in states]
to_delete

['Y', 'N', 'W', 'M', 'R', 'K', 'B', 'S']

In [7]:
for key, value in tqdm(sequences.items()):
    value = MutableSeq(value)
    for nucleotid in to_delete:
        value.replace(nucleotid, '', inplace=True)
    sequences[key] = Seq(value)

100%|██████████| 25/25 [00:12<00:00,  1.97it/s]


In [8]:
pickle.dump(sequences, open("sequences.pkl", "wb"))

## **Creation of the Transition and Emission Matrices**

In [4]:
sequences = pickle.load(open("sequences.pkl", "rb"))

In [5]:
hidden_states = ["Exon", "Intron"]

In [6]:
sequence = sequences['NCX']

# Initialize counters
lowercase_count = 0
uppercase_count = 0

# Iterate through each character in the sequence
for char in sequence:
    if char.islower():
        lowercase_count += 1
    elif char.isupper():
        uppercase_count += 1

# Print the counts
print(f"Lowercase letters (INTRONES): {lowercase_count}")
print(f"Uppercase letters (EXONES): {uppercase_count}")
print(f'Total: {len(sequence)}')

Lowercase letters (INTRONES): 67723882
Uppercase letters (EXONES): 87169147
Total: 154893029


# **MATRIZ DE TRANSICIÓN Y EMISIÓN**

In [7]:
from collections import defaultdict
import numpy as np

# Definir la secuencia (En este caso el cromosoma 1 en sus primeros 1000 nucleótidos)
sequence = sequences['NC1'][0:1000]

# Definir estados y observaciones
hidden_states = ["Exon1", "Exon2", "Exon3", "Intron"]
nucleotids = ["A", "C", "G", "T"]

# Inicializar los diccionarios para contar los estados, la matriz de transición y la matriz de emisión
count_states = defaultdict(int)
count_transitions = defaultdict(lambda: defaultdict(int))
count_emision_transitions = defaultdict(lambda: defaultdict(int))

# Inicializar las transiciones y emisiones con 0
for state in hidden_states:
    for next_state in hidden_states:
        count_transitions[state][next_state] = 0  # Asegura que todos los pares de estados estén presentes
    for nucleotid in nucleotids:
        count_emision_transitions[state][nucleotid] = 0  # Asegura que todas las observaciones estén presentes para cada estado
    count_states[state] = 0

# Función para determinar el estado basado en la posición
def get_state(nucleotid, index):
    if nucleotid.islower():
        return f"Exon{index % 3 + 1}" 
    else:
        return "Intron"

# Inicializar el primer estado
current_state = get_state(sequence[0], 0)
count_states[current_state] += 1
count_emision_transitions[current_state][sequence[0].upper()] += 1

# Iterar a través de la secuencia para contar transiciones y emisiones
i = 1
for nuc in sequence[1:]:
    next_state = get_state(nuc, i)
    count_states[next_state] += 1
    count_transitions[current_state][next_state] += 1
    count_emision_transitions[next_state][nuc.upper()] += 1
    current_state = next_state
    i += 1
    if next_state == "Intron":
        i = 0


# Calcular la distribución inicial
distribucion_inicial = pd.Series({state: count / len(sequence) for state, count in count_states.items()}, name="Distribución Inicial")
distribucion_inicial

Exon1     0.271
Exon2     0.268
Exon3     0.265
Intron    0.196
Name: Distribución Inicial, dtype: float64

In [8]:
# Ajustar las transiciones según las reglas especificadas
count_transitions['Exon1'] = {'Exon2': count_transitions['Exon1']['Exon2']}
count_transitions['Exon2'] = {'Exon3': count_transitions['Exon2']['Exon3']}
count_transitions['Exon3'] = {k: count_transitions['Exon3'][k] for k in ('Exon1', 'Intron')}
count_transitions['Intron'] = {k: count_transitions['Intron'][k] for k in ('Intron', 'Exon1')}

# Calcular las probabilidades de transición con verificación para evitar división por cero
prob_transicion = {
    state: {next_state: count / sum(next_states.values()) if sum(next_states.values()) > 0 else 0 for next_state, count in next_states.items()}
    for state, next_states in count_transitions.items()
}

# Calcular las probabilidades de emisión con verificación para evitar división por cero
prob_emision = {
    state: {obs: count / sum(observations.values()) if sum(observations.values()) > 0 else 0 for obs, count in observations.items()}
    for state, observations in count_emision_transitions.items()
}

print("Distribución inicial:", distribucion_inicial)
print("Matriz de transición:", prob_transicion)
print("Matriz de emisión:", prob_emision)

Distribución inicial: Exon1     0.271
Exon2     0.268
Exon3     0.265
Intron    0.196
Name: Distribución Inicial, dtype: float64
Matriz de transición: {'Exon1': {'Exon2': 1.0}, 'Exon2': {'Exon3': 1.0}, 'Exon3': {'Exon1': 0.9962264150943396, 'Intron': 0.0037735849056603774}, 'Intron': {'Intron': 0.9693877551020408, 'Exon1': 0.030612244897959183}}
Matriz de emisión: {'Exon1': {'A': 0.25092250922509224, 'C': 0.44649446494464945, 'G': 0.17712177121771217, 'T': 0.12546125461254612}, 'Exon2': {'A': 0.3208955223880597, 'C': 0.43283582089552236, 'G': 0.19402985074626866, 'T': 0.05223880597014925}, 'Exon3': {'A': 0.20754716981132076, 'C': 0.47547169811320755, 'G': 0.19245283018867926, 'T': 0.12452830188679245}, 'Intron': {'A': 0.1683673469387755, 'C': 0.32142857142857145, 'G': 0.3520408163265306, 'T': 0.15816326530612246}}


In [9]:
ctdtype = pd.CategoricalDtype(categories=hidden_states, ordered=True)
hidden_states_cat = pd.Categorical(hidden_states, dtype=ctdtype)

transition_matrix = pd.DataFrame(prob_transicion).fillna(0).T
transition_matrix.index = transition_matrix.index.astype(ctdtype)
transition_matrix.columns = transition_matrix.columns.astype(ctdtype)
transition_matrix = transition_matrix.sort_index(axis=0).sort_index(axis=1)
transition_matrix


Unnamed: 0,Exon1,Exon2,Exon3,Intron
Exon1,0.0,1.0,0.0,0.0
Exon2,0.0,0.0,1.0,0.0
Exon3,0.996226,0.0,0.0,0.003774
Intron,0.030612,0.0,0.0,0.969388


In [10]:
emission_matrix = pd.DataFrame(prob_emision).T
emission_matrix.index = emission_matrix.index.astype(ctdtype)
emission_matrix

Unnamed: 0,A,C,G,T
Exon1,0.250923,0.446494,0.177122,0.125461
Exon2,0.320896,0.432836,0.19403,0.052239
Exon3,0.207547,0.475472,0.192453,0.124528
Intron,0.168367,0.321429,0.352041,0.158163


## **Viterbi Algorithm**

In [11]:
def Viterbi(A: np.array, B: np.array, pi: np.array, observados, states):
    # A: matriz de transición de la cadena de Markov oculta
    # B: matriz de emisión
    # pi: distribución inicial de la cadena de Markov
    # observados: lista de valores observados
    # states: lista de estados ocultos
    # retorna la lista de valores no observados más probable

    num_obs = len(observados)
    num_states = len(A)
    A = np.log(A)
    B = np.log(B)
    pi = np.log(pi)

    # Matrices para almacenar las probabilidades y los caminos
    m = np.full((num_obs, num_states), fill_value=-np.inf, dtype=np.float64)
    path = np.zeros((num_obs, num_states), dtype=int)

    # Inicialización
    for i in range(num_states):
        m[0][i] = pi[i] + B[i][observados[0]]
        path[0][i] = 0

    # Recursión
    for t in range(1, num_obs):
        for i in range(num_states):
            max_prob = -np.inf
            for j in range(num_states):
                prob = m[t-1][j] + A[j][i] + B[i][observados[t]]
                if prob > max_prob:
                    max_prob = prob
                    path[t][i] = j
            m[t][i] = max_prob

    # Construcción del camino más probable
    max_seq = []
    last_state = np.argmax(m[num_obs-1])
    for t in range(num_obs-1, -1, -1):
        max_seq.append(states[last_state])
        last_state = path[t][last_state]

    return max_seq[::-1]

nuc_dtype = pd.CategoricalDtype(categories=nucleotids, ordered=True)
obs = pd.Categorical(sequences['NC1'][1001:40000].upper(), dtype=nuc_dtype).codes
max_seq = Viterbi(transition_matrix.to_numpy()
            , emission_matrix.to_numpy()
            , distribucion_inicial.to_numpy()
            , obs
            , hidden_states_cat.codes)
print(f'Secuencia oculta más probable: {max_seq}')

  A = np.log(A)


Secuencia oculta más probable: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,

In [12]:
Counter(max_seq)

Counter({3: 30815, 0: 2728, 1: 2728, 2: 2728})

In [13]:
max_prob_states = pd.Series(max_seq).map(lambda x: hidden_states[x]).astype(ctdtype)
max_prob_states

0        Intron
1        Intron
2        Intron
3        Intron
4        Intron
          ...  
38994     Exon2
38995     Exon3
38996     Exon1
38997     Exon2
38998     Exon3
Length: 38999, dtype: category
Categories (4, object): ['Exon1' < 'Exon2' < 'Exon3' < 'Intron']

In [37]:
ground_truth = pd.Series(sequences['NC1'][1001:40000]).map(lambda x: x.upper() == x) # Intron
predicted = max_prob_states.map(lambda x: x == 'Intron') # Intron

In [38]:
sum(ground_truth == predicted) / len(ground_truth)

0.740044616528629