In [84]:
import numpy as np
from hmmlearn import hmm
from scipy import special

In [85]:
class PairHMM(hmm.CategoricalHMM):
    '''
        Clase que simula el funcionamiento de un pair HMM.
    '''
    def __init__ (self, alphabet, mu, epsilon, tau, match_probabilities, insert_probabilities):
        '''
        Parámetros:
        ----------
        - alphabet: lista de caracteres que conforman el alfabeto a tratar.
        - mu: probabilidad de pasar desde el estado M a un estado de inserción.
        - epsilon: probabilidad de permanecer en un estado de inserción.
        - tau: probabilidad de pasar al estado fin.
        - match_probabilities: lista con probabilidades de alineamiento entre dos elementos del alfabeto. Para un alfabeto con n elementos debe estar ordenada de forma:
            1-1, 1-2,..., 1-n, 2-1, 2-2,..., 2-n, ..., n-1, ..., n-n.
        - insert_prbabilities: lista con probabilidades de alinear un elemento del alfabeto con un hueco, ordenada según el alfabeto.
        '''
        
        #Transformamos el alfabeto a una lista de caracteres
        alphabet = list(map(str, alphabet))
        self.alphabet = alphabet
        self.n_alphabet = len(alphabet)

        #Calculamos el número de posibles emisiones a partir del alfabeto
        n_features = self.n_alphabet**2+2*self.n_alphabet+1

        #Los cinco estados son (Begin, M, X, Y, End)
        super().__init__(n_components=5, n_features=n_features)

        #Calculamos las probabilidades a partir de los parámetros
        match_to_ins = np.subtract(1.0, 2.0*mu+tau)
        ins_to_match = np.subtract(1.0, epsilon+tau)

        if match_to_ins<0.0 :
            raise ValueError(f"1-2*mu-tau debe ser no negativo (se ha obtenido {match_to_ins:.4f})")

        if ins_to_match<0.0 :
            raise ValueError(f"1-epsilon-tau debe ser no negativo (se ha obtenido {ins_to_match:.4f})")

        self.transmat_  = np.array([[0,match_to_ins, mu, mu, tau], 
                                   [0,match_to_ins, mu, mu, tau],
                                   [0,ins_to_match, epsilon, 0, tau],
                                   [0,ins_to_match, 0, epsilon, tau],
                                   [0,0,0,0,1]]) 
        
        self.__init_emissions(match_probabilities, insert_probabilities)


    def __init_emissions(self, match_probabilities, insert_probabilities):
        '''
        Función que inicializa la matriz de emisiones.
        '''
        self.emissionprob_ = np.zeros((5, self.n_features))

        #Emisiones de estados silenciosos
        self.emissionprob_[0,self.n_features-1] = 1
        self.emissionprob_[4,self.n_features-1] = 1

        if len(match_probabilities)!=(self.n_alphabet**2):
            raise ValueError(f"El número de emisiones en el estado de alineamiento debe ser {self.n_alphabet**2})")
        
        if len(insert_probabilities)!=(self.n_alphabet):
            raise ValueError(f"El número de emisiones en el estado de inserción debe ser {self.n_alphabet})")
        
        self.emissionprob_[1, 0:self.n_alphabet**2] = match_probabilities
        self.emissionprob_[2, self.n_alphabet**2: self.n_alphabet**2+self.n_alphabet] = insert_probabilities
        self.emissionprob_[3, self.n_alphabet**2+self.n_alphabet:self.n_features-1] = insert_probabilities
        self.startprob_=np.array([1,0,0,0,0])
        self._check()

    def __decodify(self, x):
        '''
        Función que convierte una secuencia de elementos del alfabeto en secuencia de índices.

        Parámetros:
        ----------
        - x: string con elementos del alfabeto.
        '''
        x = list(map(str.upper, x))
        try:
            decoded= np.array(list(map(self.alphabet.index, x)))
        except ValueError:
            print("¡Error, existe elemento de la secuencia que no se encuentra en el alfabeto!")
            raise
        return decoded
    
    def __codify(self,state_sequence,x,y):
        '''
        Función auxiliar que construye el alineamiento a partir de la secuencia de estados resultante del algoritmo de Viterbi.

        Parámetros:
        ----------
        - state_sequence: resultado del algoritmo de Viterbi.
        - x: string con elementos del alfabeto.
        - y: string con elementos del alfabeto.
        '''
        i=0
        j=0
        align1 = []
        align2 = []
        for k in state_sequence:
            match k:
                case 0:
                    align1.append(x[i])
                    align2.append(y[j])
                    i+=1
                    j+=1
                case 1:
                    align1.append(x[i])
                    align2.append("-")
                    i+=1
                case 2:
                    align1.append("-")
                    align2.append(y[j])
                    j+=1

        return "".join(align1), "".join(align2)

    def modifiedViterbi(self, x, y):
        '''
        Algoritmo de Viterbi adapatado a pair HMM.

        Parámetros:
        ----------
        - x: string con elementos del alfabeto.
        - y: string con elementos del alfabeto.

        Salidas:
        ----------
        - sequence: secuencia de estados resultante del algoritmo de Viterbi, siendo 0 el estado M, 1 el estado X y 2 el estado Y.
        - alignment: lista con el alineamiento de x e y que se produce a partir de sequence.
        - p_fin: la probabilidad final del algoritmo de Viterbi.
        '''
        decoded_x = self.__decodify(x)
        decoded_y = self.__decodify(y)

        #Tendremos valores de (0,0) a (n,m)
        n = len(decoded_x)
        m = len(decoded_y)

        v = np.zeros((3, n+1, m+1))
        pointer = np.zeros((3, n+1, m+1), dtype=int)
        v[0,0,0] = 1
        
        #Ignorar el warning de división entre 0 a la hora de calcular log(0)
        with np.errstate(divide='ignore'): 
            #Inicializar las variables
            log_emissionprob_ =  np.log(self.emissionprob_)
            log_transmat_ = np.log(self.transmat_)
            v = np.log(v)
        
        for i in range(1,n+1):
            v[1,i,0] = log_emissionprob_[2,(self.n_alphabet**2)+decoded_x[i-1]]+np.max([log_transmat_[1,2]+v[0,i-1,0], log_transmat_[2,2]+v[1,i-1,0] ])
            pointer[1,i,0] = np.argmax([log_transmat_[1,2]+v[0,i-1,0], log_transmat_[2,2]+v[1,i-1,0], -np.inf ])

        for j in range(1,m+1):
            v[2,0,j] = log_emissionprob_[3,(self.n_alphabet**2+self.n_alphabet)+decoded_y[j-1]]+np.max([log_transmat_[1,3]+v[0,0,j-1], log_transmat_[3,3]+v[2,0,j-1] ])
            pointer[2,0,j] = np.argmax([log_transmat_[1,3]+v[0,0,j-1], -np.inf ,log_transmat_[3,3]+v[2,0,j-1] ])

        for i in range(1,n+1):
            for j in range(1,m+1):
                v[0,i,j] = log_emissionprob_[1,decoded_x[i-1]*self.n_alphabet+decoded_y[j-1]]+np.max([log_transmat_[1,1]+v[0,i-1,j-1], log_transmat_[2,1]+v[1,i-1,j-1], log_transmat_[3,1]+v[2,i-1,j-1] ])
                pointer[0,i,j] = np.argmax([log_transmat_[1,1]+v[0,i-1,j-1], log_transmat_[2,1]+v[1,i-1,j-1], log_transmat_[3,1]+v[2,i-1,j-1] ])

                v[1,i,j] = log_emissionprob_[2,(self.n_alphabet**2)+decoded_x[i-1]]+np.max([log_transmat_[1,2]+v[0,i-1,j], log_transmat_[2,2]+v[1,i-1,j] ])
                pointer[1,i,j] = np.argmax([log_transmat_[1,2]+v[0,i-1,j], log_transmat_[2,2]+v[1,i-1,j], -np.inf ])

                v[2,i,j] = log_emissionprob_[3,(self.n_alphabet**2+self.n_alphabet)+decoded_y[j-1]]+np.max([log_transmat_[1,3]+v[0,i,j-1], log_transmat_[3,3]+v[2,i,j-1] ])
                pointer[2,i,j] = np.argmax([log_transmat_[1,3]+v[0,i,j-1], -np.inf ,log_transmat_[3,3]+v[2,i,j-1] ])

        #Empezamos el proceso de recuperación de estados
        sequence = [ np.argmax([v[0,n,m], v[1,n,m], v[2,n,m] ])]
        i = n
        j = m
        
        #Los índices cambiarán dependiendo del último estado
        def cases(state, i, j):            
            if i>0 and (state==0 or state==1):
                i-=1
            if j>0 and (state==0 or state==2):
                j-=1
            return i,j

        while i>0 or j>0:
            next_state = pointer[sequence[-1] ,i,j]
            i,j = cases(sequence[-1],i,j) 
            sequence.append( next_state )

        #Eliminamos el último elemento pues es siempre el estado inicial
        del sequence[-1]

        #Invertimos para obtener la secuencia de estados
        sequence.reverse()
        return sequence, self.__codify(sequence,x,y), self.transmat_[0,4]*np.max(np.exp([v[0,n,m], v[1,n,m], v[2,n,m]]))
    

    def modifiedFoward(self, x, y):
        '''
        Algoritmo de avance adapatado a pair HMM.

        Parámetros:
        ----------
        - x: string con elementos del alfabeto.
        - y: string con elementos del alfabeto.

        Salidas:
        ----------
        - total_prob: la probabilidad de x e y bajo el modelo.
        '''
        decoded_x = self.__decodify(x)
        decoded_y = self.__decodify(y)

        #Tendremos valores de (0,0) a (n,m)
        n = len(decoded_x)
        m = len(decoded_y)

        v = np.zeros((3, n+1, m+1))
        v[0,0,0] = 1

        #Ignorar el warning de división entre 0 a la hora de calcular log(0)
        with np.errstate(divide='ignore'): 
            #Inicializar las variables
            log_emissionprob_ =  np.log(self.emissionprob_)
            log_transmat_ = np.log(self.transmat_)
            v = np.log(v)


        for i in range(1,n+1):
            v[1,i,0] = log_emissionprob_[2,(self.n_alphabet**2)+decoded_x[i-1]]+special.logsumexp([log_transmat_[1,2]+v[0,i-1,0], log_transmat_[2,2]+v[1,i-1,0] ])

        for j in range(1,m+1):
            v[2,0,j] = log_emissionprob_[3,(self.n_alphabet**2+self.n_alphabet)+decoded_y[j-1]]+special.logsumexp([log_transmat_[1,3]+v[0,0,j-1], log_transmat_[3,3]+v[2,0,j-1] ])
            
        for i in range(1,n+1):
            for j in range(1,m+1):
                v[0,i,j] = log_emissionprob_[1,decoded_x[i-1]*self.n_alphabet+decoded_y[j-1]]+special.logsumexp([log_transmat_[1,1]+v[0,i-1,j-1], log_transmat_[2,1]+v[1,i-1,j-1], log_transmat_[3,1]+v[2,i-1,j-1] ])

                v[1,i,j] = log_emissionprob_[2,(self.n_alphabet**2)+decoded_x[i-1]]+special.logsumexp([log_transmat_[1,2]+v[0,i-1,j], log_transmat_[2,2]+v[1,i-1,j] ])

                v[2,i,j] = log_emissionprob_[3,(self.n_alphabet**2+self.n_alphabet)+decoded_y[j-1]]+special.logsumexp([log_transmat_[1,3]+v[0,i,j-1], log_transmat_[3,3]+v[2,i,j-1] ])

        return self.transmat_[0,4]*np.sum(np.exp([v[0,n,m], v[1,n,m], v[2,n,m]]))

