# Classification with Hidden Markov Models

## Setup

In [1]:
import pandas as pd
import numpy as np
from hmmlearn import hmm
import pickle

DATA_PICKLE_FILE = "./data/genrated_data_1657979568.pkl"

In [9]:
def fit_with_known_no_hidden_states(samples: list[np.ndarray], no_hidden_states: int, trials: int) -> hmm.GaussianHMM:
    """Fit GaussianHMM when number of hidden states is known.
    EM Algorithm will stuck in local optima, so it is recommended to try to fit model multiple times 
    and select the one with highest score. Number of iterations is set via `trials` argument.
    """
    lengths = [len(sample) for sample in samples]
    X = np.concatenate(samples)
    best_model = None
    best_score = -np.infty
    for t in range(trials):
        remodel = hmm.GaussianHMM(n_components = no_hidden_states).fit(X, lengths)
        if not remodel.monitor_.converged:
            print(f"Model in trial {t} didn't converge.") 
        score = remodel.score(X)
        if score > best_score:
            best_model = remodel
            best_score = score
    return best_model

def AIC(samples: list[np.ndarray], model: hmm.BaseHMM) -> float:
    """Akaike Information Criterion implemented for Hidden Markov Model."""
    lengths = [len(sample) for sample in samples]
    X = np.concatenate(samples)
    loglik = model.score(X, lengths)
    k = model.n_components
    return 2*k - 2*loglik

def BIC(samples: list[np.ndarray], model: hmm.BaseHMM) -> float:
    """Bayesian Information Criterion implemented for Hidden Markov Model."""
    lengths = [len(sample) for sample in samples]
    X = np.concatenate(samples)
    loglik = model.score(X, lengths)
    k = model.n_components
    n = sum(lengths) # TODO jak to dziala jak to rozbijam na czesci?
    return k*np.log(n) - 2*loglik

def fit_and_compute_criteria(samples: list[np.ndarray], search_space: list[int], trials: int) -> dict:
    """Return dict containing models and corresponding information criteria values."""
    models = [fit_with_known_no_hidden_states(samples, nhs, trials) for nhs in search_space]
    AICs = [AIC(samples, model) for model in models]
    BICs = [BIC(samples, model) for model in models]
    return {"models": models, "AIC": AICs, "BIC": BICs}

def fit_best_model(samples: list[np.ndarray], search_space: list[int], trials: int = 10, criterion: str = "AIC") -> hmm.BaseHMM:
    """Fit hmm.GaussianHMM models for all number of hidden states in `search_space` and return the one selected
    by given criterion."""
    results = fit_and_compute_criteria(samples, search_space, trials)
    best_model_id = np.argmin(results[criterion]) # Information criteria are minimized, not maximized.
    return results["models"][best_model_id]

def classify_sample(X: np.ndarray, fitted_models: list[hmm.BaseHMM]) -> int:
    """Return id of fitted model with best log-likelihood given (one) sample X."""
    scores = [model.score(X) for model in fitted_models]
    return np.argmax(scores)

## Load data and prepare train and test splits

In [21]:
with open(DATA_PICKLE_FILE, "rb") as f:
    data = pickle.load(f)

all_X_samples = data["all_X_samples"]
labels_df = data["labels_df"]
data.keys()

dict_keys(['models_lst', 'labels_df', 'all_X_samples', 'all_Z_samples'])

In [4]:
TRAIN_SUBSET_SIZE = 0.8 # stratified sampling
train_samples_ids = []
test_samples_ids = []
for label, sub_df in labels_df.groupby("true_label"):
    n = sub_df.shape[0]
    train_size = int(n*TRAIN_SUBSET_SIZE)
    test_size = n - train_size
    if test_size < 1:
        raise Exception(f"For {TRAIN_SUBSET_SIZE = } and {n = } test subset in group {label} is empty.")
    train_ids = sub_df.sample(8).index.values
    train_samples_ids.extend(train_ids)
    test_samples_ids.extend(sub_df.drop(train_ids).index.values)


## Fit the models

In [20]:
SEARCH_SPACE = list(range(1,6)) # numbers of hidden states to try

fitted_models = dict()
for label, sub_df in labels_df.loc[train_samples_ids].groupby("true_label"):
    int_ids = sub_df.index.values.astype('int')
    X = [all_X_samples[id] for id in int_ids]
    fitted_models[label] = fit_best_model(X, SEARCH_SPACE)
    

### Compare fitted models and true models

In [25]:
true_models = data["models_lst"]
for i in range(len(fitted_models)):
    true_model = true_models[i]
    fitted_model = fitted_models[i]
    print(f"Samples labeled as {i}: generated with k = {true_model.n_components}; results in fitted model with k = {fitted_model.n_components}.")

Samples labeled as 0: generated with k = 2; results in fitted model with k = 2.
Samples labeled as 1: generated with k = 2; results in fitted model with k = 2.
Samples labeled as 2: generated with k = 2; results in fitted model with k = 4.
Samples labeled as 3: generated with k = 1; results in fitted model with k = 4.
Samples labeled as 4: generated with k = 1; results in fitted model with k = 2.
Samples labeled as 5: generated with k = 1; results in fitted model with k = 2.
Samples labeled as 6: generated with k = 5; results in fitted model with k = 5.
Samples labeled as 7: generated with k = 3; results in fitted model with k = 4.
Samples labeled as 8: generated with k = 3; results in fitted model with k = 3.


## Classify test samples