In [None]:
import itertools
import math
import pickle
from collections import Counter
from collections.abc import Callable
from pprint import pprint

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd

In [None]:
with open("../artifacts/human_eval_cache.pkl", "rb") as f:
    cache = pickle.load(f)

len(cache)

In [None]:
keys = list(cache.keys())
values = list(cache.values())
value_keys = list(values[0].keys())
value_values = list(values[0].values())

print(
    f"Key: {type(keys[0])}",
    f"Key elements: {[type(k) for k in keys[0]]}",
    f"Value: {type(values[0])}",
    f"Value key: {type(value_keys[0])}",
    f"Value value: {type(value_values[0])}",
    sep="\n",
)

In [None]:
print("Key:")
pprint(keys[0])
print()
print("Value:")
pprint(values[0])

In [None]:
hashes = [hash(t) for t in cache]
len(hashes), len(set(hashes))

In [None]:
df = pd.DataFrame(
    [
        {
            "q": q,
            "a": a,
            "n": n,
            "ann": sorted(ann["val_annotations"]),
        }
        for (q, a, n), ann in cache.items()
    ],
)

In [None]:
df.head()

In [None]:
df["ann"] = df["ann"].map(lambda x: [i + 2 for i in x])
df["ann"].explode().agg(["min", "max"])

In [None]:
df["std"] = df["ann"].map(np.std)
df["std"].describe()

In [None]:
df.sort_values("std")

In [None]:
df[df["ann"].map(lambda x: x == [0, 2, 4])]

In [None]:
lst = [0, 2, 4]
np.diff(lst)

In [None]:
def listeq(lst: list[int]) -> Callable[[list[int]], bool]:
    def eq(el: list[int]) -> bool:
        return lst == el

    return eq


def entropy(data: list[int]) -> float:
    counts = np.bincount(data)
    p = counts / len(data)
    return -np.sum(p[p > 0] * np.log2(p[p > 0]))


def gini_coefficient(data: list[int]) -> float:
    if np.all(data == data[0]) or np.isclose(np.sum(data), 0):
        return 0

    sorted_data = np.sort(data)
    n = len(data)

    return (
        2 * np.sum(np.arange(1, n + 1) * sorted_data) / (n * np.sum(sorted_data))
    ) - (n + 1) / n


def calculate_alpha(ratings: list[int]) -> float:
    """
    Calculate Krippendorff's Alpha for a single example with nominal data.

    :param ratings: List of ratings (integers or strings) for a single example by different raters.
    :return: Krippendorff's Alpha as a float.
    """
    arratings = np.array(ratings)

    # Count the occurrences of each rating
    _, counts = np.unique(arratings, return_counts=True)
    n = len(arratings)

    # Calculate observed disagreement (D_o)
    D_o = sum(c * (c - 1) for c in counts)  # Pairwise comparisons for each category
    D_o = 0 if n <= 1 else 1 - D_o / (n * (n - 1))

    # Calculate expected disagreement (D_e)
    D_e = 1 - sum((counts / n) ** 2)  # Prob. of random agreement for each category

    # Calculate Krippendorff's Alpha
    return 1 - D_o / D_e if D_e != 0 else 1  # Handle division by zero


def observed_agreement_single_item(coder_codes: list[int]) -> float:
    """
    Calculates the observed agreement among coders for a single item with K categories
    according to the provided formula, using Counter for category counts and a
    comprehension for agreement calculation.

    Args:
        coder_codes: A list of integers representing the codes assigned by each coder.

    Returns:
        The observed agreement as a float.
    """

    n_coders = len(coder_codes)
    category_counts = Counter(coder_codes)

    agreement = sum(n_k * (n_k - 1) for n_k in category_counts.values())

    return agreement / (n_coders * (n_coders - 1))  # Normalize by total comparisons


def calculate_agreement(
    data: list[int], *, labels: list[int], weight_type: str
) -> float:
    """Calculte the agreement between multiple raters on a single item.

    The calculation is based on the average weighted difference between all pairs of
    values in `data`. `labels` is used to determine the possible values in the data in
    case the data doesn't represent all of them.

    The final value is normalized to the range [0, 1] by dividing by the maximum possible
    weighted difference.

    Args:
        data:
            A list of integers representing the ratings from different raters. There
            must be at least two items.
        labels:
            A list of integers specifying all potential rating values. This is used
            to determine the range for normalization.
        weight_type:
            A string indicating the weighting scheme to use. Valid options: "quadratic",
            "linear", "binary".

    Returns:
        A float in the range [0, 1] representing the normalized agreement between
        raters.  A value of 0 indicates maximum disagreement, and a value of 1 indicates
        perfect agreement.

    Raises:
        ValueError:
            If `data` contains fewer than two values or if an invalid  `weight_type` is
            provided.
    """
    if len(data) < 2:
        raise ValueError("Data must contain at least two values.")

    combinations = list(itertools.combinations(data, 2))

    if weight_type == "quadratic":
        disagreement = sum((x - y) ** 2 for x, y in combinations)
        normaliser = (max(labels) - min(labels)) ** 2
    elif weight_type == "linear":
        disagreement = sum(abs(x - y) for x, y in combinations)
        normaliser = abs(max(labels) - min(labels))
    elif weight_type == "binary":
        disagreement = sum(x != y for x, y in combinations)
        normaliser = 1
    else:
        raise ValueError(f"Invalid weight type: {weight_type}")

    k = disagreement / normaliser / len(combinations)
    return 1 - k


