# Thesis Code

This notebook contains the code for the thesis "Predicting Stress Recovery in Older Adults using Graph Attention Networks: a Holistic Approach", written by Malou van der Velde. The outline is as follows:

1. Preprocessing of the data
2. Exploratory data analysis
3. Baseline models: LIME / Error analysis
4. Graph Attention Network: Error analysis / Visual representations of the attention weights / Tracking of model performance

Given the confidentiality of the data, no output is shown in the notebook. However, the relevant output is enclosed in the thesis itself.


## Preprocessing

Importing the libraries for the preprocessing

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import numpy as np  
import matplotlib.pyplot as plt
import seaborn as sns
import re  
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

In [None]:
display(df.head())
print(df.shape)
print(df.columns)

Replace missing values 99 and 100 with actual missing values.

In [None]:

#print unique values columns which include 99 and 100 as missing values
print(df['Gender'].unique())
print(df['Income'].unique())
print(df['Education'].unique())

# Define columns to exclude
excluded_cols = ["Age", "Physical_activity"]

# Apply replacement only to selected columns (99 and 100 have meaning)
df.loc[:, ~df.columns.isin(excluded_cols)] = df.loc[:, ~df.columns.isin(excluded_cols)].replace({99: np.nan, 100: np.nan})

#print unique values columns which included 99 and 100 before as a check
print(df['Income'].unique())
print(df['Education'].unique())

Remove cases who did not sufficiently fill in the questionnaire

In [None]:
print(df.shape) #sample size before
threshold = len(df.columns) * 0.6
df.dropna(thresh=threshold, inplace=True)
print(df.shape) #sample size after
#removed columns with more than 60% missing values

Check if there are duplicate cases that need to be removed

In [None]:
#Check duplicate cases
duplicate_rows = df.duplicated()
print("Number of duplicate rows:", duplicate_rows.sum())
print("Duplicate rows:\n", df[duplicate_rows])

A dictionairy is created with the name of the questionnaire (as the key) and its items (as values), for the construction of the variables. Due to confidentiality, only an example is shown.

In [None]:
questionnaire_dict = {
    "openness" : ['Personality_10', 'Personality_15', 'Personality_20', 'Personality_25', 'Personality_30'],
}

The same is done for the demographic items

In [None]:
demographic_dict = {
    "physical_act" : ['Physical_activity']
}

Exclude cases who did not carefully fill in the questionnaire:

 the personality questionnaire contained 5 subscales and about half of the items were reverse coded, so the variance of the answer values is expected to be high. Moreover, this questionnaire was the last one on the survey, so the probability of carefully answering this questionnaire was lower. 
 The variabilty of this questionnaire was therefore picked as a threshhold to exclude participants who did not fill in the questionnaire in a careful way.
 Upon further inspection of the values of these participants, one participant was exluded

In [None]:
# Calculate the variance for each participant across the personality questionnaire items
personality_items = questionnaire_dict['extraversion'] + questionnaire_dict['agreeableness'] + \
                    questionnaire_dict['conscientiousness'] + questionnaire_dict['neuroticism'] + \
                    questionnaire_dict['openness']

participant_variability = df[personality_items].var(axis=1, skipna=True)

# Filter participants with variability below 50
low_variability_participants = df[participant_variability < 50]

# Display the filtered participants
print(low_variability_participants)

The questionnaire threatening experiences contained items regarding threatening experiences. The answer options were never, ever or last year. Two different variables are created: threatening experiences in the last year and ever had threatening experiences. Missing values are assumed to be indicative of not having that experience.

In [None]:
# Recode the items in the threatening experiences columns
threatening_columns = questionnaire_dict['threatening_exp']
#fill missing values with 0
df[threatening_columns] = df[threatening_columns].fillna(0)

# Create new columns for recoded values
df_last_year = df[threatening_columns].applymap(lambda x: 1 if x == 1 else 0)
df_ever = df[threatening_columns].applymap(lambda x: 1 if x == 2 else 0)

# Sum of threatening experiences in the last year
df['threatening_exp_last_year'] = df_last_year.sum(axis=1)

# Sum of threatening experiences ever
df['threatening_exp_ever'] = df_ever.sum(axis=1)

Dictionary is adapted to include these two variables, whilst removing the now redundant variable of threatening experiences as a whole.

In [None]:
questionnaire_dict = {
    "threatening_exp_ever": ['LTE_Ever_1', 'LTE_Ever_2', 'LTE_Ever_3', 'LTE_Ever_4', 'LTE_Ever_5', 'LTE_Ever_6', 'LTE_Ever_7', 'LTE_Ever_8', 'LTE_Ever_9', 'LTE_Ever_10', 'LTE_Ever_11', 'LTE_Ever_12'],
    "threatening_exp_last_year": ['LTE_LastYear_1', 'LTE_LastYear_2', 'LTE_LastYear_3', 'LTE_LastYear_4', 'LTE_LastYear_5', 'LTE_LastYear_6', 'LTE_LastYear_7', 'LTE_LastYear_8', 'LTE_LastYear_9', 'LTE_LastYear_10', 'LTE_LastYear_11', 'LTE_LastYear_12'],
}

Reverse coding the appropriate items of the questionnaires: Thought control, stress recovery, and childhood trauma. The personality questionnaire is shown as an example.

In [None]:
display(df['Personality_1'].value_counts()) #values before reverse scoring

# List of ThoughtControl items to reverse
cols_to_reverse = ['Personality_1', 'Personality_3', 'Personality_7', 'Personality_8', 'Personality_10', 'Personality_14', 'Personality_17', 'Personality_19', 'Personality_20', 'Personality_21', 'Personality_24', 'Personality_26', 'Personality_27', 'Personality_28', 'Personality_30']

# Reverse scoring (assuming a 1-5 Likert scale, where 6 - x flips the scale)
df[cols_to_reverse] = 6 - df[cols_to_reverse]

# Display the value counts for verification after reverse scoring
display(df['Personality_1'].value_counts())

Counting the missing values per questionnaire (also to evaluate whether person mean imputation is appropriate)

In [None]:
# Count missing values for each questionnaire in questionnaire_dict
print("\nMissing values in questionnaire_dict:")
for key, items in questionnaire_dict.items():
    if key in ["threatening_exp_ever", "threatening_exp_last_year"]:
        continue  # Skip these keys
    missing_count = df[items].isnull().sum().sum()  # Sum missing values across all items in the questionnaire
    print(f"{key}: {missing_count}")

Calculating the Cronbach's alpha for reliability and thus making sure person mean imputation is an appropriate measure. 

In [None]:
def cronbach_alpha(df, items):
    n_items = len(items)
    
    # There need to be at least 2 items to compute Cronbach's alpha
    if n_items < 2:
        return np.nan 
    
    item_variances = df_items.var(axis=0, ddof=1)  # Variance for each item
    total_variance = df_items.sum(axis=1).var(ddof=1)  # Variance of total score
    
    # Cronbach's alpha formula
    alpha = (n_items / (n_items - 1)) * (1 - (item_variances.sum() / total_variance))
    return alpha

# Compute and store Cronbach's alpha for each questionnaire
cronbach_results = {}

for q_name, items in questionnaire_dict.items():
    try:
        alpha = cronbach_alpha(df, items)
        cronbach_results[q_name] = alpha
        print(f"Cronbach's Alpha for {q_name}: {alpha:.3f}")
       
    except Exception as e:
        print(f"Error calculating Cronbach's Alpha for {q_name}: {e}")

Person mean imputation for the questionnaires

In [None]:
for q_name, items in questionnaire_dict.items():
    #exclude threateing experience items from the questionnaires
    if q_name == "threatening_exp_ever" or q_name == "threatening_exp_last_year":
        continue

    # Calculate the row-wise mean for the questionnaire, ignoring missing values
    person_mean = df[items].mean(axis=1, skipna=True)
    
    # Iterate over each item in the questionnaire
    for item in items:
        # Impute missing values with the person mean
        df[item] = df[item].fillna(person_mean)

Check whether there are missing values left

In [None]:
def check_imputation(df, questionnaire_dict):
    for q_name, items in questionnaire_dict.items():
        if q_name == "threatening_exp_ever" or q_name == "threatening_exp_last_year":
            continue
        print(f"Checking questionnaire: {q_name}")

        # Check for remaining missing values (Github CoPilot)
        missing_values = df[items].isnull().sum().sum()
        if missing_values == 0:
            print("✅ No missing values remaining")
        else:
            print(f"⚠️ {missing_values} missing values still present")

# Run the function
check_imputation(df, questionnaire_dict)

The preprocessing of demographic variables 

Ensuring all values of the demographic variables are floats. Age had to be converted to a float. However, participants had to fill in a number for alcohol and physical activity. Unfortunately, many gave written answers, comma's as decimals etc., so this had to be more thoroughly cleaned...

In [None]:
#show the datatypes of all the columns in demographic_dict
for key in demographic_dict:
    print(key, df[demographic_dict[key]].dtypes)

#convert age into float
df['Age'] = df['Age'].astype(float)

Determine custom replacements

In [None]:
#value counts Alcohol_Glases
print(df['Alcohol_Glases'].value_counts())
#sum of all the values in the column Alcohol_Glases
print(df['Alcohol_Glases'].isnull().sum())

Function to clean the alcohol column

In [None]:

# Function to clean and convert values to float
def convert_to_float(value):
    if not isinstance(value, str):  # If already a number, return it
        return value
    
    # Custom replacements
    replacements = {
        'alleen bij gelegenheden': 1,  
        'Nu niets meer': 0,
        'Nog geen glas wijn per week': 0.5,
        'glaasje in het weekend soms': 1,
        '1 alleen op een feestje': 1,
        '5 maar ik drink niet elke week': 5,
        'Minder dan 1 per week, gemiddeld genomen 2 glazen per maand': 0.25,
        '1 in de maand': 0.25,
        'twee': 2,
        '3-4': 3.5,
        '2-4': 3,
        '0 5': 0.5,
        '15-20': 18,
        '1/2' : 1.5
    }

    # Check for direct replacements
    if value in replacements:
        return replacements[value]
    
    # Remove unwanted words
    words_to_remove = [
        "glazen", "glas", "alleen op een feestje", "standaardglazen", "drink meestal niet", "(één per dag)"
    ]
    # Remove unwanted words
    for word in words_to_remove:
        value = value.replace(word, "").strip()
    
    # Convert to lowercase and normalize decimal format (convert ",")
    value = value.lower().replace(",", ".")
    # Extract numeric values using regex
    match = re.search(r"\d+(\.\d+)?", value)
    if match:
        return float(match.group(0))
    
    # Default case: return None if no match (bug)
    return None

