In [21]:
import librosa
import math
import numpy as np
import scipy.signal
from scipy.special import logsumexp
import torch
import torch.nn as nn
import torch.nn.functional as F

class MyNet(nn.Module):
    def __init__(self):
        super(MyNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 5, padding=2)
        self.conv3 = nn.Conv2d(64, 64, 3, padding=1)
        self.conv4 = nn.Conv2d(64, 128, (1, 5))
        self.fc1 = nn.Linear(128, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 48)
        self.sm = nn.LogSoftmax(dim=1)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = F.relu(self.conv4(x))
        x = x.view(-1, 128)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        x = self.sm(x)
        return x
    
def load_audio_to_melspec_tensor(wavpath, sample_rate=16000):
    window_size = .025
    window_stride = 0.01
    n_dft = 512
    win_length = int(sample_rate * window_size)
    hop_length = int(sample_rate * window_stride)
    y, sr = librosa.load(wavpath, sr=sample_rate)
    y = y - y.mean()
    y = np.append(y[0],y[1:]-.97*y[:-1])
    # compute mel spectrogram
    stft = librosa.stft(y, n_fft=n_dft, hop_length=hop_length,
        win_length=win_length, window=scipy.signal.hamming)
    spec = np.abs(stft)**2
    mel_basis = librosa.filters.mel(sr=sample_rate, n_fft=n_dft, n_mels=40, fmin=20)
    melspec = np.dot(mel_basis, spec)
    logspec = librosa.power_to_db(melspec, ref=np.max)
    logspec = np.transpose(logspec)
    logspec_tensor = torch.tensor(logspec)
    return logspec_tensor

def compute_phone_likelihoods(model, logspec):
    likelihood_list = []
    with torch.no_grad():
        for j in range(6, logspec.size(0) - 5):
            inp = logspec[j-5:j+6,:].unsqueeze(0)
            output = model(inp) # output will be log probabilities over classes
            output = output - math.log(1. / 48) # subtract the logprob of the class priors (assumed to be uniform)
            likelihood_list.append(output[0])
    likelihoods = torch.transpose(torch.stack(likelihood_list, dim=1), 0, 1).numpy()
    return likelihoods

model = MyNet()
model.load_state_dict(torch.load('lab3_AM.pt'))

lab3_data = np.load('lab3_phone_labels.npz')
phone_labels = list(lab3_data['phone_labels'])
print ("phones labels: ", phone_labels)

def phones2indices(phones):
    return [phone_labels.index(p) for p in phones]


# fee_HMM = MyHMM(phones2indices(['sil', 'f', 'iy', 'sil']), np.array([0.5, 0.5, 0, 0]), np.array([[.9,.1,0,0],[0,.9,.1,0],[0,0,.9,.1],[0,0,0,1]]))
# pea_HMM = MyHMM(phones2indices(['sil', 'p', 'iy', 'sil']), np.array([0.5, 0.5, 0, 0]), np.array([[.9,.1,0,0],[0,.9,.1,0],[0,0,.9,.1],[0,0,0,1]]))
# rock_HMM = MyHMM(phones2indices(['sil', 'r', 'aa', 'cl', 'k', 'sil']), np.array([0.5,0.5,0,0,0,0]), np.array([[.9,.1,0,0,0,0],[0,.9,.1,0,0,0],[0,0,.9,.1,0,0],[0,0,0,.9,.1,0],[0,0,0,0,.9,.1],[0,0,0,0,0,1]]))
# burt_HMM = MyHMM(phones2indices(['sil', 'b', 'er', 'cl', 't', 'sil']), np.array([0.5,0.5,0,0,0,0]), np.array([[.9,.1,0,0,0,0],[0,.9,.1,0,0,0],[0,0,.9,.1,0,0],[0,0,0,.9,.1,0],[0,0,0,0,.9,.1],[0,0,0,0,0,1]]))
# see_HMM = MyHMM(phones2indices(['sil', 's', 'iy', 'sil']), np.array([0.5, 0.5, 0, 0]), np.array([[.9,.1,0,0],[0,.9,.1,0],[0,0,.9,.1],[0,0,0,1]]))
# she_HMM = MyHMM(phones2indices(['sil', 'sh', 'iy', 'sil']), np.array([0.5, 0.5, 0, 0]), np.array([[.9,.1,0,0],[0,.9,.1,0],[0,0,.9,.1],[0,0,0,1]]))

phones labels:  ['sil', 's', 'ao', 'l', 'r', 'iy', 'vcl', 'd', 'eh', 'cl', 'p', 'ix', 'z', 'ih', 'sh', 'n', 'v', 'aa', 'y', 'uw', 'w', 'ey', 'dx', 'b', 'ay', 'ng', 'k', 'epi', 'ch', 'dh', 'er', 'en', 'g', 'aw', 'hh', 'ae', 'ow', 't', 'ax', 'm', 'zh', 'ah', 'el', 'f', 'jh', 'uh', 'oy', 'th']


