In [36]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import json
import numpy as np
from operator import itemgetter
import os
import pickle as pkl
import time

import torch
import torch.nn as nn
import torch.nn.functional as F

np.seterr(divide='ignore') # masks log(0) errors

{'divide': 'ignore', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}

In [3]:
from hybrid.hmm.multiple import FullGaussianHMM

The default DNN set-up should take ~40 seconds/epoch on a GPU (and ~350 secconds/epoch on a CPU).

Performance (WER) on test set:   

Baseline performance of the GMM-HMM model   
24.55%

Performance of the DNN-HMM model with normalized emission probabilities   
20.45%

Performance of the DNN-HMM model with unnormalized emission probabilities   
18.18%

## Training a multiple digit GMM-HMM model
NOTE: You are not expected to run/tune this part as the trained FullGaussianHMM model file is provided. The provided model is designed to have 15 states for each digit and 3 additional states for start, pause, and end. Feel free to look through hybrid/hmm/multiple.py to see how we can string single-digit HMMs to obtain the one that can model multiple-digit sequences.

In [4]:
# """
# Multiple Digit HMM: training two-digit sequences
# """
# data_multiple_digit = np.load("hybrid/data/mfccs/mfccs_multiple.npz", allow_pickle=True)
# full_model = FullGaussianHMM(data_multiple_digit["Xtrain"], "hybrid/hmm/models/single_digit_model.pkl")

# n_iter = 15

# print("Training HMM")
# for i in range(n_iter):
#     print("starting iteration {}...".format(i + 1))
#     full_model.train(data_multiple_digit["Xtrain"], data_multiple_digit["Ytrain"])
        
# print("Testing HMM")
# test_wer = full_model.test(data_multiple_digit["Xtest"], data_multiple_digit["Ytest"])
# print("{:.2f}% WER".format(test_wer * 100.))

# with open("hybrid/hmm/models/multiple_digit_model.pkl", "wb") as f:
#     pkl.dump(full_model, f)

## Saving the optimal state sequences
Save the optimal state label per framee using the trained GMM-HMM model. Complete the # TODO in force_align function

In [15]:
def force_align(X, Y, hmm_gmm_model):
    """
    Force align using Viterbi to get the hidden state sequence for each (X, Y) pair.
    ------
    input:
    X: list of 2d-arrays of shape (Tx, 13): list of single digit MFCC features
    Y: digit sequence
    hmm_gmm_model: load the trained model
    ------
    Returns a list of utterence-wise hidden state sequences
    """
    digit_states_total, start_states = hmm_gmm_model.digit_states_total, hmm_gmm_model.start_states
    ## 150 states, 15 for each 
    begin_sil_id, pause_id, end_sil_id = hmm_gmm_model.begin_sil, hmm_gmm_model.pause, hmm_gmm_model.end_sil
    ## ids are 150,152,151 for each of them
    A_estimate, pi_estimate = hmm_gmm_model.A, hmm_gmm_model.pi
    state_seqs = []
    for ii, (x, y) in enumerate(zip(X, Y)):

        y = np.array([0 if yy == 'o' else int(yy) for yy in y], dtype=np.int32)
        # TODO: edit A_estimate appropriately to enable decoding for the ground-truth labelss
        
        A_estimate[begin_sil_id, :digit_states_total] = 0.
        A_estimate[pause_id, :digit_states_total] = 0.

        A_estimate[begin_sil_id, start_states[y[0]]] = 1. - A_estimate[begin_sil_id, begin_sil_id]
        A_estimate[pause_id, start_states[y[1]]] = 1. - A_estimate[pause_id, pause_id]
   
        log_pi = np.log(pi_estimate)
        log_A = np.log(A_estimate)
        log_B = hmm_gmm_model.get_emissions(x)

        q, log_prob = hmm_gmm_model.viterbi(log_pi, log_A, log_B) 
        state_seqs.append(q)

    return state_seqs

data_multiple_digit = np.load("hybrid/data/mfccs/mfccs_multiple.npz", allow_pickle=True)
with open("hybrid/hmm/models/multiple_digit_model.pkl", "rb") as f:
    hmm_gmm_model = pkl.load(f)
    
