In [None]:

# Hidden Markov Model lab (single cell, Colab-ready)
# Implements: parameter representation, display, sampling (generate sequence),
# and Viterbi inference for the phonemes of the word "speech".

import numpy as np
import pandas as pd

np.random.seed(42)  # for reproducible runs; change/remove if you want randomness

# 1) Define states (phonemes) and observations
states = ['/s/', '/p/', '/ie:/', '/tS/']          # hidden states (phonemes)
observations = ['Energy', 'Pitch', 'Duration']   # discrete observation symbols

n_states = len(states)
n_obs = len(observations)

# 2) HMM parameters
# (a) Initial probabilities: start in /s/ with probability 1
pi = np.array([1.0, 0.0, 0.0, 0.0])   # order matches `states`

# (b) Transition probability matrix (rows = from-state, cols = to-state)
# We will define a plausible transition matrix for the word "speech"
# A sample table consistent with the idea of the phoneme order in "speech".
A = np.array([
    # to:   /s/   /p/   /ie:/  /tS/
    [0.0,  0.9,  0.1,   0.0],   # from /s/
    [0.0,  0.0,  0.95,  0.05],  # from /p/
    [0.0,  0.0,  0.2,   0.8],   # from /ie:/
    [0.0,  0.0,  0.0,   1.0]    # from /tS/ (terminal loop to itself if needed)
], dtype=float)

# (c) Emission probability matrix: P(observation | state)
# rows = states, columns = observations
B = np.array([
    # Energy Pitch Duration
    [0.7,   0.2,  0.1],   # /s/
    [0.5,   0.3,  0.2],   # /p/
    [0.3,   0.5,  0.2],   # /ie:/
    [0.4,   0.4,  0.2]    # /tS/
], dtype=float)

# Utility dictionaries for indices
state_idx = {s: i for i, s in enumerate(states)}
obs_idx = {o: i for i, o in enumerate(observations)}

# 3) Display function for matrices and initial probabilities
def display_hmm(pi, A, B, states, observations):
    print("Initial probabilities (pi):")
    display_df = pd.DataFrame(pi.reshape(1, -1), columns=states)
    print(display_df.to_string(index=False))
    print("\nTransition matrix A (rows = from-state, cols = to-state):")
    dfA = pd.DataFrame(A, index=states, columns=states)
    print(dfA.to_string())
    print("\nEmission matrix B (rows = state, cols = observation):")
    dfB = pd.DataFrame(B, index=states, columns=observations)
    print(dfB.to_string())

# 4) Sampling: generate a single sequence of states and observations
def sample_hmm(pi, A, B, states, observations, max_len=4):
    """
    Generate a sequence of hidden states (phonemes) and observations (symbols).
    For the word 'speech' we set max_len=4 (s, p, ie:, tS) by default.
    """
    curr_state = np.random.choice(states, p=pi)  # should be '/s/' as pi has all mass there
    state_sequence = [curr_state]
    obs_sequence = []
    # Emit observation for the starting state
    s_idx = state_idx[curr_state]
    obs_symbol = np.random.choice(observations, p=B[s_idx])
    obs_sequence.append(obs_symbol)

    for _ in range(1, max_len):
        # transition from current state
        from_idx = state_idx[curr_state]
        next_state = np.random.choice(states, p=A[from_idx])
        state_sequence.append(next_state)
        # emit observation from next_state
        obs_symbol = np.random.choice(observations, p=B[state_idx[next_state]])
        obs_sequence.append(obs_symbol)
        curr_state = next_state
    return state_sequence, obs_sequence

# 5) Viterbi algorithm for inference (most likely state path given observations)
def viterbi(pi, A, B, obs_seq, states, observations):
    T = len(obs_seq)
    N = len(states)
    # index observation sequence
    obs_indices = [obs_idx[o] for o in obs_seq]

    # delta[t, i] = highest log-prob of any path that ends in state i at time t
    # psi[t, i] = argmax previous state at time t-1 leading to i
    log_pi = np.log(pi + 1e-12)          # small epsilon avoid log(0)
    log_A = np.log(A + 1e-12)
    log_B = np.log(B + 1e-12)

    delta = np.full((T, N), -np.inf)
    psi = np.zeros((T, N), dtype=int)

    # initialization t=0
    delta[0] = log_pi + log_B[:, obs_indices[0]]
    psi[0] = 0

    # recursion
    for t in range(1, T):
        for j in range(N):
            # compute max over i: delta[t-1,i] + log_A[i,j]
            scores = delta[t-1] + log_A[:, j]
            psi[t, j] = np.argmax(scores)
            delta[t, j] = scores[psi[t, j]] + log_B[j, obs_indices[t]]

    # termination - backtrack
    path = np.zeros(T, dtype=int)
    path[T-1] = np.argmax(delta[T-1])
    for t in range(T-2, -1, -1):
        path[t] = psi[t+1, path[t+1]]

    decoded_states = [states[i] for i in path]
    max_log_prob = np.max(delta[T-1])
    return decoded_states, max_log_prob, delta, psi

# === Run everything and print outputs ===
print("=== Hidden Markov Model parameters ===")
display_hmm(pi, A, B, states, observations)

print("\n=== Sampling a single sequence (phonemes + observations) ===")
gen_states, gen_obs = sample_hmm(pi, A, B, states, observations, max_len=4)
print("Generated phoneme sequence:", gen_states)
print("Generated observation sequence:", gen_obs)

# Run Viterbi to infer states from the observations
decoded_states, log_prob, delta, psi = viterbi(pi, A, B, gen_obs, states, observations)
print("\n=== Viterbi inference ===")
print("Observations given to Viterbi     :", gen_obs)
print("Viterbi-decoded phoneme sequence :", decoded_states)
print(f"Log-probability of decoded path  : {log_prob:.4f}")

# Show comparison
correct = all(a == b for a, b in zip(gen_states, decoded_states))
print("\nDo the generated (true) states match the Viterbi-decoded states? ->", correct)

# (Optional) show the delta table as a pandas DataFrame for inspection
delta_df = pd.DataFrame(delta, columns=states)
print("\nDelta (log probs) at each time step (rows = time t):")
print(delta_df.to_string(float_format=lambda x: f"{x:.3f}"))

=== Hidden Markov Model parameters ===
Initial probabilities (pi):
 /s/  /p/  /ie:/  /tS/
 1.0  0.0    0.0   0.0

Transition matrix A (rows = from-state, cols = to-state):
       /s/  /p/  /ie:/  /tS/
/s/    0.0  0.9   0.10  0.00
/p/    0.0  0.0   0.95  0.05
/ie:/  0.0  0.0   0.20  0.80
/tS/   0.0  0.0   0.00  1.00

Emission matrix B (rows = state, cols = observation):
       Energy  Pitch  Duration
/s/       0.7    0.2       0.1
/p/       0.5    0.3       0.2
/ie:/     0.3    0.5       0.2
/tS/      0.4    0.4       0.2

=== Sampling a single sequence (phonemes + observations) ===
Generated phoneme sequence: [np.str_('/s/'), np.str_('/p/'), np.str_('/ie:/'), np.str_('/ie:/')]
Generated observation sequence: [np.str_('Duration'), np.str_('Pitch'), np.str_('Energy'), np.str_('Duration')]

=== Viterbi inference ===
Observations given to Viterbi     : [np.str_('Duration'), np.str_('Pitch'), np.str_('Energy'), np.str_('Duration')]
Viterbi-decoded phoneme sequence : ['/s/', '/p/', '/ie:/', 