## 6.1 HMM 클래스의 정의

In [1]:
import numpy as np # 필요한 라이브러리를 불러옵니다
import tensorflow as tf

In [2]:
class HMM(object):
    def __init__(self, initial_prob, trans_prob, obs_prob):
        self.N = np.size(initial_prob) # 파라미터들을 메소드의 변수로 저장합니다
        self.initial_prob = initial_prob
        self.trans_prob = trans_prob
        self.emission = tf.constant(obs_prob)
        
        assert self.initial_prob.shape == (self.N, 1) # 모든 행렬의 형태가 맞는지 재확인합니다
        assert self.trans_prob.shape == (self.N, self.N)
        assert obs_prob.shape[0] == self.N
        
        self.obs_idx = tf.placeholder(tf.int32) # 포워드 알고리즘에 사용되는 플레이스홀더를 정의합니다
        self.fwd = tf.placeholder(tf.float64)
        
    def get_emission(self, obs_idx):
        slice_location = [0, obs_idx] # 방출 행렬 내에서 슬라이스해줄 위치
        num_rows = tf.shape(self.emission)[0]
        slice_shape = [num_rows, 1] # 슬라이스의 형태)
        return tf.slice(self.emission, slice_location, slice_shape) # 슬라이스 연산을 수행합니다
    
    def forward_init_op(self):    
        obs_prob = self.get_emission(self.obs_idx)
        fwd = tf.multiply(self.initial_prob, obs_prob)
        return fwd
    
    def forward_op(self):
        transitions = tf.matmul(self.fwd, tf.transpose(self.get_emission(self.obs_idx)))
        weighted_transitions = transitions * self.trans_prob
        fwd = tf.reduce_sum(weighted_transitions, 0)
        return tf.reshape(fwd, tf.shape(self.fwd))

## 6.5 주어진 HMM 에서 포워드 알고리즘의 정의

In [3]:
def forward_algorithm(sess, hmm, observations):
    fwd = sess.run(hmm.forward_init_op(), feed_dict={hmm.obs_idx: observations[0]})
    for t in range(1, len(observations)):
        fwd = sess.run(hmm.forward_op(), feed_dict={hmm.obs_idx: observations[t], hmm.fwd: fwd})
    prob = sess.run(tf.reduce_sum(fwd))
    return prob

## 6.6 HMM 정의 및 포워드 알고리즘의 호출

In [4]:
initial_prob = np.array([[0.6],[0.4]])
trans_prob = np.array([[0.7, 0.3],[0.4, 0.6]])
obs_prob = np.array([[0.1, 0.4, 0.5],[0.6, 0.3, 0.1]])

hmm = HMM(initial_prob=initial_prob, trans_prob=trans_prob, obs_prob=obs_prob)

observations = [0, 1, 1, 2, 1]

with tf.Session() as sess:
    prob = forward_algorithm(sess, hmm, observations)
    print('Probability of observing {} is {}'.format(observations, prob))

Probability of observing [0, 1, 1, 2, 1] is 0.004540300799999999


## 6.7 비터비 캐시를 멤버 변수로 추가하기

In [5]:
class viterbi_HMM(object):
    def __init__(self, initial_prob, trans_prob, obs_prob):
        self.N = np.size(initial_prob) # 파라미터들을 메소드의 변수로 저장합니다
        self.initial_prob = initial_prob
        self.trans_prob = trans_prob
        self.emission = tf.constant(obs_prob)
        self.viterbi = tf.placeholder(tf.float64)
        
        assert self.initial_prob.shape == (self.N, 1) # 모든 행렬의 형태가 맞는지 재확인합니다
        assert self.trans_prob.shape == (self.N, self.N)
        assert obs_prob.shape[0] == self.N
        
        self.obs_idx = tf.placeholder(tf.int32) # 포워드 알고리즘에 사용되는 플레이스홀더를 정의합니다
        self.fwd = tf.placeholder(tf.float64)
        
    def get_emission(self, obs_idx):
        slice_location = [0, obs_idx] # 방출 행렬 내에서 슬라이스해줄 위치
        num_rows = tf.shape(self.emission)[0]
        slice_shape = [num_rows, 1] # 슬라이스의 형태)
        return tf.slice(self.emission, slice_location, slice_shape) # 슬라이스 연산을 수행합니다
    
    def forward_init_op(self):    
        obs_prob = self.get_emission(self.obs_idx)
        fwd = tf.multiply(self.initial_prob, obs_prob)
        return fwd
    
    def forward_op(self):
        transitions = tf.matmul(self.fwd, tf.transpose(self.get_emission(self.obs_idx)))
        weighted_transitions = transitions * self.trans_prob
        fwd = tf.reduce_sum(weighted_transitions, 0)
        return tf.reshape(fwd, tf.shape(self.fwd))
    
    def decode_op(self):
        transitions = tf.matmul(self.viterbi, tf.transpose(self.get_emission(self.obs_idx)))
        weighted_transitions = transitions * self.trans_prob
        viterbi = tf.reduce_max(weighted_transitions, 0)
        return tf.reshape(viterbi, tf.shape(self.viterbi))
    
    def backpt_op(self):
        back_transitions = tf.matmul(self.viterbi, np.ones((1, self.N)))
        weighted_back_transitions = back_transitions * self.trans_prob
        return tf.argmax(weighted_back_transitions, 0)

## 6.10 비터비 디코딩 알고리즘의 정의

In [6]:
def viterbi_decode(sess, hmm, observations):
    viterbi = sess.run(hmm.forward_init_op(), feed_dict={hmm.obs_idx: observations[0]})
    backpts = np.ones((hmm.N, len(observations)), 'int32') * -1

    for t in range(1, len(observations)):
        viterbi, backpt = sess.run([hmm.decode_op(), hmm.backpt_op()], feed_dict={hmm.obs_idx: observations[t],hmm.viterbi: viterbi})
        backpts[:, t] = backpt
    
    tokens = [viterbi[:, -1].argmax()]
    
    for i in range(len(observations) - 1, 0, -1):
        tokens.append(backpts[tokens[-1], i])
    return tokens[::-1]

## 6.11 비터비 디코드의 실행

In [7]:
initial_prob = np.array([[0.6],[0.4]])
trans_prob = np.array([[0.7, 0.3],[0.4, 0.6]])
obs_prob = np.array([[0.1, 0.4, 0.5],[0.6, 0.3, 0.1]])

observations = [0, 1, 1, 2, 1]

hmm = viterbi_HMM(initial_prob=initial_prob, trans_prob=trans_prob, obs_prob=obs_prob)

with tf.Session() as sess:
    seq = viterbi_decode(sess, hmm, observations)
    print('Most likely hidden states are {}'.format(seq))

Most likely hidden states are [1, 0, 0, 0, 0]
