# Statistical Tests

This notbeook includes all statistical tests for the analysis.

- Correlation Test
- Mann-Whitney-U test
- Wilcoxon
- Linear Mixed Models (LMM)

In [None]:
#  All imports
import sys, glob, json, re
from pathlib import Path
import numpy as np
import pandas as pd
import neurokit2 as nk
from tqdm import tqdm
from scipy.stats import pearsonr, spearmanr
from scipy import stats
from scipy.stats import shapiro
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import math
import statsmodels.formula.api as smf
from scipy.stats import probplot
import seaborn as sns
import os
import itertools
#Imports for Predicition 
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

import warnings
warnings.filterwarnings("ignore")

#### Loading data

Loading csv data files with all necessary informations

In [None]:
#folder with all results
OUTDIR = Path("/Users/johannanagel/Masterarbeit_Programming/output_postfeedback")
#folder with the dataframes of the preparation files (market mechanism)
RESULTS = Path("/Users/johannanagel/Masterarbeit_Programming/output_postfeedback/Main_Part/DataFrames")
STAT_READY = RESULTS / "statistical_analysis_ready.csv"
SCHEMA = RESULTS /"schema_results.json"
#folder for the results of the analyis
TESTS_RESULT = Path("/Users/johannanagel/Masterarbeit_Programming/output_postfeedback/Main_Part/Statistical_Test")
os.makedirs(TESTS_RESULT, exist_ok = True)
FULL_RESULTS =    RESULTS / "all_data_combined.csv"


DEMOGRAPHICS = RESULTS/ "demographics_participant.csv"

In [None]:
# Loading data
try: 
    #reading .csv file which represent the data which is ready for the analysis
    df_all = pd.read_csv(STAT_READY)
    df_demo = pd.read_csv(DEMOGRAPHICS)
    df_full = pd.read_csv(FULL_RESULTS)

    with open(SCHEMA) as f:
        schema = json.load(f)

    for c, meta in schema.items():
        df_all[c] = pd.Categorical(df_all[c], categories=meta["categories"], ordered=meta["ordered"])

    df_all.sort_values(["session", "participant", "round"], ascending = True)
except ValueError as err:
    print(f"Something went wrong with reading the file: {err}")

In [None]:
#adding new data (forgot mispricing data) to the analysis frame
df_mispricing = df_full.copy()
df_mispricing = df_mispricing[["session","round", "rd_Smith_categorization", "Category_SMITH", "Category_Guisty", "Category_KIRCHLER", "Category_Guisty_round", "rd_Kirchler_categorization"]].drop_duplicates()

df_all = pd.merge(df_all,df_mispricing, on = ["session", "round"], how = "left")

In [None]:
#check for nan values 
print("Columns with nan values: ")
df_all.isna().sum()[df_all.isna().sum()>0]

In [None]:
#check mumber of categories per window
df_all["window_type"] = df_all["window_type"].fillna("none")
df_all["window_type"].value_counts() 

In [None]:
#check datatypes
print("Checking datatypes")
df_all.dtypes

## Descriptive Analysis

This chapter includes discriptive analysis steps (such as Shapiro nomarlization, or data summary of each column)

In [None]:
# Variables with columns based on market mechanisms and EDA
GENERAL_COL = ["session", "round", "participant", "price", "dividend", "interest", "stocks"]
EDA_COLUMNS = ["window_type", "scl_mean", "scl_std", "scr_auc", "scr_amplitude", "scr_per_sec", "scr_count", "signal_range", "signal_std"]
FV_COL = ["FV_KIRCHLER", "FV_GUISTY", "FV_SMITH", "rd_Guisty", "rd_Smith", "rd_Kirchler", "rad_Guisty", "rad_Smith", "rad_Kirchler", "rd_GUISTY_per_round", "rd_Smith_per_round", "rd_Kirchler_per_round", "rad_GUISTY_per_round", "rad_Smith_per_round", "rad_Kirchler_per_round"]
VOL_COL = ["V_t", "MAD_t", "AMAD_t"]

In [None]:
# Calculating number of participants per session and the number of observations
session_overview = df_all.groupby("session")["participant"].nunique().reset_index().copy()

session_overview.columns = ["session", "Number_participant"]
obs_per_session = df_all.groupby("session").size().reset_index(name="Number_observations").copy()

# grouping number of unique participants
session_stats = (
    df_all.groupby("session")
      .agg(
          Number_participant=("participant", "nunique"),
          Number_observations=("participant", "count")
      )
      .reset_index()
)

numb_session = df_all.session.nunique()
numb_participant = df_all.participant.nunique()
number_observations = len(df_all)
print(number_observations)

number = pd.DataFrame({"session": [f"Total: {numb_session}"] , "Number_participant": numb_participant, "Number_observations": number_observations})

# Saving the dataframe in a latex table
session_stats = pd.concat([session_stats, number], ignore_index=True)
path = TESTS_RESULT / "session_overview.tex"
session_stats.to_latex(
    path,
    index=False,
    caption="Session Overview with number of paricipants",
    label="tab:session_overview"
)
print(f"saved in: {path}")
session_stats


In [None]:
# Describe data values of the general columns
stat_general = df_all[GENERAL_COL].describe().T
stat_general = stat_general.round(3)

stat_general = stat_general.reset_index()
path = TESTS_RESULT / "Description_General_Variables.tex"
df_clean = stat_general.astype(str)
# Saving as a latex file
df_clean.to_latex(
    path,
    index=False,
    caption="Description_General_Variables",
    label="tab:description_General_Variables"
)

print(f"saved in {path}")

stat_general

In [None]:
# Describe data values of the EDA columns
stats_EDA = df_all[EDA_COLUMNS].describe().T.round(3)

path = TESTS_RESULT / "Description_EDA.tex"
df_clean_eda = stats_EDA.astype(str)
# Saving table
df_clean_eda.to_latex(
    path,
    index=False,
    caption="Description EDA Variables",
    label="tab:description_EDA"
)

print(f"saved in {path}")
stats_EDA

In [None]:
# Describe data values of the volatility columns
stats_VOL = df_all[VOL_COL].describe().T.round(3)
path = TESTS_RESULT / "Description_VOL.tex"
df_clean_VOL = stats_VOL.astype(str)

# Saving latex file
df_clean_VOL.to_latex(
    path,
    index=False,
    caption="Description VOL Variables",
    label="tab:description_VOL"
)

print(f"saved in {path}")

stats_VOL

In [None]:
#Describe data values of the FV columns
stats_FV = df_all[FV_COL].describe().T.round(3)

path = TESTS_RESULT / "Description_FV.tex"
df_clean_FV = stats_FV.astype(str)
df_clean_FV.to_latex(
    path,
    index=False,
    caption="Description FV Variables",
    label="tab:description_FV"
)

print(f"saved in {path}")
stats_FV

In [None]:
#drop nan values
df_all = df_all.dropna(subset=["V_t", "MAD_t", "AMAD_t", "Vol_Classification_Global", "Vol_Classification_between_session"])

### Testing for Normal distribution of Attributes with Shapiro

H0: The data is normal distributed
<br>H1: Data is not normal distributed

- if p < 0.05: Reject H0 (data is not normal distributed)  => Support H1
- if p >= 0.05: No rejection of HO

In [None]:
#splitting the columns of the data in numeric columns and categorical columns
#only colums with numeric values
numeric_cols = df_all.select_dtypes(include=[np.number]).columns.tolist()
#categorical variables
categorical_cols = df_all.select_dtypes(exclude=[np.number]).columns.tolist()
print("\nNumerical Cols:")
print(numeric_cols)

print("\nCategorical Cols:")
print(categorical_cols)

In [None]:
def safe_shapiro(x):
    """Shapiro function for testing normal distribution """
    x = pd.to_numeric(x, errors="coerce").dropna()
    if len(x) < 3 or np.isclose(x.std(), 0):
        return np.nan
    try:
        return shapiro(x)[1]
    except Exception:
        return np.nan

In [None]:
# Apply for each numeric columns the shapiro function
try: 
  results = []
  for column in numeric_cols:
    if column in ["participant", "session", "round"] :
      continue
    else: 
      #apply it for each session
      result = (
          df_all.groupby([ "session"])[column]
            .apply(safe_shapiro)
            .reset_index(name="p_value")
      )
      result["variable"] = column
      results.append(result)
  result_normal_distributed_shapiro_session = pd.concat(results, ignore_index=True)
  result_normal_distributed_shapiro_session = result_normal_distributed_shapiro_session[[ "session", "variable", "p_value"]]

  table_shapiro_session = result_normal_distributed_shapiro_session.pivot(index="session", columns="variable", values="p_value")
  table_shapiro_session = table_shapiro_session.sort_index()
  


  #### check if the results include values with p> 0.05
  

  display(table_shapiro_session)
  print(f"If participants have normal distributed values: \n")
  h0_no_rejection_session = result_normal_distributed_shapiro_session[result_normal_distributed_shapiro_session["p_value"] >= 0.05]
  print(h0_no_rejection_session)

except ValueError as err:
  print(f"Something went wrong during the test of shapiro {err}")

In [None]:
result_normal_distributed_shapiro_session

In [None]:
try: 
  results = []
  for column in numeric_cols:
    if column in ["participant", "session", "round"] :
      continue
    else: 
      #apply for each participant
      result = (
          df_all.groupby(["participant", "session"])[column]
            .apply(safe_shapiro)
            .reset_index(name="p_value")
      )
      result["variable"] = column
      results.append(result)
  result_normal_distributed_shapiro = pd.concat(results, ignore_index=True)
  result_normal_distributed_shapiro = result_normal_distributed_shapiro[["participant", "session", "variable", "p_value"]]
  table_shapiro_session = result_normal_distributed_shapiro.pivot(index="participant", columns="variable", values="p_value").round(3)
  display(table_shapiro_session)


  #### check if the results include values with p> 0.05
  h0_no_rejection = result_normal_distributed_shapiro[result_normal_distributed_shapiro["p_value"] >= 0.05]
  print(f"If participants have normal distributed values: \n{h0_no_rejection}")

except ValueError as err:
  print(f"Something went wrong during the test of shapiro {err}")

In [None]:
#saving normal distributed variables as a latex file

path_latex_shapiro = TESTS_RESULT / "normal_distributed_varibales.tex"
latex_str = h0_no_rejection.to_latex(index = False, caption = "Shaprio Test - No rejection possible", label = "tab:shapiro_normal_dis", float_format = "%.3f")
with open(path_latex_shapiro, "w", encoding= "utf-8") as f:
    f.write(latex_str)

if path_latex_shapiro.exists:
    print(f"saved in {path_latex_shapiro}")
else:
        print("‚ùå file was NOT saved")


In [None]:
## Plotting Histogram and Q-Q Plot for assessing the normal distribution
for idx, row in h0_no_rejection.iterrows():
    name = row["participant"]
    var = row["variable"]
    fig, axes = plt.subplots(1,2, figsize=(8,4))
    fig.suptitle(f"{name} ‚Äî {var}", fontsize=12, fontweight="bold")
    sns.histplot(df_all[df_all["participant"] ==name][var], kde=True, ax=axes[0], color="skyblue")
    axes[0].set_title("Histogramm", fontsize=12)
    axes[0].set_xlabel(var)
    axes[0].set_ylabel("Frequency")

    probplot(df_all[df_all["participant"] ==name][var].dropna(), dist="norm", plot=axes[1])
    axes[1].set_title("Q‚ÄìQ-Plot", fontsize=12)
    plt.tight_layout()
    plt.show()
    path = TESTS_RESULT / f"histogram_qq_{name}.png"
    fig.savefig(path)

    if path.exists():
        print(f"saved in {path}")
    else:
        print("‚ùå file was NOT saved")

Result of the the Spapiro test => the data is not normal distributed

### Correlation - Heatmap (global, session, participant)

Plotting the correlation heatmap with Spearman correlation tests

In [None]:
numeric_cols = df_all.select_dtypes(include=["number"]).columns.drop(
    ["interest", "dividend", "FV_SMITH", "rd_Kirchler", "rd_Guisty", "rd_Smith","rad_Kirchler", "rad_Guisty", "rad_Smith"]
)

corr_spearman = df_all[numeric_cols].corr(method="spearman")

# Heatmap of the correlation (global)
fig = plt.figure(figsize = (len(numeric_cols)*0.6, len(numeric_cols)*0.6))
sns.heatmap(corr_spearman,  cmap="coolwarm", center=0, annot = True, fmt = ".2f",linewidths=0.5, square=True, cbar_kws={"shrink": 0.7} )
plt.title("Spearman Correlation Matrix", fontsize=16, pad=12, fontweight="bold")

#Saving heatmao as a figure 
FOLDER_HEATMAP_CORRELATION = TESTS_RESULT / "HEATMAP_CORRELATION"
os.makedirs(FOLDER_HEATMAP_CORRELATION, exist_ok=True)

save_path = FOLDER_HEATMAP_CORRELATION / "heatmap_corr_global.png"
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches="tight")
if os.path.exists(save_path):
    print(f"‚úÖ plot was saved under: {save_path}")
else: 
    print(f"‚ùå plot was not saved")
plt.show()
plt.close()


In [None]:
# Correlation session-wise and participant wise
for group in ["session", "participant"]:
    print(f"Correlation within {group}")
    for pid, df_p in df_all.groupby(group):
        numeric_cols = df_all.select_dtypes(include=["number"]).columns.drop(["interest", "dividend", "FV_SMITH", "rd_Kirchler", "rd_Guisty", "rd_Smith","rad_Kirchler", "rad_Guisty", "rad_Smith", "final_earnings_mean"])
        corr_spearman = df_p[numeric_cols].corr(method="spearman")


        plt.figure(figsize = (len(numeric_cols)*0.6, len(numeric_cols)*0.6))
        sns.heatmap(corr_spearman,  cmap="coolwarm", center=0, annot = True, fmt = ".2f",linewidths=0.5, square=True, cbar_kws={"shrink": 0.7} )
        plt.title(f"Spearman Correlation Matrix - {pid}")
        
        #saving heatmap
        save_path = FOLDER_HEATMAP_CORRELATION / f"heatmap_corr_{pid}.png"
        plt.tight_layout()
        plt.savefig(save_path, dpi=300, bbox_inches="tight")
        if os.path.exists(save_path):
            print(f"‚úÖ plot was saved under: {save_path}")
        else: 
            print(f"‚ùå plot was not saved")
        plt.show()
        plt.close()







## Correlation

Applying Spearman corrlation function to test whether there is a relationship between EDA signals and the market mechanism.

