In [5]:
import os
import itertools
os.environ['OMP_NUM_THREADS'] = '1'
import numpy as np
import mne
from mne_connectivity import symmetric_orth
from hmmlearn import hmm
from scipy.signal import hilbert  # For Hilbert transform
from scipy.signal import resample, butter, lfilter # For downsampling
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import networkx as nx
import seaborn as sns
from scipy.optimize import fminbound

from markovian_helpers import downsample_with_filtering, apply_orthogonalization
from numba import njit, jit

In [6]:
class VariationalHMM:
    
    def __init__(self, n_states, data):
        self.n_states = n_states
        self.data = data

        # Initialize priors (adjust if you have prior knowledge)
        self.init_distrib = np.ones(n_states) / n_states
        self.trans_distrib = np.ones((n_states, n_states)) / n_states
        self.emission_means = np.random.randn(n_states, data.shape[1])
        self.emission_covs = np.ones((n_states, data.shape[1]))
#     @staticmethod
#     @njit
    def elbo(self, q):  # Evidence Lower Bound

        # Expected log-likelihood: This measures how well the model with its approximate posterior distribution q explains the observed data
        expected_log_likelihood = 0
        for t in range(len(self.data)):
            for i in range(self.n_states):
                expected_log_likelihood += q[i, t] * self._emission_logprob(self.data[t], i)

        # Entropy of the approximate posterior: Measures the uncertainty in our approximate posterior distribution q
        entropy_q = 0
        for t in range(len(self.data)):
            for i in range(self.n_states):
                entropy_q -= q[i, t] * np.log(q[i, t] + 1e-10)

        # KL Divergence (between approximate and true posterior): This term penalizes the divergence of our approximation q from the true, intractable posterior distribution over model parameters
        kl_divergence = 0
        for t in range(len(self.data)):
            for i in range(self.n_states):
                kl_divergence += q[i, t] * np.log(np.maximum(q[i, t], 1e-10) / q[i, t])  # Using q as an approximation

                return expected_log_likelihood + entropy_q - kl_divergence

    # Free energy = -ELBO (expected log-likelihood - Kl divergence)
#     @staticmethod
#     @njit
    def free_energy(self, q):
        return -self.elbo(q)
  
#     @staticmethod
#     @njit(forceobj=True, nopython=False)
    def fit(self, max_iter=100):
        q = None  # Initialize q to None
        for _ in range(max_iter):
            print("loop")
            # E-step: Update q(z) using current model parameters
            print("Running e step")
            q = self._e_step()

            # M-step: Update model parameters using current q(z)
            print("Running m step")
            self._m_step(q)

            print(f"ELBO: {self.elbo(q)}")  # Commented out to avoid cluttering the output
        return q  # Return the final q after fitting
#     @staticmethod
#     @njit
    def normalize_logprobs(log_probs):
        max_log_prob = np.max(log_probs)
        return log_probs - max_log_prob - np.log(np.sum(np.exp(log_probs - max_log_prob)))

#     @staticmethod
#     @njit(forceobj = True)
    def _e_step(self):
        log_likelihoods = np.zeros((self.n_states, len(self.data)))

        # Forward Pass (Calculate alphas in log domain)
        for t in range(len(self.data)):
            for i in range(self.n_states):
                log_lik = self._emission_logprob(self.data[t], i)  # Log of emission probability
                log_likelihoods[i, t] = log_lik + (np.log(self.init_distrib[i]) if t == 0
                                                   else np.logaddexp.reduce(
                    log_likelihoods[:, t - 1] + np.log(self.trans_distrib[:, i])))

        # Backward Pass (Calculate betas in log domain)
        log_betas = np.zeros((self.n_states, len(self.data)))
        log_betas[:, -1] = 0  # Initialize in log-domain

        for t in list(range(len(self.data) - 1, 0, -1)):
            for i in range(self.n_states):
                for j in range(self.n_states):
                    log_betas[i, t] = np.logaddexp(log_betas[i, t], log_betas[j, t + 1] + np.log(
                        self.trans_distrib[i, j]) + self._emission_logprob(self.data[t + 1], j))


        # Store approximate posteriors for all time steps
        q = np.zeros((self.n_states, len(self.data)))
        for t in range(len(self.data)):
            q[:, t] = np.exp(log_likelihoods[:, t] + log_betas[:, t] - self.normalize_logprobs(
                log_likelihoods[:, t] + log_betas[:, t]))

        return q
#     @staticmethod
#     @njit
    def _emission_logprob(self, x, state_idx):
        # Calculation of log p(x|z) based on Gaussian emission
        return multivariate_normal.logpdf(x, mean=self.emission_means[state_idx],
                                          cov=np.diag(self.emission_covs[state_idx]))
