Speaker identification


There are three people in a room. Each says about 10 phonemes, before being randomly interrupted by someone else. When they speak they all sound the same, however each person tends to use different phonemes in their speech. Specifically we can model the following transition probabilities that someone will interrupt the current speaker: P(speaker i at time t+1 | speaker j at time t). We can also model the probability over phonemes given a particular speaker: P(phoneme | speaker i). The phonemes are identical to the ones introduced in problem 1 (but the transition matrices are obviously different, since they take a different form altogether).

Write down the update equations that you will need to train a hidden Markov model. Using the information given above, write down a sensible initialization for the transition matrix.
Write your own python code to train a hidden Markov model on the data. You may look at code online, but will need to reference any code that helps you with your implementation.


From matplotlb use a stackplot (https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.stackplot.html) to show the probability of a particular person speaking.
Audio dataset: https://course-resources.minerva.kgi.edu/uploaded_files/mke/VW8Rjr/speaker.wav.zip

Symbol dataset: https://course-resources.minerva.kgi.edu/uploaded_files/mke/n705lY/speaker

In [109]:
import numpy as np
from decimal import *
getcontext().prec = 1000
import sys
sys.setrecursionlimit(5000)

In [33]:
with open('data.txt') as f:
        sequence = f.read()
        

In [94]:
CHARS = ['A', 'o', 'e', 't', 'p', 'g', 'k']

C = len(CHARS)
P = 3
S = len(sequence)

# Prior [1,3]
prior = np.ones((P,)) / P

# Transition matrix [char, char]
transition_matrix = np.ones((P,P))

# Emission matrix [ppl, char]
emission_matrix = np.ones((C,P))

# Hidden probability [ppl, seq]
state_hidden = np.ones((P, S))

# transition hidden probability []
# Z = p(h_t=i, h_t+1=i' |v_1:t) ~ a(h_t)p(v_t+1|h_t+1)p(h_t+1|h_t)b(h_t+1)
trans_hidden = np.ones((P, P, S))


In [100]:
def memoize(func):
    cache = dict()
    def memoized_func(*args):
        if args in cache:
            return cache[args]
        result = func(*args)
        cache[args] = result
        return result

    return memoized_func

In [107]:
def alpha(hidden_state, t):
    
    
    visib_state = sequence[t]
    curent_v_state = CHARS.index(visib_state)

    if t == 0:
        #a(h_1) = p(v_1|h_1)p(h_1)
        first_ = prior[hidden_state] * emission_matrix[curent_v_state,hidden_state]
        
        return first_
    
    
    
    corrector = Decimal(emission_matrix[curent_v_state, hidden_state])
    sum_predictor = Decimal(0)
    
    for prev_hidden_state in range(P):
    
        predictor = Decimal(transition_matrix[hidden_state, prev_hidden_state])
        
        predictor *= Decimal(alpha(prev_hidden_state, t-1))
        
        sum_predictor += predictor
    
    return corrector*sum_predictor
    
alpha = memoize(alpha)

alpha(2,4)

Decimal('26.999999999999998501198916756038670428097248077392578125')

In [110]:
# b(h_t) = p(v_t+1:t|h_t)

def beta (hidden_state, t):
    visib_state = sequence[t]
    curent_v_state = CHARS.index(visib_state)
    
    if t >= S-1:
        return 1
    
    sum_beta = Decimal(0)
    for next_hidden_state in range(P):
        
        next_visib_state = sequence[t+1]
        next_v_state = CHARS.index(next_visib_state)
    
        tr_p = Decimal(transition_matrix[hidden_state, next_hidden_state])
        
        em_p = Decimal(emission_matrix[next_v_state, next_hidden_state])
        
        sum_beta += tr_p*em_p*beta(next_hidden_state, t+1)
        
    return sum_beta
        
beta = memoize(beta)

beta(2,4)

Decimal('5440620656299615789672655389926520024549061863177564475987326618217401635171625100990857258644477621250657521386306923275518331613582793875989064712995694131800906276536299604436274791065698935285557202129994312926456575372934545012599037749193772323006198263890865614837642199164769118590392461954387391855268594912669062922750684766699127147812989317221806327610589473958215472998746982572094405712382964716418400982979846972635331887848419061772075580790835813863797758107')

In [112]:
def gama(hidden_state, t):
    top = alpha(hidden_state, t) * beta(hidden_state, t)
    
    bot = 0
    for hidden in range(P):
        bot += alpha(hidden, t) * beta(hidden, t)
        
    return top/bot


Decimal('0.33333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333

In [117]:
# Z = p(h_t=i, h_t+1=i' |v_1:t) ~ a(h_t)p(v_t+1|h_t+1)p(h_t+1|h_t)b(h_t+1)

def Z(hidden_state, hidden_next_state, t):
    
    a= alpha(hidden_state, t) 
    b= beta(hidden_next_state, t+1)

    next_v_state = CHARS.index(sequence[t])
        
    em_p = Decimal(emission_matrix[next_v_state, hidden_next_state])
    
    tr_p = Decimal(transition_matrix[hidden_state, hidden_next_state])
    
    top = a*b*em_p*tr_p 
    
    bot=0
    
    for i in range(P):
        for j in range(P):
            
            a= alpha(i, t) 
            b= beta(j, t+1)

            next_v_state = CHARS.index(sequence[t+1])

            em_p = Decimal(emission_matrix[next_v_state, j])

            tr_p = Decimal(transition_matrix[i, j])
            
            bot += a*b*em_p*tr_p
    
    return top/bot
 
Z(2,2,1)

Decimal('0.11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111

In [None]:
# Probability of the first state
# a 

for h_state in range(P):
    prior[h_state] = gama(h_state, 0)
    
# update transition matrix
# A

sum_over_time = lambda state, next_state : sum([Z(state, next_state, t) for t in range(S-1)])

for h_state in range(P):
    sum_of_all_states = []
    
    for next_h_state in range(P):
        sum_of_all_states.append(sum_over_time(h_state, next_h_state))
    
    for next_h_state in range(P):   
        transition_matrix[h_state, next_h_state] = sum_of_all_states[next_h_state]/np.sum(sum_of_all_states)
    
    
# update emission matrix
# B



In [None]:
#A (transition)
# p_new(h_t+1=i|h_t=i`) = [∑N∑T-1 (Z-old)]/[∑i'∑N∑T-1 (Z-old)]

# B (emission)
# p_new(h_t+1=i|h_t=i`) = ∑N∑T (I[v=j] p-old(h_t=i|v) )
 