In [None]:
def compute_correlations(
    df, market_mechanism,
    #eda_signals=("scl_mean", "scl_std", "scr_amplitude", "scr_auc",
                 #"scr_count", "scr_per_sec", "signal_range", "signal_std"),

    eda_signals=( "scr_amplitude", "scr_auc", "scr_count", "scr_per_sec", "scl_mean", "scl_std"),
    methods=("pearsonr", "spearmanr"),
    groupby_cols=None,       # e.g. None, ["window_type"], ["session"], ["session","window_type"]
    min_n=3,
    min_std=1e-8,
    coerce_numeric=True
):
    """
    Compute correlations between FV deviation variables and EDA signals.
    Optionally group by one or more columns (e.g., session, window_type).
    Returns a tidy pandas DataFrame.
    """
    df = df.copy()

    # Check if all variables exist and are numeric
    if coerce_numeric:
        for col in set(market_mechanism) | set(eda_signals):
            if col not in df.columns:
                raise KeyError(f"Missing column: {col}")
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Check grouping columns 
    if not groupby_cols:
        groups = [((), df)]  # treat all data as one group
    else:
        for g in groupby_cols:
            if g not in df.columns:
                raise KeyError(f"Grouping column missing: {g}")
        groups = df.groupby(groupby_cols, dropna=False)

    results = []

    # Adding the results to the results list
    def _add_row(group_dict,key, mm, eda, r_spearman, p_spearman, r_pearson, p_pearson, n, note, sig_spearman, sig_pearson ):
        row = dict(group_dict)
        row.update({
            "key": key,
            "market_mechanism": mm,
            "EDA_signal": eda,
            "correlation_coefficient_Spearman": r_spearman,
            "p_value_Spearman": p_spearman,
            "correlation_coefficient_Pearson": r_pearson,
            "p_value_Pearson": p_pearson,
            "N": n,
            "note": note,
            "significance_Spearman": sig_spearman,
            "significance_Pearson": sig_pearson
        })
        results.append(row)

    # Iterate over groups (e.g. per window_type or session) ---
    for key, df_g in groups:
        if not groupby_cols:
            group_dict = {}
        else:
            if not isinstance(key, tuple):
                key = (key,)
            group_dict = dict(zip(groupby_cols, key))

        for mm in market_mechanism:
            for eda in eda_signals:
                pair = df_g[[eda, mm]].dropna()
                n = len(pair)

                # Skip if not enough data
                if n < min_n:
                    _add_row(group_dict,key, mm, eda, None, None, None, None, n, "N < min_n ‚Äî skipped", "not significant", "not significant")
                    continue

                x, y = pair[eda].values, pair[mm].values

                # Skip if nearly constant
                if np.std(x) < min_std or np.std(y) < min_std:
                    _add_row(group_dict,key, mm, eda, None, None, None, None, n, "Variance too small ‚Äî skipped", "not significant", "not significant")
                    continue

                # Compute correlation(s)

                try:
                    r_pearson, p_pearson = pearsonr(x, y)
                    r_spearman, p_spearman = spearmanr(x, y)
                    sig_pearson = "significant" if (p_pearson is not None and p_pearson < 0.05) else "not significant"
                    sig_spearman = "significant" if (p_spearman is not None and p_spearman < 0.05) else "not significant"
                    _add_row(group_dict,key, mm, eda, r_spearman,p_spearman, r_pearson, p_pearson, n, None, sig_spearman, sig_pearson)
                except Exception as e:
                    _add_row(group_dict,key, mm, eda, None, None, None, None, n, f"error: {e}", "not significant", "not significant")

    return pd.DataFrame(results)

# Filter dataframes groups

def filter_results(df_corr, groupby_cols, alpha = 0.05): 
    """
    Filtering the result dataframe for correlations and divide them into groups.
    """  
    print(f"\nüöÄ Start: Filter correlation results - Group = {groupby_cols}")
    df = df_corr.copy()
    correlation = df["correlation_coefficient_Spearman"]  
    p_values =   df["p_value_Spearman"] 
    #dividing based on the correlation values into groups (low, medium, strong, not significant)
    sig= p_values.le(alpha)
    strong_pos =  sig & correlation.ge(0.5)     
    medium_pos =  sig & correlation.ge(0.3)  &  correlation.lt(0.5)
    small_pos = sig & correlation.ge(0.1) &  correlation.lt(0.3)
    strong_neg =  sig & correlation.le(-0.5)
    medium_neg = sig & correlation.le(-0.3) & correlation.gt(-0.5)
    low_sig = sig & correlation.le(-0.1) & correlation.gt(-0.3) 
    only_sig = sig &  ~(strong_pos | medium_pos | small_pos | strong_neg | medium_neg | low_sig)
    not_sig = ~sig

    results_df = []
    results_df.append({"Case": f"r ‚â• 0.5", "Frequency": strong_pos.sum()})
    print(f"Size r ‚â• 0.5 & p‚â§{alpha}: {strong_pos.sum()}")

    results_df.append({"Case": f"0.3 ‚â§ r < 0.5", "Frequency": medium_pos.sum()})
    print(f"Size 0.3 ‚â§ r < 0.5 & p‚â§{alpha}: {medium_pos.sum()}")

    results_df.append({"Case": f"0.1 ‚â§ r < 0.3", "Frequency": small_pos.sum()})
    print(f"Size 0.1 ‚â§ r < 0.3 & p‚â§{alpha}: {small_pos.sum()}")

    results_df.append({"Case": f"r ‚â§ -0.5", "Frequency": strong_neg.sum()})
    print(f"Size r ‚â§ -0.5 & p‚â§{alpha}: {strong_neg.sum()}")

    results_df.append({"Case": "-0.5 < r ‚â§ -0.3", "Frequency": medium_neg.sum()})
    print(f"Size -0.5 < r ‚â§ -0.3 & p‚â§{alpha}: {medium_neg.sum()}")

    results_df.append({"Case": f"-0.3 < r ‚â§ -0.1", "Frequency": low_sig.sum()})
    print(f"Size -0.3 < r ‚â§ -0.1 & p‚â§{alpha}: {low_sig.sum()}")

    results_df.append({"Case": f"|r|<0.1", "Frequency": only_sig.sum()})
    print(f"Size p‚â§{alpha} but |r|<0.1: {only_sig.sum()}")

    results_df.append({"Case": f"p>{alpha}", "Frequency": not_sig.sum()})
    print(f"Size p>{alpha} (not significant): {not_sig.sum()}")


    #Max and Min of significant values
    if sig.any():
        idx_max = correlation[sig].idxmax()
        idx_min = correlation[sig].idxmin()
        r_max, p_max = correlation.loc[idx_max], p_values.loc[idx_max]
        r_min, p_min = correlation.loc[idx_min], p_values.loc[idx_min]
        print(f"max correlation: {r_max}, p: {p_max}, key: {df.loc[idx_max,'key']}, "
              f"EDA: {df.loc[idx_max,'EDA_signal']}, mech: {df.loc[idx_max,'market_mechanism']}")
        print(f"min correlation: {r_min}, p: {p_min}, key: {df.loc[idx_min,'key']}, "
              f"EDA: {df.loc[idx_min,'EDA_signal']}, mech: {df.loc[idx_min,'market_mechanism']}")
    print("üèÅ Done")

    strong_or_weak = df[strong_pos | medium_pos | small_pos | strong_neg | medium_neg | low_sig]
    only_significant = df[only_sig]
    not_significant  = df[not_sig]

    return strong_or_weak, only_significant, not_significant, pd.DataFrame(results_df)

def combine_frequency(df_frequency):
    """
    Calculating for each category the frequency. Aim: To get a better overview over the results. 
    """
    df = df_frequency.copy()

    wide = df.pivot_table(index="Case", columns="Scenario", values="Frequency", aggfunc="sum", fill_value=0)


    scenario_order = [ "General", "Decision-Phase", "Transaction", "Session", "Session and Decision-Phase", "Performance" ] 
    wide = wide.reindex(columns=[c for c in scenario_order if c in wide.columns])
    case_order_row = [
    "r ‚â• 0.5",
    "0.3 ‚â§ r < 0.5",
    "0.1 ‚â§ r < 0.3",
    "r ‚â§ -0.5",
    "-0.5 < r ‚â§ -0.3",
    "-0.3 < r ‚â§ -0.1",
    "|r|<0.1",
    "p>0.05",
    "Total"  
]
    # Keep order
    ordered = [r for r in case_order_row if r in wide.index]
    #print(ordered)
    rest = [r for r in wide.index if r not in set(ordered)]
    #print(rest)
    wide = wide.reindex(index=ordered + rest)


    #calc total of categeories for each sceanrio
    col_totals = wide.sum(axis=0, numeric_only=True)         
    wide = pd.concat([wide, pd.DataFrame([col_totals], index=["Total"])])

    #calc relative frequency
    denom = col_totals.replace(0, np.nan)
    wide_rel = wide.div(denom, axis=1) * 100
    wide_rel = wide_rel.fillna(0)
    wide_rel.loc["Total"] = 100.0

 
    wide = wide.round(2).reset_index().rename(columns={"index": "Case"})
    wide_rel = wide_rel.round(1).reset_index().rename(columns={"index": "Case"})

    return wide, wide_rel

def significance_marks(p):
    """
        Marks for significance levels
    """
    if pd.isna(p): return ""
    elif p < 0.001: return "***"
    elif p < 0.01: return "**"
    elif p < 0.05: return "*"
    return ""

def correlation_matrix_with_significance(
    df_corr: pd.DataFrame,
    *,
    r_col="correlation_coefficient_Spearman",
    p_col="p_value_Spearman",
    mm_col="market_mechanism",
    eda_col="EDA_signal",
    alpha_levels=(0.05, 0.01, 0.001),
    digits_r=2
):
    """
    Creating a matrix of the correlation-values
    """

    df = df_corr.copy()
    df["cell_value"] = df.apply(lambda x: f"{x[r_col]:.{digits_r}f}{significance_marks(x[p_col])}" if pd.notna(x[r_col]) else "‚Äì", axis=1)
    #pivot the table for creating a matrix based on correlation effects and significants amrks
    mat = df.pivot_table(index=mm_col, columns=eda_col, values="cell_value", aggfunc=lambda x: x.iloc[0] if len(x) else "-")
    if not mat.empty:
        mat = mat.reindex(sorted(mat.columns), axis=1).fillna("-")
    return mat

def all_scenario_matrices(
    df_corr: pd.DataFrame,
    group_cols,                              
    *,
    r_col="correlation_coefficient_Spearman",
    p_col="p_value_Spearman",
    mm_col="market_mechanism",
    eda_col="EDA_signal",
    alpha_levels=(0.05, 0.01, 0.001),
    digits_r=2,
    safe_names=True
):
    """
    Applying correlation_matrix_with_significance method for each category of a column
    """

    if not group_cols:
        raise ValueError("Bitte group_cols angeben (z.B. ['window_type']).")

    out = {}
    for key, g in df_corr.groupby(group_cols, dropna=False):
        
        if not isinstance(key, tuple):
            key = (key,)
        scen_name = " √ó ".join(f"{c}={v}" for c, v in zip(group_cols, key))
        if safe_names:
            
            scen_key = "_".join(f"{c}-{str(v)}" for c, v in zip(group_cols, key)).replace(" ", "_")
        else:
            scen_key = scen_name

        mat = correlation_matrix_with_significance(
            g, r_col="correlation_coefficient_Spearman", p_col="p_value_Spearman", mm_col=mm_col, eda_col=eda_col,
            alpha_levels=alpha_levels, digits_r=digits_r
        )
        out[scen_key] = mat
    return out

def calc_and_save_matrices(results_df, group_cols, folder):
    """
    Executing the method for creating a matrix out of the r-values of the scenarios. And saving them as .tex file and .csv file
    """
    
    if results_df is None:
        raise ValueError("No available results")

    if (group_cols == "General") | (group_cols == "Performance") :
        matr = correlation_matrix_with_significance(results_df)
        save_pdf_tex = folder / f"corr_matrix_{group_cols}.tex"
        save_pdf_csv = folder / f"corr_matrix_{group_cols}.csv"
        matr = matr.reset_index()

        #saving as .csv file
        matr.to_csv(save_pdf_csv, index=False, encoding="utf-8", sep=";")
        #saving as a latex file
        with open(save_pdf_tex, "w", encoding="utf-8") as f:
                f.write(matr.to_latex(
                    index=False,
                    escape=True,
                    float_format="%.0f",  
                    caption=f"Correlation Matrix FV vs EDA - {group_cols}",
                    label=f"tab:Corr_Matrix_FV_EDA_{group_cols}",
                    column_format="l" + "r" * (len(matr.columns) - 1)  
                ))
    else:
        matr = all_scenario_matrices(results_df, group_cols=group_cols)
        for name, matrix in matr.items():
            matrix = matrix.reset_index()

            save_pdf_tex = folder / f"corr_matrix_{name}.tex"
            save_pdf_csv = folder / f"corr_matrix_{name}.csv"
            #saving csv file
            matrix.to_csv(save_pdf_csv, index=False, encoding="utf-8", sep=";")
            #saving latex file
            with open(save_pdf_tex, "w", encoding="utf-8") as f:
                f.write(matrix.to_latex(
                    index=False,
                    escape=True,
                    float_format="%.0f",  
                    caption=f"Correlation Matrix FV vs EDA - {name}",
                    label=f"tab:Corr_Matrix_FV_EDA_{name}",
                    column_format="l" + "r" * (len(matrix.columns) - 1)  
                ))
            
            







### Execution Method for the correlation test

Grouping the data into multiple Groups
1. General (All Data is combined together)
2. Decision Phases: For each decision category applying the correlation matrix
3. Session
4. Transaction types: For each transaction category applying the correlation matrix
5. Session √ó Decision Phase
6. Performance


In [None]:


eda_signals = ["scr_amplitude", "scr_auc", "scr_count", "scr_per_sec", "scl_mean", "scl_std"]

market_mechanism = ["rd_GUISTY_per_round", "rad_GUISTY_per_round", "rd_Kirchler_per_round", "rad_Kirchler_per_round","rad_Smith_per_round", "rd_Smith_per_round", "V_t", "AMAD_t", "MAD_t"]

MATRIX_CORR = TESTS_RESULT / "MATRIX_CORR"
os.makedirs(MATRIX_CORR, exist_ok=True)

# A) All data combined
results_all = compute_correlations(df_all, market_mechanism, groupby_cols=None)
strong_or_weak_general, only_significant_general, not_significant_general, frequency_all = filter_results(results_all, None)
frequency_all["Scenario"] = "General"
calc_and_save_matrices(results_all, group_cols="General",folder = MATRIX_CORR)


#print(table_all)


#print(sign_bigger_r_all)

# B) Grouped only by Decision Phase
results_by_phase = compute_correlations(df_all,market_mechanism,  groupby_cols=["window_type"])
strong_or_weak_phase, only_significant_phase, not_significant_phase, frequency_phase = filter_results(results_by_phase, groupby_cols=["window_type"])
frequency_phase["Scenario"] = "Decision-Phase"
calc_and_save_matrices(results_by_phase, group_cols=["window_type"],folder = MATRIX_CORR)
#print(sign_bigger_r_phase)

df_frequency = pd.concat([frequency_all ,frequency_phase ])

