## Single digit recognition with HMM

In [1]:
from scipy.stats import multivariate_normal
from scipy.special import logsumexp
from glob import glob
import soundfile as sf
from os import path
import numpy as np
np.random.seed(seed=273)

In [2]:
class GaussianHMM(object):
    """
    Gaussian Hidden Markov Model.
    """
    def __init__(self, n_states, n_dims):
        """
        Set up Gaussian HMM
        ------
        input:
        n_states: number of states in HMM (note: one of them will be a final state)
        n_dims: number of dimensions (13 MFCCs for this assignment)
        """
        self.n_states = n_states
        self.n_dims = n_dims

    def init_gaussian_params(self, X):
        """
        Initialize Gaussian parameters
        ------
        input:
        X: list of 2d-arrays with shapes (Ti, 13) for example i: each is a matrix of MFCCs for a digit utterance
        ------
        initialize mu and sigma for each state's Gaussian (where sigma is a diagonal covariance)
        """
        X_concat = np.concatenate(X)
        self.mu = np.zeros((self.n_states, self.n_dims))
        self.sigma = np.zeros((self.n_states, self.n_dims))
        for s in range(self.n_states):
            X_subset = X_concat[np.random.choice(len(X_concat), size=50, replace=False)]
            self.mu[s] = X_subset.mean(axis=0)
            self.sigma[s] = X_subset.var(axis=0)

    def init_hmm_params(self):
        """
        Initialize HMM parameters
        ------
        initialize pi (starting probability vector) and A (transition probabilities)
        """
        self.pi = np.zeros(self.n_states)
        self.pi[0] = 1.
        self.A = np.zeros((self.n_states, self.n_states))
        for s in range(self.n_states - 1):
            self.A[s, s:s + 2] = .5
        self.A[-1, -1] = 1.

    def get_emissions(self, x):
        """
        Compute Gaussian log-density at X for a diagonal model.
        ------
        get (continuous) emission probabilities from the multivariate normal
        """
        T, _ = x.shape
        log_B = np.zeros((self.n_states, T))
        for s in range(self.n_states):
            log_B[s] = multivariate_normal.logpdf(x, mean=self.mu[s], cov=np.diag(self.sigma[s]))
        return log_B

    def forward(self, log_pi, log_A, log_B):
        """
        Forward algorithm.
        ------
        input:
        log_pi: 1d-array of shape n_states: log of start probability vector
        log_A: 2d-array of shape (n_states, n_states): log of transition probability matrix
        log_B: 2d-array of shape (n_states, Tx): log of emision probabilities (Note: Tx depends on x)
        ------
        output:
        log_alpha: 2d-array of shape (n_states, Tx): log probability to state i at time t
        """
        _, T = log_B.shape
        log_alpha = np.zeros(log_B.shape)
        for t in range(T):
            if t == 0:
                log_alpha[:, t] = log_pi + log_B[:, t]
            else:
                # log_alpha[:, t] = log_B[:, t] + logsumexp(log_alpha[:, t - 1].reshape(-1, 1) + log_A[:, :], axis=0)
                log_alpha[:, t] = log_B[:, t] + logsumexp(log_alpha[:, t - 1] + log_A.T, axis=1) # reads better to me
        return log_alpha

    def backward(self, log_A, log_B):
        """
        Backward algorithm.
        ------
        input:
        log_A: 2d-array of shape (n_states, n_states): log of transition probability matrix
        log_B: 2d-array of shape (n_states, Tx): log of emision probabilities (Note: Tx depends on x)
        ------
        output:
        log_beta: 2d-array of shape (n_states, Tx): log probability from state i at time t
        """
        _, T = log_B.shape
        log_beta = np.zeros(log_B.shape)
        for t in range(T - 1, -1, -1):
            if t == T - 1:
                log_beta[:, t] = np.zeros_like(log_beta[:, t]) # log(1) = 0 so we do zeros here
            else:
                log_beta[:, t] = logsumexp(log_A[:, :].T + log_beta[:, t+1] + log_B[:, t + 1], axis=1)
        return log_beta

    def viterbi(self, log_pi, log_A, log_B):
        """
        Use viterbi algorithm to find the best path and associated log probability.
        ------
        input:
        log_pi: 1d-array of shape n_states: log of start probability vector
        log_A: 2d-array of shape (n_states, n_states): log of transition probability matrix
        log_B: 2d-array of shape (n_states, Tx): log of emision probabilities (Note: Tx depends on x)
        ------
        output:
        q: 1d-array of length T: optimal state sequence for observed sequence
        log_prob: log probability of observed sequence
        """
        _, T = log_B.shape
        log_delta = np.zeros(log_B.shape)
        maxes = np.zeros(log_B.shape)
        for t in range(T):
            if t == 0:
                log_delta[:, t] = log_pi + log_B[:, t] # same as fwd algorithm
            else:
              # note: log is monotonic + increasing so we can take max as is
              prob = log_delta[:, t - 1] + log_A.T
              maxes[:, t] = np.argmax(prob, axis=1) # backpointers to prev maxes to trace later
              log_delta[:, t] = log_B[:, t] + np.max(prob, axis=1)

        q = np.zeros(T, dtype=np.int32)
        for t in range(T - 1, -1, -1):
            if t == T - 1:
                q[t] = log_delta[:, t].argmax()
                log_prob = log_delta[:, t].max()
            else:
                q[t] = maxes[q[t + 1], t]

        return q, log_prob

    def score(self, x):
        """
        Use forward-backward algorithm to
        compute log probability and posteriors.
        ------
        input:
        x :2d-array of shape (T, 13): MFCCs for a single example
        ------
        output:
        log_prob :scalar: log probability of observed sequence
        log_alpha :2d-array of shape (n_states, T): log prob of getting to state at time t from start
        log_beta :2d-array of shape (n_states, T): log prob of getting from state at time t to end
        gamma :2d-array of shape (n_states, T): state posterior probability
        eps :2d-array of shape (n_states, n_states): state transition probability matrix
        """
        T = len(x)

        log_pi = np.log(self.pi.clip(1e-10)) # starting log probabilities <- this is giving divide by 0 so we will clip
        log_A = np.log(self.A.clip(1e-10)) # transition log probabilities <- this is giving divide by 0 so we will clip
        log_B = self.get_emissions(x) # emission log probabilities

        log_alpha = self.forward(log_pi, log_A, log_B)
        log_beta = self.backward(log_A, log_B)

        log_prob = logsumexp(log_alpha[:, T-1]) # sum of final probabilities (end in each state)

        # (a * b) / (sum (a*b) across states) in log and vectorized and then un-logged
        gamma = np.exp(log_alpha + log_beta - logsumexp(log_alpha + log_beta, axis=0))
        gamma = np.exp(log_alpha + log_beta - log_prob)

        xi = np.zeros((T - 1, self.n_states, self.n_states))
        for t in range(T - 1):

            # get log multiplied values <- vectorized on i and j
            numerator = np.exp(
                log_alpha[:, t].reshape(-1, 1) +   # alpha_t(i)
                log_A +                            # a_{ij}
                log_B[:, t + 1].reshape(1, -1) +   # b_{t+1} (o_{t+1})
                log_beta[:, t + 1].reshape(1, -1)  # beta_{t+1}(j)
            )
            # P(O | lambda) is logged so we exp
            denominator = np.exp(log_prob.clip(1e-10)) # clip again for 0 err
            # formula is division obv
            xi[t] = numerator / denominator

        xi = xi.sum(axis=0) # sum over time
        xi /= xi.sum(axis=1, keepdims=True).clip(1e-1) # normalize by state probabilities (sum transitions over j)

        return log_prob, log_alpha, log_beta, gamma, xi

    def train(self, X):
        """
        Estimate model parameters.
        ------
        input:
        X: list of 2d-arrays of shape (Tx, 13): list of single digit MFCC features
        ------
        update model parameters (A, mu, sigma)
        """
        stats = {
            "gamma": np.zeros((self.n_states, 1)),
            "A": np.zeros((self.n_states, self.n_states)),
            "X": np.zeros((self.n_states, self.n_dims)),
            "X**2": np.zeros((self.n_states, self.n_dims))
        }

        for x in X:
            log_prob, log_alpha, log_beta, gamma, xi = self.score(x)

            stats["gamma"] += gamma.sum(axis=1, keepdims=True)
            stats["A"] += xi
            stats["X"] += gamma.dot(x)
            stats["X**2"] += gamma.dot(x**2)

        stats["gamma"] += 1
        stats["A"][:-1,:-1] += np.diag(np.full(self.n_states - 1, 1))

        self.mu = stats["X"] / stats["gamma"] # mean
        self.sigma = stats["X**2"] / stats["gamma"] - self.mu**2 # variance
        self.sigma = self.sigma.clip(1e-1)

        # print("mu shape:", self.mu.shape)
        # print("sigma shape:", self.sigma.shape)

        self.A = np.where(np.bitwise_or(self.A == 0.0, self.A == 1.0), self.A, stats["A"]) # update transition probabilities
        self.A /= self.A.sum(axis=1, keepdims=True) # normalize transition probabilities