state_seq_train = force_align(data_multiple_digit["Xtrain"], data_multiple_digit["Ytrain"], hmm_gmm_model)
state_seq_dev = force_align(data_multiple_digit["Xdev"], data_multiple_digit["Ydev"], hmm_gmm_model)
state_seq_test = force_align(data_multiple_digit["Xtest"], data_multiple_digit["Ytest"], hmm_gmm_model)
seqDict = {'Ytrain': state_seq_train, 'Ydev': state_seq_dev, 'Ytest': state_seq_test, 'total_states': hmm_gmm_model.total}
np.savez_compressed('hybrid/data/state_seq/state_seq.npz', **seqDict)

## Training a DNN frame classifier

In [16]:
from hybrid.dnn.loader import DataLoader

In [17]:
with open("hybrid/dnn/config.json", "r") as fid:                                                                                                                                                                                                                                      
    config = json.load(fid)

np.random.seed(config["seed"])
torch.manual_seed(config["seed"])

data_cfg = config["data"]
model_cfg = config["model"]
opt_cfg = config["optimizer"]
out_cfg = config["output"]

data_mfccs = np.load(data_cfg["mfcc"], allow_pickle=True)
state_seq = np.load(data_cfg["state_seq"], allow_pickle=True)

print("Preparing data...\n")
data_ldr = DataLoader(data_cfg)
train_features, train_labels, train_labels_onehot, train_utt_to_frames = data_ldr.prepare_data('train')
dev_features, dev_labels, dev_labels_onehot, dev_utt_to_frames = data_ldr.prepare_data('dev')
test_features, test_labels, test_labels_onehot, test_utt_to_frames = data_ldr.prepare_data('test')

feat_dim = (data_ldr.context_size+1)*data_ldr.mfcc_dim
n_states = data_ldr.n_states

Preparing data...



In [None]:
class FeedForward(nn.Module):

    def __init__(self, feat_dim, n_states, hidden_dim, n_layers, dropout):
        """
        Initialized feed forward neural network model.
        ---
        feat_dim: input feature dimension
        n_states: size of the output
        hidden_dim: dimension of the hidden layers
        n_layers: number of layers
        dropout: dropout probabilty for the dropout layer
        """
        super().__init__()

        self.n_layers = n_layers
        self.fc_input = nn.Linear(feat_dim, hidden_dim)
        self.fc_output = nn.Linear(hidden_dim, n_states)
        self.fc_hidden_list = nn.ModuleList([nn.Linear(hidden_dim, hidden_dim)]*n_layers)
        self.nl = nn.ReLU()
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):
        """
        Forward pass for the feedforward network
        """
        x = self.nl(self.fc_input(x))
        for i in range(self.n_layers):
            x = self.nl(self.fc_hidden_list[i](x))
        output = F.leaky_relu(self.fc_output(x))

        return output

def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_uniform_(m.weight, nonlinearity='relu')

In [None]:
def train(epoch):
    """
    Training the classifier on frame level labels
    """
    classifier.train()
    perm = np.random.permutation(train_features.shape[0])
    train_loss, pred_multi, gt_multi = [], [], []
    n_iter = 0
    start = time.time()
    time_per_iter = [0]*4
    for i in range(0, len(perm), batch_size):
        idx = perm[i:i+batch_size]
        train_Xs = torch.tensor(train_features[idx], dtype=torch.float32).to(device)
        train_Ys = torch.tensor(train_labels[idx], dtype=torch.long).to(device)
        pred_Ys = classifier(train_Xs)
        loss = loss_function(pred_Ys, train_Ys)
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(classifier.parameters(), 5.0)
        optimizer.step()
        train_loss.append(loss.cpu().item())
        pred_multi.append(np.argmax(pred_Ys.cpu().data.numpy(), axis=1))
        gt_multi.append(train_Ys.cpu().data.numpy())
    pred_multi, gt_multi = np.concatenate(pred_multi, axis=0), np.concatenate(gt_multi, axis=0)
    accuracy = 100*len(np.where((pred_multi - gt_multi)==0)[0])/len(pred_multi)
    print("Epoch: %d, Training loss: %.2f, Accuracy: %.2f, Time elapsed: %.2f seconds" % (epoch, np.mean(train_loss), accuracy, time.time() - start))

    return accuracy

