# BA10I - viterbi learning

#### Viterbi learning:

If we **know x and Parameters, then we can construct the most likely path π** by applying the Viterbi algorithm to solve the **Decoding Problem**:

(x, ?, Parameters) → π

On the other hand, if we know **x and π**, then reconstructing **Parameters** amounts to solving the **HMM Parameter Estimation Problem**:

(x, π, ?) → Parameters

STOP and Think: What do the expressions (x, π, ?) → Parameters and (x, ?, Parameters) → π remind you of?

*Expectation - maximization algorithm in k-means and hierarchical clustering*

**HMM Parameter Learning Problem:**

*Estimate the parameters of an HMM explaining an emitted string.*

**Input**:

A string x = x1 ... xn emitted by an HMM with unknown transition and emission probabilities.


**Output**:
A transition matrix Transition and an emission matrix Emission that maximize Pr(x, π) over all possible transition and emission matrices and over all hidden paths π.


Unfortunately, the HMM Parameter Learning Problem is **intractable**, and so we will instead develop a heuristic that is analogous to the Lloyd algorithm for k-means clustering. In that algorithm, we iterated two steps:

1. “From Centers to Clusters”,

        (Data, ?, Centers) → HiddenVector
        
        

2. and “From Clusters to Centers”,

        (Data, HiddenVector, ?) → Centers



<br>

As for **HMM parameter estimation**, we begin with an **initial random guess for Parameters**. Then, we use the **Viterbi algorithm to find the optimal hidden path π**:

    (x, ?, Parameters) → π

Once we know π, we will question our original choice of Parameters and apply our solution to the **HMM Parameter Estimation Problem to update Parameters based on x and π**:

    (x, π, ?) → Parameters′

We then **iterate over these two steps**, hoping that the estimated parameters are getting closer and closer to the parameters solving the HMM Parameter Learning Problem:

$$(x, ?, Parameters)→ (x, π, Parameters)  → (x, π, ?)                     $$
$$               → (x, π, Parameters′)  → (x, ?, Parameters′)          $$
$$            → (x, π′, Parameters′)  → (x, π′, ?)                  $$
$$          → (x, π′, Parameters′′) → . . .  $$


This approach to learning the HMM’s parameters is called Viterbi learning.

STOP and Think: Can Pr(x, π) decrease during Viterbi learning? When would you decide to stop the Viterbi learning algorithm?

Note that we have not specified how Viterbi learning should terminate. In practice, there are various stopping rules to control its running time. For example, the algorithm can be stopped if the number of iterations exceeds a predefined threshold or if Pr(x, π) changes very little from one iteration to another.

#### Code Challenge: Implement Viterbi learning for estimating the parameters of an HMM.

**Input:**

A number of iterations j, followed by a string x of symbols emitted by an HMM, followed by the HMM's alphabet Σ, followed by the HMM's states, followed by initial transition and emission matrices for the HMM.


**Output:**
Emission and transition matrices resulting from applying Viterbi learning for j iterations.

    #Sample
    
        100
        --------
        zyzxzxxxzz
        --------
        x y z
        --------
        A B
        --------
            A	B
        A	0.599	0.401	
        B	0.294	0.706	
        --------
            x	y	z
        A	0.424	0.367	0.209	
        B	0.262	0.449	0.289
    
    Sample Output:

            A	B
        A	0.5	0.5	
        B	0.0	1.0	
        --------
            x	y	z
        A	0.333	0.333	0.333	
        B	0.4	0.1	0.5
        
        
<br>


<br>

---

### Simple sample dataset

<br>

In [18]:
def estimate_parameters(pi, x, S, alpha):

    S = len(states)
    T = np.zeros(shape = (S, S))
    E = np.zeros(shape = (S, len(alpha)))
    E[pi[0]][x[0]] +=1
    prev = pi[0]
    for i in range(1, len(x)):
        E[pi[i]][x[i]] +=1
        T[prev][pi[i]] +=1
        prev = pi[i]
    for s in range(S):
        if sum(E[s]) == 0:
            E[s]+=1
        E[s] = E[s]/sum(E[s])
        if sum(T[s]) == 0:
            T[s] += 1
        T[s] = T[s]/sum(T[s])

    return T,E


In [34]:
## Given a set of initial T, E matrices, apply viterbi algorithm for optimal hidden path

