In [1]:
#!/usr/bin/env python3

########################################################################
# File: problem25.ipynb

# Author: Nicholas Chan
# History: 12/06/2021 Created
########################################################################

# Assignment 7: Problem 25
<br>
For this assignment, we were given input for the number of iterations of learning to execute, <br>
an emission sequence, emitted symbols, hidden states, an initial transition table, and an initial <br>
emission table. With this information we were to compute a new set of parameters for the HMM <br>
through iterations of the Expectation and Maximization steps of  Viterbi Learning. <br>


# ViterbiLearning Class
<br>
The ViterbiLearning class stores the number of iterations of learning to execute, the <br> 
emission sequence, emission symbols, hidden states, transition table, and emission  <br>
table given as input. With this information, we can generate the likelihood maximizing path <br> 
with the Viterbi algorithm and then estimate new parameters which can be used to re-compute <br>
another likelihood maximizing path.

In [2]:
import numpy as np
# use long double floating point type for all computation
# particularly, initialize the emission and transition tables to this
class ViterbiLearning:
    '''
    Objects of the ViterbiLearning class are able to call on a method called computePath to 
    compute the most likely path of hidden states used to generate an emission sequence. 
    This path maximizes Pr(x,pi) which is the probability of the emission sequence and path 
    occurring together. Using this path, we can estimate a new set of parameters.
    '''
    def __init__(self, hStates, eSymbols, emissionSeq, trTable, emTable):
        '''
        The ViterbiLearning class initializes objects with a list of hidden states, list of emission symbols,
        an emission sequence, transition table, and emission table. The path of maximum likelyhood and the
        probability of generating it are computed by the computePath() method. A new Emission and Transition table
        can then be estimated from emission and transition counts of the computed most likely path.
        '''
        self.hS = hStates # List of hidden states
        self.eS = eSymbols # List of emitted symbols
        self.emS = emissionSeq # stores sequence of emitted symbols
        self.trT = trTable # stores transition table parsed from input
        self.emT = emTable # stores emission table parsed from input
        self.prob = 0 # Computed on call to self.computePath()
        self.path = self.computePath() # Computed hidden path via Viterbi Algorithm and sets prob
        self.newEmT = self.genNewEmissionTable()
        self.newTrT = self.genNewTransitionTable()
        
    def maxState(self, scores):
        '''
        Small method used to return a list of [{maximal hidden state}, {corresponding maximal score}]
        '''
        # For sublists in pathList: [ [A, 0.0585], ... ] where subList[0] is max state and subList[1] is max score
        return [self.hS[np.argmax(scores)], max(scores)]

    def computePath(self):
        '''
        Scores - computed with Score = prevScore(l) * transition(l to k) * emission(p_i to k)
        '''
        firstScores = [[(1/len(self.hS))*self.emT[state + self.emS[0]], '[ZERO_STATE]', '[ZERO_IDX]', state] for state in self.hS] # Scores of 1st nodes after source node
        scoreList = [firstScores] # List stores lists containing the scores of each column: [score, prevState, currState]
        for i in range(1,len(self.emS)): # Iterates over the indices of the emission seq and hidden path
            currCol = [] # Stores the most likely scores for the curr column
            prevScore = scoreList[i-1] # prev Score List
            for idx, state in enumerate(self.hS): # Iterates over all hidden states in curr column
                currNodeScores = [] # Stores the possible scores of a state in the curr column
                for prevStateIdx, prevState in enumerate(self.hS): # Iterates over states in previous column
                    currEmission = self.emT[state+self.emS[i]] # Emission for a curr state
                    currTransition = self.trT[prevState + state] # Transition for a curr state
                    currScore =  prevScore[prevStateIdx][0] * currEmission * currTransition # Computes a score
                    currScoreAndPrevState = [currScore, prevState, prevStateIdx, state]
                    currNodeScores.append(currScoreAndPrevState) # Appends scores to the list representing a State's curr score
                currCol.append(max(currNodeScores, key = lambda x:x[0])) # Appends list with the maximal score given a previous state to the list representing the current column
            scoreList.append(currCol)
        scoreToPathList = scoreList[::-1] # We will look at all columns of the HMM from a reversed view
        lastStates = max(scoreToPathList[0], key = lambda x:x[0]) # When starting backtracking, start with the last state which has the maximal score
        self.prob = lastStates[0]
        revPath = "" # This will hold our backtracked states
        for i in scoreToPathList[1:]: # Last state's preceding states are found by following prev state pointers on index = 2
            revPath += lastStates[3]
            lastStates = i[lastStates[2]]
        revPath += lastStates[3]
        path = revPath[::-1] # Reverse our backtracked path to get our regular path
        return path
    
    def genNewEmissionTable(self):
        '''
        Generates a new emission table from emission counts of the maximum likelihood path
        and input emission sequence.
        '''
        norms = {s:0 for s in self.hS} # Initialize dictionary for normalization constants
        newEmT = dict() # Initialize dictionary
        for st in self.hS:
            for em in self.eS:
                newEmT[st+em] = 0
        for stateIdx, state in enumerate(self.path):
            norms[state] += 1 # Increment entry corresponding to state in norms dict 
            newEmT[state+self.emS[stateIdx]] += 1 # Increment entry corresponding to emission in new em table
        for s1 in self.hS:
            for e2 in self.eS:
                if norms[s1] == 0: # Handles div by 0 when normalizing
                    newEmT[s1+e2] = 0
                else:
                    newEmT[s1+e2] /= norms[s1] # Normalize emission table sums by conditional sums of states
        return newEmT
    
    def genNewTransitionTable(self):
        '''
        Generates a new transition table from transition counts of the maximum likelihood path.
        '''
        norms = {s:0 for s in self.hS} # Initialize dictionary for normalization constants
        newTrT = dict()
        for st1 in self.hS:
            for st2 in self.hS:
                newTrT[st1 + st2] = 0
        for stateIdx in range(len(self.path)-1): # Iterate over idxes of the first state l in edges (l,k)
            norms[self.path[stateIdx]] += 1 # Increment entry corresponding to state in norms dict 
            newTrT[self.path[stateIdx] + self.path[stateIdx + 1]] += 1 # Increment entry corresponding to transition in new tr table
        for s1 in self.hS:
            for s2 in self.hS:
                if norms[s1] == 0: # Handles div by 0 when normalizing
                    newTrT[s1+s2] = 0
                else:
                    newTrT[s1+s2] /= norms[s1] # Normalize transition table sums by conditional sums of transitions
        return newTrT
    
    def maximizeIters(self, num):
        '''
        Iterates between maximization and expectation steps for 
        num amount of times.
        '''
        count = num
        prevProb = 0
        currProb = self.prob
        while count>0:
            self.trT = self.newTrT # Assigns object new transition and emission tables
            self.emT = self.newEmT
            prevProb = currProb # 
            # Rest of this code resets current object to the new transition and emission tables
            self.path = self.computePath() # Computed hidden path via Viterbi Algorithm and sets prob
            self.newEmT = self.genNewEmissionTable()
            self.newTrT = self.genNewTransitionTable()
            # After object has been refreshed with new parameters, compute new product weight
            currProb = self.prob
            count -= 1
        return
    
    def printParams(self):
        '''
        Use this method to return a string representing params.
        Call this after a call to the maximize method for Rosalind 
        solution.
        '''
        outString = ""
        for hs in self.hS:
            outString += hs + "\t"
        outString += "\n"
        for hs1 in self.hS:
            outString += hs1 + "\t"
            for hs2 in self.hS:
                outString += str(np.around(self.trT[hs1+hs2], decimals=3)) + "\t"
            outString += "\n"
        outString += "-------- \n"
        outString += "\t".join(self.eS) + "\n"
        for hs1 in self.hS:
            outString += hs1 + "\t"
            for em2 in self.eS:
                outString += str(np.around(self.emT[hs1+em2], decimals=3)) + "\t"
            outString += "\n"
        return outString
        
            
            

