## Imports

In [None]:
# %cd /home/liamroy/Documents/PHD/repos/RL_audio/notebooks

%cd '/Users/liamroy/Library/CloudStorage/GoogleDrive-liam.roy@monash.edu/My Drive/Robotics/User Study 02 - Auto State Expression/RL_audio/notebooks'

# %cd <add your path here and comment out the others>

In [None]:
# IMPORTS
import os
import shutil
import time
import sys
import random

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors
from matplotlib.ticker import FormatStrFormatter

from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D

import pandas as pd
from dython.nominal import associations
from dython.nominal import identify_nominal_columns


from scipy import stats
from sklearn import datasets, mixture

from termcolor import colored, cprint
# Termcolor guide: https://pypi.org/project/termcolor/

from openpyxl import Workbook
from openpyxl import load_workbook


from sklearn.mixture import GaussianMixture
from sklearn.model_selection import GridSearchCV

import seaborn as sns


%matplotlib widget
# %matplotlib inline
# %matplotlib notebook

# Try to use this website to use the explode feature, so we can see internal blocks and space everything out
# https://terbium.io/2017/12/matplotlib-3d/ 

# Use this website to make your GIFs - generally 50 delay per frame is good
# https://ezgif.com/maker

## Data

These are all the data series for the 24 participants

## Clustering Analysis: Bayesian Information Criterion (BIC) and Akaike Information Criterion (AIC)

## Sample BIC Analysis on our data

In [None]:
# Define the parameters for the normal distribution
mean_A = 0.5  # Center point
mean_B = 2.5  # Center point
mean_C = 4.5  # Center point

std_deviation_A = 1.2  # Standard deviation
std_deviation_B = 0.3  # Standard deviation
std_deviation_C = 0.7  # Standard deviation

# Create a 500x3 matrix filled with random numbers from a normal distribution
matrix_A = np.random.normal(loc=mean_A, scale=std_deviation_A, size=(1000, 3))
matrix_B = np.random.normal(loc=mean_B, scale=std_deviation_B, size=(1000, 3))
matrix_C = np.random.normal(loc=mean_C, scale=std_deviation_C, size=(1000, 3))

# Clip values to ensure they are within the range [0, 5]
# matrix_A = np.clip(matrix_A, 0, 5)
# matrix_B = np.clip(matrix_B, 0, 5)
# matrix_B = np.clip(matrix_C, 0, 5)

# Print the resulting matrix
print(matrix_A.shape)
print(matrix_B.shape)
print(matrix_C.shape)

X = np.concatenate([matrix_A, matrix_B, matrix_C])

In [None]:
X

In [None]:
plt.scatter(matrix_A[:, 0], matrix_A[:, 1], s=0.8)
plt.scatter(matrix_B[:, 0], matrix_B[:, 1], s=0.8)
plt.scatter(matrix_C[:, 0], matrix_C[:, 1], s=0.8)
plt.title("Gaussian Mixture components")
plt.axis("equal")
plt.show()

In [None]:
# Create estimators for BIC and AIC

def gmm_bic_score(estimator, X):
    """Callable to pass to GridSearchCV that will use the BIC score."""
    # Make it negative since GridSearchCV expects a score to maximize
    return -estimator.bic(X)

def gmm_aic_score(estimator, X):
    """Callable to pass to GridSearchCV that will use the AIC score."""
    # Make it negative since GridSearchCV expects a score to maximize
    return -estimator.aic(X)

param_grid = {"n_components": range(1, 7), "covariance_type": ["spherical", "tied", "diag", "full"]}

grid_search = GridSearchCV(GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score)

grid_search.fit(X)

In [None]:
# Sort and present our results so we see the best results first

df = pd.DataFrame(grid_search.cv_results_)[["param_n_components", "param_covariance_type", "mean_test_score"]]

df["mean_test_score"] = -df["mean_test_score"]

