# HMM Continuous Timeseries Annotation Dataset

In [31]:
import numpy as np
import pandas as pd
import os
import sys
from sklearn.metrics import confusion_matrix

In [32]:
dataset_name = "hmm_continuous"

In [33]:
output_dir = f'./../../processed/{dataset_name}/'
outp_fname = os.path.join(output_dir, f'{dataset_name}.csv')

In [34]:

def generate_hmm_parameters(M: int, K: int, D: int, min_stay_prob: float, max_stay_prob: float):
    """
    Generate Hidden Markov Model (HMM) parameters.

    Args:
        M (int): Number of states.
        K (int): Number of Gaussians in the Gaussian Mixture Model (GMM).
        D (int): Number of dimensions in the observation.
        min_stay_prob (float): Minimum probability of staying in the same state.
        max_stay_prob (float): Maximum probability of staying in the same state.

    Returns:
        pi (np.ndarray): Initial state distribution. Shape is (M,).
        A (np.ndarray): State transition matrix. Shape is (M, M).
        R (np.ndarray): Mixture of proportions. Shape is (M, K).
        mu (np.ndarray): Means of the Gaussians. Shape is (M, K, D).
        sigma (np.ndarray): Covariances of the Gaussians. Shape is (M, K, D, D).
    """
    pi = np.ones(M) / M # initial state distribution (uniform)
    
    A = np.zeros((M, M))
    for i in range(M):
        stay_prob = np.random.uniform(min_stay_prob, max_stay_prob)
        A[i, :] += (1 - stay_prob) / (M - 1)
        A[i, i] = stay_prob # overwrite diagonals

    
    
    R = np.random.dirichlet(np.ones(K), size=M)
    # R = np.ones((M, K)) / K # mixture proportions

    
    mu = np.random.randn(M, K, D)
    
    sigma = np.zeros((M, K, D, D))
    for m in range(M):
        for k in range(K):
            sigma[m, k, :, :] = np.diag(np.random.rand(D))
    return pi, A, R, mu, sigma

In [35]:
def generate_hmm_samples(
    N: int,
    T: int,
    pi: np.ndarray,
    A: np.ndarray,
    R: np.ndarray,
    mu: np.ndarray,
    sigma: np.ndarray,
):
    """
    Generate dataset from the Hidden Markov Model (HMM).

    Args:
        N (int): Number of samples to generate.
        T (int): Length of each sample.
        pi (np.ndarray): Initial state distribution. Shape is (M,).
        A (np.ndarray): State transition matrix. Shape is (M, M).
        R (np.ndarray): Mixture of proportions. Shape is (M, K).
        mu (np.ndarray): Means of the Gaussians. Shape is (M, K, D).
        sigma (np.ndarray): Covariances of the Gaussians. Shape is (M, K, D, D).
        
        where
            M: number of states
            K: number of Gaussians in the GMM
            D: number of dimensions in the observation

    Returns:
        observations (np.ndarray): Generated observations. Shape is (N, T, D).
        labels (np.ndarray): State labels for each timestep. Shape is (N, T).
    """
    M, K, D = mu.shape
    observations = np.zeros((N, T, D))
    labels = np.zeros((N, T), dtype=int)
    
    for n in range(N):
        states = np.zeros(T, dtype=int)
        obs = np.zeros((T, D))
        # starting state
        states[0] = np.random.choice(M, p=pi)
        
        # rest of the states over the sequence
        for t in range(1, T):
            states[t] = np.random.choice(M, p=A[states[t-1]])
        
        # observations over the sequence
        for t in range(T):
            state = states[t]
            component = np.random.choice(K, p=R[state])
            obs[t] = np.random.multivariate_normal(mu[state, component], sigma[state, component])
        
        observations[n] = obs
        labels[n] = states
        
        if n % 100 == 0 and n > 0:
            print(f"Generated {n} samples...")
    
    print("Done generating samples.")
    return observations, labels

