In [None]:
import pandas as pd
from analysis import *
from statsmodels.stats.proportion import proportions_ztest
from scipy.stats import binom_test

import matplotlib.pyplot as plt
import numpy as np

In [None]:
surge_annotations = data.surge_evaluation.annotation_dataframe()
surge_annotations_comparative = data.surge_evaluation.comparative_annotation_dataframe()

surge_annotations

# 8 Comprehensive Analysis

### Metric Sensitivity

In [None]:
from itertools import combinations
from scipy.stats import ttest_ind

def p_vals(df: pd.DataFrame, test='t'):
    """
    :param df: (bot, data point) x 1 -> score
    :param test: statistical test function (t for t test, p for prop test, s for sign test)
    :return: p values of test on each bot pair (pd.Series)
    """
    bots = set(df.index.get_level_values(0))
    bot_pairs = list(combinations(bots, 2))
    result = {}
    for ba, bb in bot_pairs:
        a = df.xs(ba).to_numpy().squeeze()
        b = df.xs(bb).to_numpy().squeeze()
        if test == 't':
            t, p = ttest_ind(a, b, equal_var=False)
        elif test == 'p':
            z, p = proportions_ztest(count=[
                sum(a), sum(b)
            ], nobs=[
                len(a), len(b)
            ])
        elif test == 's':
            # sign test
            a = a[a==1]
            b = b[b==1]
            p = binom_test(sum(a), sum(a)+sum(b), p=0.5)
        else:
            raise ValueError('invalid arg for param "test"')
        result[(ba, bb)] = p
    result_series = pd.Series(result.values(), result)
    return result_series

@to_file
def t_test_p_values_comparing_bots(annotations):
    annotations = get_singly_annotated(annotations)
    prop_annotations = annotations.xs(
        category.behavior, level=sym.category, drop_level=False
    )
    comp_annotations = annotations.xs(
        category.comparative, level=sym.category, drop_level=False
    )
    mean_annotations = annotations.drop(
        index=category.behavior, level=sym.category
    ).drop(
        index=category.comparative, level=sym.category
    )
    mean_ps = mean_annotations.groupby(
        [sym.category, sym.label]
    ).apply(p_vals)
    prop_ps = prop_annotations.groupby(
        [sym.category, sym.label]
    ).apply(lambda x: p_vals(x, test='p'))
    comp_ps = comp_annotations.groupby(
        [sym.category, sym.label]
    ).apply(lambda x: p_vals(x, test='s'))
    result = pd.concat([prop_ps, mean_ps, comp_ps], axis=0)
    return result

t_test_p_values_comparing_bots(surge_annotations, load='results/t_test_p_values_comparing_bots')

### Predictive Validity

In [None]:
from statsmodels.miscmodels.ordinal_model import OrderedModel
from statsmodels.regression.linear_model import OLS as LinearModel
from statsmodels.tools.tools import add_constant

def dialogue_metrics(ev):
    df: pd.DataFrame = ev.annotation_dataframe()
    df = get_singly_annotated(df, seed=123)
    reindexed = df.reset_index()
    items = reindexed[sym.item]
    dialogues = [e[0] if isinstance(e, tuple) else e for e in items]
    reindexed['dialogue'] = dialogues
    reindexed.set_index(
        [sym.bot, sym.category, sym.label, 'dialogue', sym.item],
        inplace=True, verify_integrity=True
    )
    ld = reindexed.xs(category.likert_dialogue, level=sym.category)
    ld = ld.droplevel(sym.bot).droplevel(sym.item)
    ld.columns = ['score']
    ldq = ld.xs(scale.quality, level=sym.label)
    ldq.columns = ['quality']

    lt = reindexed.xs(category.likert_turn, level=sym.category)
    lt = lt.groupby([sym.label, 'dialogue']).mean()
    lt.columns = ['score']
    ltq = lt.xs(scale.quality, level=sym.label)
    ltq.columns = ['quality']

    be = reindexed.xs(category.behavior, level=sym.category)
    be = be.groupby([sym.label, 'dialogue']).mean()
    be.columns = ['score']

    ds = pd.concat(
        [lt, be, ld],
        keys=[category.likert_turn, category.behavior, category.likert_dialogue],
        names=[sym.category, sym.label, 'dialogue']
    )
    likert_dialogue_quality_features = ds.join(ldq, on='dialogue')
    likert_turn_quality_features = ds.join(ltq, on='dialogue')
    return likert_dialogue_quality_features, likert_turn_quality_features