# Ejemplos:

In [86]:
model = PairHMM(alphabet=["A", "C", "G", "T"], mu=0.2, epsilon=0.3, tau=0.1, match_probabilities=[0.2, 0.0125, 0.0125, 0.025, 0.0125, 0.2, 0.025, 0.0125, 0.0125, 0.025, 0.2, 0.0125, 0.025, 0.0125, 0.0125, 0.2], insert_probabilities=[0.25]*4)

In [87]:
x="CACGAAT"
y="AGTTCAA"

In [88]:
sequence, alignment, p_fin=model.modifiedViterbi( x, y )
print("\n".join(alignment))

CA---CGAAT
-AGTTC-AA-


In [89]:
total_prob=model.modifiedFoward(x, y)
print(total_prob)

5.85602525347472e-12


In [90]:
p_sequence = p_fin/total_prob
print(p_sequence)

0.10373930673190596


In [91]:
#Un alineamiento trivial
sequence, alignment, p_fin=model.modifiedViterbi( "AAAAAAA" , "AAAAAAA" )
print("\n".join(alignment))
total_prob=model.modifiedFoward("AAAAAAA", "AAAAAAA")
print(total_prob)

AAAAAAA
AAAAAAA
2.8039432167959317e-08


In [92]:
model = PairHMM(alphabet=["A", "C", "G", "T"], mu=0.2, epsilon=0.5, tau=0.1, match_probabilities=[0.2, 0.0125, 0.0125, 0.025, 0.0125, 0.2, 0.025, 0.0125, 0.0125, 0.025, 0.2, 0.0125, 0.025, 0.0125, 0.0125, 0.2], insert_probabilities=[0.25]*4)