df["Alcohol_Glases"] = df["Alcohol_Glases"].apply(convert_to_float)


The question was skipped if someone filled in the previous question with: I dont drink, so the missing values should be 0. 

In [None]:
df['Alcohol_Glases'] = df['Alcohol_Glases'].fillna(0)

The column containing the question regarding variable physical activity (the minutes per day working out) is cleaned.
This is done in a similar way compared to the alcohol variable.

In [None]:
pd.set_option('display.max_rows', None) 
#print all the values of the column
print(df["Physical_activity"].value_counts())
print(df["Physical_activity"].unique())
print(df["Physical_activity"].isnull().sum())

In [None]:

# Function to clean and convert values to float
def convert_to_float(value):
    if not isinstance(value, str):  # If already a number, return it
        return value
    
    # Custom replacements for direct string-to-number mapping
    replacements = {
        'drie kwartier': 45,  # 45 minutes
        '240 minuten per dag (4 uur)': 240,  # 240 minutes per day
        'half uur tot 1 uur': 45,  # Assuming 'half hour to 1 hour' is 30 minutes
        '1 uur': 60,
        '2 uur': 120,
        '3 uur': 180,
        '300m': 300,  # Assuming '300m' is 300 minutes
        '9 uur': 540,  # Assuming '9 uur' is 9 hours = 540 minutes
        'sport 60 min  huis en tuin 120 min fietsen 60 min': 240,  # Summing the times
        '1,5 uur': 90,  # 1.5 hours = 90 minutes
        '2, 3 uur': 150,  # Assuming average of 2.5 hours = 150 minutes
        '4 uren': 240,  # 4 hours = 240 minutes
        '2,5': 150,  # 2.5 hours = 150 minutes
        '2.5': 150,
        '2,5 uur': 150,
        '3.30': 210,  # 3.30 hours = 210 minutes
        '11 uur = 661 min': 661,  # Explicitly provided value
        'gemiddeld over een week genomen  tussen 60 en 90 min. tijdens fietsvakanties is dat prakties de hele dag': 75,  # Average of 60 and 90 minutes
        '0,5': 30,  # 0.5 hours = 30 minutes
        '1h': 60,
    }

    # Check for direct replacements
    if value in replacements:
        return replacements[value]
    
    # Remove unwanted words or phrases
    words_to_remove = [
        "minuten", "uur", "m", "min"
    ]
    for word in words_to_remove:
        value = value.replace(word, "").strip()
    
    # Convert decimal format (convert "," to ".")
    value = value.replace(",", ".")

    # Extract numeric values using regex
    match = re.search(r"\d+(\.\d+)?", value)
    if match:
        return float(match.group(0))
    
    # Default case: return None if no match
    return None

# Apply function to the column
df["Physical_activity"] = df["Physical_activity"].apply(convert_to_float)


Convert the hours to minutes

In [None]:
def convert_hours_to_minutes(value):
    """
    Converts hours into minutes for values below 11.
    If the value is not numeric or greater than or equal to 11, it returns the original value.
    """
    try:
        # Check if the value is numeric
        numeric_value = float(value)
        # Convert hours to minutes if the value is below 11
        if numeric_value < 11:
            return numeric_value * 60
        else:
            return value
    except (ValueError, TypeError):
        # Return the original value if it's not numeric
        return value

# Apply the function to the "Physical_activity' column
df["Physical_activity"] = df["Physical_activity"].apply(convert_hours_to_minutes)


Checking for missing values in the demographic variables. 

In [None]:
df[['PSQI_9', 'SF_12_Q1', 'Education', 'Income', 'SmokingAtAll', 'Gender', 'Age']].isnull().sum()

The missing values in smoking, indicates that the person does not smoke, so this is recoded as such.

In [None]:
df['SmokingAtAll'] = df['SmokingAtAll'].fillna(1)

MICE is used to impute the missing values in education, income and physical activity. 

In [None]:
# Define the columns to impute
columns_to_impute = ['Education', 'Income', 'Physical_activity']

# Initialize the MICE imputer
mice_imputer = IterativeImputer(random_state=0, sample_posterior=True)

# Apply the imputer to the selected columns
df[columns_to_impute] = mice_imputer.fit_transform(df[columns_to_impute])

# Verify the imputed values
for col in columns_to_impute:
    print(f"Unique values in {col}: {df[col].unique()}")

The full dataset is constructed

In [None]:
# Create an empty dataframe for df_all
df_all = pd.DataFrame()

# Add variables from demographic_dict to df_all using the key name
for key, items in demographic_dict.items():
    #just add the item itself to df_all
    df_all[key] = df[items] 

# Add variables from questionnaire_dict to df_all by calculating the mean
# Note: experiences and target are not included in the mean calculation, and are included as sum scores
for key, items in questionnaire_dict.items():
    if key == "threatening_exp_ever" or key == "threatening_exp_last_year":
        continue
    #exclude target
    if key == "target":
        continue
    df_all[key] = df[items].mean(axis=1, skipna=True)

#create sumscore of target and add to df_all
df_all["target"] = df[questionnaire_dict["target"]].sum(axis=1, skipna=True)

# Add the variables "threatening_exp_ever" and "threatening_exp_last_year" to df_all
df_all["threatening_exp_ever"] = df["threatening_exp_ever"]
df_all["threatening_exp_last_year"] = df["threatening_exp_last_year"]

# Drop the original threatening_exp, to be sure 
if 'threatening_exp' in df_all.columns:
    df_all.drop(columns=['threatening_exp'], inplace=True)

#set threatening experiences ever and last year to floats
df_all["threatening_exp_ever"] = df_all["threatening_exp_ever"].astype(float)
df_all["threatening_exp_last_year"] = df_all["threatening_exp_last_year"].astype(float)

Create two  sub-datasets: young adults, older adults

In [None]:
# Split the dataframe into two based on the Age column
df_old = df_all[df_all['age'] > 49] 
df_young = df_all[df_all['age'] <= 49]

# Display the shapes of the resulting datasets
print("young:", df_old.shape)
print("old:", df_young.shape)

#mean age old and young dataset
print("Mean age old dataset:", df_old['age'].mean())
print("Mean age young dataset:", df_young['age'].mean())

## Exploratory Data Analysis

Importing libraries

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import pyreadstat
import numpy as np  
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import skew, kurtosis
from sklearn.manifold import TSNE
import plotly.express as px
from scipy.stats import pearsonr
from minisom import MiniSom
from scipy.stats import ttest_ind

More informative column names are given.

In [None]:
# Define the new column names
column_names = [
    'Physical_Activity', 'Alcohol', 'Smoking', 'Income', 'Gender', 'Age',
    'Education', 'Sleep_Quality', 'Subjective_Health', 'Thoughtcontrol',
    'Reappraisal_Emotion', 'Suppression_Emotion', 'Neg_Interpersonal_Reg', 'Pos_Interpersonal_Reg',
    'Rumination_Reflection', 'Rumination_Brooding', 'Resilience', 'Childhoodtrauma', 'Depression',
    'Effective_Coping', 'Ineffective_Coping', 'Extraversion', 'Agreeableness',
    'Conscientiousness', 'Neuroticism', 'Openness', "Stress_Recovery",
    'Threatening_exp_ever', 'Threatening_exp_last_year'
]

# Rename the columns for all three datasets
df_all.columns = column_names
df_old.columns = column_names
df_young.columns = column_names

A distrubtion of age and the cut-off point is created, in order to display a clear destinction of the distribution between the two age groups. 

In [None]:
# Create a histogram for the 'Age' column in df_all
plt.figure(figsize=(10, 6))

# Define the age threshold
age_threshold = 50

# Add a vertical red line at age 50
plt.axvline(x=50, color='red', linestyle='--', linewidth=2, label='Age Above 50')

# Separate the data into two groups
younger = df_all[df_all['Age'] < age_threshold]['Age']
older = df_all[df_all['Age'] >= age_threshold]['Age']

# Plot the histogram for younger adults
plt.hist(younger, bins=20, color='blue', edgecolor='black', alpha=0.7, label='Younger Adults')

# Plot the histogram for older adults
plt.hist(older, bins=20, color='orange', edgecolor='black', alpha=0.7, label='Older Adults')

# Add title and labels
plt.title("Distribution and Split of Age", fontsize=16, pad=20)
plt.xlabel("Age", fontsize=12)
plt.ylabel("Frequency", fontsize=12)

# Add grid for better readability
plt.grid(axis='y', linestyle='--', alpha=0.7)

# Add legend
plt.legend(fontsize=12)

# Show the plot
plt.tight_layout()
plt.show()

Overlapping histograms are created to get a preliminary understanding of the differences between the two age groups.

In [None]:
# Iterate through each column in the DataFrame
for column in df_young.columns:
    if pd.api.types.is_numeric_dtype(df_young[column]) and pd.api.types.is_numeric_dtype(df_old[column]):
        plt.figure(figsize=(8, 6))
        
        # Plot histogram for df_young
        plt.hist(df_young[column], bins=30, alpha=0.5, density=True, label='Younger Adults', color='blue')
        
        # Plot histogram for df_old
        plt.hist(df_old[column], bins=30, alpha=0.5, density=True, label='Older Adults', color='orange')
        
        # Add labels and legend
        plt.title(f'Histogram of {column}')
        plt.xlabel(column)
        plt.ylabel('Proportion')
        plt.legend()
        
        # Show the plot
        plt.show()

This code generates histograms for each dataset seperately. They were inspected to evaluate the distribution and potential issues of the data. However, this was less informative and straightforward compared to the overlapping histograms, so this output is not included in the thesis. 