In [22]:
# load in audio files
# convert to mel-logspec tensor
# compute phone likelihoods for each

burt_phone_likelihoods = compute_phone_likelihoods(model, load_audio_to_melspec_tensor(wavpath="burt.wav"))
fee_phone_likelihoods = compute_phone_likelihoods(model, load_audio_to_melspec_tensor(wavpath="fee.wav"))
pea_phone_likelihoods = compute_phone_likelihoods(model, load_audio_to_melspec_tensor(wavpath="pea.wav"))
rock_phone_likelihoods = compute_phone_likelihoods(model, load_audio_to_melspec_tensor(wavpath="rock.wav"))
see_phone_likelihoods = compute_phone_likelihoods(model, load_audio_to_melspec_tensor(wavpath="see.wav"))
she_phone_likelihoods = compute_phone_likelihoods(model, load_audio_to_melspec_tensor(wavpath="she.wav"))

print ("burt_phone_likelihoods.shape: ", burt_phone_likelihoods.shape)
print ("fee_phone_likelihoods.shape: ", fee_phone_likelihoods.shape)
print ("pea_phone_likelihoods.shape: ", pea_phone_likelihoods.shape)
print ("rock_phone_likelihoods.shape: ", rock_phone_likelihoods.shape)
print ("see_phone_likelihoods.shape: ", see_phone_likelihoods.shape)
print ("she_phone_likelihoods.shape: ", she_phone_likelihoods.shape)

#np.set_printoptions(threshold=np.inf, suppress=True)
#print ("burt_phone_likelihoods: ", burt_phone_likelihoods)

burt_phone_likelihoods.shape:  (100, 48)
fee_phone_likelihoods.shape:  (103, 48)
pea_phone_likelihoods.shape:  (113, 48)
rock_phone_likelihoods.shape:  (81, 48)
see_phone_likelihoods.shape:  (96, 48)
she_phone_likelihoods.shape:  (107, 48)


In [23]:
# implement hidden markov model
# A (N x N)= state transition distribution
# pi (N x 1)= initial state distribution

