<a href="https://colab.research.google.com/github/marcoincerti/EMHMM-Toolbox/blob/main/emhmm_toolbox_v3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
from scipy import stats
from scipy.special import logsumexp
from sklearn.mixture import GaussianMixture
from matplotlib import pyplot as plt
import pandas as pd
import numpy as np

In [None]:
def read_data(file_path):
    print(f'Reading {file_path}')
    df = pd.read_excel(file_path)

    # Get the headers
    headers = df.columns.tolist()

    # Find the header indices
    SID = headers.index('SubjectID')
    TID = headers.index('TrialID')
    FX = headers.index('FixX')
    FY = headers.index('FixY')
    FD = headers.index('FixD') if 'FixD' in headers else None

    # Check if required headers exist
    if SID == -1:
        raise ValueError('Error with SubjectID')
    print(f'- found SubjectID in column {SID + 1}')

    if TID == -1:
        raise ValueError('Error with TrialID')
    print(f'- found TrialID in column {TID + 1}')

    if FX == -1:
        raise ValueError('Error with FixX')
    print(f'- found FixX in column {FX + 1}')

    if FY == -1:
        raise ValueError('Error with FixY')
    print(f'- found FixY in column {FY + 1}')

    if FD is not None:
        print(f'- found FixD in column {FD + 1}')

    # Initialize data structures
    sid_names = []
    sid_trials = []
    data = []

    # Read data
    for _, row in df.iterrows():
        mysid = str(int(row[SID])) if np.issubdtype(type(row[SID]), np.number) else str(row[SID])
        mytid = str(int(row[TID])) if np.issubdtype(type(row[TID]), np.number) else str(row[TID])
        myfxy = [row[FX], row[FY]]

        if FD is not None:
            myfxy.append(row[FD])

        # Find subject
        s = sid_names.index(mysid) if mysid in sid_names else -1
        if s == -1:
            # New subject
            sid_names.append(mysid)
            sid_trials.append([])
            data.append([])

        # Find trial
        t = -1
        if s != -1:
            t = sid_trials[s].index(mytid) if mytid in sid_trials[s] else -1
        if t == -1:
            sid_trials[s].append(mytid)
            data[s].append([])

        # Put fixation
        data[s][t].append(myfxy)

    print(f'- found {len(sid_names)} subjects:')
    print(' '.join(sid_names))

    for i, subject in enumerate(data):
        print(f'  * subject {i + 1} had {len(subject)} trials')

    return data, sid_names, sid_trials

file_path = '/content/demodata.xls'
data, sid_names, sid_trials = read_data(file_path)

In [None]:
# Flattening the array
flattened_array = [element for sublist in data[0] for element in sublist]

d = np.array(flattened_array)



In [None]:
%pip install hmmlearn

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from hmmlearn import hmm, vhmm
import matplotlib.image as mpimg


fixation_data = d  # 2D fixation coordinates

# Load the image
img = mpimg.imread('/content/face.jpg')  # Replace 'path_to_your_image_file.jpg' with the actual image file path

# Define the number of hidden states for the HMM
n_states = 3

'''
"full": Assumes a full covariance matrix for each hidden state. This means each state can have a different covariance structure.
"diag": Assumes a diagonal covariance matrix for each hidden state. This means each state has its own variances but no covariance between dimensions.
"spherical": Assumes a single variance value for each hidden state. This means each state has a spherical covariance, where the variances are the same for all dimensions.
'''

# Create an HMM model with the specified covariance type and initialization method
'''
model = hmm.GaussianHMM(n_components=n_states,
                        covariance_type=covariance_type,
                        init_params=init_method,
                        n_iter=100,
                        verbose=True)
'''
#model = hmm.GMMHMM(n_components=n_states, random_state=42)

model = vhmm.VariationalGaussianHMM(n_components=n_states, random_state=42, algorithm='viterbi')

# Fit the HMM to the fixation data using the EM algorithm
model.fit(fixation_data)

# Generate a scanpath using the trained HMM
scanpath, _ = model.sample(len(fixation_data))

# Retrieve the estimated parameters after training
estimated_means = model.means_
estimated_covariances = model.covars_
estimated_transition_matrix = model.transmat_

# Print the estimated parameters
print("Estimated Means:")
print(estimated_means)
print("Estimated Covariance Matrices:")
print(estimated_covariances)
print("Estimated Transition Matrix:")
print(estimated_transition_matrix)

# Calculate the log-likelihood of the generated scanpath
log_likelihood = model.score(scanpath)