def viterbi_hiddenpath(x, T, E, S):
    """returns max likelihood hidden path for emission string, given HMM"""

    n = len(x)
    viterbi = np.ones(shape = (S, n)) * -float('inf')
    pointers = [[False for e in range(n)] for s in range(S)] 
    # init first column of viterbi with Pr_emission & 1/States
    for state in range(S):
        viterbi[state][0] = np.log(1/S) + E[state][x[0]]
        pointers[state][0] = -1
        
    # Fill viterbi graph using dynamic programming
    
    for i in range(1,n):
        for state in range(S):
            for prev in range(S):
                p_total = E[state][x[i]] + T[prev][state] + viterbi[prev][i-1]
                if p_total > viterbi[state][i]:
                    viterbi[state][i] = p_total
                    pointers[state][i] = prev
    # start backtrack from greatest probability in last column of viterbi
    score = -float('inf')
    for state in range(S):
        if viterbi[state][n-1] > score:
            last = state
            score = viterbi[state][n-1]
    path = [last]
    # backtrack to recreate max likelihood hidden_path in reverse
    i = n-1
    while i > 0:
        next = pointers[last][i]
        path.append(next)
        last = next
        i -= 1
    # reverse string to get hidden_path     
    return path[::-1]

In [2]:
import pandas as pd
import numpy as np

with open("/Users/jasonmoggridge/Dropbox/Rosalind/Coursera_textbook_track/Course6/data/ba10i.sample.txt") as f:
    lines = [line.strip() for line in f]
    # number of iterations 'iters'
    iters = int(lines[0].strip())
    # string emitted by HMM 'x'
    x = lines[2].strip()
    # alphabet of emissions from HMM in x; convert x to integers
    alpha = lines[4].split()
    x = [int(alpha.index(emission)) for emission in x]
    
    # states of HMM
    states = lines[6].split()
    S = len(states)
    # Transition matrix
    T = np.array([line.split()[1:] for line in lines[9:9+len(states)]], float)
    #Emission matrix
    E = np.array([line.split()[1:] for line in lines[9+len(states)+2:]], float)

print(iters,x,alpha, states)
print(T)
print(E)

100 [2, 1, 2, 0, 2, 0, 0, 0, 2, 2] ['x', 'y', 'z'] ['A', 'B']
[[0.599 0.401]
 [0.294 0.706]]
[[0.424 0.367 0.209]
 [0.262 0.449 0.289]]


In [3]:
for r in range(iters):
    # viterbi algo to find optimal hidden path | HMM, x
    pi = viterbi_hiddenpath(x, T, E, S)
    # estimate HMM parameters | pi, x
    T, E = estimate_parameters(pi, x, S, alpha)

NameError: name 'viterbi_hiddenpath' is not defined

In [48]:
print(pi)
print("\n\nestimated Transition matrix:")
print(pd.DataFrame(np.around(T,3), index=states, columns = states))
print("\nestimated Emission matrix:")
print(pd.DataFrame(np.around(E,3), index=states, columns = alpha))

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


estimated Transition matrix:
     A    B
A  0.5  0.5
B  0.0  1.0

estimated Emission matrix:
       x      y      z
A  0.333  0.333  0.333
B  0.400  0.100  0.500