def regressions(df, quality_column_name=None, model='linear'):
    """
    :param df: dialogue x (*features, quality) -> value
    :return: *(coef, low, high), mcfadden r^2
    """
    if not quality_column_name:
        quality_column_name = df.columns[-1]
    qualities = df[quality_column_name]
    features = [f for f in df.columns if f != quality_column_name]
    if model == 'ordinal':
        model = OrderedModel(qualities, df[features], distr='logit')
        results = model.fit()
        coefs = {f: results.params[f] for f in features}
        prsqrd = results.prsquared
        result = {stat.mcfad_r2: prsqrd, stat.p_of_llr_test: results.llr_pvalue}
    elif model == 'linear':
        x = add_constant(df[features])
        y = qualities
        model = LinearModel(y, x)
        results = model.fit()
        coefs = {f: results.params[f] for f in features}
        rsquared = results.rsquared
        result = {**coefs, stat.r2: rsquared, stat.p_of_f_test: results.f_pvalue}
    else:
        raise ValueError('Param "model" must be one of {"linear", "ordinal"}')
    return pd.Series(result.values(), result)

In [None]:
@to_file
def dialogue_quality_regressions(ev):
    ldq, ltq = dialogue_metrics(ev)
    ldq_groups = ldq.groupby(
        [sym.category, sym.label]
    )
    ltq_groups = ltq.groupby(
        [sym.category, sym.label]
    )
    names = ['Predicted', 'Metric']
    linear_result = ldq_groups.apply(lambda x: regressions(x, model='linear'))
    linear_result.columns = pd.MultiIndex.from_arrays(
        [['Likert Dialogue Quality']*3,
        ['LR Coefficient', 'LR R-Squared', stat.p_of_f_test]],
        names=names
    )
    ordinal_result = ldq_groups.apply(lambda x: regressions(x, model='ordinal'))
    ordinal_result.columns = pd.MultiIndex.from_arrays(
        [['Likert Dialogue Quality']*2,
        ['OR Pseudo R-Squared', stat.p_of_llr_test]],
        names=names
    )
    linear_turn_result = ltq_groups.apply(regressions)
    linear_turn_result.columns = pd.MultiIndex.from_arrays(
        [['Likert Turn Quality']*3,
        ['LR Coefficient', 'LR R-Squared', stat.p_of_f_test]],
        names=names
    )
    result = pd.concat((linear_turn_result, linear_result, ordinal_result), axis=1)
    return result.round(5)

regs = dialogue_quality_regressions(
    data.surge_evaluation,
    load='results/dialogue_quality_regressions'
)
regs

In [None]:
to_plot_regs = regs[[("Likert Dialogue Quality", "LR R-Squared"), ("Likert Dialogue Quality", "P value of F-test")]]
to_plot_regs = to_plot_regs.drop(("likert dialogue", "quality"))
to_plot_regs = to_plot_regs.reset_index()
to_plot_regs

In [None]:
# Build the plot
SMALL_SIZE = 16
MEDIUM_SIZE = 20
BIGGER_SIZE = 24

plt.rc('font', size=SMALL_SIZE)          # controls default text sizes
plt.rc('axes', titlesize=SMALL_SIZE)     # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=SMALL_SIZE)    # legend fontsize
plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title

plt.rcParams["figure.figsize"] = (20,10)

fig, ax = plt.subplots()

def plot_by_category(ax, df, category, color, xaxis_start):
    extracted = df[df["category"] == category]
    xaxis_end = xaxis_start + len(extracted)
    ax.bar(np.arange(xaxis_start, xaxis_end),
                extracted[("Likert Dialogue Quality", "LR R-Squared")],
                color=color)
    return xaxis_end

likert_turn_color = "blue"
likert_dialogue_color = "red"
comparative_color = "green"
behavior_color = "orange"

behavior_start = plot_by_category(ax, to_plot_regs, "likert turn", likert_turn_color, 0)
likert_dialogue_start = plot_by_category(ax, to_plot_regs, "behavior", behavior_color, behavior_start)
misc_start = plot_by_category(ax, to_plot_regs, "likert dialogue", likert_dialogue_color, likert_dialogue_start)

