In [None]:
!pip install kagglehub hmmlearn soundfile librosa numpy matplotlib seaborn



In [None]:
import os
import numpy as np
import librosa
import soundfile as sf
import kagglehub
import matplotlib.pyplot as plt
import seaborn as sns
from hmmlearn import hmm
from scipy.special import logsumexp

In [None]:
dataset_path = kagglehub.dataset_download("mfekadu/darpa-timit-acousticphonetic-continuous-speech")
print(f"Dataset at: {dataset_path}")

Using Colab cache for faster access to the 'darpa-timit-acousticphonetic-continuous-speech' dataset.
Dataset at: /kaggle/input/darpa-timit-acousticphonetic-continuous-speech


In [None]:
def load_and_process_audio(root_dir):
    target_file = None

    for root, _, files in os.walk(root_dir):
        for f in files:
            if f.lower().endswith('.wav'):
                target_file = os.path.join(root, f)
                break
        if target_file: break

    if not target_file:
        raise FileNotFoundError("Could not locate any audio files.")

    print(f"Selected Audio: {target_file}")

    y, sr = librosa.load(target_file, sr=16000)

    raw_mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20, hop_length=160)
    X = raw_mfcc.T

    mean_vec = np.mean(X, axis=0)
    std_vec = np.std(X, axis=0)
    X_norm = (X - mean_vec) / (std_vec + 1e-8)

    return X_norm

mfcc_features = load_and_process_audio(dataset_path)
print(f"Feature Matrix Shape: {mfcc_features.shape}")

Selected Audio: /kaggle/input/darpa-timit-acousticphonetic-continuous-speech/data/TEST/DR4/FLBW0/SA1.WAV.wav
Feature Matrix Shape: (357, 20)


In [None]:
num_states = 8

reco_model = hmm.GaussianHMM(n_components=num_states, covariance_type='diag', n_iter=150, random_state=99)
reco_model.fit(mfcc_features)

print(f"\nModel Converged: {reco_model.monitor_.converged}")

print("\n1. Initial State Probabilities (pi) [First 5]:")
print(np.exp(reco_model.startprob_[:5]))

print("\n2. Transition Matrix (A) [Shape]:", reco_model.transmat_.shape)
print("Sample of Transition probabilities (State 0 to others):")
print(reco_model.transmat_[0, :].round(3))

print("\n3. Emission Means (Gaussian Means) [Shape]:", reco_model.means_.shape)
print("Mean vector for State 0 (first 5 dimensions):")
print(reco_model.means_[0][:5].round(3))

In [None]:
def compute_log_emission(X, means, covars):
    T, D = X.shape
    N = means.shape[0]
    log_B = np.zeros((T, N))

    const_term = -0.5 * D * np.log(2 * np.pi)

    for i in range(N):
        mu = means[i]
        sigma2 = covars[i]

        if sigma2.ndim == 2:
            sigma2 = np.diag(sigma2)
        log_det_sigma = -0.5 * np.sum(np.log(sigma2 + 1e-10))
        diff = X - mu
        mahalanobis = -0.5 * np.sum((diff ** 2) / (sigma2 + 1e-10), axis=1)

        log_B[:, i] = const_term + log_det_sigma + mahalanobis

    return log_B

def run_forward_pass(obs_seq, model):
    start_log = np.log(model.startprob_ + 1e-50)
    trans_log = np.log(model.transmat_ + 1e-50)

    log_B = compute_log_emission(obs_seq, model.means_, model.covars_)

    T, N = log_B.shape
    fwd_lattice = np.zeros((T, N))

    fwd_lattice[0] = start_log + log_B[0]

    for t in range(1, T):
        for s_curr in range(N):
            prev_alpha_trans = fwd_lattice[t-1] + trans_log[:, s_curr]
            fwd_lattice[t, s_curr] = logsumexp(prev_alpha_trans) + log_B[t, s_curr]

    return logsumexp(fwd_lattice[-1])

def run_viterbi_decode(obs_seq, model):
    start_log = np.log(model.startprob_ + 1e-50)
    trans_log = np.log(model.transmat_ + 1e-50)
    log_B = compute_log_emission(obs_seq, model.means_, model.covars_)

    T, N = log_B.shape
    viterbi_lattice = np.zeros((T, N))
    back_pointers = np.zeros((T, N), dtype=int)

    viterbi_lattice[0] = start_log + log_B[0]

    for t in range(1, T):
        for s_curr in range(N):
            candidates = viterbi_lattice[t-1] + trans_log[:, s_curr]

            best_prev_state = np.argmax(candidates)
            back_pointers[t, s_curr] = best_prev_state
            viterbi_lattice[t, s_curr] = candidates[best_prev_state] + log_B[t, s_curr]

    best_path = [np.argmax(viterbi_lattice[-1])]
    for t in range(T-1, 0, -1):
        prev_s = back_pointers[t, best_path[-1]]
        best_path.append(prev_s)

    return np.array(best_path[::-1])

In [None]:
my_log_prob = run_forward_pass(mfcc_features, reco_model)
lib_log_prob = reco_model.score(mfcc_features)

print("Forward Algorithm Validation")
print(f"My Implementation Score: {my_log_prob:.4f}")
print(f"Hmmlearn Library Score:  {lib_log_prob:.4f}")

my_path = run_viterbi_decode(mfcc_features, reco_model)
lib_path = reco_model.predict(mfcc_features)

print("\n Viterbi Algorithm Validation")
print(f"Identical Path Lengths? {len(my_path) == len(lib_path)}")
print(f"Path Arrays Identical?  {np.array_equal(my_path, lib_path)}")

print(f"\nDecoded State Sequence (First 30 frames):\n{my_path[:30]}")

Forward Algorithm Validation
My Implementation Score: -6619.7310
Hmmlearn Library Score:  -6619.7310

 Viterbi Algorithm Validation
Identical Path Lengths? True
Path Arrays Identical?  True

Decoded State Sequence (First 30 frames):
[6 6 6 6 6 6 6 6 6 0 0 0 0 0 0 0 0 0 0 0 0 0 2 2 2 2 2 2 2 4]