In [None]:
# # get provided data from drive (imma change the path later to access it)
# from google.colab import drive
# drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
dataset = np.load("/content/drive/MyDrive/hybrid/data/mfccs/mfccs.npz", allow_pickle=True)
Xtrain, Ytrain = dataset["Xtrain"], dataset["Ytrain"]
Xtest, Ytest = dataset["Xtest"], dataset["Ytest"]

# Expected error rates:
# 15 states/15 iterations: 0.9714 forward, 0.9679 viterbi
# 25 states/25 iterations: 0.9821 forward, 0.9821 viterbi
# 50 states/50 iterations: 0.9804 forward, 0.9804 viterbi

n_states = 6
n_dims = 13
n_iter = 15
model = dict()

digits = range(10)

for digit in digits:
    print("Training HMM for digit %d" % digit)
    Xtrain_digit = [x for x, y in zip(Xtrain, Ytrain) if y == digit]
    model[digit] = GaussianHMM(n_states=n_states, n_dims=n_dims)
    model[digit].init_gaussian_params(Xtrain_digit)
    model[digit].init_hmm_params()

    for i in range(n_iter):
        print("Starting iteration %d..." % i)
        model[digit].train(Xtrain_digit)

print("Testing HMM")
forward_accuracy, viterbi_accuracy = np.zeros(10), np.zeros(10)
forward_confusion, viterbi_confusion = np.zeros((10, 10)), np.zeros((10, 10))
for x, y in zip(Xtest, Ytest):
    T = len(x)

    forward_scores, viterbi_scores = [], []
    for digit in digits:
        log_pi = np.log(model[digit].pi.clip(1e-10)) # same 0 issue
        log_A = np.log(model[digit].A.clip(1e-10)) # same 0 issue
        log_B = model[digit].get_emissions(x)

        log_alpha = model[digit].forward(log_pi, log_A, log_B)
        forward_log_prob = logsumexp(log_alpha[:, T - 1])
        _, viterbi_log_prob = model[digit].viterbi(log_pi, log_A, log_B)

        forward_scores.append(forward_log_prob)
        viterbi_scores.append(viterbi_log_prob)

    forward_top_digit, forward_top_log_prob = sorted(zip(digits, forward_scores), key=lambda x: -x[1])[0]
    viterbi_top_digit, viterbi_top_log_prob = sorted(zip(digits, viterbi_scores), key=lambda x: -x[1])[0]

    forward_confusion[y, forward_top_digit] += 1.
    viterbi_confusion[y, viterbi_top_digit] += 1.