class MyHMM:
    def __init__(self, state_labels, initial_state_distribution, transition_matrix, eps=1e-200):
        self.eps = eps
        self.N_total_states = len(state_labels)
        # (the product of probabilities becomes addition in log space)
        self.pi = np.log(initial_state_distribution + eps)
        self.A = np.log(transition_matrix + eps) #A_{ji} is prob of transitioning from state j to state i
        self.labels = state_labels # a list where self.labels[j] is the index of the phone label belonging to the jth state


    def forward(self, state_likelihoods): # state_likelihoods.shape is assumed to be (T_timesteps, 48)
        # create B array (T_total_timesteps, N_total_states)
        T_total_timesteps = state_likelihoods.shape[0]
        self.B = np.zeros((T_total_timesteps, self.N_total_states))
        i = 0
        for state in self.labels:
            self.B[:, i] = state_likelihoods[:, state]
            #print ("B[", i, "]: ", self.B[:, i])
            i += 1
             
        # create alpha matrix (T_total_timesteps, N_total_states)
        alpha_matrix = np.zeros((T_total_timesteps, self.N_total_states))
        
        # initialization
        t = 0 # time step
        i = 0 # state 
        for i in range (self.N_total_states):
            alpha_matrix[t, i] = np.add(self.B[t, i], self.pi[i])
        
        print ("alpha_matrix[t = ", t, "]: ", alpha_matrix[t,:])

        # induction step
        for t in range(0, T_total_timesteps - 1):
            for i in range (self.N_total_states):
                sum_array = []
                # get sum
                for j in range(self.N_total_states):
                    sum_array.append(np.add(alpha_matrix[t, j], self.A[j, i]))
                # multiply values and set alpha matrix
                #print ("Sum array: ", sum_array)
                value = np.add(logsumexp(sum_array), self.B[t + 1, i])
                alpha_matrix[t + 1, i] = value
            print ("alpha_matrix[t = ", t + 1, "]: ", alpha_matrix[t + 1,:])

        # termination
        # output = np.sum(alpha_matrix[T_total_timesteps - 1,:])
        # print ("forward output: ", output)
        # return output
        print ("final alpha: ", alpha_matrix[T_total_timesteps - 1, self.N_total_states - 1])
        return alpha_matrix[T_total_timesteps - 1, self.N_total_states - 1]
    

    def viterbi(self, state_likelihoods): # state_likelihoods.shape is assumed to be (T_timesteps, 48)
        # create B array
        T_total_timesteps = state_likelihoods.shape[0]
        self.B = np.zeros((T_total_timesteps, self.N_total_states))
        i = 0
        for state in self.labels:
            self.B[:,i] = state_likelihoods[:,state]
            i += 1

        # create psi matrix (backtrace)
        psi_matrix = np.zeros((T_total_timesteps, self.N_total_states), dtype=np.int8)
        for i in range(self.N_total_states):
            psi_matrix[0, i] = 0
        
        # create delta array (T_total_timesteps, N_total_states)
        delta_matrix = np.zeros((T_total_timesteps, self.N_total_states))

        # initialization
        t = 0 # time step
        i = 0 # state 
        for i in range (self.N_total_states):
            delta_matrix[t, i] = np.add(self.B[t, i], self.pi[i])

        # induction step
        for t in range(1, T_total_timesteps):
            for i in range (self.N_total_states):
                find_max_array = []
                # get induction values to find max
                for j in range(self.N_total_states):
                    find_max_array.append(np.add(delta_matrix[t - 1, j], self.A[j, i]))
                # find max, multiply values, and set delta matrix
                max = np.max(find_max_array)
                value = np.add(max, self.B[t, i])
                delta_matrix[t, i] = value
                # set psi backtrace value
                psi_matrix[t, i] = int(np.argmax(find_max_array))

        # termination
        output_path = []
        t = T_total_timesteps - 1
        q = int(np.argmax(delta_matrix[t,:]))
        output_path.append(q)
        t -= 1
        while t >= 0:
            q = int(psi_matrix[t + 1, int(q)])
            output_path.append(q)
            t -= 1
        output_path.reverse()
        #print ("output_path: ", output_path)
        return output_path
    
    
    def viterbi_transition_update(self, state_likelihoods): # state_likelihoods.shape is assumed to be (T_timesteps, 48)
        # compute viterbi algorithm
        viterbi_states = self.viterbi(state_likelihoods)

        # create temp A matrix using viterbi output
        temp_A_matrix = np.zeros((self.N_total_states, self.N_total_states))
        # number of times model made transition out of state i
        num_out_states = np.zeros(self.N_total_states)
        t = 0
        for t in range(len(viterbi_states) - 1):
            state_i = viterbi_states[t]
            state_j = viterbi_states[t + 1]
            # number of times transition was made from state i to state j
            temp_A_matrix[state_i, state_j] += 1
            # number of times model made transition out of state i
            num_out_states[state_i] += 1

        #print ("temp A matrix: ", temp_A_matrix)
        #print ("num_out_states: ", num_out_states)

        # compute log probabilities for A matrix
        for i in range(self.N_total_states):
            for j in range (self.N_total_states):
                numerator = temp_A_matrix[i, j]
                denominator = num_out_states[i]
                self.A[i, j] = np.log((numerator / denominator) + self.eps)

        #print ("A log matrix: ", self.A)
        #print ("A matrix: ",np.exp(self.A))
        #print ("viterbi states: ", viterbi_states)
        #return self.A

In [24]:
# HMM inputs: (state_labels, initial_state_distribution (pi), transition_matrix (A), eps = 1e-200)
# HMM.forward inputs: (state_likelihoods (B))
# HMM.viterbi inputs: (state_likelihoods (B))
# HMM.viterbi_transition_update inputs: (state_likelihoods (B))

# create HMMs for each word
burt_HMM = MyHMM(phones2indices(['sil', 'b', 'er', 'cl', 't', 'sil']), # state_labels
                 np.array([0.5, 0.5, 0.0, 0.0, 0.0, 0.0]), # initial_state_distribution (pi)
                 np.array([[0.9, 0.1, 0.0, 0.0, 0.0, 0.0], # transition_matrix (A)
                           [0.0, 0.9, 0.1, 0.0, 0.0, 0.0],
                           [0.0, 0.0, 0.9, 0.1, 0.0, 0.0],
                           [0.0, 0.0, 0.0, 0.9, 0.1, 0.0],
                           [0.0, 0.0, 0.0, 0.0, 0.9, 0.1],
                           [0.0, 0.0, 0.0, 0.0, 0.0, 1.0]]))

fee_HMM = MyHMM(phones2indices(['sil', 'f', 'iy', 'sil']), 
                np.array([0.5, 0.5, 0.0, 0.0]), 
                np.array([[0.9, 0.1, 0.0, 0.0],
                          [0.0, 0.9, 0.1, 0.0],
                          [0.0, 0.0, 0.9, 0.1],
                          [0.0, 0.0, 0.0, 1.0]]))

pea_HMM = MyHMM(phones2indices(['sil', 'p', 'iy', 'sil']),  
                np.array([0.5, 0.5, 0.0, 0.0]), 
                np.array([[0.9, 0.1, 0.0, 0.0],
                          [0.0, 0.9, 0.1, 0.0],
                          [0.0, 0.0, 0.9, 0.1],
                          [0.0, 0.0, 0.0, 1.0]]))