In [None]:
# Function to create histograms for all columns in a dataframe
def create_histograms(df, title_prefix):
    for column in df.columns:
        plt.figure(figsize=(8, 5))
        plt.hist(df[column], bins=20, color='skyblue', edgecolor='black', alpha=0.7)
        plt.title(f'{title_prefix} - Histogram of {column}', fontsize=14)
        plt.xlabel(column, fontsize=12)
        plt.ylabel('Frequency', fontsize=12)
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.show()

# Create histograms for df_all, df_old, and df_young
create_histograms(df_all, "Full Dataset")
create_histograms(df_old, "Older Adults")
create_histograms(df_young, "Younger Adults")

Boxplots were created to check for issues in the data.

In [None]:
# Function to create boxplots for all columns in a dataframe
def create_boxplots(df, title_prefix):
    for column in df.columns:
        plt.figure(figsize=(8, 5))
        sns.boxplot(y=df[column], color='skyblue')
        plt.title(f'{title_prefix} - Boxplot of {column}', fontsize=14)
        plt.ylabel(column, fontsize=12)
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.show()

# Create boxplots for df_all, df_old, and df_young
create_boxplots(df_all, "df_all")
create_boxplots(df_old, "df_old")
create_boxplots(df_young, "df_young")


Scatterplots were created to get a visualisation of the relationship between the predictors and the target stress recovery.

In [None]:
# Function to create scatterplots for all columns in a dataframe with the target variable
def create_scatterplots(df, title_prefix):
    for column in df.columns:
        if column != 'Stress':  # Skip the target column itself
            plt.figure(figsize=(8, 5))
            plt.scatter(df[column], df['Stress'], alpha=0.7, color='blue', edgecolor='black')
            plt.title(f'{title_prefix} - Scatterplot of {column} vs Stress', fontsize=14)
            plt.xlabel(column, fontsize=12)
            plt.ylabel('Stress', fontsize=12)
            plt.grid(linestyle='--', alpha=0.7)
            plt.show()

# Create scatterplots for df_all, df_old, and df_young
create_scatterplots(df_all, "Full Dataset")
create_scatterplots(df_old, "Older Adults")
create_scatterplots(df_young, "Younger Adults")

This function contains heatmaps for the three datasets. Only the significant correlations are shown. The heatmaps are symmetrical and are thereby equal to amount the edges of the Graph Attention Network (the edges are bidirectionnally constructed)

In [None]:


# Function to calculate correlation matrix and p-values
def calculate_correlation_and_significance(df):
    corr_matrix = df.corr()
    p_values = pd.DataFrame(np.zeros_like(corr_matrix), columns=df.columns, index=df.columns)
    
    for row in df.columns:
        for col in df.columns:
            if row != col:
                _, p = pearsonr(df[row].dropna(), df[col].dropna())
                p_values.loc[row, col] = p
            else:
                p_values.loc[row, col] = np.nan  # No p-value for self-correlation
    return corr_matrix, p_values

# Function to create a heatmap with significance annotations
def create_correlation_heatmap_with_significance(df, title, alpha=0.05):
    corr_matrix, p_values = calculate_correlation_and_significance(df)
    plt.figure(figsize=(14, 10))  # Increase figure size for better readability
    
    # Mask insignificant correlations
    mask = np.zeros_like(corr_matrix, dtype=bool)
    mask[p_values > alpha] = True  # Mask correlations with p-value > alpha
    
    sns.heatmap(
        corr_matrix, 
        annot=True, 
        fmt=".2f", 
        cmap="coolwarm", 
        cbar=True, 
        annot_kws={"size": 10},  # Adjust font size of annotations
        linewidths=0.5,  # Add lines between boxes for clarity
        mask=mask  # Mask insignificant correlations
    )
    plt.xticks(rotation=45, ha="right", fontsize=10)  # Rotate x-axis labels for better readability
    plt.yticks(fontsize=10)  # Adjust font size for y-axis labels
    plt.title(title, fontsize=16, pad=20)  # Add padding to the title
    plt.tight_layout()  # Adjust layout to prevent clipping
    plt.show()

# Run the heatmaps for df_all, df_old, and df_young
create_correlation_heatmap_with_significance(df_all, "Correlation Heatmap of Significant Relationships for Both Groups")
create_correlation_heatmap_with_significance(df_old, "Correlation Heatmap of Significant Relationships for Older Adults")
create_correlation_heatmap_with_significance(df_young, "Correlation Heatmap of Significant Relationships for Younger Adults")

This code calculates the significant differences between the two age groups

In [None]:
# Function to calculate statistical significance for each variable
def calculate_statistical_significance(df_old, df_young):
    significance_results = {}
    for column in df_old.columns:
        if np.issubdtype(df_old[column].dtype, np.number) and np.issubdtype(df_young[column].dtype, np.number):
            # Perform t-test
            t_stat, p_value = ttest_ind(df_old[column].dropna(), df_young[column].dropna(), equal_var=False)
            # Format p-value with leading 0 for better readability
            significance_results[column] = {'t_stat': t_stat, 'p_value': f"{p_value:.10f}"}
    return pd.DataFrame(significance_results).T

# Calculate statistical significance
significance_df = calculate_statistical_significance(df_old, df_young)

# Display the results
print("Statistical Significance Results:")
display(significance_df)

Descriptions for all the variables in the three datasets

In [None]:

# Function to calculate distributions for a dataframe
def calculate_distributions(df):
    stats = {}
    for column in df.columns:
        if np.issubdtype(df[column].dtype, np.number):  # Only process numeric columns
            stats[column] = {
                'min': df[column].min(),
                'max': df[column].max(),
                'mean': df[column].mean(),
                'median': df[column].median(),
                'std': df[column].std(),
                'skewness': skew(df[column].dropna()),
                'kurtosis': kurtosis(df[column].dropna()),
                'count': df[column].count()
            }
    return pd.DataFrame(stats).T

# Calculate distributions for df_all, df_old, and df_young
distributions_all = calculate_distributions(df_all)
distributions_old = calculate_distributions(df_old)
distributions_young = calculate_distributions(df_young)

# Display the distributions
print("Distributions for df_all:")
display(distributions_all)

print("Distributions for df_old:")
display(distributions_old)

print("Distributions for df_young:")
display(distributions_young)

Demographic information of the three datasets

In [None]:
# Count gender in each dataset
gender_counts_all = df_all['Gender'].value_counts()
gender_counts_old = df_old['Gender'].value_counts()
gender_counts_young = df_young['Gender'].value_counts()

# Display the counts
print("Gender counts in df_all:")
print(gender_counts_all)

print("\nGender counts in df_old:")
print(gender_counts_old)

print("\nGender counts in df_young:")
print(gender_counts_young)

# Calculate average and standard deviation for age in each dataset
for dataset_name, dataset in datasets.items():
    avg_age = dataset['age'].mean()
    sd_age = dataset['age'].std()
    print(f"Dataset: {dataset_name}")
    print(f"  Average Age: {avg_age:.2f}")
    print(f"  Standard Deviation of Age: {sd_age:.2f}")
    print("-" * 30)

This code creates a Self-Organizing Map for the three datasets. 
The output aligned with the t-sne regarding the amount of clustering in the three datasets. However, the interpretation was not so straightforward or clear (compared to the t-sne). Moreover, no additional information was provided by the output on top of the t-sne, so this was not included in the thesis.

In [None]:
def create_som(data, target, dataset_name):
    # Normalize the data
    data_normalized = MinMaxScaler().fit_transform(data)

    # SOM parameters
    SOM_X_AXIS_NODES = 8
    SOM_Y_AXIS_NODES = 8
    SOM_N_VARIABLES = data_normalized.shape[1]
    ALPHA = 0.5
    SIGMA0 = 1.5
    RANDOM_SEED = 123

    # Initialize the SOM
    som = MiniSom(
        SOM_X_AXIS_NODES,
        SOM_Y_AXIS_NODES,
        SOM_N_VARIABLES,
        sigma=2,  # Use an integer value for sigma
        learning_rate=ALPHA,
        neighborhood_function='triangle',
        random_seed=RANDOM_SEED
    )

    # Initialize weights using PCA
    som.pca_weights_init(data_normalized)

    # Train the SOM
    N_ITERATIONS = 5000
    som.train_random(data_normalized, N_ITERATIONS, verbose=True)

    # Plot the distance map
    plt.figure(figsize=(8, 8))
    plt.title(f"Distance Map for {dataset_name}")
    plt.pcolor(som.distance_map().T, cmap='gist_yarg')
    plt.colorbar()
    plt.show()

    # Plot the distance map with markers
    plt.figure(figsize=(8, 8))
    plt.title(f"Distance Map with Markers for {dataset_name}")
    plt.pcolor(som.distance_map().T, cmap='gist_yarg')
    plt.colorbar()

    # Ensure target values are valid
    target = target.astype(int)
    target = target - target.min() + 1  # Shift target values to start from 1 (bug)

    markers = ['o', 'x', '^', 's', 'd', '*']  # Possible to add more markers
    colors = ['C0', 'C1', 'C2', 'C3', 'C4', 'C5']  # idem
    #from low to high stress recovery -> blue, orange, green, red, purple, brown

    for count, datapoint in enumerate(data_normalized):
        w = som.winner(datapoint)
        plt.plot(w[0] + 0.5, w[1] + 0.5, markers[target[count] - 1], markerfacecolor='None',
                 markeredgecolor=colors[target[count] - 1], markersize=12, markeredgewidth=2)
    plt.show()

    # Scatter plot of winning neurons
    w_x, w_y = zip(*[som.winner(d) for d in data_normalized])
    w_x = np.array(w_x)
    w_y = np.array(w_y)
    plt.figure(figsize=(8, 8))
    plt.title(f"Winning Neurons for {dataset_name}")
    plt.pcolor(som.distance_map().T, cmap='gist_yarg', alpha=0.2)
    plt.colorbar()
    for c in np.unique(target):
        idx_target = target == c
        plt.scatter(w_x[idx_target] + 0.5 + (np.random.rand(np.sum(idx_target)) - 0.5) * 0.8,
                    w_y[idx_target] + 0.5 + (np.random.rand(np.sum(idx_target)) - 0.5) * 0.8,
                    s=50,
                    c=colors[c - 1],
                    label=f"Class {c}")
    plt.legend(loc='upper right')
    plt.grid()
    plt.show()