forward_accuracy = np.diag(forward_confusion) / forward_confusion.sum(axis=1)
viterbi_accuracy = np.diag(viterbi_confusion) / viterbi_confusion.sum(axis=1)

print("forward accuracy (%.4f)" % forward_accuracy.mean(), forward_accuracy)
print("viterbi accuracy (%.4f)" % viterbi_accuracy.mean(), viterbi_accuracy)

Training HMM for digit 0
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 1
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 2
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Sta

In [15]:
dataset = np.load("/content/drive/MyDrive/hybrid/data/mfccs/mfccs.npz", allow_pickle=True)
Xtrain, Ytrain = dataset["Xtrain"], dataset["Ytrain"]
Xtest, Ytest = dataset["Xtest"], dataset["Ytest"]

# Expected error rates:
# 15 states/15 iterations: 0.9714 forward, 0.9679 viterbi
# 25 states/25 iterations: 0.9821 forward, 0.9821 viterbi
# 50 states/50 iterations: 0.9804 forward, 0.9804 viterbi

n_states = 15
n_dims = 13
n_iter = 15
model = dict()

digits = range(10)

for digit in digits:
    print("Training HMM for digit %d" % digit)
    Xtrain_digit = [x for x, y in zip(Xtrain, Ytrain) if y == digit]
    model[digit] = GaussianHMM(n_states=n_states, n_dims=n_dims)
    model[digit].init_gaussian_params(Xtrain_digit)
    model[digit].init_hmm_params()

    for i in range(n_iter):
        print("Starting iteration %d..." % i)
        model[digit].train(Xtrain_digit)