# C) Grouped only by Session
results_by_session = compute_correlations(df_all, market_mechanism, groupby_cols=["session"])
strong_or_weak_session, only_significant_session, not_significant_session, frequency_session  = filter_results(results_by_session,groupby_cols=["session"])
frequency_session["Scenario"] = "Session"
df_frequency = pd.concat([df_frequency,frequency_session ])
calc_and_save_matrices(results_by_session, group_cols=["session"],folder = MATRIX_CORR)
#print(sign_bigger_r_session)

# D) Grouped only by Transaction 
results_by_transaction = compute_correlations(df_all,market_mechanism, groupby_cols=["transaction"])
strong_or_weak_transaction, only_significant_transaction, not_significant_transaction, frequency_transaction = filter_results(results_by_transaction, groupby_cols=["transaction"])
frequency_transaction["Scenario"] = "Transaction"
#print(sign_bigger_r_transaction)
df_frequency = pd.concat([df_frequency,frequency_transaction ])
calc_and_save_matrices(results_by_transaction, group_cols=["transaction"],folder = MATRIX_CORR)

# D) Grouped by Session √ó Decision Phase
results_by_session_phase = compute_correlations(df_all, market_mechanism, groupby_cols=["session", "window_type"])
strong_or_weak_session_phase , only_significant_phase, not_significant_session_phase, frequency_session_phase  = filter_results(results_by_session_phase, groupby_cols=["session", "window_type"])
df_frequency = pd.concat([df_frequency,frequency_session_phase ])
#print(sign_bigger_r_session_phase)


#D) Peformance
print("\nperformance")
performance = ["final_money"]
results_performance = compute_correlations(df_all, performance, groupby_cols=None)
strong_or_weak_perfomance,  only_significant_perfomance, not_significant_performance, frequency_performance  = filter_results(results_performance, None)
frequency_performance["Scenario"] = "Performance"
df_frequency = pd.concat([df_frequency,frequency_performance ])
calc_and_save_matrices(results_performance, group_cols="Performance",folder = MATRIX_CORR)




df_frequency, df_frequency_rel = combine_frequency(df_frequency)

#Saving df_frequency and df_frequency_rel in latex

abs_path = TESTS_RESULT / "frequency_absolute_corr.tex"
rel_path = TESTS_RESULT / "frequency_relative_corr.tex"
abs_path_csv = TESTS_RESULT / "frequency_absolute_corr.csv"
rel_path_csv = TESTS_RESULT / "frequency_relative_corr.csv"
df_frequency.to_csv(abs_path_csv, index=False, encoding="utf-8", sep=";")
df_frequency_rel.to_csv(rel_path_csv, index=False, encoding="utf-8", sep=";")



with open(abs_path, "w", encoding="utf-8") as f:
    f.write(df_frequency.to_latex(
        index=False,
        escape=True,
        float_format="%.0f",  
        caption="Frequency of each Scenario with Correlationcategories",
        label="tab:frequency_absolute",
        column_format="l" + "r" * (len(df_frequency.columns) - 1)  # links + rechtsb√ºndig
    ))

with open(rel_path, "w", encoding="utf-8") as f:
    f.write(df_frequency_rel.to_latex(
        index=False,
        escape=True,
        float_format="%.1f", 
        caption="Relative frequency (%) of each Scenario with Correlationcategories",
        label="tab:frequency_relative",
        column_format="l" + "r" * (len(df_frequency_rel.columns) - 1)
    ))

print(f"‚úÖ Exported Tables:\n- {abs_path}\n- {rel_path}")





In [None]:
strong_or_weak_session_phase

In [None]:
"""
market_mechanism = ["V_t", "MAD_t", "AMAD_t"]

# A) All data combined
results_all_Vol = compute_correlations(df_all, market_mechanism, groupby_cols=None)
sign_bigger_r_all_Vol, significant_all_Vol, not_significant_all_Vol, frequency_all_Vol = filter_results(results_all_Vol, None)
frequency_all_Vol["Scenario"] = "General"



#print(sign_bigger_r_all)
# B) Grouped only by Decision Phase
results_by_phase_Vol = compute_correlations(df_all,market_mechanism,  groupby_cols=["window_type"])
sign_bigger_r_phase_Vol, significant_phase_Vol, not_significant_phase_Vol, frequency_phase_VOl= filter_results(results_by_phase_Vol, groupby_cols=["window_type"])
frequency_phase_VOl["Scenario"] = "Decision-Phase"

df_frequency_VOl = pd.concat([frequency_all_Vol ,frequency_phase_VOl ])
#print(sign_bigger_r_phase)

# C) Grouped only by Session
results_by_session_Vol = compute_correlations(df_all, market_mechanism,  groupby_cols=["session"])
sign_bigger_r_session_Vol, significant_session_Vol, not_significant_session_Vol, frequency_session_VOl = filter_results(results_by_session_Vol,groupby_cols=["session"])
frequency_session_VOl["Scenario"] = "Session"
df_frequency_VOl = pd.concat([df_frequency_VOl ,frequency_session_VOl ])
#print(sign_bigger_r_session)
# C) Grouped only by Transaction 
results_by_transaction_Vol = compute_correlations(df_all,market_mechanism, groupby_cols=["transaction"])
sign_bigger_r_transaction_Vol, significant_transaction_Vol, not_significant_transaction_Vol, frequency_transaction_VOl = filter_results(results_by_transaction_Vol, groupby_cols=["transaction"])
frequency_transaction_VOl["Scenario"] = "Transaction"
df_frequency_VOl = pd.concat([df_frequency_VOl ,frequency_transaction_VOl ])


# D) Grouped by Session √ó Decision Phase
results_by_session_phase_Vol = compute_correlations(df_all, market_mechanism, groupby_cols=["session", "window_type"])
sign_bigger_r_session_phase_Vol , significant_session_phase_Vol, not_significant_session_phase_Vol, frequency_session_phase_VOl  = filter_results(results_by_session_phase_Vol, groupby_cols=["session", "window_type"])
frequency_session_phase_VOl["Scenario"] = "Session and Decision-Phase"
df_frequency_VOl = pd.concat([df_frequency_VOl ,frequency_session_phase_VOl ])
#print(sign_bigger_r_session_phase)

#D) Peformance
performance = ["final_money"]
print("\nperformance")
results_performance_Vol = compute_correlations(df_all, performance, groupby_cols=None)
sign_bigger_r_perfomance_Vol , significant_performance_Vol, not_significant_performance_Vol, frequency_performance_VOl  = filter_results(results_performance_Vol, None)
frequency_performance_VOl["Scenario"] = "Performance"
df_frequency_VOl = pd.concat([df_frequency_VOl ,frequency_performance_VOl ])


df_frequency_vol, df_frequency_rel_vol = combine_frequency(df_frequency_VOl)

#Saving df_frequency and df_frequency_rel in latex

abs_path_vol = TESTS_RESULT / "frequency_absolute_corr_VOl.tex"
rel_path_vol = TESTS_RESULT / "frequency_relative_corr_VOl.tex"

with open(abs_path_vol, "w", encoding="utf-8") as f:
    f.write(df_frequency_vol.to_latex(
        index=False,
        escape=True,
        float_format="%.0f",  
        caption="Frequency of each Scenario with Correlationcategories",
        label="tab:frequency_absolute",
        column_format="l" + "r" * (len(df_frequency_vol.columns) - 1)  # links + rechtsb√ºndig
    ))

with open(rel_path_vol, "w", encoding="utf-8") as f:
    f.write(df_frequency_vol.to_latex(
        index=False,
        escape=True,
        float_format="%.1f", 
        caption="Relative frequency (%) of each Scenario with Correlationcategories",
        label="tab:frequency_relative",
        column_format="l" + "r" * (len(df_frequency_rel_vol.columns) - 1)
    ))

print(f"‚úÖ Exported Tables:\n- {abs_path}\n- {rel_path}")

"""

## Mann-Whitney-U test

Whitney U-Test is a non-paramteric version of the 2 paired t-test
- Men vs Women
- Risk and Non risky participants
- Risk and Non-risky sessions
- High RD <> Low RD

In [None]:
test = df_all.copy()
test["performance_classification"].value_counts()

In [None]:
df_whitney = df_all.copy()

def cal_whitneyU_test(df_whitney, y_value, group_column, combinations_group, side, ses_or_part):
    """
    Method for applying Mann-Whitney-U test.
    """
    results = []
    #retrive the group categories
    for groupA, groupB in combinations_group:
        #split the categories into 2 groups + calculating the median of the EDA signal (of each participant or session)
        group_A = df_whitney[df_whitney[group_column] == groupA]
        a_mean = (group_A.groupby([ses_or_part])[y_value].median().reset_index(name = f"mean_{y_value}"))
        group_B = df_whitney[df_whitney[group_column] == groupB]
        b_mean = (group_B.groupby([ses_or_part])[y_value].median().reset_index(name = f"mean_{y_value}"))

        #check valid length of sessions
        n1,n2 =  len(a_mean), len(b_mean)
        if n1 ==0 or n2 == 0:
            continue
        
        #Test
        statistic, p_value = stats.mannwhitneyu(a_mean[f"mean_{y_value}"], b_mean[f"mean_{y_value}"], alternative=side, method = 'auto')

        #calcualting meanU, p-correction
        mean_U = n1 * n2 / 2
        var_U = n1 * n2 * (n1 + n2 + 1) / 12
        std_U = var_U ** 0.5
        z = (statistic - mean_U) / std_U  
        p_value_z = 2 * stats.norm.sf(abs(z)) 

        print(f"\n {groupA} vs. {groupB}")
        print(f"Mann-Whitney U statistic: {statistic}")
        print(f"P-Value: {p_value}")

        print(f"p-Correction: {p_value_z}")
 
        meaning = None
        #calcualte effect size r 
        N = n1 + n2
        #print(N)
        r = (abs(z) / math.sqrt(N))
        print(f"Effect size r: {r:.3f}")

        #Check if p-value < 0.05 and if effect size is bigger than 0.3

        if np.isnan(p_value):
            meaning = "p_value is None"
        elif p_value > 0.05:
            meaning = "not significant"
        elif  r >= 0.5 :
            meaning = f"strong effect"
        elif r >= 0.3 and r < 0.5:
                meaning = f"medium effect"
        else: meaning = f"weak effect difference"
        print(meaning)

        results.append({"sess_or_part": ses_or_part, "Group_label": group_column, "y_value": y_value, "combination": f"{groupA} vs. {groupB}", "statistic": statistic, "p_value": p_value, "p_value_z": p_value_z, "z":z, "n1":n1, "n2":n2, "effect_size_r": r,  "meaning": meaning})

    return pd.DataFrame(results)

def cell_format(r, p_col ="p_value", val="statistic", show_r=True):
    """
    Method for determine the cell content of the matrix, which consists of the effect-size and the significant marks.
    """
    try:
        base = f"{float(r[val]):.3f}"
    except Exception:
        base = str(r[val])
    if p_col in r and pd.notna(r[p_col]):
        base += significance_marks(r[p_col])
    if show_r and val != "effect_size_r"  and "effect_size_r" in r and pd.notna(r["effect_size_r"]):
        base += f"; r={r['effect_size_r']:.2f}"
    return base


def make_man_Whithney_U_matrix(df_manU, group_label = None, value_col= "statistic", add_stars= True, only_significant= False ):
    """
        Method for creating a matrix based on the results of the test
    """
    
    df = df_manU.copy()

    if group_label is not None:
        df = df[df["Group_label"] == group_label]

    #check significance
    if only_significant:
        sig_combos = (
            df[df["p_value"] <= 0.05]
            .groupby(["Group_label", "sess_or_part", "combination"])
            .size()
            .reset_index()[["Group_label", "sess_or_part", "combination"]]
        )
        if sig_combos.empty:
            return pd.DataFrame([])

        df = df.merge(sig_combos,
                      on=["Group_label", "sess_or_part", "combination"],
                      how="inner")

    #applying the cell format method 
    df["__cell__"] = df.apply(
        lambda r: cell_format(r, p_col="p_value", val=value_col, show_r=True) if add_stars
                  else cell_format(r, p_col=None, val=value_col, show_r=True),
        axis=1
    )
    #pivot the table to create a matrix format
    matrix = df.pivot_table(
                index = ["Group_label","sess_or_part","combination"], 
                columns = "y_value",
                values= "__cell__",
                aggfunc = "first",
                fill_value="-"
                )
 
    matrix = matrix.reindex(sorted(matrix.columns),axis=1)
    matrix = matrix.reset_index()

    

        # optional an erste Stelle setzen:
    first = ["Group_label"] + [c for c in matrix.columns if c != "Group_label"]
    matrix = matrix[first]

    return matrix


def saving_matrix_csv_latex(matrix, file_name, caption, label, FOLDER = TESTS_RESULT):
    """
    Saving matrix as a .csv and latex file.
    """

    if matrix is None:
        raise ValueError("No matrix file")

    tex_path = FOLDER / f"{file_name}.tex"
    csv_path = FOLDER / f"{file_name}.csv"
    matrix.to_csv(csv_path, index=False, encoding="utf-8", sep=";")

    with open(tex_path, "w", encoding="utf-8") as f:
        f.write(matrix.to_latex(
            index=False,
            escape=True,
            float_format="%.03f",  
            caption=f"{caption}",
            label=f"{label}",
            column_format="l" + "r" * (len(matrix.columns) - 1)  
        ))


    print(f"‚úÖ Exported Tables:\n- {tex_path}\n- {csv_path}")




print("======================= two-sided ========================")
all_res = []

#columns for participant comparision
group_participant = ["gender", "performance_classification", "performance_classification_session_wise", "p_Risk_Guisty_categorization_average","p_Risk_Smith_categorization_average", "p_Risk_Kirchler_categorization_average", "Vol_Classification_Global", "rd_Smith_categorization", "Category_Guisty_round", "rd_Kirchler_categorization"]
# columns for session comparisons
group_session = ["performance_classification_session", "s_Risk_Guisty_categorization_average","s_Risk_Smith_categorization_average", "s_Risk_Kirchler_categorization_average", "Vol_Classification_between_session", "Category_SMITH", "Category_Guisty", "Category_KIRCHLER" ]

#EDA signals which are used for the statistical tests
eda_sign = ["scr_auc", "scr_per_sec", "scl_mean", "scl_std"]
sess_or_part = ["participant", "session"]

#applying the mannwhitney U test
for group in group_participant:
    print("\n",group)
    for eda in eda_sign: 
        print("\n",eda)
        group_values = df_whitney[group].dropna().unique()      #retrieve unique values of group-columns
        #creating pariwise combiantions
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(combinations_)
        res_part = cal_whitneyU_test(df_whitney, eda, group, combinations_, 'two-sided', "participant")
        all_res.append(res_part)

#session-wise comaprison
for group in group_session:
    print("\n",group)
    for eda in eda_sign: 
        print("\n",eda)
        group_values = df_whitney[group].dropna().unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(combinations_)
        res_sess = cal_whitneyU_test(df_whitney, eda, group, combinations_, 'two-sided', "session")
        all_res.append(res_sess)