category_range = {behavior_start: likert_turn_color, likert_dialogue_start: behavior_color, misc_start: likert_dialogue_color}
xaxis_colors = {}
prev_idx = 0
for idx, color in category_range.items():
    for i in range(prev_idx, idx):
        xaxis_colors[i] = color
    prev_idx = idx

ax.set_ylabel("R-squared", labelpad=20)
xpos = np.arange(len(to_plot_regs))
ax.set_xlabel("Evaluation Label")
ax.set_xticks(xpos)
ax.set_xticklabels(to_plot_regs["label"], rotation=90)
for tickloc, ticklabel in zip(plt.gca().get_xticks(), plt.gca().get_xticklabels()):
    ticklabel.set_color(xaxis_colors[tickloc])

# Save the figure and show
plt.tight_layout()
plt.show()

In [None]:
def drop_column_level_duplication(df: pd.DataFrame, columns, levels=None):
    if levels is None:
        levels = list(range(len(columns)))
    level_columns = df.xs(columns, axis=1, level=levels)
    unique = level_columns.iloc[:,0].to_frame()
    unique.columns = [columns]
    dropped = df.drop(columns=columns, level=levels)
    result = pd.concat([dropped, unique], axis=1)
    return result

def multivariate_regression(df: pd.DataFrame, model='linear'):
    def apply_regressions(df: pd.DataFrame):
        unstacked = df.unstack([sym.category, sym.label])
        unstacked = drop_column_level_duplication(unstacked, 'quality', 0)
        results = regressions(unstacked, quality_column_name='quality', model=model)
        return results
    result = apply_regressions(df)
    result.index = [
        (idx[1] if isinstance(idx, tuple) else idx)
        for idx in result.index.values
    ]
    return result.round(5)

@to_file
def incremental_regression(df: pd.DataFrame, categories, model='linear', exclude_quality=True):
    r2_name = stat.r2 if model=='linear' else stat.mcfad_r2
    p_name = stat.p_of_f_test if model=='linear' else stat.p_of_llr_test
    results = []
    features = []
    feature_pool = {
        x[:2] for x in df.index.values
        if (not (exclude_quality and scale.quality in x))
        and x[0] in categories
    }
    while feature_pool - set(features):
        best = None
        for candidate in feature_pool - set(features):
            candidate_features = features + [candidate]
            row_mask = [
                x[:2] in candidate_features
                and (not (exclude_quality and scale.quality in x))
                and x[0] in categories
                for x in df.index.values
            ]
            candidate_df = df.loc[row_mask, :]
            candidate_results = multivariate_regression(candidate_df, model=model)

            r2 = candidate_results[r2_name].item()
            p = candidate_results[p_name]
            if not best or best[1] < r2:
                best = (candidate, r2, p)
        features.append(best[0])
        results.append(best)
    result = {l: {r2_name: r2, p_name: p} for l, r2, p in results}
    return pd.DataFrame(result.values(), result)

ldq, ltq = dialogue_metrics(data.surge_evaluation)
regs = incremental_regression(
    ldq, (category.likert_turn, category.behavior),
    reload='results/dialogue_incremental_regressions'
)
regs

In [None]:
regs = incremental_regression(
    ldq, (category.behavior,),
    reload='results/behavior_incremental_regressions'
)
regs

In [None]:
regs = incremental_regression(
    ldq, (category.likert_turn,),
    reload='results/likert_turn_incremental_regressions'
)
regs

In [None]:
regs = incremental_regression(
    ldq, (category.likert_dialogue,),
    reload='results/likert_dialogue_incremental_regressions'
)
regs

In [None]:
# Build the plot
SMALL_SIZE = 16
MEDIUM_SIZE = 20
BIGGER_SIZE = 24

plt.rc('font', size=SMALL_SIZE)          # controls default text sizes
plt.rc('axes', titlesize=SMALL_SIZE)     # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=SMALL_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=SMALL_SIZE)    # legend fontsize
plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title

plt.rcParams["figure.figsize"] = (20,10)

fig, ax = plt.subplots()