# Define the datasets
datasets = {
    "All Participants": (df_all.drop(columns=["target"]).values, df_all["target"].values),
    "Older Participants": (df_old.drop(columns=["target"]).values, df_old["target"].values),
    "Younger Participants": (df_young.drop(columns=["target"]).values, df_young["target"].values)
}

# Create a SOM for each dataset
for dataset_name, (data, target) in datasets.items():
    create_som(data, target, dataset_name)

The t-sne is applied to the three datasets and visualized in one plot

In [None]:
# Define a function to process and visualize t-SNE for the three datasets
def visualize_tsne(datasets, targets, dataset_names):
    tsne_results = []
    for i, (df, target_column) in enumerate(zip(datasets, targets)):
        # Extract features (X) and target (y)
        X = df.drop(columns=[target_column]).values
        y = df[target_column]
        
        # Apply t-SNE
        tsne = TSNE(n_components=2, random_state=42)
        X_tsne = tsne.fit_transform(X)
        
        # Create a DataFrame for visualization
        tsne_df = pd.DataFrame({
            'First T-SNE Component': X_tsne[:, 0],
            'Second T-SNE Component': X_tsne[:, 1],
            'Stress': y,
            'Dataset': dataset_names[i]
        })
        tsne_results.append(tsne_df)
    
    # Combine all t-SNE results into a single DataFrame
    combined_tsne_df = pd.concat(tsne_results, ignore_index=True)
    
    # Visualize the combined t-SNE results
    fig = px.scatter(
        combined_tsne_df,
        x='First T-SNE Component',
        y='Second T-SNE Component',
        color='Stress',
        facet_col='Dataset',
        labels={'color': 'Stress'},
        title="T-SNE Visualization of All Datasets"
    )
    fig.update_layout(
        title_font_size=20,
        legend_title_font_size=14,
        title={"text": "T-SNE Visualization of All Datasets", "x": 0.5},
        margin=dict(l=40, r=40, t=40, b=40),
        height=600,
        width=1000
    )
    fig.show()


# For the three datasets
datasets = [df_all, df_young, df_old]
targets = ['Stress', 'Stress', 'Stress']  # The target column name in each dataset
dataset_names = ['All Participants', 'Older Adults', 'Younger Adults']  # Dataset names for the figure
visualize_tsne(datasets, targets, dataset_names)

Color the t-sne visualizations by different variables to discover on which constructs the clusters in the different datasets could be based.

In [None]:
# Define a function to process and visualize t-SNE for multiple datasets with different coloring variables
def visualize_tsne_with_color(datasets, targets, dataset_names, color_variables):
    for i, (df, target_column) in enumerate(zip(datasets, targets)):
        # Extract features (X) and target (y)
        X = df.drop(columns=[target_column]).values
        y = df[target_column]
        
        # Apply t-SNE
        tsne = TSNE(n_components=2, random_state=42)
        X_tsne = tsne.fit_transform(X)
        
        # Create a DataFrame for visualization
        tsne_df = pd.DataFrame({
            'First T-SNE Component': X_tsne[:, 0],
            'Second T-SNE Component': X_tsne[:, 1],
            'Target': y
        })
        
        # Add color variables to the DataFrame
        for color_var in color_variables:
            if color_var in df.columns:
                tsne_df[color_var] = df[color_var]
        
        # Visualize the t-SNE results for each color variable
        for color_var in color_variables:
            if color_var in tsne_df.columns:
                fig = px.scatter(
                    tsne_df,
                    x='First T-SNE Component',
                    y='Second T-SNE Component',
                    color=color_var,
                    labels={'color': color_var},
                    title=f"T-SNE Visualization of {dataset_names[i]} Colored by {color_var}"
                )
                fig.update_layout(
                    title_font_size=20,
                    legend_title_font_size=14,
                    #put title in the center
                    title={"text": f"T-SNE Visualization of {dataset_names[i]} Colored by {color_var}", "x": 0.5},
                    margin=dict(l=40, r=40, t=40, b=40),
                    height=600,
                    width=800
                )
                fig.show()

# Example usage
# Assuming df_all, df_young, and df_old are your datasets
datasets = [df_all, df_young, df_old]
targets = ['Stress', 'Stress', 'Stress']  # Replace with the actual target column names for each dataset
dataset_names = ['All Participants', 'Older Adults', 'Younger Adults']  # Replace with the actual dataset names
color_variables = ['Age', 'Gender', 'Income']  # Replace with the variables you want to color by

visualize_tsne_with_color(datasets, targets, dataset_names, color_variables)

## Baseline models, LIME and Error Analysis

Importing libraries

In [None]:
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import pyreadstat
import numpy as np  
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score, KFold, RandomizedSearchCV
from sklearn.linear_model import Lasso
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error
from lime.lime_tabular import LimeTabularExplainer
import matplotlib.pyplot as plt
from scipy.stats import randint
from sklearn import svm
from sklearn.metrics import mean_squared_error, make_scorer
from lime.lime_tabular import LimeTabularExplainer
import matplotlib.pyplot as plt
from sklearn import metrics
import pickle
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error
import joblib
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
import joblib
import pyreadstat

The baseline models: Lasso regression, Support Vector Regression.
Nested cross-validation with 3 inner and 5 outer folds 
Gridsearch for hyperparameters
(The distribution of the target variable was checked during the construction of the baseline models)

In [None]:


# List of datasets
datasets = {'df_all': df_all, 'df_old': df_old, 'df_young': df_young}


results = []

# Define models and their hyperparameter grids
models = {
    "Lasso": {
        "pipeline": Pipeline([
            ('scaler', StandardScaler()),
            ('lasso', Lasso(max_iter=10000, random_state=42))
        ]),
        "param_grid": {
            'lasso__alpha': np.logspace(-4, 1, 10)
        }
    },
    "RandomForest": {
        "pipeline": Pipeline([
            ('scaler', StandardScaler()),
            ('regressor', RandomForestRegressor(random_state=42))
        ]),
        "param_grid": {
            'regressor__n_estimators': [50, 100, 200],
            'regressor__max_depth': [None, 10, 20],
            'regressor__min_samples_split': [2, 5, 10]
        }
    },
    "SVR": {
        "pipeline": Pipeline([
            ('scaler', StandardScaler()),
            ('svr', SVR())
        ]),
        "param_grid": {
            'svr__C': [0.1, 1, 10, 100],
            'svr__epsilon': [0.1, 0.2, 0.5],
            'svr__kernel': ['linear', 'rbf']
        }
    }
}

# Iterate through each dataset and model
for model_name, model_info in models.items():
    print(f"\nModel: {model_name}")
    for dataset_name, dataset in datasets.items():
        print(f"\nProcessing dataset: {dataset_name}")
        
        # Separate features (X) and target (y)
        X = dataset.drop(columns=["target"])  
        y = dataset["target"]  
        
        # Reset indices to ensure alignment
        X = X.reset_index(drop=True)
        y = y.reset_index(drop=True)
        
        # Nested cross-validation setup
        outer_cv = KFold(n_splits=5, shuffle=True, random_state=42)  # Outer loop
        inner_cv = KFold(n_splits=3, shuffle=True, random_state=42)  # Inner loop for hyperparameter tuning
        
        # Perform nested cross-validation
        outer_mae_scores = []
        outer_mse_scores = []
        outer_rmse_scores = []
        outer_r2_scores = []
        
        for train_idx, test_idx in outer_cv.split(X, y):
            X_trainval, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_trainval, y_test = y.iloc[train_idx], y.iloc[test_idx]
            
            # Inner cross-validation for hyperparameter tuning
            grid_search = GridSearchCV(
                model_info["pipeline"],
                model_info["param_grid"],
                cv=inner_cv,
                scoring='neg_mean_squared_error'
            )
            grid_search.fit(X_trainval, y_trainval)
            
            # Get the best hyperparameters from the inner fold
            best_model = grid_search.best_estimator_
            best_params = grid_search.best_params_
            
            # Train the best model on the entire training set (train + validation)
            best_model.fit(X_trainval, y_trainval)
            
            # Evaluate the best model on the outer test set
            y_pred = best_model.predict(X_test)
            
            # Calculate error metrics for the test set
            mae = mean_absolute_error(y_test, y_pred)
            mse = mean_squared_error(y_test, y_pred)
            rmse = np.sqrt(mse)
            r2 = r2_score(y_test, y_pred)
            
            # Append the scores for this fold
            outer_mae_scores.append(mae)
            outer_mse_scores.append(mse)
            outer_rmse_scores.append(rmse)
            outer_r2_scores.append(r2)
            
            print(f"Fold MSE: {mse}")
            print(f"Best parameters: {best_params}")
        
        # Calculate average and standard deviation of the error metrics
        avg_mae = np.mean(outer_mae_scores)
        std_mae = np.std(outer_mae_scores)
        avg_mse = np.mean(outer_mse_scores)
        std_mse = np.std(outer_mse_scores)
        avg_rmse = np.mean(outer_rmse_scores)
        std_rmse = np.std(outer_rmse_scores)
        avg_r2 = np.mean(outer_r2_scores)
        std_r2 = np.std(outer_r2_scores)
        
        # Display the average and standard deviation of the error metrics
        print(f"\nDataset: {dataset_name}")
        print(f"Average MAE: {avg_mae:.4f} ± {std_mae:.4f}")
        print(f"Average MSE: {avg_mse:.4f} ± {std_mse:.4f}")
        print(f"Average RMSE: {avg_rmse:.4f} ± {std_rmse:.4f}")
        print(f"Average R²: {avg_r2:.4f} ± {std_r2:.4f}")

        # Store results in the list
        results.append({
            'model': model_name,
            'dataset': dataset_name,
            'avg_mae': avg_mae,
            'std_mae': std_mae,
            'avg_mse': avg_mse,
            'std_mse': std_mse,
            'avg_rmse': avg_rmse,
            'std_rmse': std_rmse,
            'avg_r2': avg_r2,
            'std_r2': std_r2,
            'best_params': best_params
        })

        
        # Save the best model for this dataset
        model_filename = f'{model_name}_model1_{dataset_name}.pkl'
        joblib.dump(best_model, model_filename)
        print(f"Saved the best model for {dataset_name} to {model_filename}")