def randolph(data: list[int], *, labels: list[int]) -> float:
    # Make sure the labels start at 0
    if min(labels) != 0:
        data = [x - min(labels) for x in data]

    table = np.zeros(len(labels))
    np.add.at(table, data, 1)

    n_rat = table.sum()

    table2 = table**2
    p_rat = (table2.sum() - n_rat) / (n_rat * (n_rat - 1.0))
    p_mean = p_rat.mean()

    # Uniform distribution instead of marginal frequency of categories so it's defined
    # for single sample. Marginal frequency will sometimes give p_mean_exp = 0, so the
    # the numerator will be zero and the kappa will be undefined.
    p_mean_exp = 1 / len(labels)
    kappa = (p_mean - p_mean_exp) / (1 - p_mean_exp)
    return kappa


def consensus(data: list[int], *, labels: list[int]) -> float:
    """From "Consensus and dissention: A measure of ordinal dispersion (2007)"
    By William J. Tastle, Mark J. Wierman.
    """
    # Make sure the labels start at 0 for bincount
    data = [x - min(labels) for x in data]
    p = np.bincount(data, minlength=len(labels)) / len(data)

    d_x = max(labels) - min(labels)
    u_x = (p * labels).sum()

    # Page 8 
    return 1 + sum(
        p[i] * math.log2(1 - (abs(labels[i] - u_x) / d_x))
        for i in range(len(labels))
        if p[i] > 0
    )


labels = [0, 1, 2, 3, 4]
metrics = dict(
    entropy=lambda x: x["ann"].map(entropy),
    gini=lambda x: x["ann"].map(gini_coefficient),
    alpha=lambda x: x["ann"].map(calculate_alpha),
    agr=lambda x: x["ann"].map(observed_agreement_single_item),
    agr_q=lambda x: x["ann"].apply(
        calculate_agreement, weight_type="quadratic", labels=labels
    ),
    agr_l=lambda x: x["ann"].apply(
        calculate_agreement, weight_type="linear", labels=labels
    ),
    agr_b=lambda x: x["ann"].apply(
        calculate_agreement, weight_type="binary", labels=labels
    ),
    randolph=lambda x: x["ann"].apply(randolph, labels=labels),
    consensus=lambda x: x["ann"].apply(consensus, labels=labels),
)

dd = df.assign(**metrics)
examples = [
    [0, 0, 0],
    [1, 1, 1],
    [1, 2, 3],
    [0, 0, 1],
    [0, 0, 2],
    [0, 0, 3],
    [0, 0, 4],
    [0, 2, 4],
    [0, 0, 0],
    [0, 4, 4, 4, 4, 4],
    [0, 1, 2, 3, 4, 4],
]
pd.concat(
    [dd[dd["ann"].map(listeq(lst))].iloc[0] for lst in examples],
    axis=1,
).transpose()[["ann", *metrics]]

In [None]:
dd[dd.ann.map(len) == 6].drop_duplicates("ann")[
    ["ann", *metrics]
].sort_values("agr", ascending=False)

In [None]:
labels = [1,2,3,4,5]
ratings = [
    [1, 1, 1, 5, 5, 5],
    [1, 5],
    [1, 4],
    [1, 3],
    [1, 2],
    [1, 1]
]
data = [
    {
        'ratings': r,
        'agr_q': calculate_agreement(r, weight_type='quadratic', labels=labels),
        'agr_l': calculate_agreement(r, weight_type='linear', labels=labels),
        'agr_b': calculate_agreement(r, weight_type='binary', labels=labels),
        'randolph': randolph(r, labels=labels),
        'consensus': consensus(r, labels=labels),
    }
    for r in ratings
]
pd.DataFrame(data)

In [None]:
dd.describe()

In [None]:
dd.sort_values('consensus').head()

In [None]:
def plot_dist(df: pd.DataFrame, col: str) -> None:
    plt.figure(figsize=(3.5, 2.5))
    plt.hist(df[col], bins=10, edgecolor="black")
    plt.xlabel(col)
    plt.ylabel("Frequency")
    plt.title(f"Distribution of {col}")
    plt.grid(True)
    plt.show()

In [None]:
metrics = ["consensus", "agr_q", "agr_l", "agr_b"]
for col in metrics:
    plot_dist(dd, col)

In [None]:
dd[metrics].describe()

In [None]:
def plot_dist(df: pd.DataFrame, metrics: list) -> None:
    rows = len(metrics) // 2  # Calculate rows (assuming you want 2 columns)
    cols = 2 
    fig, axes = plt.subplots(rows, cols, figsize=(7, 5))  # Adjust figsize as needed

    for i, m in enumerate(metrics):
        row = i // cols
        col = i % cols
        ax = axes[row, col]
        ax.hist(df[m], bins=10, edgecolor="black")
        ax.set_xlabel(m)
        ax.set_ylabel("Frequency")
        ax.set_title(f"Distribution of {m}")
        ax.grid(True)

    plt.tight_layout()  # Prevent overlapping
    plt.show()

metrics = ["consensus", "agr_q", "agr_l", "agr_b"]
plot_dist(dd, metrics)  # Assuming 'dd' is your DataFrame 
dd[metrics].describe()

In [None]:
all(m in dd.columns for m in metrics)