print("Testing HMM")
forward_accuracy, viterbi_accuracy = np.zeros(10), np.zeros(10)
forward_confusion, viterbi_confusion = np.zeros((10, 10)), np.zeros((10, 10))
for x, y in zip(Xtest, Ytest):
    T = len(x)

    forward_scores, viterbi_scores = [], []
    for digit in digits:
        log_pi = np.log(model[digit].pi.clip(1e-10)) # same 0 issue
        log_A = np.log(model[digit].A.clip(1e-10)) # same 0 issue
        log_B = model[digit].get_emissions(x)

        log_alpha = model[digit].forward(log_pi, log_A, log_B)
        forward_log_prob = logsumexp(log_alpha[:, T - 1])
        _, viterbi_log_prob = model[digit].viterbi(log_pi, log_A, log_B)

        forward_scores.append(forward_log_prob)
        viterbi_scores.append(viterbi_log_prob)

    forward_top_digit, forward_top_log_prob = sorted(zip(digits, forward_scores), key=lambda x: -x[1])[0]
    viterbi_top_digit, viterbi_top_log_prob = sorted(zip(digits, viterbi_scores), key=lambda x: -x[1])[0]

    forward_confusion[y, forward_top_digit] += 1.
    viterbi_confusion[y, viterbi_top_digit] += 1.

forward_accuracy = np.diag(forward_confusion) / forward_confusion.sum(axis=1)
viterbi_accuracy = np.diag(viterbi_confusion) / viterbi_confusion.sum(axis=1)

print("forward accuracy (%.4f)" % forward_accuracy.mean(), forward_accuracy)
print("viterbi accuracy (%.4f)" % viterbi_accuracy.mean(), viterbi_accuracy)

Training HMM for digit 0
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 1
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 2
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Sta

In [None]:
dataset = np.load("hybrid/data/mfccs/mfccs.npz", allow_pickle=True)
Xtrain, Ytrain = dataset["Xtrain"], dataset["Ytrain"]
Xtest, Ytest = dataset["Xtest"], dataset["Ytest"]

# Expected error rates:
# 15 states/15 iterations: 0.9714 forward, 0.9679 viterbi
# 25 states/25 iterations: 0.9821 forward, 0.9821 viterbi
# 50 states/50 iterations: 0.9804 forward, 0.9804 viterbi

n_states = 30
n_dims = 13
n_iter = 15
model = dict()

digits = range(10)

for digit in digits:
    print("Training HMM for digit %d" % digit)
    Xtrain_digit = [x for x, y in zip(Xtrain, Ytrain) if y == digit]
    model[digit] = GaussianHMM(n_states=n_states, n_dims=n_dims)
    model[digit].init_gaussian_params(Xtrain_digit)
    model[digit].init_hmm_params()

    for i in range(n_iter):
        print("Starting iteration %d..." % i)
        model[digit].train(Xtrain_digit)