#concatenate each results
df_man_whitneyU = pd.concat(all_res, ignore_index=True)

#transoform results into a matrix dataframe and save them as a latex and csv file
matrirx_manU = make_man_Whithney_U_matrix(df_man_whitneyU, only_significant=True)
matrirx_manU_all = make_man_Whithney_U_matrix(df_man_whitneyU, only_significant=None)
saving_matrix_csv_latex(matrirx_manU, file_name ="MannWhitneyU_two_sided", caption="Mann Whitney U test - two sided", label ="tab:ManWhitneyU_test_2sided" )
saving_matrix_csv_latex(matrirx_manU_all, file_name ="MannWhitneyU_two_sided_all_values", caption="Mann Whitney U test - two sided - All data", label ="tab:ManWhitneyU_test_2sided_all_data" )
matrirx_manU

#separating the full results into SCR and SCL tables
base_cols = ["Group_label","combination"]
scl_cols = [c for c in ["scl_mean", "scl_std"] if c  in matrirx_manU_all.columns ]
matrix_scl = matrirx_manU_all[base_cols + scl_cols].copy()
saving_matrix_csv_latex(matrix_scl, file_name ="MannWhitneyU_two_sided_SCL", caption="Mann Whitney U test - two sided - SCL all", label ="tab:ManWhitneyU_test_2sided_SCL" )
scr_cols = [c for c in ["scr_auc", "scr_per_sec"] if c  in matrirx_manU_all.columns ]
matrix_scr = matrirx_manU_all[base_cols + scr_cols].copy()
saving_matrix_csv_latex(matrix_scr, file_name ="MannWhitneyU_two_sided_SCR", caption="Mann Whitney U test - two sided - SCR all", label ="tab:ManWhitneyU_test_2sided_SCR" )


In [None]:
matrirx_manU_all

In [None]:
df_man_whitneyU[df_man_whitneyU["p_value"] < 0.05 ]
   

### Man-Whitney-U test - greater

Applying one sided ManWhitneyU test.

In [None]:

print("======================= greater ========================")
#participant wise
group_participant = ["gender", "performance_classification", "performance_classification_session_wise", "p_Risk_Guisty_categorization_average","p_Risk_Smith_categorization_average", "p_Risk_Kirchler_categorization_average", "Vol_Classification_Global", "rd_Smith_categorization", "Category_Guisty_round", "rd_Kirchler_categorization"]
#session-wise
group_session = ["performance_classification_session", "s_Risk_Guisty_categorization_average","s_Risk_Smith_categorization_average", "s_Risk_Kirchler_categorization_average", "Vol_Classification_between_session", "Category_SMITH", "Category_Guisty", "Category_KIRCHLER" ]

eda_sign = ["scr_auc", "scr_per_sec", "scl_mean", "scl_std"]
sess_or_part = ["participant", "session"]

#participant
all_res_greater = []
for group in group_participant:
    print("\n",group)
    for eda in eda_sign: 
        print("\n",eda)
        group_values = df_whitney[group].dropna().unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(combinations_)
        res_part_great = cal_whitneyU_test(df_whitney, eda, group, combinations_, 'greater', "participant")
        all_res_greater.append(res_part_great)
#session-wise
for group in group_session:
    print("\n",group)
    for eda in eda_sign: 
        print("\n",eda)
        group_values = df_whitney[group].dropna().unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(combinations_)
        res_sess_great = cal_whitneyU_test(df_whitney, eda, group, combinations_, 'greater', "session")
        all_res_greater.append(res_sess_great)

df_man_whitneyU_greater = pd.concat(all_res_greater, ignore_index=True)

#creating matrix
matrix_manU_greater = make_man_Whithney_U_matrix(df_man_whitneyU_greater, only_significant=True)
saving_matrix_csv_latex(matrix_manU_greater, file_name ="MannWhitneyU_greater", caption="Mann Whitney U test - greater", label ="tab:ManWhitneyU_test_greater" )
matrix_manU_greater

#separating the full results into 2 table groups
base_cols = ["Group_label","combination"]
scl_cols = [c for c in ["scl_mean", "scl_std"] if c  in matrirx_manU_all.columns ]
matrix_scl_gr = matrix_manU_greater[base_cols + scl_cols].copy()
saving_matrix_csv_latex(matrix_scl_gr, file_name ="MannWhitneyU_greater_SCL", caption="Mann Whitney U test - greater - SCL all", label ="tab:ManWhitneyU_test_greater_SCL" )
scr_cols = [c for c in ["scr_auc", "scr_per_sec"] if c  in matrirx_manU_all.columns ]
matrix_scr_gr = matrix_manU_greater[base_cols + scr_cols].copy()
saving_matrix_csv_latex(matrix_scr_gr, file_name ="MannWhitneyU_greater_SCR", caption="Mann Whitney U test -greater - SCR all", label ="tab:ManWhitneyU_test_greater_SCR" )


### Man-Whitney-U test - less

Applying one sided ManWhitneyU test.

In [None]:
print("======================= less ========================")
group_participant = ["gender", "performance_classification", "performance_classification_session_wise", "p_Risk_Guisty_categorization_average","p_Risk_Smith_categorization_average", "p_Risk_Kirchler_categorization_average", "Vol_Classification_Global"]
group_session = ["performance_classification_session", "s_Risk_Guisty_categorization_average","s_Risk_Smith_categorization_average", "s_Risk_Kirchler_categorization_average", "Vol_Classification_between_session"]

eda_sign = ["scr_auc", "scr_per_sec", "scl_mean", "scl_std"]
sess_or_part = ["participant", "session"]

#participated
all_res_less = []
for group in group_participant:
    print("\n",group)
    for eda in eda_sign: 
        print("\n",eda)
        group_values = df_whitney[group].unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(combinations_)
        res_part_less = cal_whitneyU_test(df_whitney, eda, group, combinations_, 'less', "participant")     #direction : A less B
        all_res_less.append(res_part_less)

#session-wise
for group in group_session:
    print("\n",group)
    for eda in eda_sign: 
        print("\n",eda)
        group_values = df_whitney[group].unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(combinations_)
        res_sess_less = cal_whitneyU_test(df_whitney, eda, group, combinations_, 'less', "session")        #direction : A less B
        all_res_less.append(res_sess_less)

df_man_whitneyU_less = pd.concat(all_res_less, ignore_index=True)

#creating matrix
matrix_manU_less = make_man_Whithney_U_matrix(df_man_whitneyU_less, only_significant=True)
saving_matrix_csv_latex(matrix_manU_greater, file_name ="MannWhitneyU_less", caption="Mann Whitney U test - less", label ="tab:ManWhitneyU_test_less" )
matrix_manU_less

#separating into 2 tables 
base_cols = ["Group_label","combination"]
scl_cols = [c for c in ["scl_mean", "scl_std"] if c  in matrirx_manU_all.columns ]
matrix_scl_less = matrix_manU_less[base_cols + scl_cols].copy()
saving_matrix_csv_latex(matrix_scl_less, file_name ="MannWhitneyU_less_SCL", caption="Mann Whitney U test - less - SCL all", label ="tab:ManWhitneyU_test_less_SCL" )
scr_cols = [c for c in ["scr_auc", "scr_per_sec"] if c  in matrirx_manU_all.columns ]
matrix_scr_less = matrix_manU_less[base_cols + scr_cols].copy()
saving_matrix_csv_latex(matrix_scr_less, file_name ="MannWhitneyU_less_SCR", caption="Mann Whitney U test - less - SCR all", label ="tab:ManWhitneyU_test_less_SCR" )




In [None]:
df_man_whitneyU_less

## Wilcoxon 

After applying the nonparametric independent test, now the dependent Wilcoxon test will be applied.

In [None]:
df_all.columns

In [None]:
df_wilcoxon = df_all.copy()

def cal_wilcoxon(df_wilcoxon, y_value, group_column, combinations_group, side, sess_or_part):
    """
        Main test method for the wilcoxon test.
    """
    results = []
    for groupA, groupB in combinations_group:
        #seperating into 2 groups, Calculating the median based on "participant" and "session"
        group_A = df_wilcoxon[df_wilcoxon[group_column] == groupA]
        #group by "session_or_part" and calculate the mean
        a_mean = (group_A.groupby([sess_or_part])[y_value].median().reset_index(name = f"mean_{y_value}_A"))
        group_B = df_wilcoxon[df_wilcoxon[group_column] == groupB]
        b_mean = (group_B.groupby([sess_or_part])[y_value].median().reset_index(name = f"mean_{y_value}_B"))

        #find pairs by merging the 2 groups
        merged = pd.merge(a_mean, b_mean, on=sess_or_part, how='inner')

        x = merged[f"mean_{y_value}_A"]
        y = merged[f"mean_{y_value}_B"]
        print(f"{groupA} vs. {groupB}: n_pairs = {len(merged)}")
        #calc statistiscs
        statistic, p_value = stats.wilcoxon(x, y,  alternative=side, method = 'auto')
    
        print(f"\n {groupA} vs. {groupB}")
        print(f"Wilcoxon statistic: {statistic}")
        print(f"P-Value: {p_value}")

        N = len(merged)
        mean_W = N * (N + 1) / 4
        var_W = N * (N + 1) * (2 * N + 1) / 24
        std_W = math.sqrt(var_W)

        z = (statistic - mean_W) / std_W
        r = abs(z) / math.sqrt(N)
        print(f"z: {z:.3f}, r: {r:.3f}")


        #calcualte effect size r 
        N = len(a_mean) + len(b_mean)
        print(N)
        r = (abs(z) / math.sqrt(N))
        print(f"Effect size r: {r:.3f}")

        if np.isnan(p_value):
            meaning = "p_value is None"
        elif p_value > 0.05:
            meaning = "not significant"
        elif r >= 0.5:
            meaning = "strong effect"
        elif r >= 0.3:
            meaning = "medium effect"
        else:
            meaning = "weak effect difference"

        results.append({"sess_or_part": sess_or_part, "Group_label": group_column, "y_value": y_value, "combination": f"{groupA} vs. {groupB}", "statistic": statistic, "p_value": p_value,"effect_size_r": r,  "meaning": meaning})

    return pd.DataFrame(results)



def make_Wilcoxon_matrix(df_manU, group_label = None, value_col= "statistic", add_stars= True, only_significant= False ):
    """
        Creating Wilcoxon matrix for easier visulaization.
    """
    
    df = df_manU.copy()

    if group_label is not None:
        df = df[df["Group_label"] == group_label]

    if only_significant:
        sig_combos = (
            df[df["p_value"] <= 0.05]
            .groupby(["Group_label", "sess_or_part", "combination"])
            .size()
            .reset_index()[["Group_label", "sess_or_part", "combination"]]
        )
        if sig_combos.empty:
            return pd.DataFrame([])

        df = df.merge(sig_combos,
                      on=["Group_label", "sess_or_part", "combination"],
                      how="inner")

    df["__cell__"] = df.apply(
        lambda r: cell_format(r, p_col="p_value", val=value_col, show_r=True) if add_stars
                  else cell_format(r, p_col=None, val=value_col, show_r=True),
        axis=1
    )
    matrix = df.pivot_table(
                index = ["Group_label","sess_or_part","combination"], 
                columns = "y_value",
                values= "__cell__",
                aggfunc = "first",
                fill_value="-"
                )
 
    matrix = matrix.reindex(sorted(matrix.columns),axis=1)
    matrix = matrix.reset_index()

    

    
    first = [ "Group_label"] + [c for c in matrix.columns if c != "Group_label"]
    matrix = matrix[first]

    return matrix



all_res = []        #result list
#list of colums (categories) that are suppossed to be tested
session = ["Risk_Guisty", "Risk_Kirchler", "Risk_Smith", "Vol_Classification_Global", "Vol_Classification_intra_session", "window_type", "transaction"]
#EDA variables
eda_sign = ["scr_amplitude", "scr_auc", "scr_per_sec", "scl_mean", "scl_std"]
#applying on participant level
for group in session:
    for eda in eda_sign: 
        group_values = df_wilcoxon[group].dropna().unique()
        #create combinations
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(f"{eda} vs. {group}")
        res_part = cal_wilcoxon(df_wilcoxon, eda, group, combinations_,"two-sided", "participant")
        all_res.append(res_part)
#applying on session level
for group in session:
    for eda in eda_sign: 
        group_values = df_wilcoxon[group].dropna().unique()
        #create combinations
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        print(f"{eda} vs. {group}")
        res_sess = cal_wilcoxon(df_wilcoxon, eda, group, combinations_,"two-sided", "session")
        all_res.append(res_sess)

df_wilcoxon_res = pd.concat(all_res, ignore_index=True)

#creating matrix
matrix_wilcoxon = make_Wilcoxon_matrix(df_wilcoxon_res, only_significant=True)
#saving matrix
saving_matrix_csv_latex(matrix_wilcoxon, file_name ="Wilcoxon_two_sided", caption="Wilcoxon test - two-sided", label ="tab:Wilcoxon_2_sided" )

matrix_wilcoxon

base_cols = ["Group_label", "sess_or_part","combination"]
scl_cols = [c for c in ["scl_mean", "scl_std"] if c  in matrix_wilcoxon.columns ]
matrix_scl_2s_wilc = matrix_wilcoxon[base_cols + scl_cols].copy()
saving_matrix_csv_latex(matrix_scl_2s_wilc, file_name ="Wilcoxon_2s_SCR_2s_SCL", caption="Wilcoxon test - 2sided - SCL", label ="tab:Wilcoxon_test_2s_SCL" )
scr_cols = [c for c in ["scr_auc", "scr_per_sec"] if c  in matrix_wilcoxon.columns ]
matrix_scr_2s_wilc = matrix_wilcoxon[base_cols + scr_cols].copy()
saving_matrix_csv_latex(matrix_scr_2s_wilc, file_name ="Wilcoxon_2s_SCR", caption="Wilcoxon test - 2sided - SCR", label ="tab:Wilconxon_test_2s_SCR" )

In [None]:
df_wilcoxon_res

In [None]:
matrix_wilcoxon

### One sided Wilcoxon test - less

Testing if Group A has a lower median EDA value than Group B

In [None]:
all_res_less = []
session = ["Risk_Guisty", "Risk_Kirchler", "Risk_Smith", "Vol_Classification_Global", "Vol_Classification_intra_session", "window_type", "transaction"]

eda_sign = ["scr_amplitude", "scr_auc", "scr_per_sec", "scl_mean", "scl_std"]
#testing on participant level
for group in session:
    for eda in eda_sign: 
        group_values = df_wilcoxon[group].dropna().unique()
        #create combinations
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        res_part = cal_wilcoxon(df_wilcoxon, eda, group, combinations_,"less", "participant")
        all_res_less.append(res_part)
#testing on session level
for group in session:
    for eda in eda_sign: 
        group_values = df_wilcoxon[group].dropna().unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        res_sess = cal_wilcoxon(df_wilcoxon, eda, group, combinations_,"less", "session")
        all_res_less.append(res_sess)