In [93]:
sequence, alignment, p_fin=model.modifiedViterbi( x, y )
print("\n".join(alignment))

CACG---AAT
-A-GTTCAA-


In [94]:
total_prob=model.modifiedFoward(x, y)
print(total_prob)
p_sequence = p_fin/total_prob
print(p_sequence)

5.758812258839653e-12
0.08682345899234942


In [95]:
model = PairHMM(["A", "C", "G", "T"], 0.1, 0.1, 0.1, [0.125, 0.0375, 0.0125, 0.075, 0.0375, 0.125, 0.075, 0.0125, 0.0125, 0.075, 0.125, 0.0375, 0.075, 0.0125, 0.0375, 0.125], [0.25]*4)

In [96]:
x="TTTAACTTATCG"
y="TTACTCG"

In [97]:
sequence, alignment, p_fin=model.modifiedViterbi( x, y )
print("\n".join(alignment))

TTTAACTTATCG
-TT-AC-T--CG


In [98]:
model.modifiedFoward(x, y)

3.8758596420364025e-15

In [99]:
model = PairHMM(["A", "C", "G", "T"], 0.1, 0.1, 0.02, [0.125, 0.0375, 0.0125, 0.075, 0.0375, 0.125, 0.075, 0.0125, 0.0125, 0.075, 0.125, 0.0375, 0.075, 0.0125, 0.0375, 0.125], [0.25]*4)

