In [1]:
import os

import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix
from scipy.stats import multivariate_normal


Unigram $p(w)$

In [2]:
unigram_dict = {}
with open('../data/unigram.txt', 'r') as unigram:
    lines = unigram.read()
    lines = lines.split()
    for idx, line in enumerate(lines):
        if idx % 2 == 0:
            unigram_dict[line] = float(lines[idx+1])

unigram_dict

{'<s>': 0.99,
 'eight': 0.000925,
 'five': 0.00089,
 'four': 0.000886,
 'nine': 0.000905,
 'oh': 0.000968,
 'one': 0.000905,
 'seven': 0.000869,
 'six': 0.000939,
 'three': 0.000883,
 'two': 0.000941,
 'zero': 0.000889}

Bigram $p(w_t|w_{t-1})$ 

In [3]:
bigram_dict = {}
with open('../data/bigram.txt', 'r') as bigram_txt:
    for line in bigram_txt:
        line = line.split()
        bigram_dict[line[0]] = {}

with open('../data/bigram.txt') as bigram_txt:
    for line in bigram_txt:
        line = line.split()
        bigram_dict[line[0]][line[1]] = float(line[2])
    
bigram_dict

{'<s>': {'eight': 0.012084,
  'five': 0.011881,
  'four': 0.009139,
  'nine': 0.011474,
  'oh': 0.012591,
  'one': 0.010967,
  'seven': 0.010967,
  'six': 0.011779,
  'three': 0.010865,
  'two': 0.013201,
  'zero': 0.010053},
 'eight': {'<s>': 0.012287,
  'eight': 0.005991,
  'five': 0.005788,
  'four': 0.0066,
  'nine': 0.007616,
  'oh': 0.006397,
  'one': 0.005585,
  'seven': 0.005483,
  'six': 0.005991,
  'three': 0.00589,
  'two': 0.006803,
  'zero': 0.006499},
 'five': {'<s>': 0.013708,
  'eight': 0.005788,
  'five': 0.005686,
  'four': 0.004569,
  'nine': 0.005585,
  'oh': 0.005686,
  'one': 0.007514,
  'seven': 0.006093,
  'six': 0.005077,
  'three': 0.005991,
  'two': 0.006296,
  'zero': 0.00589},
 'four': {'<s>': 0.011474,
  'eight': 0.006499,
  'five': 0.005788,
  'four': 0.006803,
  'nine': 0.005382,
  'oh': 0.006702,
  'one': 0.00528,
  'seven': 0.006194,
  'six': 0.005991,
  'three': 0.006296,
  'two': 0.005991,
  'zero': 0.005077},
 'nine': {'<s>': 0.011779,
  'eight': 0.

In [4]:
phoneme_dict = {}
with open("../data/dictionary.txt") as dict_txt:
    for line in dict_txt:
        line = line.split()
        phonemes = []
        word = line[0]
        for i in range(1, len(line)):
            phonemes.append(line[i])
        
        if(word in phoneme_dict.keys()):
            phoneme_dict[word] = [phoneme_dict[word], phonemes]
        else:
            phoneme_dict[word] = phonemes
phoneme_dict

{'<s>': ['sil'],
 'eight': ['ey', 't', 'sp'],
 'five': ['f', 'ay', 'v', 'sp'],
 'four': ['f', 'ao', 'r', 'sp'],
 'nine': ['n', 'ay', 'n', 'sp'],
 'oh': ['ow', 'sp'],
 'one': ['w', 'ah', 'n', 'sp'],
 'seven': ['s', 'eh', 'v', 'ah', 'n', 'sp'],
 'six': ['s', 'ih', 'k', 's', 'sp'],
 'three': ['th', 'r', 'iy', 'sp'],
 'two': ['t', 'uw', 'sp'],
 'zero': [['z', 'ih', 'r', 'ow', 'sp'], ['z', 'iy', 'r', 'ow', 'sp']]}

# HMM

* NUMMIXES = Number of Mixture (Multivariate Gaussian Distribution with dim=39)
* MIXTURE Value = Weight of each Multivariate normal
* MEAN, VARIANCE = both length 39, features assumed to be independent
* GCONST : Gaussian Distribution const (not sure)
* TRANSP : transposition probability = 5*5 matrix

Emission probability: $ P(o|X) = \sum w_i p_i(X) \ \ where\ X \sim N(\mu_i, \sum_i)$

In [5]:
hmm_dict = {}
hmm_txt = open("../data/hmm.txt")
num_state = 3 # phoneme당 state 3개.. 'sp' 는 1개
for idx in range(21):  # 21: Phoneme 의 갯수
    pronun_word = hmm_txt.readline().split('"')[1]
    hmm_dict[pronun_word] = {}
   
    hmm_txt.readline()
    
    Numstates = hmm_txt.readline().split()
    hmm_dict[pronun_word][Numstates[0]] = Numstates[1]
    
    if idx==20:
        num_state = 1
    
    for st_idx in range(num_state):
        #state number
        state = hmm_txt.readline().split()[1]
        hmm_dict[pronun_word][state] = {}
        
        
        Num_Mixes = hmm_txt.readline().split()
        hmm_dict[pronun_word][state][Num_Mixes[0]] = Num_Mixes[1]
        hmm_dict[pronun_word][state]['<MIXTURES>'] = {}
        
        for mix_idx in range(1,11):
            
            mixture = hmm_txt.readline().split()
        
            hmm_dict[pronun_word][state]['<MIXTURES>'][mixture[1]] = {}
            hmm_dict[pronun_word][state]['<MIXTURES>'][mixture[1]][mixture[0]] = mixture[2]
            
            #Mean
            mean_dim = hmm_txt.readline().split() 
            mean_num = hmm_txt.readline().split() #39개의 mean number
            hmm_dict[pronun_word][state]['<MIXTURES>'][mixture[1]][mean_dim[0]] = mean_num
            
            #Variance
            variance_dim = hmm_txt.readline().split() #input dimension = 39
            variance_num = hmm_txt.readline().split() #39개의 variance number
            hmm_dict[pronun_word][state]['<MIXTURES>'][mixture[1]][variance_dim[0]] = variance_num
            
            #GConst
            g_const = hmm_txt.readline().split()
            hmm_dict[pronun_word][state]['<MIXTURES>'][mixture[1]][g_const[0]] = g_const[1]
            
    #Transposition Probability
    hmm_txt.readline()
    trans_prob=[]
    
    #a matrix 가져오기
    if idx != 20:
        for trans_idx in range(1,6):
            trans_prob.append(hmm_txt.readline().split())
    else:
        for trans_idx in range(1,4):
            trans_prob.append(hmm_txt.readline().split())
    
    hmm_dict[pronun_word]['<TRANSP>'] = trans_prob
    #ENDHMM
    hmm_txt.readline()
hmm_txt.close()

In [6]:
hmm_dict['sp']

{'2': {'<MIXTURES>': {'1': {'<GCONST>': '9.197596e+001',
    '<MEAN>': ['-1.508647e+001',
     '1.690120e+000',
     '-3.829488e-001',
     '6.419236e-001',
     '-5.065308e-001',
     '-3.908817e-001',
     '1.012777e+000',
     '2.029495e+000',
     '3.223517e+000',
     '2.525147e+000',
     '2.603812e+000',
     '1.022069e+000',
     '2.993897e+001',
     '-2.324865e-001',
     '4.883844e-002',
     '1.222839e-001',
     '9.253027e-002',
     '6.492668e-002',
     '5.132933e-002',
     '-2.308269e-002',
     '9.083389e-002',
     '2.527888e-001',
     '3.424749e-001',
     '2.162057e-001',
     '1.574734e-002',
     '-1.576940e-001',
     '2.806408e-001',
     '-2.874403e-001',
     '-7.688734e-002',
     '-1.113549e-001',
     '-1.495737e-001',
     '-1.703831e-001',
     '-1.706861e-001',
     '-2.208022e-001',
     '-4.280793e-001',
     '-3.966915e-001',
     '-3.042443e-001',
     '-1.879792e-001',
     '5.372433e-001'],
    '<MIXTURE>': '1.120568e-001',
    '<VARIANCE>': ['3.

In [7]:
class HMM:
    """
    A class with api for needed constants and matrices.
    """
    def __init__(self, hmm_dict):
        self.hmm_dict = hmm_dict
        self.phonemes = list(self.hmm_dict.keys())
    
    def n_states(self, phoneme):
        return int(self.hmm_dict[phoneme]['<NUMSTATES>'])
    
    def transition_prob(self, phoneme):
        return np.array(self.hmm_dict[phoneme]['<TRANSP>']).astype(float)
    
    def states(self, phoneme):
        return [int(x) for x in self.hmm_dict[phoneme] if x not in ['<NUMSTATES>', '<TRANSP>']]
    
    def n_mixes(self, phoneme, state):
        return int(self.hmm_dict[phoneme][str(state)]['<NUMMIXES>'])
    
    def gauss_mixtures_dict(self, phoneme, state):
        return self.hmm_dict[phoneme][str(state)]['<MIXTURES>']
    
    def initial_prob(self, phoneme):
        return 1
    
    @staticmethod
    def emission_prob2(x, mixture_dict):
        assert len(x) == 39
        
        prob = 0
        for mix in mixture_dict.keys():
            mean = np.array(mixture_dict[mix]['<MEAN>']).astype(float)
            var = np.array(mixture_dict[mix]['<VARIANCE>']).astype(float)
            
            mv_norm = multivariate_normal(mean, np.eye(len(var))*var)
            weight = float(mixture_dict[mix]['<MIXTURE>'])
            prob += weight * mv_norm.pdf(x)
            
        return prob
    
    @staticmethod  # considering floating point precision
    def emission_prob(x, mixture_dict):
        assert len(x) == 39
        
        b = []
        exp_sum = 0.0
        max_b_i = 0.0
        for mix in mixture_dict.keys():
            mean = np.array(mixture_dict[mix]['<MEAN>']).astype(float)
            variance = np.array(mixture_dict[mix]['<VARIANCE>']).astype(float)
            weight = float(mixture_dict[mix]['<MIXTURE>'])
            gconst = float(mixture_dict[mix]['<GCONST>'])

            log_b_i = np.log(weight) - gconst/2 + np.sum((-0.5) * ((x-mean) ** 2) / variance)

            b.append(log_b_i)

        max_b_i = max(b)
        max_idx = np.argmax(b)
        
        for i in range(len(b)):
            if i != max_idx:
                diff = b[i] - max_b_i
                exp_sum += np.exp(diff)

        return max_b_i + np.log(1+exp_sum)


In [8]:
hmm = HMM(hmm_dict)
mixture_dict = hmm.gauss_mixtures_dict('f', 2)
# print(np.log(hmm.emission_prob([10.]*39, mixture_dict)), hmm.emission_prob2([20.]*39, mixture_dict))
a = 0
print(hmm.emission_prob([a]*39, mixture_dict), np.log(hmm.emission_prob2([a]*39, mixture_dict)))

-88.1317951529 -88.1317953191


In [9]:
gconst = float(mixture_dict['1']['<GCONST>'])
mean = np.array(mixture_dict['1']['<MEAN>']).astype(float)
var = np.array(mixture_dict['1']['<VARIANCE>']).astype(float)
mv_norm = multivariate_normal(mean, np.eye(len(var))*var)

np.abs(mv_norm.pdf(mean) - 1/((2*np.pi)**(39/2) * np.prod(np.sqrt(var)))) < 1e-5

True

Get test_data (label and MFCC)

In [10]:
test_data = {}

for folder in os.listdir("../data/tst/"):
    for folder2 in os.listdir("../data/tst/{}".format(folder)):
        for dirpath, _, files in os.walk("../data/tst/{}/{}".format(folder, folder2)):
            for file in files:
                with open(os.path.join(dirpath, file)) as txt:
                    label = file.split(".")[0]
                    shape = txt.readline().split()
                    test_data[label] = {'length': int(shape[0]), 'n_features': int(shape[1])}
                    
                    mfcc = np.ndarray((int(shape[0]), int(shape[1])))
                    for idx, line in enumerate(txt.readlines()):
                        mfcc[idx] = np.array(line.split()).astype(float)
                    
                    test_data[label]['mfcc'] = mfcc
                    

**Isolated Word Recognition**  
$ \hat w = \arg\max_w p(w|o) $  
$\ \  = \arg\max_w p(o|w)p(w) $  
$\ \ \simeq \arg\max_w \max_q p(o, q|w)p(w) $

$p(o|w)$: Acoustic model probability  
$p(w)$: Language model probability (unigram)  
$ o = (o_1, ... , o_T) $: observation sequence  
$ q = (q_1, ... , q_T) $: state sequence 

We need to find $p(o, q|w)$ with Vierbi Algorithm  

**Viterbi Algorithm**  
$max_{q_0 \sim q_t} p(q_0 \sim q_t | o_0 \sim o_t)$  
$=max_{q_0 \sim q_t} p(q_0 \sim q_t , o_0 \sim o_t)$  
$=max_{q_t}\{p(o_t|q_t)\max_{q_{t-1}}\{p(o_{t-1}|q_{t-1})...max_{q0}\{p(q_0)p(o_0|q_0)p(q_1|q_0)\}\}$

*(Floating point precision) Compute it with logarithms so you can just sum up. *

$v_t(j) = \max_{i=1}^N v_{t-1}(i)a_{ij}b_j(o_t)$  
$v_{t-1}(j):$ previous Viterbi path probability from the previous time step  
$a_{ij}:$ the transition probability from previous state $q_i$ to current state $q_j$  
$b_j(o_t)(=p(o_t|q_j)):$ the state observation likelihood of the observation symbol $o_t$ given the current state j

Function viterbi(observations of len T, state-graph of len N) returns best path:  
* create a path probability matrix viterbi[N+2, T]  
    * for each state s from 1 to N do:   ; initialization step
        * viterbi[s, 1] $\leftarrow a_{0,s} * b_s(o_1)$
        * backpointer[s,1] $\leftarrow 0$  
    * for each timestep t from 2 to T do:  ; recursion step
        * for each state s from 1 to N do:
            * viterbi[s,t] $\leftarrow \max_{s'=1}^N viterbi[s', t-1] * a_{s',s} * b_s(o_t)$
            * backpointer[s,t] $\leftarrow \arg\max_{s'=1}^N viterbi[s', t-1] * a_{s', s}$  
    * viterbi[q_F, T] $\leftarrow \max_{s=1}^N viterbi[s, T]*a_{s, q_F}$   ; termination step
    * backpointer[q_F, T] $\leftarrow \arg\max_{s=1}^N viterbi[s,T]*a_{s, q_F}$  ; termination step  
* return the backtrace path by following backpointers to states back in time from backpointer[q_F, T]
            
    

We have:  
phoneme_dict  
test_data: labels, mfcc  
unigram_dict  
bigram_dict  
hmm

Try it with one label

In [20]:
labels[0]

'1237743'

In [11]:
labels = list(test_data.keys())
label = labels[0]
mfcc = test_data[label]['mfcc']
print(label, mfcc.shape)

1237743 (289, 39)


state1에서 들어와서 뒤로가는건 없고, 앞으로 넘어간다. 근데 제자리에 있을수도 있고. 나가면 state5를 통해서 나감.

Word 가 주어져있으므로, State 5 를 통해서 나가면, 다음 Phoneme으로 넘어간다.
['f', 'ay', 'v'] 인데, 'v'에서 다음으로 넘어가면, 다음 단어로 넘어갈 확률 $p(w2|w1)$을 구하고 다시 viterbi iteration  
$p(q_1, q_2,... q_t, o_1,....o_t | w)$

첫 번째 word가 주어졌다고 가정해보자.

We need two models.
1. Utterance model  
2. Isolated word recognition model

**UTTERANCE MODEL**

Ex) $P(one, two, o_1, o_2) = p(one)p(o_1|one) p(two|one) p(o2 | two)$

$p(one) \leftarrow$ unigram

$p(one|two) \leftarrow$ bigram  
$p(o_1|one) ?$

**ISOLATED WORD RECOGNITON**

Then how do we model $p(o_1|one)$?  
recall that `one = ['w', 'ah', 'n', 'sp']`  
$p(o_1|one) = p(o_{11},...o_{1,t1},...o_{1,t2},..o_{1,t3},..o_{1,t} | w_{1,1},..w_{1,t1-1}, ah_{1,t1},..., n_{1,t2},...,sp_{1,t3},..sp_{1,t})$  
e.g. $p(ah|w) =$ transition\ matrix(3,4) of w  
$p(w_{1,1}) =$ transition matrix(0,1) of w

In [12]:
test = test_data['44z5938']
test.keys()

dict_keys(['length', 'n_features', 'mfcc'])

In [13]:
mfcc = test_data['44z5938']['mfcc']
mfcc.shape

(313, 39)

To try the isolated word recognition,  
Let's assume 'mfcc' represents one word 'four'

In [14]:
phoneme_dict['four']

['f', 'ao', 'r', 'sp']

We need to do a viterbi initialization 

$q_1$ is given as 'f'? 이게 편한듯. 어차피 'ao'로 시작하면 답 없음

$P(O_1 | one)$

In [15]:
mfcc.shape

(313, 39)

In [17]:
hmm.transition_prob("sp")

array([[ 0.        ,  0.2385641 ,  0.7614358 ],
       [ 0.        ,  0.9152609 ,  0.08473914],
       [ 0.        ,  0.        ,  0.        ]])

In [18]:
phoneme_dict['four']

['f', 'ao', 'r', 'sp']

In [19]:
hmm.transition_prob("sp")

array([[ 0.        ,  0.2385641 ,  0.7614358 ],
       [ 0.        ,  0.9152609 ,  0.08473914],
       [ 0.        ,  0.        ,  0.        ]])

In [20]:
hmm.transition_prob("sil")

array([[ 0.        ,  1.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.858848  ,  0.0786422 ,  0.06250978,  0.        ],
       [ 0.        ,  0.        ,  0.8590611 ,  0.1409389 ,  0.        ],
       [ 0.        ,  0.05343336,  0.        ,  0.7964702 ,  0.1500964 ],
       [ 0.        ,  0.        ,  0.        ,  0.        ,  0.        ]])

In [182]:
phonemes = phoneme_dict['four']

In [183]:
phonemes

['f', 'ao', 'r', 'sp']

In [156]:
hmm.hmm_dict['f'].keys()

dict_keys(['<NUMSTATES>', '2', '3', '4', '<TRANSP>'])

In [155]:
hmm.transition_prob('f')

array([[ 0.       ,  1.       ,  0.       ,  0.       ,  0.       ],
       [ 0.       ,  0.8519424,  0.1480576,  0.       ,  0.       ],
       [ 0.       ,  0.       ,  0.703905 ,  0.296095 ,  0.       ],
       [ 0.       ,  0.       ,  0.       ,  0.5744837,  0.4255163],
       [ 0.       ,  0.       ,  0.       ,  0.       ,  0.       ]])

In [175]:
hmm.states('f')

[2, 3, 4]

In [266]:
hmm.transition_prob('sil')

array([[ 0.        ,  1.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.858848  ,  0.0786422 ,  0.06250978,  0.        ],
       [ 0.        ,  0.        ,  0.8590611 ,  0.1409389 ,  0.        ],
       [ 0.        ,  0.05343336,  0.        ,  0.7964702 ,  0.1500964 ],
       [ 0.        ,  0.        ,  0.        ,  0.        ,  0.        ]])

In [15]:
def viterbi_isolated(hmm, start, mfcc, word, phonemes):
    """
    
    Args
        start (int): 0<= start < T
        mfcc (2d matrix): (time, n_features)
        word (str): words in unigram
        phonemes (list): sequence of phonemes in a given word
    
    Attributes
        v_path: viterbi path probability with length T. We just need the max probability at T
            and since the word is given, let's just assume that mfcc follows sequence of word phonemes.
    
    Returns
        Max probability of Observation given word P(O|w), t
    """
    
    # Case1: Don't Skip sp.
    result1 = {}
    total_length = mfcc.shape[0]
    v_path = []
    phoneme_index = []
    cur_phoneme_idx = 0
    cur_state = 1  # Starts as state 1, states are 0~2 for sp, and 0~4 for rest.
    
    # init - just start with the known sequence
    for t in range(start, total_length):  
        cur_phoneme = phonemes[cur_phoneme_idx]
        
        if(t == start):
            # We need to consider that sp can jump to next phoneme without staying.
            log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(cur_phoneme, state=cur_state+1))
            viterbi_prob = log_emission_prob + np.log(hmm.initial_prob(cur_phoneme))
#             if cur_phoneme == 'sil':
#                 print("Viterbi_prob: {}".format(viterbi_prob))
            v_path.append(viterbi_prob)  # Log for floating point precision
            phoneme_index.append(cur_phoneme_idx)
            continue
        
        # PHONEME TRANSITION
        # Find the maximum for the next possible phoneme
        # But the next phoneme can only be found from transition marix.
        # If still inside the matrix, cur_phoneme_idx remains the same.
        # Find the viterbi value with from all possible next states
        # NEXT STATE
        next_possible_states = np.where(hmm.transition_prob(cur_phoneme)[cur_state] > 0)[0]
        viterbi_probs = np.zeros(shape=hmm.n_states(cur_phoneme))

#         print("Possible states: {}".format(next_possible_states))

        for next_state in next_possible_states:
            
#             print("cur phoneme: {}".format(cur_phoneme))
#             print("cur_state: {}, next_state: {}".format(cur_state, next_state))
            
            is_leaving = (next_state + 1 == hmm.states(cur_phoneme)[-1] + 1)
            if cur_phoneme != 'sp' and is_leaving:  
#                 print("CUR PHONEME: {}".format(cur_phoneme))
#                 print("CUR PHONEME IDX: {}".format(cur_phoneme_idx))
                if cur_phoneme == 'sil':
                    log_emission_prob = 0
                else:
                    next_phoneme = phonemes[cur_phoneme_idx + 1]
                    log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(next_phoneme, state=2))
                
            elif cur_phoneme == 'sp' and is_leaving:
                # We need the next_phoneme from next word...... Shit..
                # log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(next_phoneme, state=2))
                log_emission_prob = 0
            else:
                log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(cur_phoneme, state=next_state+1))
            
            transition_prob = hmm.transition_prob(cur_phoneme)[cur_state][next_state]
#             print("Current phoneme: {}".format(cur_phoneme))
#             if cur_phoneme == 'sil':
#                 print("cur_state: {}".format(cur_state))
#                 print("next_state: {}".format(next_state))
            viterbi_probs[next_state] = v_path[t-start-1] + log_emission_prob + np.log(transition_prob)
            # sp is optional. So we should consider cases where we skip sp.
            # next_state + 1 is the natural number representation of state ( 1,...,5 )
            
            # Case where we go to 'sp'
            # If exiting and next phoneme is 'sp' --> Add probability of going into sp    
            if cur_phoneme != 'sp' and cur_phoneme !='sil' and is_leaving and phonemes[cur_phoneme_idx+1] == 'sp':
                viterbi_probs[next_state] += np.log(hmm.transition_prob('sp')[0][1])
        
        # Update cur_state to state with maximum probability.
#         print("viterbi probs: {}".format(viterbi_probs))
        max_prob = max([viterbi_probs[next_state] for next_state in next_possible_states])
        cur_state = np.where(viterbi_probs == max_prob)[0][0]
#         print("New state: {}".format(cur_state))
#         print("max prob: {}".format(max_prob))
        v_path.append(max_prob)
        phoneme_index.append(cur_phoneme_idx)
        
        # Check Phoneme jump for phoneme idx
        # We need to consider that sp can jump to next phoneme without staying.
        # sp is optional
        exit_state = hmm.transition_prob(cur_phoneme).shape[1] - 1
        if cur_state == exit_state:
#             print("Exit")
            cur_phoneme_idx += 1
            cur_state = 1
#             print("Cur phoneme: {}".format(cur_phoneme))
#             print("cur_phoneme_idx: {}".format(cur_phoneme_idx))
#             print("length of phonemes: {}".format(phonemes))
            if cur_phoneme_idx == len(phonemes) or t == total_length-1:
                result1 = {'t': t, 'v_path': v_path, 'phoneme_path': [phonemes[idx] for idx in phoneme_index]}
#                 print("End of word. break")
                break;
        if t == total_length -1:
            result1 = {'t': t, 'v_path': v_path, 'phoneme_path': [phonemes[idx] for idx in phoneme_index]}
                
    # Case2: skip sp.
    result2 = {}
    total_length = mfcc.shape[0]
    v_path = []
    phoneme_index = []
    cur_phoneme_idx = 0
    cur_state = 1  # Starts as state 1, states are 0~2 for sp, and 0~4 for rest.
    
    for t in range(start, total_length):  
        cur_phoneme = phonemes[cur_phoneme_idx]
        is_skip = False
        
        if(t == start):
            # We need to consider that sp can jump to next phoneme without staying.
            log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(cur_phoneme, state=cur_state+1))
            viterbi_prob = log_emission_prob + np.log(hmm.initial_prob(cur_phoneme))
            v_path.append(viterbi_prob)  # Log for floating point precision
            phoneme_index.append(cur_phoneme_idx)
            continue
        
        next_possible_states = np.where(hmm.transition_prob(cur_phoneme)[cur_state] > 0)[0]
        viterbi_probs = np.zeros(shape=hmm.n_states(cur_phoneme))

#         print("Possible states: {}".format(next_possible_states))

        for next_state in next_possible_states:
            
#             print("cur phoneme: {}".format(cur_phoneme))
#             print("cur_state: {}, next_state: {}".format(cur_state, next_state))
            
            is_leaving = (next_state + 1 == hmm.states(cur_phoneme)[-1] + 1)
#             if cur_phoneme != 'sp' and is_leaving and not phonemes[cur_phoneme_idx+1] == 'sp':
            if is_leaving and cur_phoneme == 'sil':
                log_emission_prob = 0
            elif is_leaving and not phonemes[cur_phoneme_idx+1] == 'sp':
                next_phoneme = phonemes[cur_phoneme_idx + 1]
                log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(next_phoneme, state=2))
            elif is_leaving and phonemes[cur_phoneme_idx+1] == 'sp':
#                 print("is leaving and next is sp")
                # We need the next_phoneme from next word...... Shit..
                # log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(next_phoneme, state=2))
                log_emission_prob = 0  # Should add the probability later at bigram.
            else:
#                 print("Not leaving phoneme")
                log_emission_prob = hmm.emission_prob(mfcc[t], hmm.gauss_mixtures_dict(cur_phoneme, state=next_state+1))
            
            transition_prob = hmm.transition_prob(cur_phoneme)[cur_state][next_state]
            viterbi_probs[next_state] = v_path[t-start-1] + log_emission_prob + np.log(transition_prob)
            
            # Case where we skip 'sp'
            # If exiting and next phoneme is 'sp' --> Add probability of skipping sp 
            if cur_phoneme != 'sp' and cur_phoneme != 'sil' and is_leaving and phonemes[cur_phoneme_idx+1] == 'sp':
#                 print("Next is sp and we are skipping this")
                viterbi_probs[next_state] += np.log(hmm.transition_prob('sp')[0][2])
                is_skip = True
#                 cur_phoneme_idx +=1  # To end the loop at below.        
            
        # Update cur_state to state with maximum probability.
#         print("viterbi probs: {}".format(viterbi_probs))
        max_prob = max([viterbi_probs[next_state] for next_state in next_possible_states])
        cur_state = np.where(viterbi_probs == max_prob)[0][0]
#         print("New state: {}".format(cur_state))
#         print("max prob: {}".format(max_prob))
        v_path.append(max_prob)
#         if cur_phoneme != 'sp' and is_leaving and phonemes[cur_phoneme_idx] =='sp':
        phoneme_index.append(cur_phoneme_idx)
#         else:
#             phoneme_index.append(cur_phoneme_idx)
        
        # Check Phoneme jump for phoneme idx
        # We need to consider that sp can jump to next phoneme without staying.
        # sp is optional
        exit_state = hmm.transition_prob(cur_phoneme).shape[1] - 1
#         print("cur_phoneme index: {}".format(cur_phoneme_idx))
        if cur_state == exit_state or is_skip:
#         if cur_state == exit_state or cur_phoneme_idx == len(phonemes) - 1:
#             print("Exit")
            cur_phoneme_idx += 1
            cur_state = 1
#             if(cur_phoneme_idx == len(phonemes)):
#             print("Cur phoneme: {}".format(cur_phoneme))
#             print("cur_phoneme_idx: {}".format(cur_phoneme_idx))
#             print("length of phonemes: {}".format(phonemes))
#             print("is_skip: {}".format(is_skip))
            if is_skip or cur_phoneme_idx == len(phonemes) or t == total_length-1:
                result2 = {'t': t, 'v_path': v_path, 'phoneme_path': [phonemes[idx] for idx in phoneme_index]}
#                 print("End of word. break")
                break;
        if t == total_length -1:
            result2 = {'t': t, 'v_path': v_path, 'phoneme_path': [phonemes[idx] for idx in phoneme_index]}
            
                
#     print("results1 : {}".format(len(result1)))
#     print("results2: {}".format(len(result2)))
#     if result1['v_path'][-1]  > result2['v_path'][-1]:
#         return result1
#     else:
#         return result2
    return result1
    

In [13]:
def continuous_recognition(hmm, mfcc, unigram_dict, bigram_dict, phoneme_dict):
    """
    Returns estimated sequence of words from given mfcc.
    """
    
    total_length = mfcc.shape[0]
    word_path = []
    word_list = list(phoneme_dict.keys())
    viterbi_path = []
    
 
    t = 0
    while True:
        
        # initial
        if t == 0:
            viterbi_prob = np.zeros(shape=len(word_list)+1)  # 2 zeros
            viterbi_t = np.zeros(shape=len(word_list)+1)    # argmax_word P(word) * P(O_1|word)
            # Find initial seq
            for idx, word in enumerate(word_list):
                if word == 'zero':
                    # First zero
                    res = viterbi_isolated(hmm, t, mfcc, word, phoneme_dict[word][0])
                    p_obs_given_word = res['v_path'][-1]
                    viterbi_t[idx] = res['t']
                    viterbi_prob[idx] = np.log(unigram_dict[word]) + p_obs_given_word
                    
                    res = viterbi_isolated(hmm, t, mfcc, word, phoneme_dict[word][1])
                    p_obs_given_word = res['v_path'][-1]
                    viterbi_t[idx+1] = res['t']
                    viterbi_prob[idx+1] = np.log(unigram_dict[word]) + p_obs_given_word
                else:
                    res = viterbi_isolated(hmm, t, mfcc, word, phoneme_dict[word])
                    p_obs_given_word = res['v_path'][-1]
                    viterbi_t[idx] = res['t']
                    viterbi_prob[idx] = np.log(unigram_dict[word]) + p_obs_given_word
            
            word_idx = np.argmax(viterbi_prob)
            t = int(viterbi_t[word_idx])
#             print(t)
            word_path.append(word_list[word_idx if word_idx != len(word_list) else word_idx-1] )
            viterbi_path.append(max(viterbi_prob))
            continue
        
        viterbi_prob = np.zeros(shape=len(word_list)+1)  # 2 zeros
        viterbi_t = np.zeros(shape=len(word_list)+1)
        
#         print(t)
        for idx, word in enumerate(word_list):
            word_before = word_path[-1]
            if word == 'zero':
                # First zero
                res = viterbi_isolated(hmm, t, mfcc, word, phoneme_dict[word][0])
                p_obs_given_word = res['v_path'][-1]
                viterbi_t[idx] = res['t']
                
                
                if not (word_before in bigram_dict.keys() and word in bigram_dict[word_before].keys()):
                    viterbi_prob[idx] = -1e30
                else:
                    viterbi_prob[idx] = viterbi_path[-1] + np.log(bigram_dict[word_before][word]) + p_obs_given_word
                # Second zero
                res = viterbi_isolated(hmm, t, mfcc, word, phoneme_dict[word][1])
                p_obs_given_word = res['v_path'][-1]
                viterbi_t[idx+1] = res['t']
                if not (word_before in bigram_dict.keys() and word in bigram_dict[word_before].keys()):
                    viterbi_prob[idx+1] = -1e30
                else:
                    viterbi_prob[idx+1] = viterbi_path[-1] + np.log(bigram_dict[word_before][word]) + p_obs_given_word
            else:
                res = viterbi_isolated(hmm, t, mfcc, word, phoneme_dict[word])
                p_obs_given_word = res['v_path'][-1]
                viterbi_t[idx] = res['t']
                if not (word_before in bigram_dict.keys() and word in bigram_dict[word_before].keys()):
                    viterbi_prob[idx] = -1e30
                else:
                    viterbi_prob[idx] = viterbi_path[-1] + np.log(bigram_dict[word_before][word]) + p_obs_given_word
#                 if word_before == '<s>' and word == '<s>':
#                         viterbi_prob[idx] = -1e30
        
        word_idx = np.argmax(viterbi_prob)
        t = int(viterbi_t[word_idx])
#         print(t)
        word_path.append(word_list[word_idx if word_idx != len(word_list) else word_idx-1] )
        
        if(t == total_length-1):
            break;
    return word_path
        

In [14]:
len(test_data.keys())

1242

In [17]:
result = continuous_recognition(hmm, mfcc, unigram_dict, bigram_dict, phoneme_dict)

In [22]:
labels[0]

'1237743'

In [18]:
y_preds = []
y_preds.append([word for word in result if word != "<s>"])

In [19]:
y_preds

[['two', 'two', 'three', 'seven', 'oh', 'oh', 'four', 'three']]

In [55]:
res = ['<s>', 'two', '<s>', 'two', '<s>', 'oh', '<s>', 'two', '<s>', 'oh', '<s>', 'five', '<s>', 'oh', '<s>', 'four', '<s>', 'eight', '<s>', 'oh', '<s>', 'eight', '<s>', 'three']

In [85]:
[word for word in res if word != "<s>"]

['two',
 'two',
 'oh',
 'two',
 'oh',
 'five',
 'oh',
 'four',
 'eight',
 'oh',
 'eight',
 'three']

In [68]:
y_preds = []
labels = []
for label in test_data.keys():
    words_pred =  continuous_recognition(hmm, test_data[label]['mfcc'], unigram_dict, bigram_dict, phoneme_dict)
    y_preds.append([word for word in words_pred if word != "<s>"])
    labels.append(label)

KeyboardInterrupt: 

In [71]:
phoneme_dict

{'<s>': ['sil'],
 'eight': ['ey', 't', 'sp'],
 'five': ['f', 'ay', 'v', 'sp'],
 'four': ['f', 'ao', 'r', 'sp'],
 'nine': ['n', 'ay', 'n', 'sp'],
 'oh': ['ow', 'sp'],
 'one': ['w', 'ah', 'n', 'sp'],
 'seven': ['s', 'eh', 'v', 'ah', 'n', 'sp'],
 'six': ['s', 'ih', 'k', 's', 'sp'],
 'three': ['th', 'r', 'iy', 'sp'],
 'two': ['t', 'uw', 'sp'],
 'zero': [['z', 'ih', 'r', 'ow', 'sp'], ['z', 'iy', 'r', 'ow', 'sp']]}

In [93]:
def label_to_word(label):
    word_dict = {'1': 'one', '2': 'two', '3': 'three', '4': 'four', '5': 'five', '6': 'six', '7': 'seven',
                '8': 'eight', '9': 'nine', 'o': 'oh', 'z': 'zero'}
    
    return [word_dict[label] for label in label]

In [94]:
true_word_label = [label_to_word(label) for label in labels]

In [539]:
hmm.transition_prob('sp')

array([[ 0.        ,  0.2385641 ,  0.7614358 ],
       [ 0.        ,  0.9152609 ,  0.08473914],
       [ 0.        ,  0.        ,  0.        ]])

In [538]:
hmm.transition_prob('sil')

array([[ 0.        ,  1.        ,  0.        ,  0.        ,  0.        ],
       [ 0.        ,  0.858848  ,  0.0786422 ,  0.06250978,  0.        ],
       [ 0.        ,  0.        ,  0.8590611 ,  0.1409389 ,  0.        ],
       [ 0.        ,  0.05343336,  0.        ,  0.7964702 ,  0.1500964 ],
       [ 0.        ,  0.        ,  0.        ,  0.        ,  0.        ]])

In [28]:
from multiprocessing import Pool
from functools import partial
from joblib import Parallel, delayed

dataset = [1, 2, 3, 4, 5, 6]
mfccs = {1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6}
def func(hmm, mfcc):
    return mfcc**2

In [24]:
mfccs = [func(hmm, mfccs[data]) for data in dataset]

In [None]:
agents = 5
chunksize = 3
with Pool(processes=agents) as pool:
    function = partial(func, hmm=1)
    res = pool.map(function, mfccs)

In [None]:
res

In [None]:
Parallel(n_jobs=2)(delayed(func)(1, mfccs[data]) for data in dataset)