#concatenate the results to a lust
df_wilcoxon_less = pd.concat(all_res_less, ignore_index=True)
#create matriy and save it
matrix_wilcoxon_less = make_Wilcoxon_matrix(df_wilcoxon_less, only_significant=True)
saving_matrix_csv_latex(matrix_wilcoxon_less, file_name ="Wilcoxon_less", caption="Wilcoxon test - less", label ="tab:Wilcoxon_less" )


#Separting results into two groups (SCL and SCR), Saving it as a latex table
base_cols = ["Group_label", "sess_or_part","combination"]
scl_cols = [c for c in ["scl_mean", "scl_std"] if c  in matrix_wilcoxon_less.columns ]
matrix_scl_less_wilc = matrix_wilcoxon_less[base_cols + scl_cols].copy()
saving_matrix_csv_latex(matrix_scl_less_wilc, file_name ="Wilcoxon_less_SCL", caption="Wilcoxon test - less - SCL", label ="tab:Wilcoxon_test_less_SCL" )
scr_cols = [c for c in ["scr_auc", "scr_per_sec"] if c  in matrix_wilcoxon_less.columns ]
matrix_scr_less_wilc = matrix_wilcoxon_less[base_cols + scr_cols].copy()
saving_matrix_csv_latex(matrix_scr_less_wilc, file_name ="Wilcoxon_less_SCR", caption="Wilcoxon test - less - SCR", label ="tab:Wilconxon_test_less_SCR" )

matrix_wilcoxon_less

### One sided Wilcoxon test - greater

Testing if Group A has a higher median EDA value than Group B

In [None]:
all_res_greater_wilc = []
session = ["Risk_Guisty", "Risk_Kirchler", "Risk_Smith", "Vol_Classification_Global", "Vol_Classification_intra_session", "window_type", "transaction"]

eda_sign = ["scr_amplitude", "scr_auc", "scr_per_sec", "scl_mean", "scl_std"]
#participant level
for group in session:
    for eda in eda_sign: 

        group_values = df_wilcoxon[group].dropna().unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        res_part = cal_wilcoxon(df_wilcoxon, eda, group, combinations_,"greater", "participant")
        all_res_greater_wilc.append(res_part)
#session level
for group in session:
    for eda in eda_sign: 
        group_values = df_wilcoxon[group].dropna().unique()
        combinations_ = list(list(itertools.combinations(group_values, 2)))
        res_sess = cal_wilcoxon(df_wilcoxon, eda, group, combinations_,"greater", "session")
        all_res_greater_wilc.append(res_sess)

df_wilcoxon_greater = pd.concat(all_res_greater_wilc, ignore_index=True)

matrix_wilcoxon_greater = make_Wilcoxon_matrix(df_wilcoxon_greater, only_significant=True)
saving_matrix_csv_latex(matrix_wilcoxon_greater, file_name ="Wilcoxon_greater", caption="Wilcoxon test - greater", label ="tab:Wilcoxon_greater" )


#seperating and saving into two LaTex tables (SCR and SCL)
base_cols = ["Group_label", "sess_or_part","combination"]
scl_cols = [c for c in ["scl_mean", "scl_std"] if c  in matrix_wilcoxon_greater.columns ]
matrix_scl_greater_wilc = matrix_wilcoxon_greater[base_cols + scl_cols].copy()
saving_matrix_csv_latex(matrix_scl_greater_wilc, file_name ="Wilcoxon_greater_SCL", caption="Wilcoxon test - greater - SCL", label ="tab:Wilcoxon_test_greater_SCL" )
#SCR
scr_cols = [c for c in ["scr_auc", "scr_per_sec"] if c  in matrix_wilcoxon_greater.columns ]
matrix_scr_greater_wilc = matrix_wilcoxon_greater[base_cols + scr_cols].copy()
saving_matrix_csv_latex(matrix_scr_greater_wilc, file_name ="Wilcoxon_greater_SCR", caption="Wilcoxon test - greater - SCR", label ="tab:Wilconxon_test_greater_SCR" )

matrix_wilcoxon_greater

## Friedman test

Applying the Friedman Test, which is a non-parametric test. Tests multiple groups together.

Group_categories: "Risk_Guisty", "Risk_Kirchler", "Risk_Smith", "Vol_Classification_Global", "Vol_Classification_intra_session", "window_type", "transaction"

eda_sign = ["scr_amplitude", "scr_auc", "scr_per_sec", "scl_mean", "scl_std"]

Testing on participant and session level

In [None]:
df_friedman = df_all.copy()

def cal_friedman(df_friedman, y_value, Group_label, group_category, sess_or_part):
    """
    Method for applying the Friedmanchisquare
    """
    results = []
    for group_combo in group_category:
        group_values = []
        min_len = None

        for group in group_combo:
            #Separating values based on group  and calculating the median EDA signal
            group_df = df_friedman[df_friedman[Group_label] == group]
            mean_group = (group_df.groupby([sess_or_part])[y_value].median().reset_index(name = f"median_{y_value}")).dropna()
            # 
            group_values.append(mean_group[f"median_{y_value}"])
            print(group_values)
            if min_len is None or len(mean_group) < min_len:
                min_len = len(mean_group)

        #trimming for getting a groups of the same length
        trimmed_arrays = [gv.iloc[:min_len].to_numpy(dtype=float) for gv in group_values]
        #calling test method 
        statistic, p_value = stats.friedmanchisquare(*trimmed_arrays)
    
        print(f"Mann-Whitney U statistic: {statistic}")
        print(f"P-Value: {p_value}")


        results.append({"sess_or_part": sess_or_part, "Group_label": Group_label, "y_value": y_value,"combination": group_combo,  "statistic": statistic, "p_value": p_value })

    return pd.DataFrame(results)


all_res_friedman = []
session = ["Risk_Guisty", "Risk_Kirchler", "Risk_Smith", "Vol_Classification_Global", "Vol_Classification_intra_session", "window_type", "transaction"]

eda_sign = ["scr_amplitude", "scr_auc", "scr_per_sec", "scl_mean", "scl_std"]
for group in session:
    for eda in eda_sign: 
        #find unique group values
        group_values = df_friedman[group].dropna().unique()
        len_group_columns = len(group_values)
        #find the combinations 
        combinations_ = list(list(itertools.combinations(group_values, len_group_columns)))
        res_part_friedman = cal_friedman(df_friedman, eda, group, combinations_, "participant")
        all_res_friedman.append(res_part_friedman)

        #test at the session-level
        res_sess_friedman = cal_friedman(df_friedman, eda, group, combinations_, "session")
        all_res_friedman.append(res_sess_friedman)
 
#saving the matrix as a LaTex table
df_friedman_res = pd.concat(all_res_friedman, ignore_index=True)
friedman_matrix = make_man_Whithney_U_matrix(df_friedman_res, only_significant=True)
saving_matrix_csv_latex(friedman_matrix, file_name ="Friedman_test", caption="Friedman-test", label ="tab:Friedman" )

friedman_matrix

## Mixed LM

This code chapter tests if Market Mechanism (MM) have an impact on EDA signals.

Model 1: EDA ~ MM 

Model 2: EDA ~ MM * C(transaction)

Model 3: EDA ~ MM * C(window-type)

Model 4: EDA ~ MM * (transaction) * C(window-type)

Model 5: EDA ~ Risk

Variables that are used (for M1, M2, M3) for broader investigations
- "FV_GUISTY", "FV_KIRCHLER", "rd_Kirchler_per_round", "rad_Kirchler_per_round", "rd_Smith_per_round", "rad_Smith_per_round", "rd_GUISTY_per_round", "rad_GUISTY_per_round","V_t", "MAD_t", "AMAD_t"
-  "scl_mean", "scl_std", "scr_peak", "scr_auc", "scr_per_sec"


In [None]:
#Drop NaN values
df_mixed_lm = df_all.dropna().copy()



from scipy.stats import zscore

# standardize signals for LMM:
vars_to_z = [ "FV_GUISTY", "FV_KIRCHLER", "rd_Kirchler_per_round", "rad_Kirchler_per_round", "rd_Smith_per_round", "rad_Smith_per_round", "rd_GUISTY_per_round", "rad_GUISTY_per_round","V_t", "MAD_t", "AMAD_t"]

df_mixed_lm[vars_to_z] = df_mixed_lm[vars_to_z].apply(zscore)

In [None]:

def parse_eda(formula: str):
    """ Split the string on ~. """
    lhs, rhs = [s.strip() for s in formula.split("~", 1)]
    return lhs

def extraction_values_based_of_summary_fixed_effects(model, list_of_results, formula, model_name, market_mechanism):
    """ Extract fixed and interaction effects from the LMM summary model """
    # Extract fixed-effect coefficients from the model
    fixed_effects = model.fe_params
    # Extract standard errors of the fixed effects
    base_effects = model.bse_fe
    # Compute z-values (coefficient / standard error)
    zvals = fixed_effects / base_effects
    # Extract p-values corresponding to the fixed effects
    pvals = model.pvalues.loc[fixed_effects.index]
    eda = parse_eda(formula)
    mech_pattern = rf"(?:\b{re.escape(market_mechanism)}\b)"
    rows = []
    for term in fixed_effects.index:
        row = {
            "model_name": model_name,
            "formula": formula,
            "eda": eda,
            "mechanism": market_mechanism,
            "term": term,
            "coef": float(fixed_effects.loc[term]),
            "std_err": float(base_effects.loc[term]),
            "z_value": float(zvals.loc[term]),
            "p_value": float(pvals.loc[term]),
            "is_interaction": (":" in term),
            "involves_mechanism": bool(re.search(mech_pattern, term)),
            "aic": getattr(model, "aic", None),
            "bic": getattr(model, "bic", None),
            "llf": getattr(model, "llf", None),
            "nobs": getattr(model, "nobs", None)
        }
        # Adding values to a list 
        rows.append(row)
    return rows

def extraction_significant_results(dataframe_results, alpha=0.05, only_main_effects=False):
    """
        Filter and extract significant fixed-effect terms from a results Dataframe.
        Extract only significant fixed effects if only_main_effects is true.
    """
    df = dataframe_results.copy()
    # Check if Dataframe is empty
    if df.empty:
        return df
    # Create column for significance: True if  p-value < 0.05
    df["significant"] = df["p_value"] < alpha
    # Filter significance results
    df = df[df["significant"] == True]
    if only_main_effects:
        #creating masks for non main-effects variables
        mask_intercept = df["term"].str.contains("Intercept")
        mask_c = (
            df["term"].str.contains("C\\(transaction\\)") |
            df["term"].str.contains("C\\(window_type\\)")

        )
        mask_mech = df["involves_mechanism"]
        # Keep rows that are not intercepts and not categorical or the term involves a mechanism
        df = df[~mask_intercept & (~mask_c | mask_mech)]

    #sorting values
    df = df.sort_values(["formula", "p_value"])
    return df

def make_effect_pivot_table(df_summary):
    """
        Create a formatted pivot table summarizing effects sizes, standard errors, and significance levels across different models.
    """
    df = df_summary.copy()
    # Add stars for significance (*** = 0.005. ** = 0.01)
    df["stars"] = df["p_value"].apply(significance_marks)
    # Create cell of the matrix by combining the effect string with the significance stars.
    df["effect"] = df.apply(lambda r: f"{r['coef']:.3f} ({r['std_err']:.3f}){r['stars']}", axis=1)
    #Create absolute coeff so that the strongest effect can be displayed in the matrix
    df["abs_coef"] = df["coef"].abs()
    df = df.sort_values(by="abs_coef", ascending=False)
    # Create pivot table: Index = EDA and mechanism, columns reoresenting the model names (e.g. Model 1, Model 2, Model 3)
    pivot = df.pivot_table(index=["eda", "mechanism"], columns="model_name", values="effect", aggfunc=lambda x: x.iloc[0] ).fillna("-")
    #return matrix
    return pivot.reset_index()


def shorten_variable(name):
    """
        Shorten the formular varibale 
        rad_* -> rad
        rd_* -> rd
        C(transaction)[T.] -> Trans[]
        C(window_type)[T.] -> Win[]
        pre_decision     -> pre_d
        during_feedback  -> during_f
        post_feedback.   -> post_f
    """


    name = re.sub(r"C\(window_type\)\[T\.([^\]]+)\]", r"Win[\1]", name)
    name = re.sub(r"C\(transaction\)\[T\.([^\]]+)\]", r"Tran[\1]", name)

    name = name.replace("C(transaction)", "Tran")
    name = name.replace("C(window_type)", "Win")

    name = name.replace("pre_decision", "pre_d")
    name = name.replace("during_feedback", "df")
    name = name.replace("post_feedback", "pf")

    name = name.replace("rd_Kirchler_per_round", "rd_Kirchler")
    name = name.replace("rad_Kirchler_per_round", "rad_Kirchler")
    name = name.replace("rd_GUISTY_per_round", "rd_Giusti")
    name = name.replace("rad_GUISTY_per_round", "rad_Giusti")
    name = name.replace("rd_Smith_per_round", "rd_Smith")
    name = name.replace("rad_Smith_per_round", "rad_Smith")

    name = re.sub(r"C\(Risk_Guisty\)\[T\.([^\]]+)\]", r"Risk_G[\1]", name)
    name = re.sub(r"C\(Risk_Smith\)\[T\.([^\]]+)\]", r"Risk_S[\1]", name)
    name = re.sub(r"C\(Risk_Kirchler\)\[T\.([^\]]+)\]", r"Risk_K[\1]", name)
    name = re.sub(r"C\(Vol_Classification_Global\)\[T\.([^\]]+)\]", r"Risk_Vol_G[\1]", name)

    name = re.sub(r"C\(Risk_Guisty\)", r"Risk_G", name)
    name = re.sub(r"C\(Risk_Smith\)", r"Risk_S", name)
    name = re.sub(r"C\(Risk_Kirchler\)", r"Risk_K", name)
    name = re.sub(r"C\(Vol_Classification_Global\)", r"Risk_Vol_G", name)

    return name


def saving_summary_of_model(summary, path, latex_lable, latex_caption):
    """
        Saving summary of the LMM model in latex
    """

    table = summary.tables[1]

    df_summary = table.reset_index()

    df_summary.rename(columns= {"index": "variable"}, inplace=True)
    #shorting the term variables with the method: shorten_variable
    df_summary["term"] = df_summary["variable"].apply(shorten_variable)

    # drop the Confidential interval -> summary to big
    df_summary = df_summary.drop(
        columns=[col for col in df_summary.columns if "0.025" in col or "0.975" in col],
        errors="ignore"
    )
    # renaming columns
    df_summary = df_summary[["term", "Coef.", "Std.Err.", "z", "P>|z|"]].rename(columns={
        "Coef.": "Estimate",
        "Std.Err.": "Std. Error",
        "P>|z|": "p-value"
    })

    # creating latex code based of the sumamry
    latex_code = df_summary.to_latex(
        index=False,
        float_format="%.3f",
        caption=latex_caption,
        label=latex_lable
    )

    #saving the latex code
    with open(path, "w") as f:
        f.write(latex_code)

    if (path.exists()):
        print(f"saved in {path}")
    else: print(f"Could not be saved.")


