In [None]:
import os

import numpy as np
import pandas as pd
import pingouin as pg

In [None]:
import matplotlib as mpl

rcParams = {
    'font.family': 'Linux Libertine O',
    'text.latex.preamble': r'\usepackage{libertine}',
}

mpl.rcParams.update(rcParams)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
CURRENT_PATH = os.getcwd()

DATA_PATH = {
    'interim': os.path.join(CURRENT_PATH, 'data'),
    'analyses': os.path.join(CURRENT_PATH, 'data', 'ANOVA'),
}

In [None]:
INPUT_FNAMES = {
    'experiment-data': os.path.join(DATA_PATH['interim'], 'experiment-data.csv'),
}

## Analysis of Variance (ANOVA)

### 0. Preliminaries

⚠️ Please edit `MODEL_DICT` and `NUM_PARTICIPANTS` below as appropriate.

In [None]:
EXPERIMENT_DATA = pd.read_csv(INPUT_FNAMES['experiment-data'])

In [None]:
# reverse E2
EXPERIMENT_DATA.loc[(EXPERIMENT_DATA['indicator'] == 'E2'), 'rating'] *= -1

In [None]:
MODEL_DICT = {
    "effctv": ["E1", "E2", "E3"],
    "qualty": ["Q1", "Q2", "Q3"],
    "undrst": ["P1", "P2", "P3"],
}

In [None]:
NUM_PARTICIPANTS = None
SIGNIFICANCE_LEVEL = 0.05

In [None]:
if NUM_PARTICIPANTS is None:
    NUM_PARTICIPANTS = len(EXPERIMENT_DATA['prolific_id'].unique())

In [None]:
BASE_DIR = os.path.join(DATA_PATH['analyses'], 'N={}'.format(NUM_PARTICIPANTS))
FIG_DIR = os.path.join(BASE_DIR, 'figures')

if not os.path.isdir(FIG_DIR):
    os.makedirs(FIG_DIR)

In [None]:
OUTPUT_FNAMES = {
    'anova': os.path.join(BASE_DIR, 'rm-anova.csv'),
    'post-hocs': os.path.join(BASE_DIR, 'post-hocs.csv'),
}

In [None]:
def top_n_participants(df: pd.DataFrame, n: int) -> pd.DataFrame:
    ids = df \
          .sort_values(by='completion_time', ignore_index=True) \
          .drop_duplicates(subset=['prolific_id'], keep='last', ignore_index=True) \
          .iloc[:n, 0] # index 0: Prolific ID

    return df.loc[(df['prolific_id'].isin(ids.values))]

### 1. One-Way ANOVA

In [None]:
def statistical_test(
    df: pd.DataFrame,
    test: str,
):
    """Either pairwise comparisons (T-tests) or ANOVA.

    Sources:
      <https://www.reneshbedre.com/blog/repeated-measure-anova.html#perform-one-way-repeated-measures-anova>
      <https://stats.stackexchange.com/a/377622>
    """
    df =  df \
          .groupby(['prolific_id', 'condition'])['rating'] \
          .aggregate(['mean']) \
          .reset_index()

    func = None

    params = {
        'data': df,
        'dv': 'mean',
        'within': ['condition'],
        'subject': 'prolific_id',
    }

    if test == 'pwc':
        params['padjust'] = 'fdr_bh'
        func = pg.pairwise_tests
    elif test == 'anova':
        func = pg.rm_anova
    else:
        raise Exception('Unknown test')

    return func(**params)

In [None]:
def normalize_data(df: pd.DataFrame) -> pd.DataFrame:
    """Remove between-subject variability

    Source:
        <https://www.cogsci.nl/blog/tutorials/156-an-easy-way-to-create-graphs-with-within-subject-error-bars>

    References:
        Cousineau, D. (2005). Confidence intervals in within-subject designs: A simpler solution to Loftus and Masson's method.
    """
    df = df \
         .pivot_table(index='prolific_id', columns=['condition'], values='rating') \
         .reset_index().rename_axis(None, axis=1)

    df['subject_average'] = df.iloc[:, 1:4].mean(axis=1)
    grand_average = df['subject_average'].values.mean()

    for i in range(1, 4):
        df.iloc[:, i] = df.iloc[:, i] - df['subject_average'] + grand_average

    df = df \
         .drop('subject_average', axis=1) \
         .melt(id_vars='prolific_id', var_name='condition', value_name='rating')

    return df

In [None]:
def format_p_value(p_value: float):
    """Format p value according to APA Style 7th Edition.
    
    Source:
        <https://apastyle.apa.org/instructional-aids/numbers-statistics-guide.pdf>
    """
    sign = '='
    if p_value < .001:
        sign = '<'
        p_value = .001

    p_value = '{:.3f}'.format(p_value).strip('0')

    return f'{sign} {p_value}'