#     @staticmethod
#     @njit
    def _m_step(self, q):
        # Update initial distribution
        self.init_distrib = q[:, 0]  # Posteriors at t=0

        # Update transition probabilities
        expected_transitions = np.zeros((self.n_states, self.n_states))
        for t in range(len(self.data) - 1):
            for i in range(self.n_states):
                for j in range(self.n_states):
                    expected_transitions[i, j] += q[i, t] * self.trans_distrib[i, j] * self._emission_logprob(
                        self.data[t + 1], j) * q[j, t + 1]
        self.trans_distrib = expected_transitions / expected_transitions.sum(axis=1, keepdims=True)

        # Update emission means
        for i in range(self.n_states):
            self.emission_means[i] = np.sum([q[i, t] * self.data[t] for t in range(len(self.data))], axis=0) / np.sum(
                q[i])

        # Update emission covariances
        for i in range(self.n_states):
            diff = self.data - self.emission_means[i]
            # Corrected line: Initialize emission covariances to identity matrices
            self.emission_covs = np.array([np.eye(self.data.shape[1]) for _ in range(self.n_states)])


In [None]:

optimal_states_arr = []

# defining input and output directory
files_in = '../data/in/subjects/'
files_out = '../data/out/subjects/'


# loading list of subject names from txt file
names = open("./names.txt", "r")
subject_list = names.read().split('\n')
modes = ['EC', 'EO']
for subject in subject_list:
    for mode in modes:
        print(subject, mode)
        #defining input and output directories for each subject and mode
        dir_in = files_in + subject + '/' + mode + '/'
        dir_out = files_out + subject +"/" + mode +'/'

        orthogonalized_data = np.load( dir_out + "orth.npy")


        # Step 2: Determine the Optimal Number of States for the HMM

        # Compute the variance of your features to set a variance floor later
        feature_variances = np.var(orthogonalized_data, axis=0)

        # Choose a small fraction (e.g., 1% or 0.1%) of the maximum variance as the variance floor
        fraction_of_max_variance = 0.05  # Adjust as needed
        variance_floor = fraction_of_max_variance * np.max(feature_variances)

        # Handle NaNs and Infs in features (using masking)
        features = np.mean(orthogonalized_data, axis=2)
        features = np.ma.masked_invalid(features).filled(0)

        # Reshape orthogonalized_data using array.reshape (-1, 1)
        # New shape will be (samples/epochs, labels * sampling frequency)
        reshaped_data = orthogonalized_data.reshape(-1, 1)

        # Reduce dimensionality to speed up HMM fitting
        pca = PCA(n_components=0.99)  # Retain 99% of the variance

        # Fit PCA to the normalized data
        pca_data = pca.fit_transform(reshaped_data)

        # Standardize the PCA-transformed data
        scaler = StandardScaler()
        pca_data = scaler.fit_transform(pca_data)

        # Define the range of hidden states to explore
        state_numbers = range(3, 16)


        # Initialize lists to store AIC and BIC values
        aics = []
        bics = []

        # Define the range of state numbers to test based on previous literature
        state_numbers = range(3, 16)


        for n_states in state_numbers:
            print(n_states)
            # Initialize the HMM model with diagonal covariance
            model = hmm.GaussianHMM(n_components=n_states, n_iter=50, covariance_type='full', tol=1e-7, verbose=False,
                                    params='st', init_params='stmc')  # Add smoothing parameter

            # Fit the model using the PCA-transformed data
            print("Running fit")
            model.fit(pca_data)

            # Calculate AIC and BIC for the current model
            log_likelihood = model.score(pca_data)
            n_params = n_states * (2 * pca_data.shape[1] - 1)  # Adjusted for diagonal covariance
            aic = 2 * n_params - 2 * log_likelihood
            bic = np.log(pca_data.shape[0]) * n_params - 2 * log_likelihood

            # Store the AIC and BIC values
            aics.append(aic)
            bics.append(bic)
    
        # Determine the optimal number of states based on the lowest AIC and BIC
        optimal_states_aic = state_numbers[np.argmin(aics)]
        optimal_states_bic = state_numbers[np.argmin(bics)]
        
        print(optimal_states_aic, optimal_states_bic)



        # Plotting
#         plt.bar(state_numbers, free_energies)
#         plt.xlabel("Number of States")
#         plt.ylabel("Free Energy")
#         plt.show()

#         # Find the optimal number of states
#         optimal_states = state_numbers[np.argmin(free_energies)]
#         print(f"Optimal number of states based on Varitional Bayes for Subject {subject}: {optimal_states}")
#         optimal_states_arr.append({subject:optimal_states})     



101 EC
3
Running fit
4
Running fit
5
Running fit
6
Running fit
7
Running fit
8
Running fit
9
Running fit
10
Running fit