In [36]:
def adjust_variable_length(
        observations: np.ndarray,
        labels: np.ndarray,
        min_len: int,
        max_len: int
    ):
    """
    Adjust samples and labels to have variable lengths.

    Args:
        observations (np.ndarray): Generated observations. Shape is (N, T, D).
        labels (np.ndarray): State labels for each timestep. Shape is (N, T).
        min_len (int): Minimum length of the samples.
        max_len (int): Maximum length of the samples.

    Returns:
        var_len_observations (list of np.ndarray): List of observations with variable lengths.
        var_len_labels (list of np.ndarray): List of labels with variable lengths.
    """
    N, T, D = observations.shape
    var_len_observations = []
    var_len_labels = []

    for i in range(N):
        length = np.random.randint(min_len, max_len + 1)
        if length > T:
            raise ValueError(
                f"Maximum length {max_len} cannot be greater than the original length {T}"
            )
        start_idx = np.random.randint(0, T - length + 1)
        var_len_observations.append(observations[i, start_idx:start_idx + length, :])
        var_len_labels.append(labels[i, start_idx:start_idx + length])
    
    return var_len_observations, var_len_labels

In [37]:
def convert_to_dataframe(observations: list, labels: list) -> pd.DataFrame:
    """
    Convert the variable length observations and labels to a pandas DataFrame.

    Args:
        observations (list of np.ndarray): List of observations with variable lengths.
        labels (list of np.ndarray): List of labels with variable lengths.

    Returns:
        pd.DataFrame: DataFrame containing the data with columns for sample_id, label,
                      and observation dimensions.
    """
    data = []
    for i, (obs, lbl) in enumerate(zip(observations, labels)):
        for obs_t, lbl_t in zip(obs, lbl):
            row = {"sample_id": i, "label": int(lbl_t)}
            row.update({
                f"observation_dim_{d+1}": float(obs_t[d]) for d in range(obs.shape[1])
            })
            data.append(row)
    
    df = pd.DataFrame(data)
    return df

In [38]:
def generate_hmm_dataset(
        N: int,
        min_seq_len: int,
        max_seq_len: int,
        M: int,
        K: int,
        D: int,
        min_stay_prob: float,
        max_stay_prob: float
    ) -> pd.DataFrame:
    """
    Generate HMM dataset with variable length sequences and return as pandas DataFrame.

    Args:
        N (int): Number of samples to generate.
        min_seq_len (int): Minimum length of the sequences. Must be >= 10
        max_seq_len (int): Maximum length of the sequences.
        M (int): Number of states.
        K (int): Number of Gaussians in the Gaussian Mixture Model (GMM).
        D (int): Number of dimensions in the observation.
        min_stay_prob (float): Minimum probability of staying in the same state.
        max_stay_prob (float): Maximum probability of staying in the same state.

    Returns:
        pd.DataFrame: DataFrame containing the generated dataset.
    """
    pi, A, R, mu, sigma = generate_hmm_parameters(M, K, D, min_stay_prob, max_stay_prob)
    assert min_seq_len >= 10, "Minimum sequence length should be >= 10"
    assert min_seq_len <= max_seq_len, "Minimum sequence length cannot be greater than maximum sequence length"
    observations, labels = generate_hmm_samples(
        N=N,
        T=max_seq_len,
        pi=pi,
        A=A,
        R=R,
        mu=mu,
        sigma=sigma,
    )
    var_len_observations, var_len_labels = adjust_variable_length(observations, labels, min_seq_len, max_seq_len)
    df = convert_to_dataframe(var_len_observations, var_len_labels)
    return df

In [39]:
def set_seeds(seed=888):
    np.random.seed(seed)

In [40]:
N = 300
min_seq_len = 50
max_seq_len = N
# number of states
M = 4
# number of Gaussians
K = 5
# dimensionality of data
D = 3
min_stay_prob = 0.8
max_stay_prob = 0.99

set_seeds(1)
data = generate_hmm_dataset(
    N=N,
    min_seq_len=min_seq_len,
    max_seq_len=max_seq_len,
    M=M,
    K=K,
    D=D,
    min_stay_prob=min_stay_prob,
    max_stay_prob=max_stay_prob,
)


Generated 100 samples...
Generated 200 samples...
Done generating samples.


In [41]:
data.shape

(51534, 5)

In [42]:
data.head()

Unnamed: 0,sample_id,label,observation_dim_1,observation_dim_2,observation_dim_3
0,0,1,0.323794,-1.134339,-0.988766
1,0,1,-0.673589,0.29193,-1.163936
2,0,1,0.141649,2.867647,1.001803
3,0,1,0.10012,0.096912,0.70618
4,0,1,0.536475,2.487519,0.107215


# Save Main Data File

In [43]:
os.makedirs(output_dir, exist_ok=True)

data.to_csv(outp_fname, index=False, float_format="%.4f")