def plot_by_category(ax, df, category, color, xaxis_start):
    extracted = df[df["category"] == category]
    xaxis_end = xaxis_start + len(extracted)
    ax.bar(np.arange(xaxis_start, xaxis_end),
                extracted[("Likert Dialogue Quality", "LR R-Squared")],
                color=color)
    return xaxis_end

likert_turn_color = "blue"
likert_dialogue_color = "red"
comparative_color = "green"
behavior_color = "orange"

behavior_start = plot_by_category(ax, to_plot_regs, "likert turn", likert_turn_color, 0)
likert_dialogue_start = plot_by_category(ax, to_plot_regs, "behavior", behavior_color, behavior_start)
misc_start = plot_by_category(ax, to_plot_regs, "likert dialogue", likert_dialogue_color, likert_dialogue_start)

category_range = {behavior_start: likert_turn_color, likert_dialogue_start: behavior_color, misc_start: likert_dialogue_color}
xaxis_colors = {}
prev_idx = 0
for idx, color in category_range.items():
    for i in range(prev_idx, idx):
        xaxis_colors[i] = color
    prev_idx = idx

ax.set_ylabel("R-squared", labelpad=20)
xpos = np.arange(len(to_plot_regs))
ax.set_xlabel("Evaluation Label")
ax.set_xticks(xpos)
ax.set_xticklabels(to_plot_regs["label"], rotation=90)
for tickloc, ticklabel in zip(plt.gca().get_xticks(), plt.gca().get_xticklabels()):
    ticklabel.set_color(xaxis_colors[tickloc])

# Save the figure and show
plt.tight_layout()
plt.show()

In [None]:
def drop_column_level_duplication(df: pd.DataFrame, columns, levels=None):
    if levels is None:
        levels = list(range(len(columns)))
    level_columns = df.xs(columns, axis=1, level=levels)
    unique = level_columns.iloc[:,0].to_frame()
    unique.columns = [columns]
    dropped = df.drop(columns=columns, level=levels)
    result = pd.concat([dropped, unique], axis=1)
    return result

def multivariate_regression(df: pd.DataFrame, model='linear'):
    def apply_regressions(df: pd.DataFrame):
        unstacked = df.unstack([sym.category, sym.label])
        unstacked = drop_column_level_duplication(unstacked, 'quality', 0)
        results = regressions(unstacked, quality_column_name='quality', model=model)
        return results
    result = apply_regressions(df)
    result.index = [
        (idx[1] if isinstance(idx, tuple) else idx)
        for idx in result.index.values
    ]
    return result.round(5)

@to_file
def incremental_regression(df: pd.DataFrame, categories, model='linear', exclude_quality=True):
    r2_name = stat.r2 if model=='linear' else stat.mcfad_r2
    p_name = stat.p_of_f_test if model=='linear' else stat.p_of_llr_test
    results = []
    features = []
    feature_pool = {
        x[:2] for x in df.index.values
        if (not (exclude_quality and scale.quality in x))
        and x[0] in categories
    }
    while feature_pool - set(features):
        best = None
        for candidate in feature_pool - set(features):
            candidate_features = features + [candidate]
            row_mask = [
                x[:2] in candidate_features
                and (not (exclude_quality and scale.quality in x))
                and x[0] in categories
                for x in df.index.values
            ]
            candidate_df = df.loc[row_mask, :]
            candidate_results = multivariate_regression(candidate_df, model=model)

            r2 = candidate_results[r2_name].item()
            p = candidate_results[p_name]
            if not best or best[1] < r2:
                best = (candidate, r2, p)
        features.append(best[0])
        results.append(best)
    result = {l: {r2_name: r2, p_name: p} for l, r2, p in results}
    return pd.DataFrame(result.values(), result)

ldq, ltq = dialogue_metrics(data.surge_evaluation)
regs = incremental_regression(
    ldq, (category.likert_turn, category.behavior),
    reload='results/dialogue_incremental_regressions'
)
regs

In [None]:
regs = incremental_regression(
    ldq, (category.behavior,),
    reload='results/behavior_incremental_regressions'
)
regs

In [None]:
regs = incremental_regression(
    ldq, (category.likert_turn,),
    reload='results/likert_turn_incremental_regressions'
)
regs

In [None]:
regs = incremental_regression(
    ldq, (category.likert_dialogue,),
    reload='results/likert_dialogue_incremental_regressions'
)
regs