In [6]:
!pip install numpy pandas



In [7]:
import numpy as np
import pandas as pd

![SWPairHMMLocal](figures/phmm_local.png)

In [8]:
# transition probabilities

delta = 0.2
tau = 0.1
epsilon = 0.4
eta = 0.2

In [9]:
# transition matrix

a = pd.DataFrame(
    np.array([
        # B    RX1      a    RY1      b    M                    X        Y        E
        [ 0.0, 1 - eta, eta, 0.0,     0.0, 0.0,                 0.0,     0.0,     0.0 ], # B
        [ 0.0, 1 - eta, eta, 0.0,     0.0, 0.0,                 0.0,     0.0,     0.0 ], # RX1
        [ 0.0, 0.0,     0.0, 1 - eta, eta, 0.0,                 0.0,     0.0,     0.0 ], # a
        [ 0.0, 0.0,     0.0, 1 - eta, eta, 0.0,                 0.0,     0.0,     0.0 ], # RY1
        [ 0.0, 0.0,     0.0, 0.0,     0.0, 1 - (2*delta) - tau, delta,   delta,   tau ], # b
        [ 0.0, 0.0,     0.0, 0.0,     0.0, 1 - (2*delta) - tau, delta,   delta,   tau ], # M
        [ 0.0, 0.0,     0.0, 0.0,     0.0, 1 - epsilon - tau,   epsilon, 0.0,     tau ], # X
        [ 0.0, 0.0,     0.0, 0.0,     0.0, 1 - epsilon - tau,   0.0,     epsilon, tau ], # Y
        [ 0.0, 0.0,     0.0, 0.0,     0.0, 0.0,                 0.0,     0.0,     0.0 ]  # E
    ]),
    columns = ['B', 'RX1', 'a', 'RY1', 'b', 'M', 'X', 'Y', 'E']
)
a.index = ['B', 'RX1', 'a', 'RY1', 'b', 'M', 'X', 'Y', 'E']

a

Unnamed: 0,B,RX1,a,RY1,b,M,X,Y,E
B,0.0,0.8,0.2,0.0,0.0,0.0,0.0,0.0,0.0
RX1,0.0,0.8,0.2,0.0,0.0,0.0,0.0,0.0,0.0
a,0.0,0.0,0.0,0.8,0.2,0.0,0.0,0.0,0.0
RY1,0.0,0.0,0.0,0.8,0.2,0.0,0.0,0.0,0.0
b,0.0,0.0,0.0,0.0,0.0,0.5,0.2,0.2,0.1
M,0.0,0.0,0.0,0.0,0.0,0.5,0.2,0.2,0.1
X,0.0,0.0,0.0,0.0,0.0,0.5,0.4,0.0,0.1
Y,0.0,0.0,0.0,0.0,0.0,0.5,0.0,0.4,0.1
E,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [10]:
# emissions (match state)

eM = pd.DataFrame(
    np.array([
        # A     C     G     T
        [ 0.50, 0.05, 0.15, 0.30 ], # A
        [ 0.05, 0.50, 0.30, 0.15 ], # C
        [ 0.15, 0.30, 0.50, 0.05 ], # G
        [ 0.30, 0.15, 0.05, 0.50 ]  # T
    ]),
    columns = ['A', 'C', 'G', 'T']
)
eM.index = ['A', 'C', 'G', 'T']

eM

Unnamed: 0,A,C,G,T
A,0.5,0.05,0.15,0.3
C,0.05,0.5,0.3,0.15
G,0.15,0.3,0.5,0.05
T,0.3,0.15,0.05,0.5


In [11]:
# emissions (insert / delete states)

eXY = {
    'A': 0.25,
    'C': 0.25,
    'G': 0.25,
    'T': 0.25
}

eXY