def test(features, labels, classifier_test=None):
    """
    Training the classifier on frame level labels
    """
    if classifier_test is None:
        classifier_test = torch.load(save_model_fn)
    classifier_test.eval()
    test_loss, pred_multi, gt_multi = [], [], []
    n_iter = 0
    start = time.time()
    for i in range(0, len(features), test_batch_size):
        n_iter += 1
        idx = list(range(i, min(i+test_batch_size, len(features))))
        test_Xs = torch.tensor(features[idx], dtype=torch.float32).to(device)
        test_Ys = torch.tensor(labels[idx], dtype=torch.long).to(device)
        pred_Ys = classifier_test(test_Xs)
        loss = loss_function(pred_Ys, test_Ys)
        test_loss.append(loss.cpu().item())
        pred_multi.append(np.argmax(pred_Ys.cpu().data.numpy(), axis=1))
        gt_multi.append(test_Ys.cpu().data.numpy())

    pred_multi, gt_multi = np.concatenate(pred_multi, axis=0), np.concatenate(gt_multi, axis=0)
    accuracy = 100*len(np.where((pred_multi - gt_multi)==0)[0])/len(pred_multi)

    print("Dev Accuracy: %.2f, Time elapsed: %.2f seconds" % (accuracy, time.time() - start))

    return accuracy

def main_train():
    print("Training begins ...\n")
    best_accuracy = 0
    for epoch in range(tot_epoch):
        train_accuracy = train(epoch)
        dev_accuracy = test(dev_features, dev_labels, classifier)
        if dev_accuracy > best_accuracy:
            best_epoch = epoch
            torch.save(classifier, save_model_fn)
            best_accuracy = dev_accuracy
    print('\nBest dev accuracy: %.2f at epoch: %d' % (best_accuracy, best_epoch))

def main_test():
    accuracy = test(test_features, test_labels)
    print('\nAccuracy on test set: %.2sf' % (accuracy))

In [18]:
tot_epoch = opt_cfg["max_epochs"]
hidden_dim = model_cfg["hidden_dim"]
n_layers = model_cfg["n_layers"]
dropout = model_cfg["dropout_probability"]

batch_size = opt_cfg["batch_size"]
test_batch_size = opt_cfg["test_batch_size"]

save_model_fn = os.path.join(out_cfg["save_dir"], "dnn_model.pkl")

device = 'cuda' if torch.cuda.is_available() else 'cpu'
if device == 'cuda':
    torch.backends.cudnn.deterministic = True

classifier = FeedForward(feat_dim, n_states, hidden_dim, n_layers, dropout).to(device)
# classifier.apply(init_weights)
loss_function = nn.CrossEntropyLoss()
optimizer = getattr(torch.optim, opt_cfg["type"])(list(classifier.parameters()))

main_train()

Training begins ...

Epoch: 0, Training loss: 1.97, Accuracy: 44.92, Time elapsed: 743.94 seconds
Dev Accuracy: 48.40, Time elapsed: 64.28 seconds
Epoch: 1, Training loss: 1.38, Accuracy: 58.82, Time elapsed: 868.08 seconds
Dev Accuracy: 51.56, Time elapsed: 44.81 seconds
Epoch: 2, Training loss: 1.22, Accuracy: 62.86, Time elapsed: 1299.72 seconds
Dev Accuracy: 52.96, Time elapsed: 66.48 seconds
Epoch: 3, Training loss: 1.13, Accuracy: 65.78, Time elapsed: 411.45 seconds
Dev Accuracy: 53.61, Time elapsed: 7.49 seconds
Epoch: 4, Training loss: 1.05, Accuracy: 67.86, Time elapsed: 330.34 seconds
Dev Accuracy: 53.15, Time elapsed: 7.12 seconds
Epoch: 5, Training loss: 0.99, Accuracy: 69.63, Time elapsed: 334.77 seconds
Dev Accuracy: 53.46, Time elapsed: 7.02 seconds
Epoch: 6, Training loss: 0.95, Accuracy: 71.02, Time elapsed: 300.05 seconds
Dev Accuracy: 52.31, Time elapsed: 7.76 seconds
Epoch: 7, Training loss: 0.91, Accuracy: 72.26, Time elapsed: 288.21 seconds
Dev Accuracy: 52.51, Ti

512 1


In [52]:
# TODO: tune on the dev set
# may want to set up function or chunk of code here to perform tuning
# call train on training set, call test on dev, save/plot/compare results