# Main Function
<br>
Parses an input text file as an int for the number of learning iterations to perform, <br>
a string for the emission sequence, a list of emission symbols, a string for the hidden path, <br>
a list for the hidden states, a dictionary for a transition table, and a dictionary for an <br>
emission table which the ViterbiLearning class requires as input. The main function also initializes <br>
a ViterbiLearning object with the given input to compute our parameters to be printed to output.

In [3]:
def main(infile, outfile='', inCL=None):
    '''
    The main function parses input text to find the number of learning iterations to perform,
    emission sequence string, a list of emitted symbols, a list of hidden states, a transition 
    table, and an emission table. With this input we can generate the likelihood maximizing path
    of hidden states using the computePath() method from a ViterbiLearning class object (Maximization).
    Then we can estimate a new transition and emission table which can be used to recompute another likelihood
    maximizating path (Expectation).
    '''
    with open(infile,'r') as myfile:
        iters = int(myfile.readline()) # READS IN NUMBER OF ITERATIONS TO EXECUTE
        myfile.readline() # Clears over spacer line
        emSeq = myfile.readline().rstrip()  # READS EMISSION SEQUENCE STRING
        myfile.readline() # Clears over spacer line
        emSymbols = myfile.readline().rstrip().split('\t') # Reads list of emitted symbols
        myfile.readline() # Clears over spacer line
        hStates = myfile.readline().rstrip().split('\t') # Reads list of hidden states
        myfile.readline() # Clears over spacer line
        myfile.readline() # Clears over spacer line of emission symbols
        
        # TRANSITION TABLE PARSING
        trTable = dict() # Initialize transition table
        trRows = []
        trInRow = []
        while '-' not in trInRow:
            trInRow = myfile.readline().rstrip()
            if '-' in trInRow:
                break
            else:
                trRows.append(np.longdouble(trInRow.split('\t')[1:]))
        for stateIdx1, state1 in enumerate(hStates): # Iterate over hidden states and their idxes
            for stateIdx2, state2 in enumerate(hStates): # Iterate over symbols and their idxes
                trTable[state1+state2] = np.longdouble(trRows[stateIdx1][stateIdx2]) # Populate transition table probabilities where each key is a string: "{state1}+{state2}"
                
        myfile.readline() # Clears over spacer line
        
        # EMISSION TABLE PARSING
        emTable = dict() # Initialize emission table
        emRows = []
        emInRow = []
        for line in myfile:
            emInRow = line.rstrip()
            emRows.append(np.longdouble(emInRow.split('\t')[1:]))
        for stateIdx, state in enumerate(hStates): # Iterate over hidden states and their idxes
            for symbolIdx, symbol in enumerate(emSymbols): # Iterate over symbols and their idxes
                emTable[state+symbol] = np.longdouble(emRows[stateIdx][symbolIdx]) # Populate transition table probabilities where each key is a string: "{state}+{symbol}"

                
        # INITIALIZE OBJECT
        myVitL = ViterbiLearning(hStates, emSymbols, emSeq, trTable, emTable)
#         path = myVitL.path
        myVitL.maximizeIters(iters) # Runs iters number of learning iterations
        print(myVitL.printParams())
        
if __name__ == "__main__":
    main("data/rosalind_ba10i.txt")
#     main("data/p25-simple-input.txt")
#     main("data/p25-ex-input.txt")


A	B	C	
A	0.0	0.0	1.0	
B	0.559	0.324	0.118	
C	0.243	0.622	0.135	
-------- 
x	y	z
A	0.429	0.571	0.0	
B	0.057	0.0	0.943	
C	0.459	0.459	0.081	



In [4]:
# INSPECTION

# INSPECTION TEAM
# Jodi Lee
# Nabil Mohammed

# RESPONSES
# - Work on implementing main function
# - Make maximizing method bounded by a specified number of iterations
# - Write more markdown comments
# - Clean up code
# - Add more docstrings and inline comments

# CORRECTIONS
# - Finished main function
# - Fixed my maximizing method
# - Wrote markdown comments
# - Cleaned code
# - Added more docstrings and inline comments