print("Testing HMM")
forward_accuracy, viterbi_accuracy = np.zeros(10), np.zeros(10)
forward_confusion, viterbi_confusion = np.zeros((10, 10)), np.zeros((10, 10))
for x, y in zip(Xtest, Ytest):
    T = len(x)

    forward_scores, viterbi_scores = [], []
    for digit in digits:
        log_pi = np.log(model[digit].pi.clip(1e-10)) # same 0 issue
        log_A = np.log(model[digit].A.clip(1e-10)) # same 0 issue
        log_B = model[digit].get_emissions(x)

        log_alpha = model[digit].forward(log_pi, log_A, log_B)
        forward_log_prob = logsumexp(log_alpha[:, T - 1])
        _, viterbi_log_prob = model[digit].viterbi(log_pi, log_A, log_B)

        forward_scores.append(forward_log_prob)
        viterbi_scores.append(viterbi_log_prob)

    forward_top_digit, forward_top_log_prob = sorted(zip(digits, forward_scores), key=lambda x: -x[1])[0]
    viterbi_top_digit, viterbi_top_log_prob = sorted(zip(digits, viterbi_scores), key=lambda x: -x[1])[0]

    forward_confusion[y, forward_top_digit] += 1.
    viterbi_confusion[y, viterbi_top_digit] += 1.

forward_accuracy = np.diag(forward_confusion) / forward_confusion.sum(axis=1)
viterbi_accuracy = np.diag(viterbi_confusion) / viterbi_confusion.sum(axis=1)

print("forward accuracy (%.4f)" % forward_accuracy.mean(), forward_accuracy)
print("viterbi accuracy (%.4f)" % viterbi_accuracy.mean(), viterbi_accuracy)

Training HMM for digit 0
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 1
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 2
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Sta

In [None]:
dataset = np.load("hybrid/data/mfccs/mfccs.npz", allow_pickle=True)
Xtrain, Ytrain = dataset["Xtrain"], dataset["Ytrain"]
Xtest, Ytest = dataset["Xtest"], dataset["Ytest"]

# Expected error rates:
# 15 states/15 iterations: 0.9714 forward, 0.9679 viterbi
# 25 states/25 iterations: 0.9821 forward, 0.9821 viterbi
# 50 states/50 iterations: 0.9804 forward, 0.9804 viterbi

n_states = 100
n_dims = 13
n_iter = 15
model = dict()

digits = range(10)

for digit in digits:
    print("Training HMM for digit %d" % digit)
    Xtrain_digit = [x for x, y in zip(Xtrain, Ytrain) if y == digit]
    model[digit] = GaussianHMM(n_states=n_states, n_dims=n_dims)
    model[digit].init_gaussian_params(Xtrain_digit)
    model[digit].init_hmm_params()

    for i in range(n_iter):
        print("Starting iteration %d..." % i)
        model[digit].train(Xtrain_digit)

print("Testing HMM")
forward_accuracy, viterbi_accuracy = np.zeros(10), np.zeros(10)
forward_confusion, viterbi_confusion = np.zeros((10, 10)), np.zeros((10, 10))
for x, y in zip(Xtest, Ytest):
    T = len(x)

    forward_scores, viterbi_scores = [], []
    for digit in digits:
        log_pi = np.log(model[digit].pi.clip(1e-10)) # same 0 issue
        log_A = np.log(model[digit].A.clip(1e-10)) # same 0 issue
        log_B = model[digit].get_emissions(x)

        log_alpha = model[digit].forward(log_pi, log_A, log_B)
        forward_log_prob = logsumexp(log_alpha[:, T - 1])
        _, viterbi_log_prob = model[digit].viterbi(log_pi, log_A, log_B)

        forward_scores.append(forward_log_prob)
        viterbi_scores.append(viterbi_log_prob)

    forward_top_digit, forward_top_log_prob = sorted(zip(digits, forward_scores), key=lambda x: -x[1])[0]
    viterbi_top_digit, viterbi_top_log_prob = sorted(zip(digits, viterbi_scores), key=lambda x: -x[1])[0]

    forward_confusion[y, forward_top_digit] += 1.
    viterbi_confusion[y, viterbi_top_digit] += 1.

forward_accuracy = np.diag(forward_confusion) / forward_confusion.sum(axis=1)
viterbi_accuracy = np.diag(viterbi_confusion) / viterbi_confusion.sum(axis=1)

print("forward accuracy (%.4f)" % forward_accuracy.mean(), forward_accuracy)
print("viterbi accuracy (%.4f)" % viterbi_accuracy.mean(), viterbi_accuracy)