def saving_matrix_csv_latex(matrix, file_name, caption, label, FOLDER = TESTS_RESULT):
    """
        Saving the matrix of the results in csv and latex format
    """

    #check if matrix table is none
    if matrix is None:
        raise ValueError("No matrix available")

    #saving path
    tex_path = FOLDER / f"{file_name}.tex"
    csv_path = FOLDER / f"{file_name}.csv"
    matrix.to_csv(csv_path, index=False, encoding="utf-8", sep=";")

    with open(tex_path, "w", encoding="utf-8") as f:
        f.write(matrix.to_latex(
            index=False,
            escape=True,
            float_format="%.3f",  
            caption=f"{caption}",
            label=f"{label}",
            column_format="l" + "r" * (len(matrix.columns) - 1)  
        ))


    print(f"‚úÖ Exported Tables:\n- {tex_path}\n- {csv_path}")



### Overview Model 1,2,3

In [None]:
"""
Applying the Linear mixed Model test. Goal: Investigate the influence of Market Mechanims during Decision phases or with transaction on the EDA signal."
- Model 1: "M1_General", "formula" : EDA ~ market_mechanism" -> Influence of RD, Vol on EDA
- Model 2: "M2_MM_Transaction", "formula" :EDA ~ {market_mechanism} * C(transaction)" ->  Influence of RD, Vol on EDA with different transaction types
- Model 3: "M3_MM_WindowType", "formula": EDA~ {market_mechanism} * C(window_type)"} -> Influence of RD, Vol on EDA with transaction  during different decision-phases

Printing the result
Summarize the results of the 3 models with different variable combinations
Identify which combination has a significant results with a coeff > 0.1
Based on this furher investigations
"""



summary_all  = []
count = 0

for eda in ["scr_auc", "scr_amplitude", "scr_per_sec", "scl_mean", "scl_std"]:
    for market_mechanism in ["FV_GUISTY", "FV_KIRCHLER", "rd_Kirchler_per_round", "rad_Kirchler_per_round", "rd_Smith_per_round", "rad_Smith_per_round", "rd_GUISTY_per_round", "rad_GUISTY_per_round", "V_t", "MAD_t", "AMAD_t"]:
        formel = [ 
            {"model": "M1_General", "formula" :f"{eda} ~ {market_mechanism}"},
            {"model": "M2_MM_Transaction", "formula" :f"{eda} ~ {market_mechanism} * C(transaction)"},
            {"model": "M3_MM_WindowType", "formula": f"{eda} ~ {market_mechanism} * C(window_type)"}, 
            
            ]
        for entry in formel:
                count +=1
                print(f"\n\n\n============================================= {entry["formula"]} =============================================\n")
                #variance component = session
                vc = {"session": "0 + C(session)"}  
                model = smf.mixedlm(entry["formula"],
                    data=df_mixed_lm,
                    groups=df_mixed_lm["participant"], vc_formula=vc,
                ).fit(reml= False)  #fitting the model
                print(model.summary())
                #adding the summary to the list summary_all
                summary_all.extend(extraction_values_based_of_summary_fixed_effects(model, summary_all ,entry["formula"], entry["model"], market_mechanism))



df_summary = pd.DataFrame(summary_all)     
# Extract significiant results
df_significant = extraction_significant_results(df_summary, alpha=0.05, only_main_effects=True)
#filter results with an effect > 0.05
df_bigger_0dot1 = df_significant[abs(df_significant["coef"]) >= 0.05]

# shorten variables for a better visualisation, and saving them as a LaTex table
df_significant["term"] = df_significant["term"].apply(shorten_variable)
df_significant["mechanism"] = df_significant["mechanism"].apply(shorten_variable)
result_table = make_effect_pivot_table(df_significant)
saving_matrix_csv_latex(result_table, file_name ="LMM_M1_to_M3_MATRIX", caption = "LMM Result Matrix of M1, M2, M3", label="LMM_RESULT_MATRIX", FOLDER=TESTS_RESULT )


print(count)


result_table
 

In [None]:
df_bigger_0dot1.sort_values("coef", ascending = False)

### Model 2

In [None]:
"""
    Filtering the significant results (use the variables who have a coeff >=0.1)
    Visualize explizit the  Model 2 f"{eda} ~ {mechanims} * C(transaction)" for better understanding

"""
results_M2 = []
for eda in ["scr_auc", "scl_std"]:
    for mechanims in ["rd_Kirchler_per_round", "rad_Kirchler_per_round", "rd_Smith_per_round", "rad_Smith_per_round", "rd_GUISTY_per_round", "rad_GUISTY_per_round", "V_t", "MAD_t", "AMAD_t"]:
        print(eda)
        print(mechanims)

        #### Model 2
        new_formula = f"{eda} ~ {mechanims} * C(transaction)"
        print(f"Modell M2:  {eda} und {mechanims}")
        model = smf.mixedlm(new_formula,
                                data=df_mixed_lm,
                                groups=df_mixed_lm["participant"]).fit(reml=False)
        print(model.summary())
        results_M2.extend(extraction_values_based_of_summary_fixed_effects(model, results_M2 ,new_formula, "M2_MM_TRAN", mechanims))
        
        #saving the summary of this test
        summary = model.summary()
        FOLDER_LMM_M2= TESTS_RESULT / "LMM_M2"
        os.makedirs(FOLDER_LMM_M2, exist_ok=True)
        path = FOLDER_LMM_M2 / f"MLM_MODEL_2_{eda}_{mechanims}.tex"
        
        saving_summary_of_model(summary, path=path , latex_lable =f"tab:mlm_m2_{eda}_{mechanims}".lower(), latex_caption ="Coeffizienten of LMM  Model 2 for {eda} and {mechanims}")

df_summary_M2 = pd.DataFrame(results_M2)     
# Extract significant results
df_significant_M2 = extraction_significant_results(df_summary_M2, alpha=0.05, only_main_effects=True)
# Shorten term and mechanism variables
df_significant_M2["term"] = df_significant_M2["term"].apply(shorten_variable)
df_significant_M2["mechanism"] = df_significant_M2["mechanism"].apply(shorten_variable)
df_bigger_0dot1_M2 = df_significant_M2[abs(df_significant_M2["coef"]) >= 0.05]


#new result matrix
result_table_M2 = make_effect_pivot_table(df_significant_M2)


df_bigger_0dot1_M2


In [None]:
 # Helper function to create readable, generalized term names
def clean_term_name(row):
    term = str(row["term"])
    mech = str(row["mechanism"])

    # 1) Intercept
    if term == "Intercept":
        return "Intercept"

    # 2) Mechanism - Main effect
    if term == mech:
        return "Mechanism"

    # 3) Transaction-Level: C(transaction)[T.xxx]
    m = re.match(r"C\(transaction\)\[T\.(.+)\]", term)
    if m:
        level = m.group(1)
        return f"Transaction_{level}"

    # 4) Interaction Mechanism √ó Transaction
    if "C(transaction)" in term and mech in term:
        m2 = re.search(r"C\(transaction\)\[T\.(.+)\]", term)
        if m2:
            level = m2.group(1)
            return f"Mechanism:Transaction_{level}"
        return "Mechanism:Transaction"

    
    return term




def make_effect_matrix_significant(df_summary):
    """
        Method for creating a matrix of significant results. 
    """
    df = df_summary.copy()

    # Create a formatted effect string: "coef (std_err)"
    df["effect"] = df.apply(
        lambda r: f"{r['coef']:.3f} ({r['std_err']:.3f})",
        axis=1
    )

    # Keep only significant effects based on a 5% threshold
    df["significant"] = df["p_value"] < 0.05

    df["term_clean"] = df.apply(clean_term_name, axis=1)

    # Display only significant results
    df["effect_or_blank"] = df.apply(
        lambda r: r["effect"] if r["significant"] else "-",
        axis=1
    )

    # Create matrix where the terms are the columns and the values (cells) are coef. effects. 
    pivot = df.pivot_table(
        index=["eda", "mechanism"],
        columns="term_clean",
        values="effect_or_blank",
        aggfunc="first"
    )

    # Remove rows where *all* terms are non-significant
    pivot = pivot.replace("-", np.nan).dropna(how="all").fillna("-")

    return pivot.reset_index()


In [None]:
df_summary_M2[df_summary_M2["p_value"] < 0.05]

In [None]:
# Creatig a matrix based on signidicant results and saving it as a latex table

df_summary_M2[df_summary_M2["p_value"] < 0.05]
pivot = make_effect_matrix_significant(df_summary_M2)
#pivot["term"] = pivot["term"].apply(shorten_variable)
pivot["mechanism"] = pivot["mechanism"].apply(shorten_variable)
saving_matrix_csv_latex(pivot, "m2_transaction_matrix_result", caption = "M2: EDA ~ MM * C(transaction)", label = "tab:M2_trans", FOLDER = TESTS_RESULT)
pivot

In [None]:
#Plotting effect size heatmap
heat = df_significant_M2.copy()
heat["sign"] = np.sign(heat["coef"])
heat["abs_beta"] = heat["coef"].abs()
heat_pivot = heat.pivot_table(index="eda", columns="mechanism", values="abs_beta", aggfunc="max")

fig = plt.figure(figsize=(10,6))
sns.heatmap(heat_pivot, annot=False, cmap="coolwarm", center=0, vmax=1)
plt.title("Effect Size Heatmap (Œ≤ across mechanisms and EDAs)")
plt.show()
path = TESTS_RESULT / "Effect_size_heatmap_m2_eda_mm_transaction.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()

if path.exists():
    print(f"Saved: {path}")
else: print("Heatmap colud not be saved")

In [None]:
#Plot of effect coefficients
fig = plt.figure(figsize=(12, 6))
sns.barplot(data=df_significant_M2, x="mechanism", y="coef", hue="eda", ci=None)
plt.axhline(0, color="grey", linestyle="--")
plt.title("effects (Coefficient) mit p < 0.05 und |coef| > 0.1 im Modell 4")
plt.xlabel("market mechanism")
plt.ylabel("Coefficient")
plt.legend(title="EDA signal")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
path = TESTS_RESULT / "Effect_size_Barplot_m2_eda_mm_transaction.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()

if path.exists():
    print(f"Saved: {path}")
else: print("Heatmap colud not be saved")

### Modell 3: EDA ~ MM * C(window-type)

In [None]:
"""
    Filtering the significant results (use the variables who have a coeff >=0.1)
    Visualize explizit the  Model 3 f"{eda} ~ {mechanims} * C(window_type)" for better understanding

"""
results_M3 = []
for eda in ["scr_auc", "scl_std"]:
    for mechanims in ["rd_Kirchler_per_round", "rad_Kirchler_per_round", "rd_Smith_per_round", "rad_Smith_per_round", "rd_GUISTY_per_round", "rad_GUISTY_per_round", "V_t", "MAD_t", "AMAD_t"]:
        print(eda)
        print(mechanims)

        #### Model 2
        new_formula = f"{eda} ~ {mechanims} * C(window_type)"
        print(f"Modell M3:  {eda} und {mechanims}")
        model = smf.mixedlm(new_formula,
                                data=df_mixed_lm,
                                groups=df_mixed_lm["participant"]).fit(reml=False)
        print(model.summary())



        results_M3.extend(extraction_values_based_of_summary_fixed_effects(model, results_M3 ,new_formula, "M3_MM_TRAN", mechanims))
        
        #saving the summary of this test
        summary = model.summary()
        FOLDER_LMM_M3= TESTS_RESULT / "LMM_M3"
        os.makedirs(FOLDER_LMM_M3, exist_ok=True)
        path = FOLDER_LMM_M3 / f"MLM_MODEL_3_{eda}_{mechanims}.tex"
        
        saving_summary_of_model(summary, path=path , latex_lable =f"tab:mlm_m3_{eda}_{mechanims}".lower(), latex_caption ="Coeffizienten of LMM  Model 3 for {eda} and {mechanims}")
# Creating Dataframe
df_summary_M3 = pd.DataFrame(results_M3)
# Ectracting singnificant results     
df_significant_M3 = extraction_significant_results(df_summary_M3, alpha=0.05, only_main_effects=True)
# Shorting variables
df_significant_M3["term"] = df_significant_M3["term"].apply(shorten_variable)
df_significant_M3["mechanism"] = df_significant_M3["mechanism"].apply(shorten_variable)
df_bigger_0dot1_M3 = df_significant_M3[abs(df_significant_M3["coef"]) >= 0.05]


#new result matrix
result_table_M3 = make_effect_pivot_table(df_bigger_0dot1_M3)


df_summary_M3


In [None]:
df_summary_M3[(df_summary_M3["p_value"]<0.05) &(df_summary_M3["term"] != "Intercept") ][["formula", "mechanism", "coef", "std_err", "p_value"]]

### Model 4: EDA ~ MM * C(transaction) * C(window-type)

In [None]:
"""
Applying the Linear mixed Model test. Goal: Invertsigate the influence of Market Mechanims during multiple decision phases and with transaction on the EDA signal."
- Model 4: "M4_Trans_Window", "formula" : "{eda} ~ {mechanims} * C(transaction) * C(window_type)" -> Influence of RD, Vol during a decision phases with transaction on EDA

Using the variables of the significant extraction
"""


results_M4 = []
for eda in ["scr_auc", "scl_std"]:
    for mechanims in ["rd_Kirchler_per_round", "rad_Kirchler_per_round", "rd_Smith_per_round", "rad_Smith_per_round", "rd_GUISTY_per_round", "rad_GUISTY_per_round", "V_t", "MAD_t", "AMAD_t"]:
        print(eda)
        print(mechanims)
        #executing modell test 4
        new_formula = f"{eda} ~ {mechanims} * C(transaction) * C(window_type)"
        print(f"Modell M4:  {eda} und {mechanims}")
        model = smf.mixedlm(new_formula,
                                data=df_mixed_lm,
                                groups=df_mixed_lm["participant"]).fit(reml=False)
        print(model.summary())
        results_M4.extend(extraction_values_based_of_summary_fixed_effects(model, results_M4 ,new_formula, "M4_MM_TRAN_DEC", mechanims))

        #Saving each summary 
        summary = model.summary()
        FOLDER_LMM_M4= TESTS_RESULT / "LMM_M4"
        os.makedirs(FOLDER_LMM_M4, exist_ok=True)
        path = FOLDER_LMM_M4 / f"MLM_MODEL_4_{eda}_{mechanims}.tex"
        
        saving_summary_of_model(summary, path=path , latex_lable =f"tab:mlm_m4_{eda}_{mechanims}".lower(), latex_caption ="Coeffizienten of LMM for {eda} and {mechanims}")