{'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25}

In [7]:
def max_vX1(S1, S2, i, j, v_X1, tb):
    q = eXY[S2[0]]
    
    r = np.array([
        a.loc['X1', 'X1']*v_X1[i][j - 1]
    ])
    
    mr = np.max(r)
    mi = np.argmax(r)
    
    v_X1[i][j] = q*mr
    tb[i][j] = mi
    
def max_vY1(S1, S2, i, j, v_Y1, tb):
    q = eXY[S2[0]]
    
    r = np.array([
        a.loc['Y1', 'Y1']*v_Y1[i - 1][j]
    ])
    
    mr = np.max(r)
    mi = np.argmax(r)
    
    v_X1[i][j] = q*mr
    tb[i][j] = mi
    
def max_vM(S1, S2, i, j, v_M, v_X, v_Y, tb):
    p = eM.loc[S1[i - 1], S2[j - 1]]

    r = np.array([
        a.loc['M', 'M']*v_M[i - 1][j - 1],
        a.loc['X', 'M']*v_X[i - 1][j - 1],
        a.loc['Y', 'M']*v_Y[i - 1][j - 1]
    ])
    
    mr = np.max(r)
    mi = np.argmax(r)

    v_M[i][j] = p*mr
    tb[i][j] = mi
    
def max_vX(S1, S2, i, j, v_M, v_X, v_Y, tb):
    q = eXY[S1[0]]
    
    r = np.array([
        a.loc['M', 'X']*v_M[i - 1][j],
        a.loc['X', 'X']*v_X[i - 1][j]
    ])
    
    mr = np.max(r)
    mi = np.argmax(r)
    
    v_X[i][j] = q*mr
    tb[i][j] = mi
    
def max_vY(S1, S2, i, j, v_M, v_X, v_Y, tb):
    q = eXY[S2[0]]
    
    r = np.array([
        a.loc['M', 'Y']*v_M[i][j - 1],
        a.loc['Y', 'Y']*v_Y[i][j - 1]
    ])
    
    mr = np.max(r)
    mi = np.argmax(r)
    
    v_Y[i][j] = q*mr
    tb[i][j] = mi

def traceback(S1, S2, v_X1, vY1, v_M, v_X, v_Y, tb_X1, tb_Y1, tb_M, tb_X, tb_Y):
    i = len(A)
    j = len(B)
    n = np.max([len(A), len(B)]) - 1
    
    r = np.array([ v_M[i][j], v_X[i][j], v_Y[i][j] ])
    
    mr = np.max(r)
    mi = np.argmax(r)
    
    a1 = (n+1)*[' ']
    a2 = (n+1)*[' ']
            
    while i > 0 and j > 0:
        if mi == 0:
            a1[n] = S1[i - 1]
            a2[n] = S2[j - 1]
            mi = tb_M[i][j]
            i -= 1
            j -= 1
        elif mi == 1:
            a1[n] = S1[i - 1]
            a2[n] = '-'        
            mi = tb_X[i][j]
            i -= 1
        elif mi == 2:
            a1[n] = '-'
            a2[n] = S2[j - 1]
            mi = tb_Y[i][j]
            j -= 1
            
        n -= 1
        
    return ''.join(a1), ''.join(a2)

In [8]:
# Initialization

## Specify sequences

A = "GTCATGTTAGTTTACG"
B = "TAGTTAACG"

## Viterbi matrices

vX1 = np.zeros([len(A)+1, len(B)+1])
vY1 = np.zeros([len(A)+1, len(B)+1])
vM = np.zeros([len(A)+1, len(B)+1])
vX = np.zeros([len(A)+1, len(B)+1])
vY = np.zeros([len(A)+1, len(B)+1])

vM[0,0] = 1
vX[0,0] = 0  # redundant; written anyway simply to follow the Durbin textbook, p. 84
vY[0,0] = 0  # redundant; written anyway simply to follow the Durbin textbook, p. 84

## Traceback matrices

tM = np.zeros([len(A)+1, len(B)+1])
tX = np.zeros([len(A)+1, len(B)+1])
tY = np.zeros([len(A)+1, len(B)+1])

In [9]:
# Populate Viterbi matrices

for i in range(1, len(A)+1):
    for j in range(1, len(B)+1):
        max_vM(A, B, i, j, vM, vX, vY, tM)
        max_vX(A, B, i, j, vM, vX, vY, tX)
        max_vY(A, B, i, j, vM, vX, vY, tY)

In [10]:
# Follow traceback matrices

a1, a2 = traceback(A, B, vM, vX, vY, tM, tX, tY)

In [11]:
# Print alignment

print(a1)
print(a2)

TTACG
TA--G