Training HMM for digit 0
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 1
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Starting iteration 8...
Starting iteration 9...
Starting iteration 10...
Starting iteration 11...
Starting iteration 12...
Starting iteration 13...
Starting iteration 14...
Training HMM for digit 2
Starting iteration 0...
Starting iteration 1...
Starting iteration 2...
Starting iteration 3...
Starting iteration 4...
Starting iteration 5...
Starting iteration 6...
Starting iteration 7...
Sta

## Force-algnment using HMM-GMM models

In [3]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import numpy as np
from operator import itemgetter
import os
import pickle as pkl
import time

import torch
import torch.nn as nn
import torch.nn.functional as F

np.seterr(divide='ignore') # masks log(0) errors

{'divide': 'warn', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}

In [5]:
from hybrid.hmm.multiple import FullGaussianHMM

Performance (WER) on test set:   

Baseline performance of the GMM-HMM model   
24.55%

## Training a multiple digit GMM-HMM model
NOTE: You are not expected to run/tune this part as the trained FullGaussianHMM model file is provided. The provided model is designed to have 15 states for each digit and 3 additional states for start, pause, and end. Feel free to look through hybrid/hmm/multiple.py to see how we can string single-digit HMMs to obtain the one that can model multiple-digit sequences.

In [25]:
# """
# Multiple Digit HMM: training two-digit sequences
# """
# data_multiple_digit = np.load("hybrid/data/mfccs/mfccs_multiple.npz", allow_pickle=True)
# full_model = FullGaussianHMM(data_multiple_digit["Xtrain"], "hybrid/hmm/models/single_digit_model.pkl")

# n_iter = 15

# print("Training HMM")
# for i in range(n_iter):
#     print("starting iteration {}...".format(i + 1))
#     full_model.train(data_multiple_digit["Xtrain"], data_multiple_digit["Ytrain"])

# print("Testing HMM")
# test_wer = full_model.test(data_multiple_digit["Xtest"], data_multiple_digit["Ytest"])
# print("{:.2f}% WER".format(test_wer * 100.))

# with open("hybrid/hmm/models/multiple_digit_model.pkl", "wb") as f:
#     pkl.dump(full_model, f)

## Saving the optimal state sequences
Save the optimal state label per framee using the trained GMM-HMM model. Complete the # TODO in force_align function

In [23]:
def force_align(X, Y, hmm_gmm_model):
    """
    Force align using Viterbi to get the hidden state sequence for each (X, Y) pair.
    ------
    input:
    X: list of 2d-arrays of shape (Tx, 13): list of single digit MFCC features
    Y: digit sequence
    hmm_gmm_model: load the trained model
    ------
    Returns a list of utterence-wise hidden state sequences
    """
    digit_states_total, start_states, end_states = hmm_gmm_model.digit_states_total, hmm_gmm_model.start_states, hmm_gmm_model.stop_states
    begin_sil_id, pause_id, end_sil_id = hmm_gmm_model.begin_sil, hmm_gmm_model.pause, hmm_gmm_model.end_sil
    A_estimate, pi_estimate = hmm_gmm_model.A, hmm_gmm_model.pi
    state_seqs = []
    for ii, (x, y) in enumerate(zip(X, Y)):

        y = np.array([0 if yy == 'o' else int(yy) for yy in y], dtype=np.int32)
        
        # A_estimate shape is 153 x 153 because 10 digits with 15 states each, and a start, end, pause state.
        A_estimate_edited = np.zeros_like(A_estimate)
        d1, d2 = y
        
        # start can stay at itself or move to d1
        A_estimate_edited[begin_sil_id, begin_sil_id] = A_estimate[begin_sil_id, begin_sil_id]
        A_estimate_edited[begin_sil_id, start_states[d1]] = A_estimate[begin_sil_id, start_states[d1]]
        
        # allow for only sequential transitions within state
        for i in range(start_states[d1], end_states[d1]):
            A_estimate_edited[i, i] = A_estimate[i, i]
            A_estimate_edited[i, i + 1] = A_estimate[i, i + 1]
        
        # d1 last state can go to itself move to pause
        A_estimate_edited[end_states[d1], end_states[d1]] = A_estimate[end_states[d1], end_states[d1]]
        A_estimate_edited[end_states[d1], pause_id] = A_estimate[end_states[d1], pause_id]
        
        # pause can hold or move to d2
        A_estimate_edited[pause_id, pause_id] = A_estimate[pause_id, pause_id]
        A_estimate_edited[pause_id, start_states[d2]] = A_estimate[pause_id, start_states[d2]]
        
        # allow for only sequential transitions within state
        for i in range(start_states[d2], end_states[d2]):
            A_estimate_edited[i, i] = A_estimate[i, i]
            A_estimate_edited[i, i + 1] = A_estimate[i, i + 1]
        
        # d2 last state can stay on itself or move to end
        A_estimate_edited[end_states[d2], end_states[d2]] = A_estimate[end_states[d2], end_states[d2]]
        A_estimate_edited[end_states[d2], end_sil_id] = A_estimate[end_states[d2], end_sil_id]
        
        # avg out rows but keep zeros if sum is 0
        A_estimate_edited = np.array([row / row.sum() if row.sum() != 0 else np.zeros_like(row) for row in A_estimate_edited])

        log_pi = np.log(pi_estimate)
        log_A = np.log(A_estimate_edited)
        log_B = hmm_gmm_model.get_emissions(x)

        q, log_prob = hmm_gmm_model.viterbi(log_pi, log_A, log_B)
        state_seqs.append(q)

    return state_seqs