def train(epoch):
    """
    Training the classifier on frame level labels
    """
    classifier.train()
    perm = np.random.permutation(train_features.shape[0])
    train_loss, pred_multi, gt_multi = [], [], []
    n_iter = 0
    start = time.time()
    time_per_iter = [0]*4
    for i in range(0, len(perm), batch_size):
        idx = perm[i:i+batch_size]
        train_Xs = torch.tensor(train_features[idx], dtype=torch.float32).to(device)
        train_Ys = torch.tensor(train_labels[idx], dtype=torch.long).to(device)
        pred_Ys = classifier(train_Xs)
        loss = loss_function(pred_Ys, train_Ys)
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(classifier.parameters(), 5.0)
        optimizer.step()
        train_loss.append(loss.cpu().item())
        pred_multi.append(np.argmax(pred_Ys.cpu().data.numpy(), axis=1))
        gt_multi.append(train_Ys.cpu().data.numpy())
    pred_multi, gt_multi = np.concatenate(pred_multi, axis=0), np.concatenate(gt_multi, axis=0)
    accuracy = 100*len(np.where((pred_multi - gt_multi)==0)[0])/len(pred_multi)
    print("Epoch: %d, Training loss: %.2f, Accuracy: %.2f, Time elapsed: %.2f seconds" % (epoch, np.mean(train_loss), accuracy, time.time() - start))

    return accuracy,train_loss

def test(features, labels, classifier_test=None):
    """
    Training the classifier on frame level labels
    """
    if classifier_test is None:
        classifier_test = torch.load(save_model_fn)
    classifier_test.eval()
    test_loss, pred_multi, gt_multi = [], [], []
    n_iter = 0
    start = time.time()
    for i in range(0, len(features), test_batch_size):
        n_iter += 1
        idx = list(range(i, min(i+test_batch_size, len(features))))
        test_Xs = torch.tensor(features[idx], dtype=torch.float32).to(device)
        test_Ys = torch.tensor(labels[idx], dtype=torch.long).to(device)
        pred_Ys = classifier_test(test_Xs)
        loss = loss_function(pred_Ys, test_Ys)
        test_loss.append(loss.cpu().item())
        pred_multi.append(np.argmax(pred_Ys.cpu().data.numpy(), axis=1))
        gt_multi.append(test_Ys.cpu().data.numpy())

    pred_multi, gt_multi = np.concatenate(pred_multi, axis=0), np.concatenate(gt_multi, axis=0)
    accuracy = 100*len(np.where((pred_multi - gt_multi)==0)[0])/len(pred_multi)

    print("Dev Accuracy: %.2f, Time elapsed: %.2f seconds" % (accuracy, time.time() - start))

    return accuracy,test_loss

def main_train():
    print("Training begins ...\n")
    best_accuracy = 0
    train_acc_list = []
    dev_acc_list = []
    train_loss_list = []
    dev_loss_list = []
    for epoch in range(tot_epoch):
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        if device.type == 'cuda':
            print(torch.cuda.get_device_name(0))
            print('Memory Usage:')
            print('Allocated:', round(torch.cuda.memory_allocated(0)/1024**3,1), 'GB')
            print('Cached:   ', round(torch.cuda.memory_cached(0)/1024**3,1), 'GB')
        train_accuracy,train_loss = train(epoch)
        dev_accuracy,dev_loss = test(dev_features, dev_labels, classifier)
        train_acc_list.append(train_accuracy)
        dev_acc_list.append(dev_accuracy)
        train_loss_list.append(np.sum(train_loss))
        dev_loss_list.append(np.sum(dev_loss))
        if dev_accuracy > best_accuracy:
            best_epoch = epoch
            #torch.save(classifier, save_model_fn)
            best_accuracy = dev_accuracy
    print('\nBest dev accuracy: %.2f at epoch: %d' % (best_accuracy, best_epoch))
    return train_acc_list,dev_acc_list,train_loss_list,dev_loss_list,best_accuracy

def main_test():
    accuracy = test(test_features, test_labels)
    print('\nAccuracy on test set: %.2sf' % (accuracy))