df = df.rename(columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score"})

df.sort_values(by="BIC score").head()

In [None]:
# Now lets visualize them graphically 

sns.catplot(
    data=df,
    kind="bar",
    x="Number of components",
    y="BIC score",
    hue="Type of covariance")

plt.show()

## Real BIC Analysis on our data

In [None]:
sect2U_st0_pd_dataframe = pd.read_excel('user_data/response_book.xlsx', sheet_name = 'conv_idx', header=1, index_col=None, na_values=['NA'], usecols="B:D", nrows=24)

sect2I_st0_pd_dataframe = pd.read_excel('user_data/response_book.xlsx', sheet_name = 'conv_idx', header=1, index_col=None, na_values=['NA'], usecols="E:G", nrows=24)

sect2U_st1_pd_dataframe = pd.read_excel('user_data/response_book.xlsx', sheet_name = 'conv_idx', header=1, index_col=None, na_values=['NA'], usecols="L:N", nrows=24)

sect2I_st1_pd_dataframe = pd.read_excel('user_data/response_book.xlsx', sheet_name = 'conv_idx', header=1, index_col=None, na_values=['NA'], usecols="O:Q", nrows=24)

sect2U_st2_pd_dataframe = pd.read_excel('user_data/response_book.xlsx', sheet_name = 'conv_idx', header=1, index_col=None, na_values=['NA'], usecols="V:X", nrows=24)

sect2I_st2_pd_dataframe = pd.read_excel('user_data/response_book.xlsx', sheet_name = 'conv_idx', header=1, index_col=None, na_values=['NA'], usecols="Y:AA", nrows=24)

clustering_dataframe_list = [sect2U_st0_pd_dataframe, sect2I_st0_pd_dataframe, sect2U_st1_pd_dataframe, sect2I_st1_pd_dataframe, sect2U_st2_pd_dataframe, sect2I_st2_pd_dataframe]

In [None]:
# Convert to Numpy
sect2U_st0_np_array = sect2U_st0_pd_dataframe.to_numpy()
sect2I_st0_np_array = sect2I_st0_pd_dataframe.to_numpy()
sect2U_st1_np_array = sect2U_st1_pd_dataframe.to_numpy()
sect2I_st1_np_array = sect2I_st1_pd_dataframe.to_numpy()
sect2U_st2_np_array = sect2U_st2_pd_dataframe.to_numpy()
sect2I_st2_np_array = sect2I_st2_pd_dataframe.to_numpy()

# Use this to confirm data frames are correct
# sect2U_st0_pd_dataframe
# sect2U_st0_np_array

In [None]:
# Create estimators for BIC and AIC

def gmm_bic_score(estimator, X):
    """Callable to pass to GridSearchCV that will use the BIC score."""
    # Make it negative since GridSearchCV expects a score to maximize
    return -estimator.bic(X)

def gmm_aic_score(estimator, X):
    """Callable to pass to GridSearchCV that will use the AIC score."""
    # Make it negative since GridSearchCV expects a score to maximize
    return -estimator.aic(X)

In [None]:
# Fit the model and sort results
param_grid = {"n_components": range(1, 7), "covariance_type": ["spherical"]} # , "tied", "diag", "full"

grid_search = GridSearchCV(GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score)

grid_search.fit(sect2U_st0_np_array)


# Sort and present our results so we see the best results first
df_sect2U_st0 = pd.DataFrame(grid_search.cv_results_)[["param_n_components", "param_covariance_type", "mean_test_score"]]

df_sect2U_st0["mean_test_score"] = -df_sect2U_st0["mean_test_score"]

df_sect2U_st0 = df_sect2U_st0.rename(columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score"})

df_sect2U_st0.sort_values(by="BIC score").head()

df_sect2U_st0['Init and State'] = 'Uninformed Stuck'

df_sect2U_st0

In [None]:
# Fit the model and sort results
param_grid = {"n_components": range(1, 7), "covariance_type": ["spherical"]} # , "tied", "diag", "full"

grid_search = GridSearchCV(GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score)

grid_search.fit(sect2I_st0_np_array)


# Sort and present our results so we see the best results first
df_sect2I_st0 = pd.DataFrame(grid_search.cv_results_)[["param_n_components", "param_covariance_type", "mean_test_score"]]

df_sect2I_st0["mean_test_score"] = -df_sect2I_st0["mean_test_score"]

df_sect2I_st0 = df_sect2I_st0.rename(columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score"})

df_sect2I_st0.sort_values(by="BIC score").head()

df_sect2I_st0['Init and State'] = 'Informed Stuck'

df_sect2I_st0

In [None]:
# Fit the model and sort results
param_grid = {"n_components": range(1, 7), "covariance_type": ["spherical"]} # , "tied", "diag", "full"

grid_search = GridSearchCV(GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score)

grid_search.fit(sect2U_st1_np_array)


# Sort and present our results so we see the best results first
df_sect2U_st1 = pd.DataFrame(grid_search.cv_results_)[["param_n_components", "param_covariance_type", "mean_test_score"]]

df_sect2U_st1["mean_test_score"] = -df_sect2U_st1["mean_test_score"]

df_sect2U_st1 = df_sect2U_st1.rename(columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score"})

df_sect2U_st1.sort_values(by="BIC score").head()

df_sect2U_st1['Init and State'] = 'Uninformed Accomplished'

df_sect2U_st1

In [None]:
# Fit the model and sort results
param_grid = {"n_components": range(1, 7), "covariance_type": ["spherical"]} # , "tied", "diag", "full"

grid_search = GridSearchCV(GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score)

grid_search.fit(sect2I_st1_np_array)


# Sort and present our results so we see the best results first
df_sect2I_st1 = pd.DataFrame(grid_search.cv_results_)[["param_n_components", "param_covariance_type", "mean_test_score"]]

df_sect2I_st1["mean_test_score"] = -df_sect2I_st1["mean_test_score"]

df_sect2I_st1 = df_sect2I_st1.rename(columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score"})

df_sect2I_st1.sort_values(by="BIC score").head()

df_sect2I_st1['Init and State'] = 'Informed Accomplished'

df_sect2I_st1

In [None]:
# Fit the model and sort results
param_grid = {"n_components": range(1, 7), "covariance_type": ["spherical"]} # , "tied", "diag", "full"

grid_search = GridSearchCV(GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score)

grid_search.fit(sect2U_st2_np_array)


# Sort and present our results so we see the best results first
df_sect2U_st2 = pd.DataFrame(grid_search.cv_results_)[["param_n_components", "param_covariance_type", "mean_test_score"]]

df_sect2U_st2["mean_test_score"] = -df_sect2U_st2["mean_test_score"]

df_sect2U_st2 = df_sect2U_st2.rename(columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score"})

df_sect2U_st2.sort_values(by="BIC score").head()

df_sect2U_st2['Init and State'] = 'Uninformed Progressing'

df_sect2U_st2

In [None]:
# Fit the model and sort results
param_grid = {"n_components": range(1, 7), "covariance_type": ["spherical"]} # , "tied", "diag", "full"

grid_search = GridSearchCV(GaussianMixture(), param_grid=param_grid, scoring=gmm_bic_score)

grid_search.fit(sect2I_st2_np_array)


# Sort and present our results so we see the best results first
df_sect2I_st2 = pd.DataFrame(grid_search.cv_results_)[["param_n_components", "param_covariance_type", "mean_test_score"]]

df_sect2I_st2["mean_test_score"] = -df_sect2I_st2["mean_test_score"]

df_sect2I_st2 = df_sect2I_st2.rename(columns={
        "param_n_components": "Number of components",
        "param_covariance_type": "Type of covariance",
        "mean_test_score": "BIC score"})

df_sect2I_st2.sort_values(by="BIC score").head()

df_sect2I_st2['Init and State'] = 'Informed Progressing'

df_sect2I_st2

In [None]:
frames = [df_sect2U_st0, df_sect2I_st0, df_sect2U_st1, df_sect2I_st1, df_sect2U_st2, df_sect2I_st2]

result = pd.concat(frames)

In [None]:
# Now lets visualize them graphically 

g = sns.catplot(
    data=result,
    kind="bar",
    x="Number of components",
    y="BIC score",
    hue="Init and State") #, height=4, aspect=.6)

g.set_titles("{col_name} {col_var}")
g.set(ylim=(-30, 100))
g.despine(left=True)

plt.show()

## We can do the same AIC and BIC in a different way...

In [None]:
sect2U_st0_legend = "Uninformed, State 0: Stuck"

sect2U_st1_legend = "Uninformed, State 1: Accomplished"

sect2U_st2_legend = "Uninformed, State 2: Progressing"

sect2I_st0_legend = "Informed, State 0: Stuck"

sect2I_st1_legend = "Informed, State 1: Accomplished"

sect2I_st2_legend = "Informed, State 2: Progressing"

clustering_legend_list = [sect2U_st0_legend, sect2U_st1_legend, sect2U_st2_legend, sect2I_st0_legend, sect2I_st1_legend, sect2I_st2_legend]

In [None]:
def plot_BIC_clustering(dataframe_list, save_str, title_str, legend_list, plotter=True):

    idx_counter = -1

    color_list = ['red', 'orange', 'blueviolet', 'dodgerblue', 'gold', 'green', 'magenta']
    
    fig, axs = plt.subplots(figsize=(10, 6), layout='constrained')

    
    for idx_counter in range(0, 6):

        BIC_dataframe = dataframe_list[idx_counter]
                
        # Fit the data using a Gaussian mixture model and calculate the BIC for
        # different numbers of clusters, ranging for 1 to 10
        ks = np.arange(1,13)
        bics = []
        for k in ks:
            gmm = mixture.GaussianMixture(n_components=k, covariance_type='full', tol=1e-10)
            gmm.fit(BIC_dataframe)
            bics.append(gmm.bic(BIC_dataframe)) # Bayesian information criterion for the current model on the input X.
        
        
        # Plot the data    
        
        clusterplot = axs.plot(ks, bics, color=color_list[idx_counter], linewidth=3)
        
    axs.set_title(title_str, size = 18)

    axs.set_xlabel("\nNumber of clusters, $k$\n", size = 15)
    axs.set_ylabel("\nBIC", size = 15)
    axs.set_xticks(ks);
    
    axs.grid(color='black', linestyle='-', linewidth=0.1)

    axs.xaxis.set_major_formatter(FormatStrFormatter('%g'))
    axs.xaxis.set_ticks(np.arange(1, 13, 1))

    axs.set_xlim(1, 12)

    axs.yaxis.set_major_formatter(FormatStrFormatter('%g'))
    axs.yaxis.set_ticks(np.arange(-700, 400, 50))

    axs.legend(legend_list, loc='upper right', prop = { "size": 15 })


    # Save the fig
    plt.savefig(save_str, bbox_inches='tight', pad_inches=0.25)

    
    # Show the fig if plotter is not set to None
    if plotter:
        plt.show()

    if plotter == None:
        print(" Plots closed. ")
        matplotlib.pyplot.close()

In [None]:
plot_BIC_clustering(dataframe_list=clustering_dataframe_list, 
                    save_str="plots/clustering_plots/BIC_cluster_plotV2.png",
                    title_str='Bayesian Information Criterion (BIC) Clustering Analysis\nFinal Parameter Values from Informed/Uninformed Initialization\n', 
                    legend_list=clustering_legend_list)

## Collect Data into CVS for Logistic Regression Predictor

In [None]:
user_IDs = [*range(3, 27)] # 3 to 26 , 27
 
comm_states = ['st0', 'st1', 'st2'] # states 0, 1 and 2

sections = ['sect2U', 'sect2I']

workbook_path = "user_data/response_book.xlsx"
response_book = load_workbook(workbook_path)


for state in comm_states:
    
    for section in sections:

        line_counter = 2 #start input to CVS on line 2

        if section == sections[0]:            
            new_section = 'sect2U'
        if section == sections[1]:            
            new_section = 'sect2I'
        
        try: # Try to open existing sheet
            response_sheet = response_book.get_sheet_by_name("regression_" + state + "_" + new_section)
        except KeyError:  # If ot doesn't exist. create it
            response_sheet = response_book.create_sheet(title="regression_" + state + "_" + new_section)
    
        response_sheet["A1"] = "Value"
        response_sheet["B1"] = "Correct"
        response_sheet["C1"] = "User_ID"
        response_sheet["D1"] = "P1_BPM"
        response_sheet["E1"] = "P2_BPL"
        response_sheet["F1"] = "P3_Pitch"
        response_sheet["G1"] = "Confidence"
        response_sheet["H1"] = "State"
        response_sheet["I1"] = "Init"

        for user_ID in user_IDs:

            user_ID_str = str("{:02d}".format(user_ID))    
            
            # st0_final_matrix = np.load("user_data/user_" + user_ID_str + "/arrays/" + user_ID_str + "_" + section + "_final_st0.npy") # Stuck
            # st1_final_matrix = np.load("user_data/user_" + user_ID_str + "/arrays/" + user_ID_str + "_" + section + "_final_st1.npy") # Accomplished 
            final_matrix = np.load("user_data/user_" + user_ID_str + "/arrays/" + user_ID_str + "_" + section + "_final_" + state + ".npy") # Progressing

            # print()
            # print(f"section: {section}   user: {user_ID}")
            # print(f"state: \n{state}") 
            # print(final_matrix)
            # print()

            # Itterate param indecies.
            # Recall: param_0 = Speed of audio loop (BPM)         --> (100 BPM   //  140 BPM  //  180 BPM)
            #         param_1 = Number of beeps per loop (BPL)   --> (1 BPL     //  2 BPM     //  4 BPL) 
            #         param_2 = Amplitude of Pitch Bend           --> (downward  //  neutral  //  upward)

            for param_0_idx in range(0, 3):
                for param_1_idx in range(0, 3):
                    for param_2_idx in range(0, 3):

                        cube_value = final_matrix[param_0_idx, param_1_idx, param_2_idx]
                        
                        response_sheet["A" + str(line_counter)] = cube_value
                        
                        if cube_value > 0:
                            response_sheet["B" + str(line_counter)] = 1
                        elif cube_value <= 0:
                            response_sheet["B" + str(line_counter)] = 0

                        
                        response_sheet["C" + str(line_counter)] = user_ID
                        response_sheet["D" + str(line_counter)] = param_0_idx
                        response_sheet["E" + str(line_counter)] = param_1_idx
                        response_sheet["F" + str(line_counter)] = param_2_idx
                        response_sheet["G" + str(line_counter)] = abs(cube_value)

                        if state == comm_states[0]:                        
                            response_sheet["H" + str(line_counter)] = 0
                        elif state == comm_states[1]: 
                            response_sheet["H" + str(line_counter)] = 1
                        elif state == comm_states[2]: 
                            response_sheet["H" + str(line_counter)] = 2

                        if section == sections[0]:                        
                            response_sheet["I" + str(line_counter)] = "U"
                        elif section == sections[1]: 
                            response_sheet["I" + str(line_counter)] = "I"

                        line_counter += 1
                        
response_book.save(workbook_path)

print()
print("terminated and workbook saved")

### Then create a page which has all combined data

In [None]:
user_IDs = [*range(3, 27)] # 3 to 26 , 27
 
comm_states = ['st0', 'st1', 'st2'] # states 0, 1 and 2

sections = ['sect2U', 'sect2I']

workbook_path = "user_data/response_book.xlsx"
response_book = load_workbook(workbook_path)


try: # Try to open existing sheet
    response_sheet = response_book.get_sheet_by_name("regression_combined")
except KeyError:  # If not doesn't exist. create it
    response_sheet = response_book.create_sheet(title="regression_combined")

line_counter = 2 #start input to CVS on line 2

for state in comm_states:
    
    for section in sections:

        if section == sections[0]:            
            new_section = 'sect2U'
        if section == sections[1]:            
            new_section = 'sect2I'
    
        response_sheet["A1"] = "Value"
        response_sheet["B1"] = "Correct"
        response_sheet["C1"] = "User_ID"
        response_sheet["D1"] = "P1_BPM"
        response_sheet["E1"] = "P2_BPL"
        response_sheet["F1"] = "P3_Pitch"
        response_sheet["G1"] = "Confidence"
        response_sheet["H1"] = "State"
        response_sheet["I1"] = "Init"

        for user_ID in user_IDs:

            user_ID_str = str("{:02d}".format(user_ID))    
            
            # st0_final_matrix = np.load("user_data/user_" + user_ID_str + "/arrays/" + user_ID_str + "_" + section + "_final_st0.npy") # Stuck
            # st1_final_matrix = np.load("user_data/user_" + user_ID_str + "/arrays/" + user_ID_str + "_" + section + "_final_st1.npy") # Accomplished 
            final_matrix = np.load("user_data/user_" + user_ID_str + "/arrays/" + user_ID_str + "_" + section + "_final_" + state + ".npy") # Progressing

            # print()
            # print(f"section: {section}   user: {user_ID}")
            # print(f"state: \n{state}") 
            # print(final_matrix)
            # print()

            # Itterate param indecies.
            # Recall: param_0 = Speed of audio loop (BPM)         --> (100 BPM   //  140 BPM  //  180 BPM)
            #         param_1 = Number of beeps per loop (BPL)   --> (1 BPL     //  2 BPM     //  4 BPL) 
            #         param_2 = Amplitude of Pitch Bend           --> (downward  //  neutral  //  upward)

            for param_0_idx in range(0, 3):
                for param_1_idx in range(0, 3):
                    for param_2_idx in range(0, 3):

                        cube_value = final_matrix[param_0_idx, param_1_idx, param_2_idx]
                        
                        response_sheet["A" + str(line_counter)] = cube_value
                        
                        if cube_value > 0:
                            response_sheet["B" + str(line_counter)] = 1
                        elif cube_value <= 0:
                            response_sheet["B" + str(line_counter)] = 0

                        
                        response_sheet["C" + str(line_counter)] = user_ID
                        response_sheet["D" + str(line_counter)] = param_0_idx
                        response_sheet["E" + str(line_counter)] = param_1_idx
                        response_sheet["F" + str(line_counter)] = param_2_idx
                        response_sheet["G" + str(line_counter)] = abs(cube_value)

                        if state == comm_states[0]:                        
                            response_sheet["H" + str(line_counter)] = 0
                        elif state == comm_states[1]: 
                            response_sheet["H" + str(line_counter)] = 1
                        elif state == comm_states[2]: 
                            response_sheet["H" + str(line_counter)] = 2

                        if section == sections[0]:                        
                            response_sheet["I" + str(line_counter)] = "U"
                        elif section == sections[1]: 
                            response_sheet["I" + str(line_counter)] = "I"

                        line_counter += 1
                        
response_book.save(workbook_path)

print()
print("terminated and workbook saved")