# Convert to DataFrame and save to CSV
results_df = pd.DataFrame(results)
results_df.to_csv("model_comparison_results1.csv", index=False)

print("\nSaved all results to 'model_comparison_results1.csv'")


Exactly the same code, except with 5 inner folds and 10 outer folds. This code is run seperately for the tracking running time (and personal inpatience). 

In [None]:
# List of datasets
datasets = {'df_all': df_all, 'df_old': df_old, 'df_young': df_young}


results = []

# Define models and their hyperparameter grids
models = {
    "Lasso": {
        "pipeline": Pipeline([
            ('scaler', StandardScaler()),
            ('lasso', Lasso(max_iter=10000, random_state=42))
        ]),
        "param_grid": {
            'lasso__alpha': np.logspace(-4, 1, 10)
        }
    },
    "RandomForest": {
        "pipeline": Pipeline([
            ('scaler', StandardScaler()),
            ('regressor', RandomForestRegressor(random_state=42))
        ]),
        "param_grid": {
            'regressor__n_estimators': [50, 100, 200],
            'regressor__max_depth': [None, 10, 20],
            'regressor__min_samples_split': [2, 5, 10]
        }
    },
    "SVR": {
        "pipeline": Pipeline([
            ('scaler', StandardScaler()),
            ('svr', SVR())
        ]),
        "param_grid": {
            'svr__C': [0.1, 1, 10, 100],
            'svr__epsilon': [0.1, 0.2, 0.5],
            'svr__kernel': ['linear', 'rbf']
        }
    }
}

# Iterate through each dataset and model
for model_name, model_info in models.items():
    print(f"\nModel: {model_name}")
    for dataset_name, dataset in datasets.items():
        print(f"\nProcessing dataset: {dataset_name}")
        
        # Separate features (X) and target (y)
        X = dataset.drop(columns=["target"])  
        y = dataset["target"]  
        
        # Reset indices to ensure alignment
        X = X.reset_index(drop=True)
        y = y.reset_index(drop=True)
        
        # Nested cross-validation setup
        outer_cv = KFold(n_splits=10, shuffle=True, random_state=42)  # Outer loop
        inner_cv = KFold(n_splits=5, shuffle=True, random_state=42)  # Inner loop for hyperparameter tuning
        
        # Perform nested cross-validation
        outer_mae_scores = []
        outer_mse_scores = []
        outer_rmse_scores = []
        outer_r2_scores = []
        
        for train_idx, test_idx in outer_cv.split(X, y):
            X_trainval, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_trainval, y_test = y.iloc[train_idx], y.iloc[test_idx]
            
            # Inner cross-validation for hyperparameter tuning
            grid_search = GridSearchCV(
                model_info["pipeline"],
                model_info["param_grid"],
                cv=inner_cv,
                scoring='neg_mean_squared_error'
            )
            grid_search.fit(X_trainval, y_trainval)
            
            # Get the best hyperparameters from the inner fold
            best_model = grid_search.best_estimator_
            best_params = grid_search.best_params_
            
            # Train the best model on the entire training set (train + validation)
            best_model.fit(X_trainval, y_trainval)
            
            # Evaluate the best model on the outer test set
            y_pred = best_model.predict(X_test)
            
            # Calculate error metrics for the test set
            mae = mean_absolute_error(y_test, y_pred)
            mse = mean_squared_error(y_test, y_pred)
            rmse = np.sqrt(mse)
            r2 = r2_score(y_test, y_pred)
            
            # Append the scores for this fold
            outer_mae_scores.append(mae)
            outer_mse_scores.append(mse)
            outer_rmse_scores.append(rmse)
            outer_r2_scores.append(r2)
            
            print(f"Fold MSE: {mse}")
            print(f"Best parameters: {best_params}")
        
        # Calculate average and standard deviation of the error metrics
        avg_mae = np.mean(outer_mae_scores)
        std_mae = np.std(outer_mae_scores)
        avg_mse = np.mean(outer_mse_scores)
        std_mse = np.std(outer_mse_scores)
        avg_rmse = np.mean(outer_rmse_scores)
        std_rmse = np.std(outer_rmse_scores)
        avg_r2 = np.mean(outer_r2_scores)
        std_r2 = np.std(outer_r2_scores)
        
        # Display the average and standard deviation of the error metrics
        print(f"\nDataset: {dataset_name}")
        print(f"Average MAE: {avg_mae:.4f} ± {std_mae:.4f}")
        print(f"Average MSE: {avg_mse:.4f} ± {std_mse:.4f}")
        print(f"Average RMSE: {avg_rmse:.4f} ± {std_rmse:.4f}")
        print(f"Average R²: {avg_r2:.4f} ± {std_r2:.4f}")

        # Store results in the list
        results.append({
            'model': model_name,
            'dataset': dataset_name,
            'avg_mae': avg_mae,
            'std_mae': std_mae,
            'avg_mse': avg_mse,
            'std_mse': std_mse,
            'avg_rmse': avg_rmse,
            'std_rmse': std_rmse,
            'avg_r2': avg_r2,
            'std_r2': std_r2,
            'best_params': best_params
        })

        
        # Save the best model for this dataset
        model_filename = f'{model_name}_model2_{dataset_name}.pkl'
        joblib.dump(best_model, model_filename)
        print(f"Saved the best model for {dataset_name} to {model_filename}")

# Convert to DataFrame and save to CSV
results_df = pd.DataFrame(results)
results_df.to_csv("model_comparison_results2.csv", index=False)

print("\nSaved all results to 'model_comparison_results2.csv'")


Three samples for each dataset to apply LIME to

In [None]:
# Define the sample size 
sample_fraction = 0.1 #this does mean that the full dataset shows more predictions than the other two datasets

# Create sampled datasets for LIME
df_all_sampled = df_all.sample(frac=sample_fraction, random_state=42)
df_old_sampled = df_old.sample(frac=sample_fraction, random_state=42)
df_young_sampled = df_young.sample(frac=sample_fraction, random_state=42)

# Display the first few rows of each sampled dataset as a check
print("Sampled df_all:")
display(df_all_sampled.head())

print("Sampled df_old:")
display(df_old_sampled.head())

print("Sampled df_young:")
display(df_young_sampled.head())

LIME is applied to the best models for each of the datasets 

In [None]:
# List of sampled datasets and their corresponding best models
# The best models are loaded from the saved files, but need to be picked by hand based on the previous results
sampled_datasets = {'df_all_sampled': df_all_sampled, 'df_old_sampled': df_old_sampled, 'df_young_sampled': df_young_sampled}
best_models = {
    'df_all_sampled': joblib.load('Lasso_model1_df_all.pkl'),
    'df_old_sampled': joblib.load('SVR_model2_df_old.pkl'),
    'df_young_sampled': joblib.load('Lasso_model2_df_young.pkl')
}

# Iterate through each sampled dataset and its corresponding best model
for name, dataset in sampled_datasets.items():
    print(f"Processing dataset: {name}")
    
    # Separate features (X) and target (y)
    X = dataset.drop(columns=["target"])  
    y = dataset["target"]  

    # Use the corresponding best model
    best_model = best_models[name]

    # Predict on the dataset
    y_pred = best_model.predict(X)

    # Calculate and display the mean squared error
    mse = mean_squared_error(y, y_pred)
    print(f"MSE for {name}: {mse}")

    # Scatter plot for visualization
    plt.scatter(y, y_pred, alpha=0.7)
    plt.plot([y.min(), y.max()], [y.min(), y.max()], '--', color='red')
    plt.xlabel("True Values")
    plt.ylabel("Predicted Values")

    # Update the title dynamically based on the dataset name
    dataset_title = "Full Dataset" if "df_all" in name else "Older Adults" if "df_old" in name else "Younger Adults"
    plt.title(f"True and Predicted Values for the Best Model of the {dataset_title}")

    plt.legend()
    plt.show()

    # Define the prediction function for LIME
    def predict_fn(X):
        return best_model.predict(X)

    # Initialize LIME Explainer
    explainer = LimeTabularExplainer(
        training_data=X.values,  # Training data (features)
        mode='regression',       # Regression task
        training_labels=y.values,  # Target values
        feature_names=X.columns.tolist(),  # Feature names
        discretize_continuous=True  
    )

    # Choose a data point to explain (e.g., the first row of the dataset)
    data_point_to_explain = X.iloc[0].values  # Feature vector of the data point

    # Explain the prediction for the chosen data point
    explanation = explainer.explain_instance(
        data_point_to_explain,  # Feature vector of the data point
        predict_fn,             # The model's prediction function
        num_features=27         # Number of features to include in the explanation (all in this case)
    )

    # Visualize the explanation
    explanation.show_in_notebook(show_table=True, show_all=False)

    #Save the explanation as an HTML file
    explanation.save_to_file(f'lime_explanation_{name}.html')

## Graph Attention Network
## Error Analysis
## Attention Weight Visualization
## Tracking of the Training

Importing the libraries

In [None]:
import numpy as np
import pandas as pd
import random
import pickle
import os
from scipy.stats import loguniform
import scipy
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import KFold
from joblib import Parallel, delayed

import torch
from torch.nn import Linear, BatchNorm1d
import torch.nn.functional as F
from torch_geometric.nn import GATConv, global_add_pool
from torch_geometric.data import Data
from torch_geometric.utils import dense_to_sparse
from torch_geometric.loader import DataLoader

import lightning.pytorch as pl
from lightning.pytorch.loggers import CSVLogger
from torchmetrics import MeanSquaredError
import matplotlib.pyplot as plt

import shap
import json
import seaborn as sns
import networkx as nx