In [100]:
x="gcgcgtgcgcggaaggagccaaggtgaagttgtagcagtgtgtcagaagaggtgcgtggcaccatgctgtcccccgaggcggagcgggtgctgcggtacctggtcgaagtagaggagttg"
y="gacttgtggaacctacttcctgaaaataaccttctgtcctccgagctctccgcacccgtggatgacctgctcccgtacacagatgttgccacctggctggatgaatgtccgaatgaagcg"

In [101]:
sequence, alignment, p_fin=model.modifiedViterbi( x, y )
print("\n".join(alignment))

gcgcgtgcgcggaaggagccaaggtgaagttgtagcagtgtgtcagaagaggtgcgtggcacca-tgctgtcccccgaggcggagcgggtgctgcggtacctgg-tcgaagta-ga-gg-a-gtt--g
ga-cttg-t-ggaacct-acttcctgaa-aataacct-tctgtcctccgagctctc-cgcacccgtggatgacctgctcccgtacacagatgttgcc-acctggctggatgaatgtccgaatgaagcg


In [102]:
model.modifiedFoward(x, y)

5.859574024341285e-150

In [103]:
model = PairHMM(["A", "C", "G", "T"], 0.2, 0.1, 0.1, [0.125, 0.0375, 0.0125, 0.075 , 0.0375, 0.125, 0.075, 0.0125, 0.0125, 0.075, 0.125, 0.0375, 0.075, 0.0125, 0.0375, 0.125], [0.25]*4)

In [104]:
x="TTACG"
y="TAG"

In [105]:
sequence, alignment, p_fin=model.modifiedViterbi( x, y )
print("\n".join(alignment))

TTACG
-TA-G


In [106]:
model.modifiedFoward(x, y)

5.712099609375e-07

In [107]:
x="TACGAACTG"
y="TCGTAACGTA"

In [108]:
sequence, alignment, p_fin=model.modifiedViterbi( x, y )
print("\n".join(alignment))

TACG-AAC-TG
T-CGTAACGTA