rock_HMM = MyHMM(phones2indices(['sil', 'r', 'aa', 'cl', 'k', 'sil']), 
                 np.array([0.5, 0.5, 0.0, 0.0, 0.0, 0.0]), 
                 np.array([[0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
                           [0.0, 0.9, 0.1, 0.0, 0.0, 0.0],
                           [0.0, 0.0, 0.9, 0.1, 0.0, 0.0],
                           [0.0, 0.0, 0.0, 0.9, 0.1, 0.0],
                           [0.0, 0.0, 0.0, 0.0, 0.9, 0.1],
                           [0.0, 0.0, 0.0, 0.0, 0.0, 1.0]]))

see_HMM = MyHMM(phones2indices(['sil', 's', 'iy', 'sil']), 
                np.array([0.5, 0.5, 0.0, 0.0]), 
                np.array([[0.9, 0.1, 0.0, 0.0],
                          [0.0, 0.9, 0.1, 0.0],
                          [0.0, 0.0, 0.9, 0.1],
                          [0.0, 0.0, 0.0, 1.0]]))

she_HMM = MyHMM(phones2indices(['sil', 'sh', 'iy', 'sil']), 
                np.array([0.5, 0.5, 0.0, 0.0]), 
                np.array([[0.9, 0.1, 0.0, 0.0],
                          [0.0, 0.9, 0.1, 0.0],
                          [0.0, 0.0, 0.9, 0.1],
                          [0.0, 0.0, 0.0, 1.0]]))

# update HMM transition matricies using respective phone likelihoods
burt_HMM.viterbi_transition_update(burt_phone_likelihoods)
fee_HMM.viterbi_transition_update(fee_phone_likelihoods)
pea_HMM.viterbi_transition_update(pea_phone_likelihoods)
rock_HMM.viterbi_transition_update(rock_phone_likelihoods)
see_HMM.viterbi_transition_update(see_phone_likelihoods)
she_HMM.viterbi_transition_update(she_phone_likelihoods)

In [25]:
# remove scientific notation from prints
np.set_printoptions(suppress=True)

# compute log likelihoods for all 6 waveforms with all 6 of the word HMMs
# BURT
burtHMM_burtWORD = burt_HMM.forward(burt_phone_likelihoods)
print ("burtHMM_burtWORD:\t", np.exp(burtHMM_burtWORD))

# burtHMM_feeWORD = burt_HMM.forward(fee_phone_likelihoods)
# burtHMM_peaWORD = burt_HMM.forward(pea_phone_likelihoods)
# burtHMM_rockWORD = burt_HMM.forward(rock_phone_likelihoods)
# burtHMM_seeWORD = burt_HMM.forward(see_phone_likelihoods)
# burtHMM_sheWORD = burt_HMM.forward(she_phone_likelihoods)


# print ("burtHMM_feeWORD:\t", np.exp(burtHMM_feeWORD) * 100, " %")
# print ("burtHMM_peaWORD:\t", np.exp(burtHMM_peaWORD) * 100, " %")
# print ("burtHMM_rockWORD:\t", np.exp(burtHMM_rockWORD) * 100, " %")
# print ("burtHMM_seeWORD:\t", np.exp(burtHMM_seeWORD) * 100, " %")
# print ("burtHMM_sheWORD:\t", np.exp(burtHMM_sheWORD) * 100, " %")



state:  0
state:  23
state:  30
state:  9
state:  37
state:  0
alpha_matrix[t =  0 ]:  [   3.01596499   -5.74261188 -470.54947977 -459.25415067 -467.91478042
 -456.80790643]
alpha_matrix[t =  1 ]:  [   6.64610305   -5.79693865  -19.01329324 -455.91822746 -465.56395792
 -452.74239721]
alpha_matrix[t =  2 ]:  [  10.27835159   -3.05713478  -20.54809388  -20.42994633 -463.58870311
 -448.80000561]
alpha_matrix[t =  3 ]:  [  13.9352376     0.70025663  -17.90069388  -19.55116524  -32.65821918
 -444.90037908]
alpha_matrix[t =  4 ]:  [ 17.43968449   5.43529728 -13.09439624 -17.7615184  -30.45432313
 -31.76297666]
alpha_matrix[t =  5 ]:  [ 20.67195225  11.14917356  -6.74330296 -13.49048772 -25.62634653
 -28.26628158]
alpha_matrix[t =  6 ]:  [ 24.25982847  12.59588048  -1.8756581   -7.80386707 -23.19286264
 -23.95496626]
alpha_matrix[t =  7 ]:  [ 28.01781087  13.31123715  -1.09672067  -3.54733151 -19.50864309
 -20.02471539]
alpha_matrix[t =  8 ]:  [ 31.78275771  16.51893017  -1.58206648  -2.21250