The Graph Attention Network

Architecture:
Nodes: feature values
Edges: significant correlations
Nested cross-validation with 3 inner and 5 outer folds
Random search for hyperparameters
Loss values and attention weights are saved for later evaluation

This is the entire code for the network model. However, for executing the code it is more efficient to define the model in seperate functions and then execute those.

In [None]:

# Set random seed for reproducibility/same seed as in baselines
seed = 42

def save_model_and_metadata(model, scaler, edge_index, best_params, y_true, y_pred, dataset_name):
    # Ensure the directory exists
    save_dir = f'{dataset_name}_saved_models'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # Save the best model
    torch.save(model, os.path.join(save_dir, f'{dataset_name}_best_gat_model_full.pth'))
    
    # Save preprocessing info
    preprocessing_info = {"scaler": scaler, "edge_index": edge_index}
    with open(os.path.join(save_dir, f'{dataset_name}_preprocessing.pkl'), 'wb') as f:
        pickle.dump(preprocessing_info, f)
    
    # Save hyperparameters
    with open(os.path.join(save_dir, f'{dataset_name}_best_hyperparams.json'), 'w') as f:
        json.dump(best_params, f)
    
    # Save predictions
    predictions_info = {"y_true": y_true.tolist(), "y_pred": y_pred.tolist()}
    with open(os.path.join(save_dir, f'{dataset_name}_predictions.json'), 'w') as f:
        json.dump(predictions_info, f)

# Parallelized Pearson correlation calculation
def parallel_pearson_correlation(df):
    def calculate_pearson(col1, col2):
        return scipy.stats.pearsonr(df[col1], df[col2])[1]

    p_values = np.array(Parallel(n_jobs=-1)(
        delayed(calculate_pearson)(col1, col2) for col1 in df.columns for col2 in df.columns
    )).reshape(len(df.columns), len(df.columns))

    return pd.DataFrame(p_values, columns=df.columns, index=df.columns)

def create_graph_from_correlations(df, significance_level=0.05):
    corr_matrix = df.corr()
    p_values = parallel_pearson_correlation(df)  # Use the parallelized Pearson correlation
    significant_edges = (p_values < significance_level).astype(int)
    edge_index = dense_to_sparse(torch.tensor(significant_edges.values, dtype=torch.float))[0]
    return edge_index

#the dataset is made with a dataframe that contains the edges that should be included (based on the pearson correlation)
def build_dataset(X, y, edge_index):
    data_list = []
    for i in range(len(X)):
        x_tensor = torch.tensor(X.iloc[i].values, dtype=torch.float).unsqueeze(1)  # shape: [num_nodes, 1]
        y_tensor = torch.tensor([y.iloc[i]], dtype=torch.float)
        data = Data(x=x_tensor, edge_index=edge_index, y=y_tensor)
        data_list.append(data)
    return data_list

#model architecture
class GAT(pl.LightningModule):
    def __init__(self, learning_rate=1e-3, optimizer_name='Adam', dropout=None, num_heads=4, n_features=10, hidden_dim=None, n_gats=0, n_fcs=0): #Default values, but overridden in the nested cross-validation
        super(GAT, self).__init__()
        self.learning_rate = learning_rate
        self.optimizer_name = optimizer_name

        # First GAT layer: multihead attention, then combined
        self.gat1 = GATConv(in_channels=n_features, out_channels=hidden_dim, heads=num_heads, concat=True, dropout=dropout)
        self.bn1 = BatchNorm1d(hidden_dim * num_heads)

        # Second GAT layer
        self.gat2 = GATConv(in_channels=hidden_dim * num_heads, out_channels=hidden_dim, heads=1, concat=False, dropout=dropout)
        self.bn2 = BatchNorm1d(hidden_dim)

        # Additional GAT layers (as tuning parameter)
        self.gats = torch.nn.ModuleList()
        for i in range(n_gats):
            self.gats.append(GATConv(in_channels=hidden_dim, out_channels=hidden_dim, heads=1, concat=False, dropout=dropout))
        
        # Fully connected layers: now combined into one regression output
        self.fc1 = Linear(hidden_dim, hidden_dim)

        # Additional fully connected layer (as tuning parameter)
        self.fcs = torch.nn.ModuleList()
        self.fcs_bns = torch.nn.ModuleList()
        for i in range(n_fcs):
            self.fcs.append(Linear(hidden_dim, hidden_dim))
            self.fcs_bns.append(BatchNorm1d(hidden_dim))

        self.bn3 = BatchNorm1d(hidden_dim)
        self.fc2 = Linear(hidden_dim, 1)  # Regression output

        # Evaluation metrics
        self.train_mse = MeanSquaredError()
        self.val_mse = MeanSquaredError()
        self.test_mse = MeanSquaredError()

#The gat layers are applied, then pooled, dense layers, then output is given
    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        # First GAT layer with attention weights
        x, (edge_index1, attn_weights1) = self.gat1(x, edge_index, return_attention_weights=True)
        x = F.elu(x)
        x = self.bn1(x)

        x = F.elu(self.gat2(x, edge_index))
        x = self.bn2(x)

        # Apply additional GAT layers as hyperparameter
        for gat in self.gats:
            x = F.elu(gat(x, edge_index))
            x = self.bn2(x)

        x = global_add_pool(x, batch)
        x = F.relu(self.fc1(x))
        x = self.bn3(x)

        # Apply additional fully connected layers as hyperparameter
        for fc, bn in zip(self.fcs, self.fcs_bns):
            x = F.relu(fc(x))
            x = bn(x)

        out = self.fc2(x)  # No activation for regression

        return out, attn_weights1, edge_index1
    
#Optimizers and loss functions are defined here
    def configure_optimizers(self):
        if self.optimizer_name == 'Adam':
            optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        elif self.optimizer_name == 'RMSprop':
            optimizer = torch.optim.RMSprop(self.parameters(), lr=self.learning_rate)
        elif self.optimizer_name == 'SGD':
            optimizer = torch.optim.SGD(self.parameters(), lr=self.learning_rate)
        else:
            raise ValueError(f'Unsupported optimizer: {self.optimizer_name}')
        return optimizer

#mse is tracked during training, validation and testing
    def training_step(self, train_batch, batch_idx):
        y = train_batch.y
        output, attn_weights1, edge_index1 = self.forward(train_batch)
        output = output.squeeze()  # Ensure output is squeezed to match y shape
        loss = F.mse_loss(output, y)
        self.log('train_loss', loss, on_epoch=True, prog_bar=True, on_step=False)
        self.log('train_mse', self.train_mse(output, y), on_epoch=True, prog_bar=True, on_step=False)
        return loss

    def validation_step(self, val_batch, batch_idx):
        y = val_batch.y
        # print(f'{y.shape = }')
        output, attn_weights1, edge_index1 = self.forward(val_batch) # squeeze needed or not? Check if error occurs when not using it
        # In that case: change in each 'step' (training, validation, test, predict)
        output = output.squeeze()  # Ensure output is squeezed to match y shape
        # print(f'{output.shape = }')
        loss = F.mse_loss(output, y)
        self.log('val_loss', loss, on_epoch=True, prog_bar=True)
        self.log('val_mse', self.val_mse(output, y), on_epoch=True, prog_bar=True)
        return loss

    def test_step(self, test_batch, batch_idx):
        y = test_batch.y
        output, attn_weights1, edge_index1 = self.forward(test_batch)
        output = output.squeeze()  # Ensure output is squeezed to match y shape
        mse = self.test_mse(output, y)  # Calculate MSE
        self.log('test_mse', mse, on_epoch=True, prog_bar=True)  # Log MSE
        return mse

    def predict_step(self, batch):
        y_hat, attn_weights1, edge_index1 = self.forward(batch)
        y_hat = y_hat.squeeze()
        return y_hat, attn_weights1, edge_index1 #attention weights and edge index are returned for further analysis of gat layer 