df_summary_M4 = pd.DataFrame(results_M4) 
# Extracting significant variables
df_significant_M4 = extraction_significant_results(df_summary_M4, alpha=0.05, only_main_effects=True)
# Shorten variables
df_significant_M4["term"] = df_significant_M4["term"].apply(shorten_variable)
df_significant_M4["mechanism"] = df_significant_M4["mechanism"].apply(shorten_variable)
#Create matrix
matrix_M4 = make_effect_matrix_significant(df_significant_M4)   
df_bigger_0dot1_M4 = df_significant_M4[abs(df_significant_M4["coef"]) >= 0.1]

#saving Results in latex and csv
path_latex = FOLDER_LMM_M4 / f"MLM_MODEL_4_RESULTS.tex"
latex = df_bigger_0dot1_M4.to_latex(index=False, float_format="%.3f", caption="Model 4: EDA ~ MM * Transaction * Dec", label="tab:LMM_model4")
with open(path_latex, "w") as f:
    f.write(latex)
path_csv = FOLDER_LMM_M4 / f"MLM_MODEL_4_RESULTS.csv"
df_bigger_0dot1_M4.to_excel(path_csv, index=False)

df_significant_M4


In [None]:
#Separatung EDA table into two tables (SCL and SCRs)

x = df_significant_M4[["eda", "term", "coef", "std_err", "z_value", "p_value"]]
x_scl = x[x["eda"]== "scl_std"].round(3)
x_scr = x[x["eda"]== "scr_auc"].round(3)

path_latex = FOLDER_LMM_M4 / f"MLM_MODEL_4_SIGN_SCL.tex"
latex = x_scl.to_latex(index=False, float_format="%.3f", caption="Model 4: SCL ~ MM * Transaction * Dec", label="tab:LMM_model4_SCL", escape=True)
with open(path_latex, "w") as f:
    f.write(latex)
if path_latex.exists():
    print(path_latex)

path_latex = FOLDER_LMM_M4 / f"MLM_MODEL_4_SIGN_SCR.tex"
latex = x_scr.to_latex(index=False, float_format="%.3f", caption="Model 4: SCR ~ MM * Transaction * Dec", label="tab:LMM_model4_SCR", escape=True)
with open(path_latex, "w") as f:
    f.write(latex)
if path_latex.exists():
    print(path_latex)


x_scr

In [None]:

#visualize effectsize with barplot
fig = plt.figure(figsize=(12, 6))
sns.barplot(data=df_significant_M4, x="mechanism", y="coef", hue="eda", ci=None)
plt.axhline(0, color="grey", linestyle="--")
plt.title("Effekte (Koeffizienten) mit p < 0.05 und |coef| > 0.1 im Modell 4")
plt.xlabel("Marktmechanismus")
plt.ylabel("Koeffizient")
plt.legend(title="EDA-Signal")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
path = FOLDER_LMM_M4 / "barchart_m4_effectsize.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()

In [None]:
#visualize effectsize with heatmap
heat = df_significant_M4.copy()
heat["sign"] = np.sign(heat["coef"])
heat["abs_beta"] = heat["coef"].abs()
heat_pivot = heat.pivot_table(index="eda", columns="mechanism", values="abs_beta", aggfunc="max")

plt.figure(figsize=(10,6))
sns.heatmap(heat_pivot, annot=False, cmap="coolwarm", center=0, vmax=1)
plt.title("Effect Size Heatmap (Œ≤ across mechanisms and EDAs)")
plt.show()
path = FOLDER_LMM_M4 / "heatmap_m4_effectsize.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()

In [None]:
df_bigger_0dot1_M4

In [None]:
def classify(row):
    """
        Classify term variables and Mechanism into categories
        Goal: For a better visualization
    """
    term = str(row["term"])
    mech = str(row["mechanism"])

    if term == "Intercept":
        return "Intercept"
    if term == mech:
        return "Mechanism"
    if ":" in term and mech in term:
        return "Mechanism:Vol"
    if ":" not in term and term != mech:
        return "Volatility"
    return None


def summarize_mechanism_volatility(df):
    """
        Creating a a matrix based on a dataframe with the results.
    """

    df = df.copy()

    # Adding significance mareks to coefficient value
    df["stars"] = df["p_value"].apply(significance_marks)
    # Combine coeef value with star marks
    df["eff"] = df.apply(lambda r: f"{r['coef']:.3f} ({r['std_err']:.3f}){r['stars']}", axis=1)
    df["sig"] = df["p_value"] < 0.05

    # Classify term type into categories 
    df["term_type"] = df.apply(classify, axis=1)


    df = df[df["term_type"].notna()].copy()


    # Keep sifnificant resulzs
    df["value"] = df.apply(lambda r: r["eff"] if r["sig"] else "-", axis=1)

    # Create matrix
    pivot = df.pivot_table(
        index=["formula"],
        columns="term_type",
        values="value",
        aggfunc="first"
    )

    # Ensure all expected columns exist (fill missing ones with "-")
    for col in ["Intercept", "Mechanism", "Volatility", "Mechanism:Vol"]:
        if col not in pivot.columns:
            pivot[col] = "-"

    # Order the columns in a logical structure
    pivot = pivot[["Intercept", "Mechanism", "Volatility", "Mechanism:Vol"]]

    return pivot.reset_index()


### Model 5: EDA ~ FV_D * Volatility

In [None]:
"""
    Investigating if there is an interaction effect  between rd/rad and volatility on EDA

"""

result_model_5 = []
for eda in ["scr_auc", "scl_std"]:

    for market_mechanism in ["rd_Kirchler_per_round", "rad_Kirchler_per_round","rad_Smith_per_round", "rd_GUISTY_per_round", "rad_GUISTY_per_round", "rd_Smith_per_round"]:
        for vol in ["V_t", "MAD_t", "AMAD_t"]:   
            formel = f"{eda} ~ {market_mechanism} * {vol}"
            
            print(f"\n\n\n============================================= {formel} =============================================\n")
            vc = {"session": "0 + C(session)"}  
            model = smf.mixedlm(formel,
                    data=df_mixed_lm,
                    groups=df_mixed_lm["participant"], vc_formula=vc,
                ).fit(reml= False)
            print(model.summary())
            result_model_5.extend(extraction_values_based_of_summary_fixed_effects(model, result_model_5 ,formel, "M5_VOL_MM", market_mechanism))

df_summary_M5 = pd.DataFrame(result_model_5)
# Shorten variables
df_summary_M5["term"] = df_summary_M5["term"].apply(shorten_variable)
df_summary_M5["mechanism"] = df_summary_M5["mechanism"].apply(shorten_variable)  
df_summary_M5["formula"] = df_summary_M5["formula"].apply(shorten_variable)  
# Extract significant variables
df_significant_M5 = extraction_significant_results(df_summary_M5, alpha=0.05, only_main_effects=True)
df_significant_M5[abs(df_significant_M5["coef"]) > 0.05]
# Create matrix
matrix_m5 = summarize_mechanism_volatility(df_summary_M5)


#saving Results in latex and csv
path_latex = TESTS_RESULT / f"MLM_MODEL_5_RESULTS.tex"
latex = df_significant_M5.to_latex(index=False, float_format="%.3f", caption="Model 5: EDA ~ RD * Vol", label="tab:LMM_model5")
with open(path_latex, "w") as f:
    f.write(latex)


In [None]:
df_significant_M5

In [None]:
#Sava matrix as a LATEX file
path_latex = TESTS_RESULT / f"MLM_MODEL_5_SIGN_Matrix.tex"
latex = matrix_m5.to_latex(index=False, float_format="%.3f", caption="Model 5: EDA ~ FV * VOL", label="tab:LMM_model5_Matrix", escape=True)
with open(path_latex, "w") as f:
    f.write(latex)
if path_latex.exists():
    print(path_latex)

matrix_m5

In [None]:
df_summary_M5[(df_summary_M5["p_value"] < 0.05) & (df_summary_M5["term"] != "Intercept")]

In [None]:
#visualize effectsize with heatmap
heat = df_significant_M5.copy()
heat["sign"] = np.sign(heat["coef"])
heat["abs_beta"] = heat["coef"].abs()
heat_pivot = heat.pivot_table(index="eda", columns="mechanism", values="abs_beta", aggfunc="max")

fig = plt.figure(figsize=(10,6))
sns.heatmap(heat_pivot, annot=False, cmap="coolwarm", center=0, vmax=0.25)
plt.title("Effect Size Heatmap (Œ≤ across mechanisms and EDAs)")
plt.show()
path = TESTS_RESULT / "heatmap_m5_effectsize.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()

In [None]:
#visualize effectsize with barplot
fig = plt.figure(figsize=(12, 6))
sns.barplot(data=df_significant_M5, x="mechanism", y="coef", hue="eda", ci=None)
plt.axhline(0, color="grey", linestyle="--")
plt.title("Effekte (Koeffizienten) mit p < 0.05 und |coef| > 0.1 im Modell 5")
plt.xlabel("Marktmechanismus")
plt.ylabel("Koeffizient")
plt.legend(title="EDA-Signal")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
path = TESTS_RESULT / "barplot_m5_effectsize.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()

In [None]:
### Model 5B: EDA ~ FV_D * Volatility * C(transaction) * C(window-type)

In [None]:
"""
    Investigating if there is an interaction effect  between rd/rad and volatility on EDA

"""

result_model_5_b = []
for eda in ["scr_auc", "scl_std"]:

    for market_mechanism in ["rd_Kirchler_per_round", "rad_Kirchler_per_round"]:
        for vol in ["V_t", "MAD_t", "AMAD_t"]:   
            formel = f"{eda} ~ {market_mechanism} * {vol} * C(transaction) * C(window_type) "
            
            print(f"\n\n\n============================================= {formel} =============================================\n")
            vc = {"session": "0 + C(session)"}  
            model = smf.mixedlm(formel,
                    data=df_mixed_lm,
                    groups=df_mixed_lm["participant"], vc_formula=vc,
                ).fit(reml= False)
            print(model.summary())
            result_model_5_b.extend(extraction_values_based_of_summary_fixed_effects(model, result_model_5_b ,formel, "M5b_VOL_MM", market_mechanism))

df_summary_M5_b = pd.DataFrame(result_model_5_b)
#shorten variables
df_summary_M5_b["term"] = df_summary_M5_b["term"].apply(shorten_variable)
df_summary_M5_b["mechanism"] = df_summary_M5_b["mechanism"].apply(shorten_variable)  
df_summary_M5_b["formula"] = df_summary_M5_b["formula"].apply(shorten_variable)  
#extract significant results
df_significant_M5_b = extraction_significant_results(df_summary_M5_b, alpha=0.05, only_main_effects=True)


saving = df_significant_M5_b[df_significant_M5_b["p_value"] < 0.05][["formula", "term", "coef", "std_err", "p_value"]]
#saving Results in latex and csv
path_latex = TESTS_RESULT / f"MLM_MODEL_5_RESULTS_significant.tex"
latex = saving.to_latex(index=False, float_format="%.3f", caption="Model 5b: EDA ~ RD * Vol * C(transaction) * C(window_type)", label="tab:LMM_model5b",escape=True)
with open(path_latex, "w") as f:
    f.write(latex)
if path_latex.exists():
    print(path_latex)
path_csv = TESTS_RESULT / f"MLM_MODEL_5_RESULTS_significant.csv"
saving.to_excel(path_csv, index=False)

saving



### EDA ~ Risk

- 1. EDA ~ Risk
- 2. EDA ~ Risk * C(transaction)
- 3. EDA ~ Risk * C(window-type)
- 4. EDA ~ Risk * C(transaction) * C(window-type)

Risk variables: ["Risk_Guisty", "Risk_Kirchler", "Vol_Classification_Global"]

In [None]:
"""
    Testing if Risk categeories can influence the EDA-Signal (using scr_auc and scl_std)
"""


result_M6=[]
for eda in ["scr_auc", "scl_std"]:
    for RISK in ["Risk_Guisty", "Risk_Kirchler", "Vol_Classification_Global"]:

        formel = f"{eda} ~ C({RISK})"
                    
        print(f"\n\n\n============================================= {formel} =============================================\n")
        vc = {"session": "0 + C(session)"}  
        model = smf.mixedlm(formel,
                            data=df_mixed_lm,
                            groups=df_mixed_lm["participant"], vc_formula=vc,
                        ).fit(reml= False)
        print(model.summary())
        result_M6.extend(extraction_values_based_of_summary_fixed_effects(model, result_M6 ,formel, "M6_EDA_RISK", RISK))

        formel = f"{eda} ~ C({RISK}) * C(transaction)"
                    
        print(f"\n\n\n============================================= {formel} =============================================\n")
        vc = {"session": "0 + C(session)"}  
        model = smf.mixedlm(formel,
                            data=df_mixed_lm,
                            groups=df_mixed_lm["participant"], vc_formula=vc,
                        ).fit(reml= False)
        print(model.summary())
        result_M6.extend(extraction_values_based_of_summary_fixed_effects(model, result_M6 ,formel, "M6_EDA_RISK_transaction", RISK))

        formel = f"{eda} ~ C({RISK}) * C(window_type)"
                    
        print(f"\n\n\n============================================= {formel} =============================================\n")
        vc = {"session": "0 + C(session)"}  
        model = smf.mixedlm(formel,
                            data=df_mixed_lm,
                            groups=df_mixed_lm["participant"], vc_formula=vc,
                        ).fit(reml= False)
        print(model.summary())

        result_M6.extend(extraction_values_based_of_summary_fixed_effects(model, result_M6 ,formel, "M7_EDA_RISK_window", RISK))

        formel = f"{eda} ~ C({RISK}) * C(transaction) * C(window_type)"

        print(f"\n\n\n============================================= {formel} =============================================\n")
        vc = {"session": "0 + C(session)"}  
        model = smf.mixedlm(formel,
                            data=df_mixed_lm,
                            groups=df_mixed_lm["participant"], vc_formula=vc,
                        ).fit(reml= False)
        print(model.summary())



df_summary_M6 = pd.DataFrame(result_M6)  
df_summary_M6["term"] = df_summary_M6["term"].apply(shorten_variable) 
df_summary_M6["formula"] = df_summary_M6["formula"].apply(shorten_variable) 
df_significant_M6 = extraction_significant_results(df_summary_M6, alpha=0.05, only_main_effects=True)

# Saving matrix table
result_table_M6 = make_effect_pivot_table(df_significant_M6)
path = TESTS_RESULT / "Modell_Risk_Category_Results"
saving_matrix_csv_latex(result_table_M6, file_name=path, caption = "LMM - EDA ~¬†Risk Categorization", label = "tab:LMM_RISK_Categorization", FOLDER = TESTS_RESULT)
result_table_M6



In [None]:
# Saving matrix table
x_M6= df_summary_M6[(df_summary_M6["p_value"] < 0.05) & (df_summary_M6["term"] != "Intercept")][["formula","term", "coef", "std_err", "p_value"]].sort_values(["formula"])
path = TESTS_RESULT / "Modell_Risk_Results"
saving_matrix_csv_latex(x_M6, file_name=path, caption = "LMM - EDA ~¬†Risk Categorization", label = "tab:LMM_RISK_Categorization", FOLDER = TESTS_RESULT)