In [None]:
def plot_pairwise_tests(ax, pwc):
    # Source: <https://stackoverflow.com/a/52743675>
    pwc = pwc.loc[:, ['A', 'B', 'p-corr']].values.tolist()

    conditions = {'BASE': 0, 'BASE_CONT': 1, 'HETG_CONT': 2}

    step = .375

    for (cond1, cond2, p_value) in pwc:
        xdata = [conditions[cond1], conditions[cond2]]

        y = 1.475
        if cond1 == 'BASE' and cond2 == 'HETG_CONT':
            y += step
        elif cond1 == 'BASE_CONT':
            y = y + step * 2

        ydata = [y, y]

        line = ax.add_line(mpl.lines.Line2D(xdata, ydata, linewidth=.5, color="black"))
        anno_args = {
            'ha': 'center',
            'va': 'top',
            'size': 6,
            'color': line.get_color(),
        }

        ax.annotate("|", xy=(xdata[0], ydata[0]), **anno_args)
        ax.annotate("|", xy=(xdata[1], ydata[1]), **anno_args)

        ax.annotate(
            r'$p {}$'.format(format_p_value(p_value)), (.5, .5),
            xycoords=line, ha='center', va='bottom',
            fontsize=10
        )

    return ax

In [None]:
def draw_point_plot(df: pd.DataFrame, aov: dict = None):
    ax = sns.pointplot(
        data=df, x="condition", y="rating",
        estimator="mean", errorbar=('se'),
        capsize=.15, errwidth=.5, color="black", linestyles="none"
    )

    ax.set_xlabel(None)
    ax.set_ylabel(None)

    ax.set_ylim([0.65, 1.45])
    ax.set_yticks(np.arange(0.7, 1.5, 0.1))

    ax.yaxis.grid(color='gainsboro', alpha=.375)

    ax.tick_params(axis='both', which='major', labelsize=11)

    """
    plt_title = r'ANOVA, $F ({}, {}) = {:.2f}$, $p {}$'.format(
        aov['ddof1'],
        aov['ddof2'],
        aov['F'],
        format_p_value(aov['p-unc']),
    )

    ax.set_title(label=plt_title, fontsize=12)
    """

    ax.margins(0.25)

    return ax

In [None]:
def one_way_anova(
    df: pd.DataFrame,
    model_dict: dict,
    num_participants: int = NUM_PARTICIPANTS,
    fig_dir: str = FIG_DIR
):
    df = top_n_participants(df, num_participants)

    statistics = None
    post_hocs = None

    for construct, indicators in model_dict.items():
        construct_data = df.loc[(df['indicator'].isin(indicators))]

        # ANOVA
        aov = statistical_test(construct_data, 'anova') \
              .to_dict(orient='records')[0]
        
        # Point plot
        plot_data = normalize_data(construct_data)
        ax = draw_point_plot(plot_data, aov=None)

        # pairwise comparisons
        if aov['p-unc'] <= SIGNIFICANCE_LEVEL:
            pwc = statistical_test(construct_data, 'pwc')
            # ax = plot_pairwise_tests(ax, pwc)
            
            pwc.insert(1, 'construct', construct)
            pwc = pwc.loc[:, ['construct', 'A', 'B', 'p-corr']]

            post_hocs = (pwc.copy(deep=True) if post_hocs is None
                         else pd.concat((post_hocs, pwc), ignore_index=True))
    
        # save figure
        fig = ax.get_figure()
        fname = os.path.join(fig_dir, f'{construct}.pdf')
        fig.savefig(fname, dpi=400, bbox_inches='tight', transparent=True)
        fig.clf()

        # statistics
        stats = construct_data \
                .groupby('condition')['rating'] \
                .aggregate(['mean', ('std', lambda x: np.std(x))]) \
                .reset_index()

        cols = ['mean', 'std']
        stats[cols] = stats[cols].map(lambda x: '{0:.3f}'.format(x))
        stats['M (SD)'] = stats['mean'] + ' (' + stats['std'] + ')'

        stats = stats.drop(columns=cols).melt(id_vars='condition', var_name='desc')
        stats.loc[len(stats)] = ['ANOVA', 'p-unc', '{0:.3f}'.format(aov['p-unc'])]

        stats.insert(0, 'construct', construct)

        statistics = (stats.copy(deep=True) if statistics is None
                      else pd.concat((statistics, stats), ignore_index=True))

    statistics = statistics \
                 .pivot(index=['construct'], columns=['condition', 'desc']) \
                 .reset_index()

    statistics.columns = statistics.columns.droplevel()

    return statistics, post_hocs

In [None]:
if (
        not os.path.isfile(OUTPUT_FNAMES['anova']) or
        not os.path.isfile(OUTPUT_FNAMES['post-hocs'])
):
    statistics, post_hocs = one_way_anova(EXPERIMENT_DATA, MODEL_DICT)

    statistics.to_csv(OUTPUT_FNAMES['anova'], index=False)
    post_hocs.to_csv(OUTPUT_FNAMES['post-hocs'], index=False, float_format='%.3f')