In [1]:
import numpy as np
import pandas as pd
from hmmlearn import hmm
from scipy.stats import multivariate_normal
import pickle
import seaborn as sns

In [2]:
digits = [ d for d in range(10) ]
names = ["george", "jackson", "lucas", "nicolas", "theo", "yweweler"]

In [3]:
with open('preprocessed data.pkl', 'rb') as file:
    df = pickle.load(file)

## Hidden Markov Models
### Q1. what are states and observations in the given task?
states represent the hidden sequence of discrete values that model can be in at each time. Observations are the visible outputs generated given the hidden states. In this task observations are the MFCC frames and states are the way different speakers pronounces different numbers.
### Q2. First-order HMMs.
It's called first-order because future states are conditionally independent of the past states given the present state.
### Q3. In which contents HMMs are useful and why?
tasks that involve sequential data or time-series analysis, especially when the underlying process is not directly observable. because model is based on states and observations. For example speech recognition, NLP, handwriting recognition, predictions over time and music analysis.
### Q4.
**Pros:**
1. Efficient handling of sequential data.
2. Flexibility in modeling various scenarios and processes with hidden states.
3. Ability to handle missing or noisy data.

**Cons:**
1. it finds a local extermum and not the global one. meaning that there is no guarantee it leads us to the optimal answer but the answer is often reasonable enough.
2. assumptions may lead to loosing important scenarios.
3. Limited in modeling long-term dependencies without modifications or extensions.
### Q5. different types of HMMs:
1. First-order: explained before.
2. Fully-connected: all the states are connected to one another and it's more complicated than our model here.
3. Continuous-time: state transitions and observations can occur at any time in a continuous time space.
4. Hierarchical: These models have multiple levels of hidden states with hierarchical relationships. Useful for modeling complex data with multiple levels of abstraction.

In [4]:
class HMM:
    def __init__(self, num_hidden_states):
        self.num_hidden_states = num_hidden_states
        self.rand_state = np.random.RandomState(1)

        self.initial_prob = self._normalize(self.rand_state.rand(self.num_hidden_states, 1))
        self.transition_matrix = self._stochasticize(self.rand_state.rand(self.num_hidden_states, self.num_hidden_states))
        
        self.mean = None
        self.covariances = None
        self.num_dimensions = None

    def _forward(self, observation_matrix):
        log_likelihood = 0.
        T = observation_matrix.shape[1]
        alpha = np.zeros(observation_matrix.shape)

        for t in range(T):
            if t == 0:
                alpha[:, t] = self.initial_prob[:, t] * observation_matrix[:, t]

            else:        
                for s in range(self.num_hidden_states):
                    if observation_matrix[s][t] == 0: # check if it's possible to observe 'O-t' given 's"
                        alpha[s][t] = 0; continue
                    
                    sum = 0
                    for adj in range(self.num_hidden_states):
                        if self.transition_matrix[adj][s]: # check if it's possible to get to 's' from 'adj'
                            sum += alpha[adj][t-1] * self.transition_matrix[adj][s]
                    alpha[s][t] = sum * observation_matrix[s][t]

            alpha_sum = np.sum(alpha[:, t])
            alpha[:, t] /= alpha_sum
            log_likelihood += np.log(alpha_sum)

        return log_likelihood, alpha
    

    def _backward(self, observation_matrix):
        T = observation_matrix.shape[1]
        beta = np.zeros(observation_matrix.shape)

        beta[:, -1] = np.ones(observation_matrix.shape[0])

        for t in reversed(range(T - 1)):
            for s in range(self.num_hidden_states):
                sum = 0
                for adj in range(self.num_hidden_states):
                    sum += beta[adj][t+1] * self.transition_matrix[adj][s] * observation_matrix[adj][t+1]
                beta[s][t] = sum
                    
            beta[:, t] /= np.sum(beta[:, t])

        return beta
    

    def _state_likelihood(self, obs):
        obs = np.atleast_2d(obs)
        B = np.zeros((self.num_hidden_states, obs.shape[1]))

        for s in range(self.num_hidden_states):
            np.random.seed(self.rand_state.randint(1))
            B[s, :] = multivariate_normal.pdf(obs.T, mean=self.mean[:, s], cov=self.covariances[:, :, s])

        return B
    

    def _normalize(self, x):
        return (x + (x == 0)) / np.sum(x)


    def _stochasticize(self, x):
        return (x + (x == 0)) / np.sum(x, axis=1)


    def _em_init(self, obs):
        if self.num_dimensions is None:
            self.num_dimensions = obs.shape[0]
        if self.mean is None:
            subset = self.rand_state.choice(np.arange(self.num_dimensions), size=self.num_hidden_states, replace=False)
            self.mean = obs[:, subset]
        if self.covariances is None:
            self.covariances = np.zeros((self.num_dimensions, self.num_dimensions, self.num_hidden_states))
            self.covariances += np.diag(np.diag(np.cov(obs)))[:, :, None]

        return self
    
    
    def _em_step(self, obs):
        obs = np.atleast_2d(obs)
        T = obs.shape[1]

        B = self._state_likelihood(obs=obs)

        log_likelihood, alpha = self._forward(observation_matrix=B)
        beta = self._backward(observation_matrix=B)

        xi_sum = np.zeros((self.num_hidden_states, self.num_hidden_states))
        gamma = np.zeros((self.num_hidden_states, T))

        for t in range(T-1):
            partial_sum = alpha[:,t] @ ((beta[:,t+1] * B[:,t+1]).T * self.transition_matrix)
            xi_sum += self._normalize(partial_sum)
        
        for t in range(T):
            gamma[:, t] = self._normalize(alpha[:, t] * beta[:, t])
        
        expected_prior = gamma[:, 0].reshape(-1, 1)
        expected_transition = self._stochasticize(xi_sum)
        
        expected_covariances = np.zeros((self.num_dimensions, self.num_dimensions, self.num_hidden_states))
        expected_covariances += .01 * np.eye(self.num_dimensions)[:, :, None]

        gamma_state_sum = np.sum(gamma, axis=1)
        gamma_state_sum = gamma_state_sum + (gamma_state_sum == 0)

        expected_mean = np.zeros((self.num_dimensions, self.num_hidden_states))
        for s in range(self.num_hidden_states):
            gamma_obs = obs * gamma[s, :]
            expected_mean[:, s] = np.sum(gamma_obs, axis=1) / gamma_state_sum[s]

        self.initial_prob = expected_prior
        self.mean = expected_mean
        self.transition_matrix = expected_transition

        return log_likelihood


    def train(self, obs, num_iterations=1):
        for i in range(num_iterations):
            self._em_init(obs)
            self._em_step(obs)
        return self


    def score(self, obs):
        B = self._state_likelihood(obs)
        log_likelihood, _ = self._forward(B)
        return log_likelihood