#### sample data solved. converges after 2 iterations.

    pi=    [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


    estimated Transition matrix:
         A    B
    A  0.5  0.5
    B  0.0  1.0

    estimated Emission matrix:
           x      y      z
    A  0.333  0.333  0.333
    B  0.400  0.100  0.500

#### Extra test dataset:

##### Solution

    Output:
        A	B
    A 	0.0	1.0 
    B 	1.0	0.0 
    --------
        x	y	z
    A	0.36	0.14	0.5 
    B	0.34	0.34	0.32

<br>

---

### Extra test dataset

<br>

In [60]:
def viterbi_learning(x, T, E, states, alpha):
    """Learn HMM given emission string x and initial HMM; return HMM: T, E matrices"""

    def viterbi_hiddenpath(x, T, E, S):
        """returns max likelihood hidden path for emission string, given HMM"""

        n = len(x)
        viterbi = np.ones(shape = (S, n)) * -float('inf')
        pointers = [[False for e in range(n)] for s in range(S)] 
        # init first column of viterbi with Pr_emission & 1/States
        for state in range(S):
            viterbi[state][0] = np.log(1/S) + E[state][x[0]]
            pointers[state][0] = -1

        # Fill viterbi graph using dynamic programming

        for i in range(1,n):
            for state in range(S):
                for prev in range(S):
                    p_total = E[state][x[i]] + T[prev][state] + viterbi[prev][i-1]
                    if p_total > viterbi[state][i]:
                        viterbi[state][i] = p_total
                        pointers[state][i] = prev
        # start backtrack from greatest probability in last column of viterbi
        score = -float('inf')
        for state in range(S):
            if viterbi[state][n-1] > score:
                last = state
                score = viterbi[state][n-1]
        path = [last]
        # backtrack to recreate max likelihood hidden_path in reverse
        i = n-1
        while i > 0:
            next = pointers[last][i]
            path.append(next)
            last = next
            i -= 1
        # reverse string to get hidden_path     
        return path[::-1]

    def estimate_parameters(pi, x, S, alpha):
        """Adjusts parameters based on hidden path pi and emit string x"""
        T = np.zeros(shape = (S, S))
        E = np.zeros(shape = (S, len(alpha)))
        E[pi[0]][x[0]] +=1
        prev = pi[0]
        for i in range(1, len(x)):
            E[pi[i]][x[i]] +=1
            T[prev][pi[i]] +=1
            prev = pi[i]
        for s in range(S):
            if sum(E[s]) == 0:
                E[s]+=1
            E[s] = E[s]/sum(E[s])
            if sum(T[s]) == 0:
                T[s] += 1
            T[s] = T[s]/sum(T[s])
        return T,E
    
    # Convert emissions x to integers list
    x = [int(alpha.index(emission)) for emission in x]
    S = len(states)
    
    # E-M algorithm
    for r in range(iters):
        # viterbi algo to find optimal hidden path | HMM, x
        pi = viterbi_hiddenpath(x, T, E, S)
        # estimate HMM parameters | pi, x
        T, E = estimate_parameters(pi, x, S, alpha)
    print("Final hidden path pi", pi)
    return (T, E)

In [61]:
def outputHMM(T,E,S,alpha):
    lists = []
    lists.append(['']+S)
    for i in range(len(S)):
        # use {0:g} format to remove trailing zeros
        lists.append([S[i]]+['{0:g}'.format(x) for x in T[i]])
    lists.append('--------')
    lists.append([''] + alpha)
    for i in range(len(S)):
        lists.append([S[i]]+['{0:g}'.format(x) for x in E[i]])
    return lists

In [64]:

with open("/Users/jasonmoggridge/Dropbox/Rosalind/Coursera_textbook_track/Course6/data/ba10i.extra.txt") as f:
    lines = [line.strip() for line in f]
    # number of iterations 'iters'
    iters = int(lines[0].strip())
    # string emitted by HMM 'x'
    x = lines[2].strip()
    # alphabet of emissions from HMM in x; convert x to integers
    alpha = lines[4].split()
    # states of HMM
    states = lines[6].split()
    # Transition matrix
    T = np.array([line.split()[1:] for line in lines[9:9+len(states)]], float)
    #Emission matrix
    E = np.array([line.split()[1:] for line in lines[9+len(states)+2:]], float)

print(iters,x,alpha, states)
print(T)
print(E)

# Learn HMM from input string and random HMM to start.
(T, E) = viterbi_learning(x,T,E, states, alpha)

# T = pd.DataFrame(np.around(T,3), index=states, columns = states)
# E = pd.DataFrame(np.around(E,3), index=states, columns = alpha)                 
print("\n\nestimated Transition matrix:\n")
print(T)
print("\nestimated Emission matrix:\n")
print(E)

outdata = outputHMM(T, E, states, alpha)

with open('/Users/jasonmoggridge/Dropbox/Rosalind/Coursera_textbook_track/Course6/data/10i.extra.out.txt','w') as file:
    for line in outdata[:-1]:
        if line[0] != '-':
            file.write('\t'.join(line))
        else:
            file.write(line)
        file.write('\n')
    file.write('\t'.join(outdata[-1]))

100 zzzyyxyzzzxyxzxxzzyxxzzzzzzyzxyzxxxzxxxzyzzxzxzzzxyzyyyxxxxzxyyyyyxzzzyxyzzxxzxxzxyxyxyzxzxzxzyxyzzz ['x', 'y', 'z'] ['A', 'B']
[[0.436 0.564]
 [0.953 0.047]]
[[0.367 0.248 0.385]
 [0.401 0.361 0.238]]
Final hidden path pi [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]


estimated Transition matrix:

[[0. 1.]
 [1. 0.]]

estimated Emission matrix:

[[0.36 0.14 0.5 ]
 [0.34 0.34 0.32]]


#### my solution:

    estimated Transition matrix:
             A    B
        A  0.0  1.0
        B  1.0  0.0

    estimated Emission matrix:
              x     y     z
        A  0.36  0.14  0.50
        B  0.34  0.34  0.32

<br>

---

### Stepik test dataset

<br>

In [76]:

def viterbi_learning(x, T, E, states, alpha):
    """Learn HMM given emission string x and initial HMM; return HMM: T, E matrices"""

    def viterbi_hiddenpath(x, T, E, S):
        """returns max likelihood hidden path for emission string, given HMM"""

        n = len(x)
        viterbi = np.ones(shape = (S, n)) * -float('inf')
        pointers = [[False for e in range(n)] for s in range(S)] 
        # init first column of viterbi with Pr_emission & 1/States
        for state in range(S):
            viterbi[state][0] = np.log(1/S) + E[state][x[0]]
            pointers[state][0] = -1

        # Fill viterbi graph using dynamic programming

        for i in range(1,n):
            for state in range(S):
                for prev in range(S):
                    p_total = E[state][x[i]] + T[prev][state] + viterbi[prev][i-1]
                    if p_total > viterbi[state][i]:
                        viterbi[state][i] = p_total
                        pointers[state][i] = prev
        # start backtrack from greatest probability in last column of viterbi
        score = -float('inf')
        for state in range(S):
            if viterbi[state][n-1] > score:
                last = state
                score = viterbi[state][n-1]
        path = [last]
        # backtrack to recreate max likelihood hidden_path in reverse
        i = n-1
        while i > 0:
            next = pointers[last][i]
            path.append(next)
            last = next
            i -= 1
        # reverse string to get hidden_path     
        return path[::-1]

    def estimate_parameters(pi, x, S, alpha):
        """Adjusts parameters based on hidden path pi and emit string x"""
        T = np.zeros(shape = (S, S))
        E = np.zeros(shape = (S, len(alpha)))
        E[pi[0]][x[0]] +=1
        prev = pi[0]
        for i in range(1, len(x)):
            E[pi[i]][x[i]] +=1
            T[prev][pi[i]] +=1
            prev = pi[i]
        for s in range(S):
            if sum(E[s]) == 0:
                E[s]+=1
            E[s] = E[s]/sum(E[s])
            if sum(T[s]) == 0:
                T[s] += 1
            T[s] = T[s]/sum(T[s])
        return T,E
    
    # Convert emissions x to integers list
    x = [int(alpha.index(emission)) for emission in x]
    S = len(states)
    
    # E-M algorithm
    for r in range(iters):
        # viterbi algo to find optimal hidden path | HMM, x
        pi = viterbi_hiddenpath(x, T, E, S)
        # estimate HMM parameters | pi, x
        T, E = estimate_parameters(pi, x, S, alpha)
    print("Final hidden path pi", pi)
    return (T, E)


def outputHMM(T,E,S,alpha):
    lists = []
    lists.append(['']+S)
    for i in range(len(S)):
        # use {0:g} format to remove trailing zeros
        lists.append([S[i]]+['{0:g}'.format(x) for x in T[i]])
    lists.append('--------')
    lists.append([''] + alpha)
    for i in range(len(S)):
        lists.append([S[i]]+['{0:g}'.format(x) for x in E[i]])
    print(lists)
    return lists

In [80]:
### MAIN:

with open("/Users/jasonmoggridge/Dropbox/Rosalind/Coursera_textbook_track/Course6/data/dataset_26260_8.txt") as f:
    lines = [line.strip() for line in f]
    # number of iterations 'iters'
    iters = int(lines[0].strip())
    # string emitted by HMM 'x'
    x = lines[2].strip()
    # alphabet of emissions from HMM in x; convert x to integers
    alpha = lines[4].split()
    # states of HMM
    states = lines[6].split()
    # Transition matrix
    T = np.array([line.split()[1:] for line in lines[9:9+len(states)]], float)
    #Emission matrix
    E = np.array([line.split()[1:] for line in lines[9+len(states)+2:]], float)

print(iters,x,alpha, states)
print(T)
print(E)

# Learn HMM from input string and random HMM to start.
(T, E) = viterbi_learning(x,T,E, states, alpha)

# T = pd.DataFrame(np.around(T,3), index=states, columns = states)
# E = pd.DataFrame(np.around(E,3), index=states, columns = alpha)                 
print("\n\nestimated Transition matrix:\n")
print(T)
print("\nestimated Emission matrix:\n")
print(E)

outdata = outputHMM(T, E, states, alpha)

with open('/Users/jasonmoggridge/Dropbox/Rosalind/Coursera_textbook_track/Course6/data/0UT.out.txt','w') as file:
    for line in outdata[:-1]:
        if line[0] != '-':
            file.write('\t'.join(line))
        else:
            file.write(line)
        file.write('\n')
    file.write('\t'.join(outdata[-1]))

100 xyxyxzyyxxxxxxyxyxxzxzxzyyxyxzyyzyyxyyxzzzxyyzyyyzxzxyzyxzzyxxxxxyxzyyxxyzzzyxzxzyxzzxxxzxyyxyyzxyyz ['x', 'y', 'z'] ['A', 'B']
[[0.261 0.739]
 [0.18  0.82 ]]
[[0.078 0.475 0.447]
 [0.211 0.213 0.576]]
Final hidden path pi [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


estimated Transition matrix:

[[0.5 0.5]
 [0.  1. ]]

estimated Emission matrix:

[[0.33333333 0.33333333 0.33333333]
 [0.39       0.35       0.26      ]]
[['', 'A', 'B'], ['A', '0.5', '0.5'], ['B', '0', '1'], '--------', ['', 'x', 'y', 'z'], ['A', '0.333333', '0.333333', '0.333333'], ['B', '0.39', '0.35', '0.26']]


<br>

---

### Rosalind dataset

<br>

In [81]:
### MAIN:

with open("/Users/jasonmoggridge/Dropbox/Rosalind/Coursera_textbook_track/Course6/data/rosalind_ba10i.txt") as f:
    lines = [line.strip() for line in f]
    # number of iterations 'iters'
    iters = int(lines[0].strip())
    # string emitted by HMM 'x'
    x = lines[2].strip()
    # alphabet of emissions from HMM in x; convert x to integers
    alpha = lines[4].split()
    # states of HMM
    states = lines[6].split()
    # Transition matrix
    T = np.array([line.split()[1:] for line in lines[9: 9+len(states)]], float)
    #Emission matrix
    E = np.array([line.split()[1:] for line in lines[9+len(states)+2:]], float)

print(iters, x, alpha, states)
print(T)
print(E)

# Learn HMM from input string and random HMM to start.
(T, E) = viterbi_learning(x, T, E, states, alpha)

T = np.around(T, 3)
E = np.around(E, 3)
print("\n\nestimated Transition matrix:\n")
print(T)
print("\nestimated Emission matrix:\n")
print(E)

outdata = outputHMM(T, E, states, alpha)

with open('/Users/jasonmoggridge/Dropbox/Rosalind/Coursera_textbook_track/Course6/data/0UT.ba10i.txt','w') as file:
    for line in outdata[:-1]:
        if line[0] != '-':
            file.write('\t'.join(line))
        else:
            file.write(line)
        file.write('\n')
    file.write('\t'.join(outdata[-1]))

100 yzyyzxzyyzxyzzxxyzyzyxyyzzzyzzxxyxyzyyzxxzxzzzxzxyzxxyxxyzzyyyxzzyyzxzxzyzzxzzyzzxxyxyyzyyyyzzzxyyzy ['x', 'y', 'z'] ['A', 'B']
[[0.959 0.041]
 [0.543 0.457]]
[[0.682 0.275 0.043]
 [0.601 0.077 0.322]]
Final hidden path pi [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


estimated Transition matrix:

[[1.  0. ]
 [0.5 0.5]]

estimated Emission matrix:

[[0.25  0.36  0.39 ]
 [0.333 0.333 0.333]]
[['', 'A', 'B'], ['A', '1', '0'], ['B', '0.5', '0.5'], '--------', ['', 'x', 'y', 'z'], ['A', '0.25', '0.36', '0.39'], ['B', '0.333', '0.333', '0.333']]


#### Solved, had a little bug in output formatting with rounding (needs to be 3, no trailing zeros)