def tuning(hidden_dim_list,n_layers_list):
    tot_epoch = 5
    dropout = model_cfg["dropout_probability"]
    batch_size = opt_cfg["batch_size"]
    test_batch_size = opt_cfg["test_batch_size"]
    total_acc = 0
    best_hidden =0
    best_layer = 0
    
    for hidden_dim in hidden_dim_list:
        for n_layers in n_layers_list:
            save_model_fn = os.path.join(out_cfg["save_dir"], "dnn_model.pkl")
            device = 'cuda' if torch.cuda.is_available() else 'cpu'
            if device == 'cuda':
                torch.backends.cudnn.deterministic = True
            torch.cuda.empty_cache()
            with torch.no_grad():
                classifier = FeedForward(feat_dim, n_states, hidden_dim, n_layers, dropout).to(device)
            # classifier.apply(init_weights)
            loss_function = nn.CrossEntropyLoss()
            optimizer = getattr(torch.optim, opt_cfg["type"])(list(classifier.parameters()))
            train_acc_list,dev_acc_list,train_loss_list,dev_loss_list,best_accuracy = main_train()
            if best_accuracy > total_acc:
                torch.save(classifier, save_model_fn)
                total_acc = best_accuracy
                best_hidden = hidden_dim
                best_layer = n_layers
    return best_hidden,best_layer


# (hidden_dim = 512 ,model_cfg["n_layers"] = 1)

hidden_dim_list = [128,256]
n_layers_list = [1,2]
best_hidden,best_layer = tuning(hidden_dim_list,n_layers_list)

Training begins ...

GeForce GTX 960
Memory Usage:
Allocated: 0.0 GB
Cached:    0.0 GB
Epoch: 0, Training loss: 0.75, Accuracy: 77.59, Time elapsed: 195.10 seconds
Dev Accuracy: 52.13, Time elapsed: 3.90 seconds
GeForce GTX 960
Memory Usage:
Allocated: 0.0 GB
Cached:    0.0 GB


KeyboardInterrupt: 

In [49]:
opt_cfg["batch_size"]

8

Save log-emission probabilities using the best model saved

In [None]:
def get_log_emission(utt_to_frames_dict, features, prior, temp_parameter, best_model_path):
    """
    Save posteriors using the trained model
    """
    classifier_eval = torch.load(best_model_path)
    classifier_eval.eval()
    log_emission = []
    n_iter = 0
    for utt_idx in range(len(utt_to_frames_dict)):
        frame_id = utt_to_frames_dict[utt_idx]
        log_emission_utt = []
        for i in range(0, len(frame_id), batch_size):
            idx = frame_id[i:i+batch_size]
            Xs = torch.tensor(itemgetter(*idx)(features), dtype=torch.float32).to(device)
            log_pred_Ys = F.log_softmax(classifier_eval(Xs), dim=1).cpu().data.numpy()
            log_emission_utt.append(log_pred_Ys  - temp_parameter*np.log(prior))
        log_emission_utt = np.concatenate(log_emission_utt, axis=0)
        log_emission.append(log_emission_utt)

    return log_emission

In [None]:
temp_parameter = out_cfg["temp_parameter"]
print("Saving log emissions for temperature %.1f ...\n" % (temp_parameter))
prior = data_ldr.get_prior()
train_log_emission =  get_log_emission(train_utt_to_frames, train_features, prior, temp_parameter, save_model_fn)
dev_log_emission = get_log_emission(dev_utt_to_frames, dev_features, prior, temp_parameter, save_model_fn)
test_log_emission = get_log_emission(test_utt_to_frames, test_features, prior, temp_parameter, save_model_fn)
log_emission_dict = {'Ytrain': train_log_emission, 'Ydev': dev_log_emission, 'Ytest': test_log_emission}
np.savez_compressed(os.path.join('hybrid/data/log_emission/log_emission_'+str(temp_parameter)+'.npz'), **log_emission_dict)


## HMM inference using the posterior from neural network

In [None]:
data_multiple_digit = np.load("hybrid/data/mfccs/mfccs_multiple.npz", allow_pickle=True)

with open("hybrid/hmm/models/multiple_digit_model.pkl", "rb") as f:
    full_model_trained = pkl.load(f)

log_emission_1 = np.load('hybrid/data/log_emission/log_emission_1.0.npz', allow_pickle=True)
log_emission_0 = np.load('hybrid/data/log_emission/log_emission_0.0.npz', allow_pickle=True)
    
def get_test_wer(model, posterior=None):
    test_wer = model.test(data_multiple_digit["Xtest"], data_multiple_digit["Ytest"], posterior)
    print("{:.2f}% TEST WER".format(test_wer * 100.))

print('Baseline performance of the trained model')
get_test_wer(full_model_trained)

print('Performance of the trained model with normalized emission probabilities')
get_test_wer(full_model_trained, log_emission_1["Ytest"])

print('Performance of the trained model with unnormalized emission probabilities')
get_test_wer(full_model_trained, log_emission_0["Ytest"])