# Plot the original fixation data and the generated scanpath
plt.plot(fixation_data[:, 0], fixation_data[:, 1], 'ro-', label='Fixation Data')
#plt.plot(scanpath[:, 0], scanpath[:, 1], 'bo-', label='Generated Scanpath')
plt.imshow(img)  # Replace xmin, xmax, ymin, ymax with the appropriate plot limits


plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.title('Scanpath (Log-Likelihood: {:.2f})'.format(log_likelihood))
plt.legend()
plt.show()

# Plot the sampled data

fig, ax = plt.subplots()
ax.plot(fixation_data[:, 0], fixation_data[:, 1], ".-", label="observations", ms=6,
        mfc="orange", alpha=0.7)

means = model.means_
# Indicate the component numbers
for i, m in enumerate(means):
    ax.text(m[0], m[1], 'Component %i' % (i + 1),
            size=5, horizontalalignment='center',
            bbox=dict(alpha=.7, facecolor='w'))
ax.legend(loc='best')
fig.show()


# Get the state probabilities for each fixation
state_probabilities = model.predict_proba(fixation_data)

# Set up colors or marker styles based on state probabilities
colors = state_probabilities.argmax(axis=1)  # Use the state with the highest probability as color index
markers = ['o', 's', '^']  # Specify marker styles for each state

# Plot the fixation clusters
for state in range(model.n_components):
    # Get the fixations belonging to the current state
    state_fixations = fixation_data[colors == state]

    # Plot the fixations with the corresponding color and marker style
    plt.scatter(state_fixations[:, 0],
                state_fixations[:, 1],
                color='C{}'.format(state),
                marker=markers[state],
                label=f'State {state}')

# Add legend and labels to the plot
plt.legend()
plt.xlabel('X-coordinate')
plt.ylabel('Y-coordinate')

# Show the plot
plt.imshow(img)
plt.show()


'''
# Assign colors to clusters
colors = ['red', 'blue', 'green']  # Add more colors if needed

# Plot the fixation data, means, and ellipses
plt.scatter(fixation_data[:, 0], fixation_data[:, 1], c=model.predict(fixation_data), cmap=plt.cm.get_cmap('jet', n_states), label='Fixation Data')
plt.scatter(estimated_means[:, 0], estimated_means[:, 1], c=range(n_states), cmap=plt.cm.get_cmap('jet', n_states), marker='x', label='State Means')

if n_states == 2:
    # Plot the fixation data, state means, and state boundaries
    plt.scatter(fixation_data[:, 0], fixation_data[:, 1], c=model.predict(fixation_data), cmap=plt.cm.get_cmap('jet', n_states), label='Fixation Data')
    plt.scatter(estimated_means[:, 0], estimated_means[:, 1], c=range(n_states), cmap=plt.cm.get_cmap('jet', n_states), marker='x', label='State Means')

    for i in range(n_states):
        cov_matrix = estimated_covariances[i]  # Full covariance matrix
        eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
        angle = np.degrees(np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0]))  # Extract angle from eigenvectors
        width = 2 * np.sqrt(2 * eigenvalues[0])
        height = 2 * np.sqrt(2 * eigenvalues[1])
        ellipse = Ellipse(xy=estimated_means[i], width=width, height=height, angle=angle, edgecolor=colors[i], facecolor='none')
        plt.gca().add_patch(ellipse)
else:
    # Plot the fixation data, cluster means, and cluster boundaries
    plt.scatter(fixation_data[:, 0], fixation_data[:, 1], c=model.predict(fixation_data), cmap=plt.cm.get_cmap('jet', n_states), label='Fixation Data')
    plt.scatter(estimated_means[:, 0], estimated_means[:, 1], c=range(n_states), cmap=plt.cm.get_cmap('jet', n_states), marker='x', label='State Means')

    for i in range(n_states):
        cov_matrix = estimated_covariances[i]  # Full covariance matrix
        eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
        angle = np.degrees(np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0]))  # Extract angle from eigenvectors
        width = 2 * np.sqrt(2 * eigenvalues[0])
        height = 2 * np.sqrt(2 * eigenvalues[1])
        ellipse = Ellipse(xy=estimated_means[i], width=width, height=height, angle=angle, edgecolor=colors[i], facecolor='none')
        plt.gca().add_patch(ellipse)

plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.title('Fixation Data Clustering')
plt.imshow(img)
plt.legend()
plt.show()
'''


In [None]:
import collections
import numpy as np
from sklearn.utils import check_random_state