In [None]:
#visualize effectsize with heatmap
heat = df_summary_M6.copy()
heat["sign"] = np.sign(heat["coef"])
heat["abs_beta"] = heat["coef"].abs()
heat_pivot = heat.pivot_table(index="eda", columns="mechanism", values="abs_beta", aggfunc="max")

fig = plt.figure(figsize=(10,6))
sns.heatmap(heat_pivot, annot=False, cmap="coolwarm", center=0, vmax=1)
plt.title("Effect Size Heatmap (Œ≤ across mechanisms and EDAs)")
plt.show()
path = TESTS_RESULT / "heatmap_m_risk_classification_effectsize.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()


In [None]:
#visualize effectsize with barplot
fig = plt.figure(figsize=(12, 6))
sns.barplot(data=df_summary_M6, x="mechanism", y="coef", hue="eda", ci=None)
plt.axhline(0, color="grey", linestyle="--")
plt.title("Effekte (Koeffizienten) mit p < 0.05 und |coef| > 0.1 im Modell 2")
plt.xlabel("Marktmechanismus")
plt.ylabel("Koeffizient")
plt.legend(title="EDA-Signal")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
path = TESTS_RESULT / "barplot_m_risk_classification_effectsize.png"
fig.savefig(path, dpi=300, bbox_inches="tight")
plt.close()

### EDA ~ Risk * C(Vol_Classification_Global)



Risk variables: ["Risk_Guisty", "Risk_Kirchler"]

EDA variables: ["scr_auc", "scl_std"]

In [None]:
result_M11=[]
for eda in ["scr_auc", "scl_std"]:
    for RISK in ["Risk_Guisty", "Risk_Kirchler"]:
        formel = f"{eda} ~ C({RISK}) * C(Vol_Classification_Global)" 
                    
        print(f"\n\n\n============================================= {formel} =============================================\n")
        vc = {"session": "0 + C(session)"}  
        model = smf.mixedlm(formel,
                            data=df_mixed_lm,
                            groups=df_mixed_lm["participant"], vc_formula=vc,
                        ).fit(reml= False)
        print(model.summary())
        result_M11.extend(extraction_values_based_of_summary_fixed_effects(model, result_M11 ,formel, "M11_EDA_RISK_VOL", RISK))

        formel = f"{eda} ~ C({RISK}) * C(Vol_Classification_Global) * C(transaction) * C(window_type)"

        print(f"\n\n\n============================================= {formel} =============================================\n")
        vc = {"session": "0 + C(session)"}  
        model = smf.mixedlm(formel,
                            data=df_mixed_lm,
                            groups=df_mixed_lm["participant"], vc_formula=vc,
                        ).fit(reml= False)
        print(model.summary())
        result_M11.extend(extraction_values_based_of_summary_fixed_effects(model, result_M11 ,formel, "M12_EDA_RISK_VOL_TRAN_WIND", RISK))



df_summary_M11 = pd.DataFrame(result_M11)  
df_summary_M11["term"] = df_summary_M11["term"].apply(shorten_variable) 
df_summary_M11["formula"] = df_summary_M11["formula"].apply(shorten_variable)  
df_significant_M11 = extraction_significant_results(df_summary_M11, alpha=0.05, only_main_effects=True)
x_M11 = df_significant_M11[["formula","term", "coef", "std_err", "p_value"]].round(3)
path = TESTS_RESULT / "Modell_Risk_VOL_FV_Results"
saving_matrix_csv_latex(x_M11, file_name=path, caption = "LMM - EDA ~¬†Risk * VOL Categorization", label = "tab:LMM_RISK_VOL_Categorization", FOLDER = TESTS_RESULT)

df_significant_M11

In [None]:
df_significant_M11[["formula","term", "coef", "std_err", "p_value"]].round(3)

# Prediction

This chapter applies a Random Forest Classifier with a training split ratio of 70:30. The goal is to predict the risky, medium risk, and low (no) risk categories based on EDA. First, the feature set starts with only EDA variables. Then, trading variables are added, and finally, market mechanism values (RD Kirchler, V_t) are added.

In [None]:
df_prediction = df_all.copy()

In [None]:
df_prediction["Risk_Kirchler"]

### Model: Predict Volatility Classification with EDA signals

In [None]:
#feature set
features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc", "window_type", "session"]
target = ["Vol_Classification_Global"]          #target
mapping = {0.0: 0, 0.5: 1, 1.0: 2}
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()        #mapping the catgories of risk to intgers (0 (Lower Risk) = 0, 0.5 (Medium-Risk) -> 1, 1 (Risky) -> 2)
df_target

df_features =df_prediction[features].copy()
# One-Hot Encoder for the window-type and session (e.g., pre-decision -> gets its own column. All session of this )
df_features = pd.get_dummies(df_features, 
                       columns=["window_type", "session"], 
                       drop_first=True)

# Splitting the data in training and test set
X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)

# create and fit model
model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)

# Predcition of the test sample
y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)

#Calculating AUC, Accuracy and the classification report
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
class_report = classification_report(y_test, y_pred, output_dict=True)
# saving classifcation report as latex table
print(class_report)
df_class = pd.DataFrame(class_report).transpose()
path = TESTS_RESULT / "RFC_CLass_report_V_t_EDA"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report",  label="tab:classification_report")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)



print(y_test.value_counts())

# Creating confusion matrix 
cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])

# Saving confusion matrix as a figure
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_V_t_RISK.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)

# Printing the feature importance of each feature
print("=============== Featuere Importance ===============================")
importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")

### Model: Predict Riks Kirchler Classification with EDA signals

In [None]:
features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc", "window_type", "session"]
target = ["Risk_Kirchler"]
mapping = {0.0: 0, 0.5: 1, 1.0: 2}
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()
df_target

# One hot encoder
df_features =df_prediction[features].copy()
df_features = pd.get_dummies(df_features, 
                       columns=["window_type", "session"], 
                       drop_first=True)

# Splitting datset: 70: 30%
X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)

#Create and fit model
model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)

# Predict the test sample
y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)

#Creating and saving assessment metrics in latex
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
class_report = classification_report(y_test, y_pred, output_dict=True)
print(class_report)
df_class = pd.DataFrame(class_report).transpose()
path = TESTS_RESULT / "RFC_CLass_report_Risk_Kirchler_EDA"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report",  label="tab:classification_report")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)



print(y_test.value_counts())

#Create confusion matrix and save it as a figure
cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])


disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_EDA_RISK.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)

# Printing the feature importance of each variable
print("=============== Feature Importance ===============================")
importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")

### Model: Predict Risk Kirchler with EDA variables and transaction variables

In [None]:
#new feature set = EDA + Trading variables

features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc","price", "transaction", "window_type", "session"]
target = ["Risk_Kirchler"]
mapping = {0.0: 0, 0.5: 1, 1.0: 2}
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()
df_target
# one hot encoder for transaction, window_tyoe, session
df_features =df_prediction[features].copy()
df_features = pd.get_dummies(df_features, 
                       columns=["transaction", "window_type", "session"], 
                       drop_first=True)


X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)

model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)

#predict test output
y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)

#Assess prediction and saving it as a LaTex file
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
class_report = classification_report(y_test, y_pred, output_dict=True)
print(class_report)
df_class = pd.DataFrame(class_report).transpose()
path = TESTS_RESULT / "RFC_CLass_report_Risk_Kirchler_EDA_TV"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report",  label="tab:classification_report")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)



print(y_test.value_counts())

#Create Confusion Matrix
cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])


disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_EDA_RISK_K_TV.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)

#Return feature importance
print("=============== Featuere Importance ===============================")

importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")



### Model: Predict Volatility Classification based on EDA and trading variables

In [None]:
# Feature = EDA + trading variables
df_prediction = df_all.copy()
df_prediction = df_prediction.dropna(subset=["Vol_Classification_Global"])
features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc", "price", "transaction", "window_type", "session"]
target = ["Vol_Classification_Global"]
mapping = {0.0: 0, 0.5: 1, 1.0: 2} #transform of target values
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()
df_target
#One Hot Encoder
df_features =df_prediction[features].copy()
df_features = pd.get_dummies(df_features, 
                       columns=["transaction", "window_type", "session"], 
                       drop_first=True)

# split dataset
X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)

#Create Classifier and fit the classifer
model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)

# Predict test dataset
y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)

print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
cp = classification_report(y_test, y_pred, output_dict=True)
print(cp)
df_class = pd.DataFrame(cp).transpose()
path = TESTS_RESULT / "RFC_CLass_report_Risk_Vol_EDA_TV"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report Vol",  label="tab:classification_report_Vol")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)


print(y_test.value_counts())

# Confusion Matrix
cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_EDA_RISK_Vol_TV.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)


print("=============== Featuere Importance ===============================")
importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")

### Model: Predict Volatility Classification with Market Mechanism

In [None]:
df_prediction = df_all.copy()
df_prediction = df_prediction.dropna(subset=["Vol_Classification_Global"])
# new feature set = EDA + trading variables + market mechanism (RD, V_t)
features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc", "price", "transaction", "window_type", "session", "rd_Kirchler_per_round", "V_t"]
target = ["Vol_Classification_Global"]
mapping = {0.0: 0, 0.5: 1, 1.0: 2}
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()
df_target

df_features =df_prediction[features].copy()
df_features = pd.get_dummies(df_features, 
                       columns=["transaction", "window_type", "session"], 
                       drop_first=True)

# split dataset
X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)

# create RF classifier and fit the classifier
model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)

# predict
y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)

# Assessment of the prediction
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
cp = classification_report(y_test, y_pred, output_dict=True)
print(cp)
df_class = pd.DataFrame(cp).transpose()
path = TESTS_RESULT / "RFC_CLass_report_Vol_with_V_TV.png"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report Vol",  label="tab:classification_report_Vol")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)


print(y_test.value_counts())

#Confusuon Matrix
cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_EDA_RISK_Vol_with_V_TV.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)

print("=============== Featuere Importance ===============================")
importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")

### Model: Predict Volatility Classification based on EDA + RD + trading variables

In [None]:
df_prediction = df_all.copy()
df_prediction = df_prediction.dropna(subset=["Vol_Classification_Global"])
# new feature = EDA + trading variables + RD
features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc", "price", "transaction", "window_type", "session", "rd_Kirchler_per_round"]
target = ["Vol_Classification_Global"]
mapping = {0.0: 0, 0.5: 1, 1.0: 2}
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()
df_target

df_features =df_prediction[features].copy()
df_features = pd.get_dummies(df_features, 
                       columns=["transaction", "window_type", "session"], 
                       drop_first=True)

#split
X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)
# Create and fit
model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)

#Predict
y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)
# Report
print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
cp = classification_report(y_test, y_pred, output_dict=True)
print(cp)
df_class = pd.DataFrame(cp).transpose()
path = TESTS_RESULT / "RFC_CLass_report_Risk_with_riskK_TV"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report Vol",  label="tab:classification_report_Vol")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)


print(y_test.value_counts())

#Confusion Matrix
cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_EDA_RISK_Vol__with_riskK_TV.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)

print("=============== Featuere Importance ===============================")
importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")

### Model: Predict RD Classification based on EDA + trading variables + FV-Kirchler

In [None]:
df_prediction = df_full.copy()
df_prediction = df_prediction.dropna(subset=["Vol_Classification_Global"])
features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc", "price", "transaction", "window_type", "session", "FV_KIRCHLER"]
target = ["Risk_Kirchler"]
mapping = {0.0: 0, 0.5: 1, 1.0: 2}
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()
df_target

df_features =df_prediction[features].copy()
df_features = pd.get_dummies(df_features, 
                       columns=["transaction", "window_type", "session"], 
                       drop_first=True)


X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)

model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)


y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)

print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
cp = classification_report(y_test, y_pred, output_dict=True)
print(cp)
df_class = pd.DataFrame(cp).transpose()
path = TESTS_RESULT / "RFC_CLass_report_Risk_K_with_K_TV"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report Vol",  label="tab:classification_report_Vol")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)


print(y_test.value_counts())


cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_EDA_RISK_K_with_K.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)

print("=============== Featuere Importance ===============================")
importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")

### Predict Risk Kirchler Classification with EDA + trading variables + V_t

In [None]:
df_prediction = df_full.copy()
df_prediction = df_prediction.dropna(subset=["Vol_Classification_Global"])
features = ["scl_mean", "scl_std", "scr_amplitude", "scr_auc", "price", "transaction", "window_type", "session", "V_t"]
target = ["Risk_Kirchler"]
mapping = {0.0: 0, 0.5: 1, 1.0: 2}
df_target = df_prediction[target].iloc[:, 0].map(mapping).astype(int).copy()
df_target
# One Hot Encoder
df_features =df_prediction[features].copy()
df_features = pd.get_dummies(df_features, 
                       columns=["transaction", "window_type", "session"], 
                       drop_first=True)

#split dataset
X_train, X_test, y_train, y_test = train_test_split(df_features,df_target, test_size= 0.3, random_state=42, stratify = df_target)
#create and fit classifier
model_rdf = RandomForestClassifier(n_estimators = 800, random_state=42)
model_rdf.fit(X_train, y_train)

#predict
y_pred = model_rdf.predict(X_test)
y_prob = model_rdf.predict_proba(X_test)

print(f"Accuracy: {accuracy_score(y_test, y_pred)}")
print(f"AUC: {roc_auc_score(y_test, y_prob, multi_class="ovr", average = "macro")}")
cp = classification_report(y_test, y_pred, output_dict=True)
print(cp)
df_class = pd.DataFrame(cp).transpose()
path = TESTS_RESULT / "RFC_CLass_report_risk_K_wih_V_TV"
#saving_matrix_csv_latex(class_report, file_name=path, caption = "Random Foreest Classifer - Classification Report of Risk based on EDA and trading variables", label = "tab:RFC_EDA_Giusti_TV", FOLDER = TESTS_RESULT)
latex = df_class.to_latex(  float_format="%.3f", index=True, bold_rows=True, column_format="lcccc", caption="Classification Report Vol",  label="tab:classification_report_Vol")
with open(path, "w") as f:
    f.write(latex)
if path.exists():
    print(path)


print(y_test.value_counts())


cm = confusion_matrix(y_test, y_pred, labels=[0, 1, 2])

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1, 2])
disp.plot()
path = TESTS_RESULT / "RFC_CM_EDA_RISK_K_wih_V_TV.png"
plt.savefig(path, dpi=300, bbox_inches="tight")
plt.close()
if path.exists():
    print(path)


print("=============== Featuere Importance ===============================")
importances = model_rdf.feature_importances_

for name, imp in sorted(zip(df_features.columns, importances), key=lambda x: -x[1]):
    print(f"{name}: {imp:.3f}")