## Train and test models

In [5]:
NUM_TRAIN_SAMPLES = 40
NUM_STATES = 13
NUM_ITER = 2

### Train Models

In [6]:
def train_model(col, value, implementing_method):
    df_value = df[df[col] == value]
    df_value = df_value[df_value["repeat num"] < NUM_TRAIN_SAMPLES]
    df_value = df_value.reset_index()
    mfccs = [df_value["MFCCs"][i] for i in range(len(df_value))]
    obs = np.concatenate(mfccs, axis=1)
    
    if implementing_method == "library":
        lengths = [mfcc.shape[1] for mfcc in mfccs]
        hmm_model = hmm.GaussianHMM(n_components=NUM_STATES)
        hmm_model.fit(obs.T, lengths=lengths)
        
    elif implementing_method == "scratch":
        hmm_model = HMM(num_hidden_states=NUM_STATES)
        hmm_model = hmm_model.train(obs=obs, num_iterations=NUM_ITER)
        
    return hmm_model
        
def train_models(col, values, method):
    hmm_models = dict()
    for value in values:
        hmm_models[value] = train_model(col=col, value=value, implementing_method=method)
    return hmm_models


### Test models

In [7]:
def create_df_test(col, value):
    return df[(df[col] == value) & (df["repeat num"] >= NUM_TRAIN_SAMPLES)].reset_index()


detected_d_l, detected_d_s = [[0 for _ in digits] for _ in range(2)]
detected_s_l, detected_s_s = [[0 for _ in names] for _ in range(2)]
def test_models(models_l, models_s, col, values, value):
    df_test = create_df_test(col=col, value=value)

    passed_l, passed_s = 0, 0

    for i in range(len(df_test)):
        obs = df_test["MFCCs"][i]
        scores_l = [ [models_l[v].score(obs.T) for v in values] ]
        scores_s = [ [models_s[v].score(obs) for v in values] ]

        if values[np.argmax(scores_l)] == value:
            passed_l += 1
        if values[np.argmax(scores_s)] == value:
            passed_s += 1
        if col == "digit":
            detected_d_l[np.argmax(scores_l)] += 1
            detected_d_s[np.argmax(scores_s)] += 1
        elif col == "speaker":
            detected_s_l[np.argmax(scores_l)] += 1
            detected_s_s[np.argmax(scores_s)] += 1

    failed_l, failed_s = len(df_test) - passed_l, len(df_test) - passed_s
    
    return [passed_l, failed_l, passed_s, failed_s, len(df_test)]


def run_tests(models_l, models_s, col, values):
    result = dict()
    for value in values:
        result[value] = test_models(models_l=models_l, models_s=models_s, col=col, values=values, value=value)
    return result


def show_result(result):
    df_res = pd.DataFrame(data=result)
    df_res = df_res.transpose()
    cols = {0: "passed (l)", 1: "failed (l)",
            2: "passed (s)", 3: "failed (s)",
            4: "tests count"}
    df_res.rename(columns=cols, inplace = True)
    for col in df_res.columns:
        if "passed" in col or "failed" in col or "count" in col:
            df_res[col] = df_res[col].astype(int)
    return df_res

In [8]:
digit_model_l = train_models(col="digit", values=digits, method="library")

In [9]:
digit_model_s = train_models(col="digit", values=digits, method="scratch")

In [10]:
speaker_model_l = train_models(col="speaker", values=names, method="library")

In [11]:
speaker_model_s = train_models(col="speaker", values=names, method="scratch")

In [12]:
result_digit = run_tests(models_s=digit_model_s, models_l=digit_model_l, col="digit", values=digits)

In [13]:
result_speaker = run_tests(models_s=speaker_model_s, models_l=speaker_model_l, col="speaker", values=names)

In [14]:
df_res_d = show_result(result=result_digit)
df_res_d["detected (l)"] = detected_d_l
df_res_d["detected (s)"] = detected_d_s

df_res_s = show_result(result=result_speaker)
df_res_s["detected (l)"] = detected_s_l
df_res_s["detected (s)"] = detected_s_s

## Saving data
Helps us to save time in the future step.s Also provides transfering data to other notebooks.

In [15]:
with open('result speaker.pkl', 'wb') as file:
    pickle.dump(df_res_s, file)
with open('result digit.pkl', 'wb') as file:
    pickle.dump(df_res_d, file)