np.set_printoptions(formatter={'float_kind': "{:.3f}".format})
rs = check_random_state(42)
sample_length = len(d)
# With random initialization, it takes a few tries to find the
# best solution
num_inits = 5
num_states = np.arange(2, 4)
verbose = False
fixation_data = d
sequences = np.asarray(fixation_data)
#({"spherical", "diag", "full", "tied"}, optional)
covariance_type = "diag"
#({"viterbi", "map"}, optional)
algorithm = "viterbi"
# {"log","scaling"}
implementation = "log"

# Train a suite of models, and keep track of the best model for each
# number of states, and algorithm
best_scores = collections.defaultdict(dict)
best_models = collections.defaultdict(dict)
for n in num_states:
    for i in range(num_inits):
        vi = vhmm.VariationalGaussianHMM(n,
                                         n_iter=1000,
                                         covariance_type=covariance_type,
                                         implementation=implementation,
                                         tol=1e-6,
                                         random_state=rs,
                                         verbose=verbose)
        vi.fit(sequences, [sample_length])
        lb = vi.monitor_.history[-1]
        print(f"Training VI({n}) Variational Lower Bound={lb} "
              f"Iterations={len(vi.monitor_.history)} ")
        if best_models["VI"].get(n) is None or best_scores["VI"][n] < lb:
            best_models["VI"][n] = vi
            best_scores["VI"][n] = lb

        em = hmm.GMMHMM(n,
                        n_iter=1000,
                        covariance_type=covariance_type,
                        implementation=implementation,
                        tol=1e-6,
                        random_state=rs,
                        verbose=verbose)
        em.fit(sequences, [sample_length])
        ll = em.monitor_.history[-1]
        print(f"Training EM({n}) Final Log Likelihood={ll} "
              f"Iterations={len(vi.monitor_.history)} ")
        if best_models["EM"].get(n) is None or best_scores["EM"][n] < ll:
            best_models["EM"][n] = em
            best_scores["EM"][n] = ll

# Display the model likelihood/variational lower bound for each N
# and show the best learned model

bestModel = None
for algo, scores in best_scores.items():
    best = max(scores.values())
    best_n, best_score = max(scores.items(), key=lambda x: x[1])
    for n, score in scores.items():
        flag = "* <- Best Model" if score == best_score else ""
        print(f"{algo}({n}): {score:.4f}{flag}")

    print(f"Best Model {algo}")
    bestModel = best_models[algo][best_n]
    print(bestModel.transmat_)
    print(bestModel.means_)
    print(bestModel.covars_)

    scanpath, _ = bestModel.sample(len(fixation_data))
    # Calculate the log-likelihood of the generated scanpath
    log_likelihood = bestModel.score(scanpath)
    print(f"n_componets: {bestModel.n_components}, ll: {log_likelihood}")


In [None]:
# Plot the original fixation data and the generated scanpath
plt.plot(fixation_data[:, 0], fixation_data[:, 1], 'ro-', label='Fixation Data')
plt.plot(scanpath[:, 0], scanpath[:, 1], 'bo-', label='Generated Scanpath')
plt.imshow(img)  # Replace xmin, xmax, ymin, ymax with the appropriate plot limits


plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.title('Scanpath (Log-Likelihood: {:.2f})'.format(log_likelihood))
plt.legend()
plt.show()


In [None]:
model = hmm.GaussianHMM(3, init_params="stc")
model.n_features = 3
#model.startprob_ = np.array([1/4., 1/4., 1/4., 1/4.])
'''
model.transmat_ = np.array([[0.3, 0.4, 0.2, 0.1],
                            [0.1, 0.2, 0.3, 0.4],
                            [0.5, 0.2, 0.1, 0.2],
                            [0.25, 0.25, 0.25, 0.25]])
'''
model.means_ = np.array([[-2.5], [0], [2.5], [5.]])
#model.covars_ = np.sqrt([[0.25], [0.25], [0.25], [0.25]])

X, _ = model.sample(1000, random_state=rs)
lengths = [X.shape[0]]

aic = []
bic = []
lls = []
ns = [2, 3]
for n in ns:
    best_ll = None
    best_model = None
    for i in range(10):
        h = hmm.GaussianHMM(n, n_iter=200, tol=1e-4, random_state=rs)
        h.fit(X)
        score = h.score(X)
        if not best_ll or best_ll < best_ll:
            best_ll = score
            best_model = h
    aic.append(best_model.aic(X))
    bic.append(best_model.bic(X))
    lls.append(best_model.score(X))