#Nested crossvalidation: inner for hyperparameters (3), outer for evaluation (5)(same as for baselines)
#The different hyperparameter combinations for the dropout, hidden dimensions, etc. are looped over
#Epochs are set to 10, but can be changed
def nested_cross_validation(X, y, edge_index, param_grid, n_epochs=10, outer_splits=5, inner_splits=3, dataset_name="dataset"):
    '''
    Performs nested cross-validation for hyperparameter tuning and GAT model evaluation.
    params:
    X: Feature DataFrame
    y: Target Series
    edge_index: Edge index for graph structure
    param_grid: List of dictionaries with hyperparameter combinations
    n_epochs: Number of epochs for training 
    outer_splits: Number of outer folds for cross-validation
    inner_splits: Number of inner folds for cross-validation
    dataset_name: Name of the dataset for logging purposes
    '''
    outer_cv = KFold(n_splits=outer_splits, shuffle=True, random_state=seed) 
    inner_cv = KFold(n_splits=inner_splits, shuffle=True, random_state=seed)  

    outer_mae_scores, outer_mse_scores, outer_rmse_scores, outer_r2_scores = [], [], [], []

    best_overall_model = None
    best_overall_score = float("inf")  
    best_params_list = []

    attention_weights1_per_fold = []
    edge_idx1_per_fold = []

    for fold_idx, (trainval_idx, test_idx) in enumerate(outer_cv.split(X, y)):
        X_trainval, X_test = X.iloc[trainval_idx], X.iloc[test_idx]
        y_trainval, y_test = y.iloc[trainval_idx], y.iloc[test_idx]
    
        best_val_loss = float("inf")
        best_params = None
        best_model = None

        avg_val_losses = []
        for params in param_grid:
            val_losses = []

            for inner_train_idx, inner_val_idx in inner_cv.split(X_trainval, y_trainval):
                X_inner_train = X_trainval.iloc[inner_train_idx]
                y_inner_train = y_trainval.iloc[inner_train_idx]
                X_inner_val = X_trainval.iloc[inner_val_idx]
                y_inner_val = y_trainval.iloc[inner_val_idx]

                #Scale the features using StandardScaler
                #Scaling the features seperately for the different folds to avoid data leakage
                #Then same scaler is used to transform the validation set
                scaler = StandardScaler()
                X_inner_train_scaled = scaler.fit_transform(X_inner_train)
                X_inner_val_scaled = scaler.transform(X_inner_val)

                # Convert back to pandas DataFrame
                X_inner_train_scaled = pd.DataFrame(X_inner_train_scaled, columns=X.columns)
                X_inner_val_scaled = pd.DataFrame(X_inner_val_scaled, columns=X.columns)

                # Build datasets
                train_dataset = build_dataset(X_inner_train_scaled, y_inner_train, edge_index)
                val_dataset = build_dataset(X_inner_val_scaled, y_inner_val, edge_index)

                # Create DataLoader for training and validation sets
                train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
                val_loader = DataLoader(val_dataset, batch_size=len(X_inner_val_scaled))

                # Create the model with the current hyperparameters
                model = GAT(
                    learning_rate=params["learning_rate"],
                    optimizer_name=params["optimizer_name"],
                    dropout=params["dropout"],
                    num_heads=4,
                    n_features=1, #This is the number of features per node, which is 1 in this case (the value of the construct)
                    hidden_dim=params["hidden_dim"],
                    n_gats=params["n_gats"],
                    n_fcs=params["n_fcs"]
                )

                
                #train inner loops
                trainer = pl.Trainer(max_epochs=n_epochs, accelerator="auto", enable_progress_bar=False, logger=False, enable_checkpointing = False)
                trainer.fit(model, train_loader, val_loader)
                val_result = trainer.validate(model, dataloaders=val_loader)
                val_loss = val_result[0]['val_loss']
                # Only add non-NaN losses (bug)
                if not np.isnan(val_loss):
                    val_losses.append(val_loss)
                else:
                    print(f"⚠️ Skipping config due to NaN loss: {params}")

            # Calculate the average validation loss for this hyperparameter combination
            avg_val_loss = np.mean(val_losses)
            avg_val_losses.append(avg_val_loss)
            print(f"Average inner validation MSE for params {params}: {avg_val_loss:.4f}")

        #Find best hyperparameters
        avg_val_losses = np.array(avg_val_losses)
        best_index = np.argmin(avg_val_losses)  # Use argmin to find the lowest validation loss
        best_params = param_grid[best_index]  # Extract the best parameters (dict)
        best_params_list.append(best_params)  # Store the best parameters for this fold
        print(f"Best parameters for fold {fold_idx + 1}: {best_params}")

        #change the parameters to those of the best model
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        best_model = GAT(
            learning_rate=best_params["learning_rate"],
            optimizer_name=best_params["optimizer_name"],
            dropout=best_params["dropout"],
            num_heads=4,
            n_features=1,
            hidden_dim=best_params["hidden_dim"],
            n_gats=best_params["n_gats"],
            n_fcs=best_params["n_fcs"]
        )

        # apply the scalar to the training and test set to avoid data leakage
        scaler = StandardScaler()
        X_trainval_scaled = scaler.fit_transform(X_trainval)
        X_test_scaled = scaler.transform(X_test)

        # Convert back to pandas DataFrame
        X_trainval_scaled = pd.DataFrame(X_trainval_scaled, columns=X.columns)
        X_test_scaled = pd.DataFrame(X_test_scaled, columns=X.columns)
        
        # Build datasets and create DataLoader for training and test sets
        trainval_dataset = build_dataset(X_trainval_scaled, y_trainval, edge_index)
        trainval_loader = DataLoader(trainval_dataset, batch_size=32, shuffle=True)
        test_dataset = build_dataset(X_test_scaled, y_test, edge_index)
        test_loader = DataLoader(test_dataset, batch_size=len(X_test_scaled), shuffle=False)

        # train outer loops
        # logger to show mse during training later on
        logger = CSVLogger("logs", name=f'{dataset_name}') # log results to csv file
        trainer = pl.Trainer(max_epochs=n_epochs, accelerator="auto", enable_progress_bar=False, logger=logger, enable_checkpointing = False, gradient_clip_val=1.0, gradient_clip_algorithm="norm") # gradient clipping to prevent exploding gradients (bug)
        trainer.fit(best_model, trainval_loader, test_loader)

        # Get output from the model (this includes the preds and attention weights)
        output_model = trainer.predict(best_model, test_loader)
        preds = [out[0] for out in output_model]  # Extract predictions from the output
        attn_weights1 = [out[1] for out in output_model]
        edge_index1 = [out[2] for out in output_model]
        attention_weights1_per_fold.append(attn_weights1)
        edge_idx1_per_fold.append(edge_index1)
    
        scaler = StandardScaler()
        scaler.fit(y_trainval.to_numpy().reshape(-1, 1))  # Convert Series to NumPy array and reshape
        # Now inverse transform your predictions:
        #make numpy array of preds
        preds = np.array(preds)  # Convert to NumPy array
        preds = scaler.inverse_transform(preds.reshape(-1, 1))

        preds = [torch.tensor(p, dtype=torch.float32) for p in preds]  # Convert to PyTorch tensors
        # Concatenate predictions into a single array (bug)
        preds = torch.cat([p.view(-1) for p in preds]).detach().cpu().numpy()  # Flatten predictions
       
        # Ensure y_test is also a NumPy array (bug)
        y_test = y_test.to_numpy()

        # Debugging: Print shapes to verify alignment (bug)
        print(f"Shape of preds: {preds.shape}")
        print(f"Shape of y_test: {y_test.shape}")

        # Check if shapes match (bug)
        if preds.shape[0] != y_test.shape[0]:
            raise ValueError(f"Shape mismatch: preds has {preds.shape[0]} samples, but y_test has {y_test.shape[0]} samples.")

        # Calculate metrics
        mae = mean_absolute_error(y_test, preds)
        mse = mean_squared_error(y_test, preds)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_test, preds)

        outer_mae_scores.append(mae)
        outer_mse_scores.append(mse)
        outer_rmse_scores.append(rmse)
        outer_r2_scores.append(r2)

        #create directory to save the models if it does not exist yet
        if not os.path.exists(f'{dataset_name}_saved_models'):
            os.makedirs(f'{dataset_name}_saved_models')

        # Save the best model for the current fold
        # Save the model in a specific folder for each fold
        best_model_path = f'{dataset_name}_saved_models/{dataset_name}_fold_{fold_idx + 1}_best_model.pth'
        torch.save(best_model.state_dict(), best_model_path) #Now they do not overwrite each other
        # Save the entire model object 
        print(f"Model for fold {fold_idx + 1} saved to best_fold_model.pth")

        # Track the best overall model across all folds
        if mse < best_overall_score:  
            best_overall_score = mse
            best_model = best_model
            scaler = scaler  # Save the scaler for the best model
            best_params = best_params  # Save the best parameters for the best model
            y_pred = preds # Save the predictions for the best model

    
    # Call save_model_and_metadata to save the best model and metadata
    save_model_and_metadata(
        model=best_model,
        scaler=scaler,
        edge_index=edge_index,
        best_params=best_params,
        y_true=y_test,
        y_pred=preds,
        dataset_name=dataset_name
    )

    # Save the attention weights
    with open(f"{dataset_name}_all_attention_weights.pkl", "wb") as f:
        pickle.dump({
            "attention_weights1": attention_weights1_per_fold,
            "edge_indices1": edge_idx1_per_fold,
        }, f)

    # Calculate average and standard deviation of the error metrics across folds
    avg_mae = np.mean(outer_mae_scores)
    std_mae = np.std(outer_mae_scores)
    avg_mse = np.mean(outer_mse_scores)
    std_mse = np.std(outer_mse_scores)
    avg_rmse = np.mean(outer_rmse_scores)
    std_rmse = np.std(outer_rmse_scores)
    avg_r2 = np.mean(outer_r2_scores)
    std_r2 = np.std(outer_r2_scores)
    
    # Display the average and standard deviation of the error metrics
    print(f"\nOuter Loop Results:")
    print(f"Average MAE: {avg_mae:.4f} ± {std_mae:.4f}")
    print(f"Average MSE: {avg_mse:.4f} ± {std_mse:.4f}")
    print(f"Average RMSE: {avg_rmse:.4f} ± {std_rmse:.4f}")
    print(f"Average R²: {avg_r2:.4f} ± {std_r2:.4f}")
    print(f"Best hyperparameters: {best_params}")


    return outer_mae_scores, outer_mse_scores, outer_rmse_scores, outer_r2_scores, best_params_list, attention_weights1_per_fold, edge_idx1_per_fold

def save_split_indices(split_indices, filename):
    with open(filename, 'wb') as f:
        pickle.dump(split_indices, f)

def load_split_indices(filename):
    with open(filename, 'rb') as f:
        return pickle.load(f)

# Load the datasets
datasets = {
    "df_all": df_all,
    "df_old": df_old,
    "df_young": df_young
}


search_space = {
    "dropout": [0.0, 0.2, 0.4, 0.6, 0.8],
    "hidden_dim": [16, 32, 64, 128],
    "learning_rate": loguniform(1e-5, 1e-2), # Adjusted to a smaller range (otherwise bug emerges)
    # "learning_rate": loguniform(1e-5, 1e-1), # Original range
    "optimizer_name": ['Adam', 'SGD', 'RMSprop']
}

def generate_random_param_grid(n_samples=50, seed=seed):
    random.seed(seed)  # Set the seed for reproducibility
    random_param_grid = []
    for _ in range(n_samples):
        params = {
            "dropout": random.choice(search_space["dropout"]),
            "hidden_dim": random.choice(search_space["hidden_dim"]),
            "learning_rate": search_space["learning_rate"].rvs(random_state=seed),  # Pass seed for reproducibility
            "optimizer_name": random.choice(search_space["optimizer_name"]),
            "n_gats": random.randint(0, 3),  # Randomly choose number of GAT layers (0 to 3)
            "n_fcs": random.randint(0, 6)    # Randomly choose number of FC layers (0 to 6)
        }
        random_param_grid.append(params)
    return random_param_grid

param_grid = generate_random_param_grid(n_samples=50, seed=seed)
print("Randomly selected hyperparameter combinations:")
for i, p in enumerate(param_grid):
    print(f"  Set {i+1}: {p}")