data_multiple_digit = np.load("hybrid/data/mfccs/mfccs_multiple.npz", allow_pickle=True)
with open("hybrid/hmm/models/multiple_digit_model.pkl", "rb") as f:
    hmm_gmm_model = pkl.load(f)
state_seq_train = force_align(data_multiple_digit["Xtrain"], data_multiple_digit["Ytrain"], hmm_gmm_model)
state_seq_dev = force_align(data_multiple_digit["Xdev"], data_multiple_digit["Ydev"], hmm_gmm_model)
state_seq_test = force_align(data_multiple_digit["Xtest"], data_multiple_digit["Ytest"], hmm_gmm_model)
seqDict = {'Ytrain': state_seq_train, 'Ydev': state_seq_dev, 'Ytest': state_seq_test, 'total_states': hmm_gmm_model.total}

with open('hybrid/data/state_seq/state_seq.pkl', 'wb') as f:
    pkl.dump(seqDict, f)

Use the HMM-GMM model to get the alignment for `65a.wav`. The labels are 6 and 5.

In [43]:
from hybrid.hmm.make_mfccs import extract_mfcc_features
from IPython.display import Audio
import soundfile as sf

# this is the number of waveform samples correspond to one mfcc feature frame
mfcc_strides = 64

wav_file = "hybrid/data/wav/65a.wav"
wav, sr = sf.read(wav_file)
mfccs = extract_mfcc_features(wav, sr)

state_seq = force_align([mfccs], [["6", "5"]], hmm_gmm_model)

digit_states_total, start_states, end_states = hmm_gmm_model.digit_states_total, hmm_gmm_model.start_states, hmm_gmm_model.stop_states
begin_sil_id, pause_id, end_sil_id = hmm_gmm_model.begin_sil, hmm_gmm_model.pause, hmm_gmm_model.end_sil

# print(len(wav))
# print(len(state_seq[0]) * mfcc_strides) these are same!!!

# print(state_seq)

# get state_seq indices for start and end of variables
last_begin = last_pause = end_5 = end_6 = -1
for i, v in enumerate(state_seq[0]):
    if v == begin_sil_id:
        last_begin = i
    if v == end_states[6]:
        end_6 = i
    if v == pause_id:
        last_pause = i
    if v == end_states[5]:
        end_5 = i
start_6, start_5 = last_begin + 1, last_pause + 1

# get relevant .wav slices for each digit utterance
first_digit = wav[start_6*mfcc_strides : (end_6+1)*mfcc_strides]
second_digit = wav[start_5*mfcc_strides : (end_5+1)*mfcc_strides]

play the first and second digits

In [44]:
Audio(data=first_digit, rate=sr)

In [41]:
Audio(data=second_digit, rate=sr)