# Cross-validated logL from HMM object

In [3]:
import sys
import re
from contextlib import redirect_stderr
from io import StringIO
from tqdm import tqdm

In [None]:
# ─── Settings ────────────────────────────────────────────────────────────────────
INPUT_DIR     = "/home/jovyan/narratives-project/shirer_components"
DATASET       = "timeseries"
STATE_COUNTS  = [10, 14]
N_SPLITS      = 5
MAX_ITER      = 500
RANDOM_SEED   = 42
# ─────────────────────────────────────────────────────────────────────────────────

def load_data(input_dir):
    data = []
    for path in sorted(glob.glob(os.path.join(input_dir, "*.h5"))):
        with h5py.File(path, "r") as f:
            ts = f[DATASET][()].T
        ts = StandardScaler().fit_transform(ts)
        data.append(ts)
    return data
    
def fit_hmm_with_progress(X, lengths, k, max_iter=500):
    """
    Fit GaussianHMM with a tqdm progress bar on EM iterations.
    """
    bar = tqdm(total=max_iter, desc=f"  EM k={k}", position=0)

    class EMLogger:
        def __init__(self):
            self.iter_regex = re.compile(r"^\s*Iter (\d+):.*")

        def write(self, msg):
            match = self.iter_regex.search(msg)
            if match:
                current_iter = int(match.group(1))
                bar.n = current_iter
                bar.refresh()

        def flush(self):  # required for redirect_stderr
            pass

    stderr_logger = EMLogger()
    model = hmm.GaussianHMM(n_components=k, covariance_type='full', n_iter=max_iter, verbose=True)
    with redirect_stderr(stderr_logger):
        model.fit(X, lengths)
    bar.n = max_iter
    bar.refresh()
    bar.close()
    return model
    
def compute_cv_loglik(data, k, n_splits=N_SPLITS):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=RANDOM_SEED)
    scores = []

    for fold, (train_idx, test_idx) in enumerate(kf.split(data), 1):
        train = [data[i] for i in train_idx]
        test  = [data[i] for i in test_idx]

        X_train = np.vstack(train)
        lengths_train = [d.shape[0] for d in train]

        model = fit_hmm_with_progress(X_train, lengths_train, k, MAX_ITER)

        X_test = np.vstack(test)
        lengths_test = [d.shape[0] for d in test]
        try:
            logL = model.score(X_test, lengths_test)
            scores.append(logL)
            print(f"  Fold {fold}: logL = {logL:.1f}")
        except Exception as e:
            print(f"  [ERROR] Fold {fold} for k={k}: {e}")

    return np.mean(scores), np.std(scores)

def main():
    data = load_data(INPUT_DIR)
    print(f"✔ Loaded {len(data)} subjects")

    cv_results = []

    for k in STATE_COUNTS:
        print(f"\n=== Running CV for k={k} ===")
        mean_cvll, std_cvll = compute_cv_loglik(data, k)
        print(f"→ k={k}: Mean CVLL = {mean_cvll:.2f}, SD = {std_cvll:.2f}")
        cv_results.append({
            "k": k,
            "mean_cvll": mean_cvll,
            "std_cvll": std_cvll
        })

    # Sort and report best
    cv_results.sort(key=lambda x: x["mean_cvll"], reverse=True)
    print("\n=== Summary ===")
    for res in cv_results:
        print(f"k={res['k']:>2} | CVLL = {res['mean_cvll']:.2f} ± {res['std_cvll']:.2f}")

    best = cv_results[0]
    print(f"\n✅ Best model: k={best['k']} (CVLL = {best['mean_cvll']:.2f})")

if __name__ == "__main__":
    main()

✔ Loaded 75 subjects

=== Running CV for k=10 ===


  EM k=10:   0%|          | 0/500 [00:00<?, ?it/s]