#executes pipeline: load data, create graph from correlations, split data, train model with nested cv, evaluate model
# Iterate through each dataset
results_dict = {}
for dataset_name, dataset in datasets.items():
    print(f"\nProcessing dataset: {dataset_name}")
    
    # Separate features (X) and target (y)
    X = dataset.drop(columns=["target"])  
    y = dataset["target"] 
    
    # Create the graph structure from correlations
    edge_index = create_graph_from_correlations(X)
    
    # Perform nested cross-validation
    outer_mae_scores, outer_mse_scores, outer_rmse_scores, outer_r2_scores, best_params_list, attention_weights1_per_fold, edge_idx1_per_fold = nested_cross_validation(
        X, y, edge_index, param_grid, n_epochs=25, outer_splits=5, inner_splits=3, dataset_name=dataset_name
    )

    # Saves the scores in a dictionary per dataset
    results_dict[dataset_name] = {
        "outer_mae_scores": outer_mae_scores,
        "outer_mse_scores": outer_mse_scores,
        "outer_rmse_scores": outer_rmse_scores,
        "outer_r2_scores": outer_r2_scores,
        "best_params_list": best_params_list
    }


Error analysis for the three datasets: true versus predicted values

In [None]:
# List of datasets and their corresponding titles
datasets = {
    "df_all": "Full Dataset",
    "df_old": "Older Adults",
    "df_young": "Younger Adults"
}

# Iterate through each dataset
for dataset_name, dataset_title in datasets.items():
    print(f"\nProcessing dataset: {dataset_title}")
    
    # Load model
    best_model = torch.load(f'{dataset_name}_saved_models/{dataset_name}_best_gat_model_full.pth', weights_only=False)
    best_model.eval()

    # Load preprocessing info
    with open(f'{dataset_name}_saved_models/{dataset_name}_preprocessing.pkl', 'rb') as f:
        preprocessing_info = pickle.load(f)
    scaler = preprocessing_info["scaler"]
    edge_index = preprocessing_info["edge_index"]

    # Load predictions
    with open(f'{dataset_name}_saved_models/{dataset_name}_predictions.json', 'r') as f:
        predictions_info = json.load(f)
    y_true = np.array(predictions_info["y_true"])
    y_pred = np.array(predictions_info["y_pred"])

    # Plot actual vs. predicted values
    plt.scatter(y_true, y_pred, alpha=0.7)
    plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], '--', color='red')
    plt.xlabel("True Values")
    plt.ylabel("Predicted Values")
    plt.title(f"True and Predicted Values for {dataset_title} of the GAT Model")
    plt.show()

Find the fold with the most accurate predictions to extract the attention weights from

In [None]:
# Find the best fold based on the lowest MSE
best_fold_index = np.argmin(outer_mse_scores)

# Print details of the best fold
print(f"Best Fold: Fold {best_fold_index + 1}")
print(f"  MAE: {outer_mae_scores[best_fold_index]:.4f}")
print(f"  MSE: {outer_mse_scores[best_fold_index]:.4f}")
print(f"  RMSE: {outer_rmse_scores[best_fold_index]:.4f}")
print(f"  R²: {outer_r2_scores[best_fold_index]:.4f}")

Visualizing the attention weights in a heatmap, from which the most important attention weights are extracted to create a graph visualisation.
(No iteration over the different datasets, so this has to be done by hand)

In [None]:
# Load attention weights from the specified path
dataset_name = "df_old"  # Change this to the dataset to visualize 
attention_weights_path = f"{dataset_name}_all_attention_weights.pkl"

with open(attention_weights_path, "rb") as f:
    attention_data = pickle.load(f)

#CHANGE THIS TO THE FOLD TO VISUALIZE
# Extract the attention weights for the 4th fold (index 3) <- depends on the fold you want to visualize
attention_weights_fold5 = attention_data["attention_weights1"][3]  # 4th fold

# Convert to a single tensor (averaging across all samples in this fold)
# It is thus also possible to visualize the attention weights for one sample in the fold to uncover individual pathways
attn_matrices = []
for attn in attention_weights_fold5:
    attn_tensor = attn.cpu().detach().numpy()  # Ensure the tensor is detached and converted to numpy
    attn_matrices.append(attn_tensor)

# Average attention weights across all graphs (samples) in this fold
attn_avg = np.mean(np.stack(attn_matrices), axis=0)  # Shape: [num_edges, num_heads]

# If multiple heads, average across heads too
if attn_avg.ndim == 2:
    attn_avg = attn_avg.mean(axis=1)

# Build heatmap matrix
num_nodes = edge_index.max() + 1  # Correctly determine the number of nodes
heatmap_matrix = np.zeros((num_nodes, num_nodes))

# Fill the heatmap with averaged attention values
for i, (src, tgt) in enumerate(zip(edge_index[0], edge_index[1])):
    heatmap_matrix[src, tgt] = attn_avg[i]

plt.figure(figsize=(12, 10))
sns.heatmap(
    heatmap_matrix,
    cmap="viridis",
    square=True,
    xticklabels=column_names,
    yticklabels=column_names,
    cbar_kws={"label": "Impact Relationship on Stress Recovery"}
)
plt.title(f"Importance Attention Weights in Older Adults") #change based on the dataset
plt.xlabel("Target Node")
plt.ylabel("Source Node")
plt.xticks(rotation=45, ha='right') 
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()


# Define the number of top attention weights to visualize
top_k = 10

# Flatten the heatmap matrix and get the indices of the top-k attention weights
flat_indices = heatmap_matrix.flatten().argsort()[-top_k:][::-1]
top_edges = [(i // heatmap_matrix.shape[1], i % heatmap_matrix.shape[1]) for i in flat_indices]
top_weights = [heatmap_matrix[src, tgt] for src, tgt in top_edges]

# Create a directed graph
G = nx.DiGraph()

# Add edges with weights to the graph
for (src, tgt), weight in zip(top_edges, top_weights):
    G.add_edge(column_names[src], column_names[tgt], weight=weight)

# Draw the graph
plt.figure(figsize=(15, 10))
pos = nx.spring_layout(G, seed=42)  # Layout for better visualization,
nx.draw(
    G, pos, with_labels=True, node_size=8000, node_color="lightblue", font_size=11, font_weight="bold", edge_color="gray"
)
edge_labels = nx.get_edge_attributes(G, 'weight')
nx.draw_networkx_edge_labels(
    G, pos, edge_labels={k: f"{v:.2f}" for k, v in edge_labels.items()}, font_size=11, font_color="black", font_weight="bold"	
)
plt.title("Graph of Most Important Attention Weights for Older Adults")
plt.show()

This function plots the training curves to gain insight into how the model is learning (e.g. over/underfitting)

In [None]:
def plot_training_curves(logs_path, outer_mse_scores, best_params_list, dataset_name):
    """
    Function to plot the training curve for the best outer fold model.
    logs_path: Path to the folder containing the training logs.
    outer_mse_scores: List of MSE scores for each outer fold.
    best_params_list: List of best parameters for each outer fold.
    dataset_name: Name of the dataset used for training.
    """
    best_fold_idx = np.argmin(outer_mse_scores)  # Get the index of the best fold based on MSE
    print(f"Best outer fold index: {best_fold_idx}")

    # print the hyperparameters for the best fold
    best_params = best_params_list[best_fold_idx]
    print(f"Best hyperparameters for fold {best_fold_idx + 1}:")
    print(best_params)
    
    # print training curve 
    curve_path = os.path.join(logs_path, f'version_{best_fold_idx}', 'metrics.csv')
    print(f"Training curve path: {curve_path}")
    logged_metrics = pd.read_csv(curve_path)

    # Filter out the test metrics
    train_indices = logged_metrics[logged_metrics['train_loss'].notna()]
    val_indices = logged_metrics[logged_metrics['val_loss'].notna()]

    # Create a figure with two subplots
    fig, axes = plt.subplots(2, 1, figsize=(10, 12))

    # Plot the mse score in the first subplot
    axes[0].plot(train_indices['epoch'], train_indices['train_mse'], label='Train MSE')
    axes[0].plot(val_indices['epoch'], val_indices['val_mse'], label='Validation MSE')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('MSE')
    axes[0].set_title(f'{dataset_name} MSE for Best Fold (fold {best_fold_idx + 1})')
    axes[0].legend()
    axes[0].grid(True)

    # Plot the loss in the second subplot
    axes[1].plot(train_indices['epoch'], train_indices['train_loss'], label='Train Loss')
    axes[1].plot(val_indices['epoch'], val_indices['val_loss'], label='Validation Loss')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Loss')
    axes[1].set_title(f'{dataset_name} Loss for Best Fold (fold {best_fold_idx + 1})')
    axes[1].legend()
    axes[1].grid(True)

    # Adjust layout
    plt.tight_layout()
    plt.show()
    
    return

The function to plot the training curves is applied to the three different models. 

In [None]:
# plot the training curves for the best outer fold model for each dataset (To do: loop over other datasets)
for dataset_name, results in results_dict.items():
    # print the average scores and the stdev for each dataset for the outer folds (same way as baselines)
    print(f"\nScores for {dataset_name}:")
    print(f"Average MSE: {np.mean(results['outer_mse_scores']):.4f} ± {np.std(results['outer_mse_scores']):.4f}")
    print(f"Average MAE: {np.mean(results['outer_mae_scores']):.4f} ± {np.std(results['outer_mae_scores']):.4f}")
    print(f"Average RMSE: {np.mean(results['outer_rmse_scores']):.4f} ± {np.std(results['outer_rmse_scores']):.4f}")
    print(f"Average R²: {np.mean(results['outer_r2_scores']):.4f} ± {np.std(results['outer_r2_scores']):.4f}")

        
    plot_training_curves(
        logs_path = rf'C:\Users\Beheerder\Documents\GitHub\Thesis_Datascience\logs\{dataset_name}',
        outer_mse_scores=results["outer_mse_scores"],
        best_params_list=results["best_params_list"],
        dataset_name=dataset_name
    )