This notebook is to calculate stats about the systems.

In [None]:
import collections
import dataclasses
import json
import re
from dataclasses import dataclass
from pathlib import Path

from rapidfuzz import fuzz

from evaluation.models import LabelledSuggestion, SuggestionLog
from evaluation.utils import (
    get_all_suggestions,
    get_annotations,
    get_experiment_by_id,
    get_experiment_infos,
    suggestions_are_same,
)
from experiments.models import ALL_MODELS
from experiments.utils import REPO_ROOT
from overhearing_agents.utils import read_jsonl

experiments = get_experiment_infos()
experiments

## get experiment durations

In [None]:
for experiment in experiments:
    pcm_fp = REPO_ROOT / experiment.pcm_fp
    duration = pcm_fp.stat().st_size / 48000
    all_suggestions = get_all_suggestions(experiment.id)
    print(f"{experiment.name}: {duration:.2f}s, {len(all_suggestions)} suggestions")

# System Timing

In [None]:
print("\n".join(ALL_MODELS))
print()
print("\n".join(experiment.id for experiment in experiments))

In [None]:
def system_timing_from_event_logs(system_id: str, experiment_id: str):
    experiment = get_experiment_by_id(experiment_id)
    experiment_duration = 0

    for subdir in (REPO_ROOT / Path(experiment.log_dir)).glob(f"{system_id}*"):
        if not subdir.is_dir():
            continue
        if not re.match(rf"{re.escape(system_id)}(__until-\d+)?$", subdir.name):
            continue
        events_fp = subdir / "events.jsonl"
        assert events_fp.exists()
        events = list(read_jsonl(events_fp))
        start = min(e["timestamp"] for e in events)
        end = max(e["timestamp"] for e in events)
        duration = end - start
        experiment_duration += duration
    print(system_id, experiment_id, experiment_duration)
    return experiment_duration


with open(REPO_ROOT / "data/starless/transcription-timing.json") as f:
    transcript_timing = json.load(f)

In [None]:
system_timing_from_event_logs("text.spans", "starless-lands-s17")

In [None]:
timings = []
for model_id in ALL_MODELS:
    model_timing = {"name": model_id}
    for experiment in experiments:
        time_taken = system_timing_from_event_logs(model_id, experiment.id)
        if "text" in model_id:
            time_taken += transcript_timing[experiment.name]
        model_timing[experiment.id] = time_taken
    timings.append(model_timing)

In [None]:
import pandas as pd

timing_df = pd.DataFrame.from_records(timings)
timing_df

# metrics

In [None]:
@dataclass
class Score:
    tp: int
    fp: int
    fn: int

    @property
    def precision(self):
        if self.tp + self.fp == 0:
            return 0.0
        return self.tp / (self.tp + self.fp)

    @property
    def recall(self):
        if self.tp + self.fn == 0:
            return 0.0
        return self.tp / (self.tp + self.fn)

    @property
    def f1(self):
        if self.precision + self.recall == 0:
            return 0.0
        return (2 * self.precision * self.recall) / (self.precision + self.recall)

    def __repr__(self):
        return f"{self.tp=}, {self.fp=}, {self.fn=}, {self.precision=:.3}, {self.recall=:.3}, {self.f1=:.3}"

    def __add__(self, other):
        return Score(tp=self.tp + other.tp, fp=self.fp + other.fp, fn=self.fn + other.fn)

    def to_dict(self, key_prefix=""):
        data = {**dataclasses.asdict(self), "precision": self.precision, "recall": self.recall, "f1": self.f1}
        return {f"{key_prefix}{k}": v for k, v in data.items()}


@dataclass
class StopwatchSuggestion:
    """Gold-labelled suggestions from stopwatch annotation"""

    time: float  # the time the annotation happened
    matches: list[str]  # the list of strings the suggestion must match to satisfy this label
    antimatches: list[str] | None = None  # a list of strings that cannot match


def length_aware_ratio(haystack: str, needle: str, **kwargs):
    """
    If the needle is length 1, return 0.
    If the needle is longer than the haystack, return full ratio.
    Otherwise return partial ratio.
    """
    if len(needle) < 2:
        return 0
    if len(needle) > len(haystack):
        return fuzz.ratio(needle, haystack, **kwargs)
    return fuzz.partial_ratio(needle, haystack, **kwargs)


def satisfies(prediction: SuggestionLog, label: LabelledSuggestion | StopwatchSuggestion, *, tolerance=300) -> bool:
    """Whether the given prediction matches the given label."""
    if isinstance(label, LabelledSuggestion):
        if label.score < 0:
            return False
        return suggestions_are_same(
            label.entry,
            prediction,
            tolerance=tolerance,
            npc_speech_similarity_ratio=length_aware_ratio,
            npc_speech_similarity_threshold=80,
        )
    # stopwatch suggestion
    timeliness = (label.time - 30) <= prediction.end <= (label.time + tolerance)
    string_match = all(s.lower() in str(prediction.suggestion).lower() for s in label.matches)
    if label.antimatches:
        antimatches = any(s.lower() in str(prediction.suggestion).lower() for s in label.antimatches)
    else:
        antimatches = False
    return timeliness and string_match and not antimatches


def get_gold_labels(experiment_id: str) -> list[LabelledSuggestion | StopwatchSuggestion]:
    out = []
    with open(f"gold/{experiment_id}.jsonl") as f:
        # with open(f"annotations/to-dedup-{experiment_id}.jsonl") as f:  # todo
        for line in f:
            if not line.strip():
                continue
            data = json.loads(line)
            if "matches" in data:
                out.append(StopwatchSuggestion(**data))
            else:
                out.append(LabelledSuggestion.model_validate(data))
    return out


def get_predictions(experiment_id: str, model_id: str) -> list[SuggestionLog]:
    return [p for p in get_all_suggestions(experiment_id) if p.model_key == model_id]


def label_is_type(l, suggestion_type):
    return (isinstance(l, LabelledSuggestion) and l.entry.suggestion["suggest_type"] == suggestion_type) or (
        isinstance(l, StopwatchSuggestion) and suggestion_type in l.matches
    )


def score(predictions: list[SuggestionLog], gold_labels: list[LabelledSuggestion | StopwatchSuggestion]) -> Score:
    """
    :param predictions:
    :param gold_labels: All the positively-annotated labels
    """

    # this block temporarily only considers suggestions up to the latest labelled gold
    # latest_gold = max((l.time if isinstance(l, StopwatchSuggestion) else l.entry.end for l in gold_labels), default=0)
    # predictions = [p for p in predictions if p.end <= latest_gold]
    # end

    pos_prediction_idxs = set()
    pos_label_idxs = set()

    for l_idx, label in enumerate(gold_labels):
        for p_idx, prediction in enumerate(predictions):
            if p_idx in pos_prediction_idxs:
                continue  # each prediction can only be a hit for one label
            if satisfies(prediction, label, tolerance=300 if label_is_type(label, "gamedata") else 30):
                pos_prediction_idxs.add(p_idx)
                pos_label_idxs.add(l_idx)
                # each label can only be a hit for one prediction unless it's an npc speech
                if not label_is_type(label, "foundry"):
                    break

    return Score(
        tp=len(pos_prediction_idxs),
        fp=len(predictions) - len(pos_prediction_idxs),
        fn=len(gold_labels) - len(pos_label_idxs),
    )

In [None]:
score(get_predictions("starless-lands-s23", "openai.audio-zeroshot"), get_gold_labels("starless-lands-s23"))

In [None]:
sum(1 for e in experiments for l in get_gold_labels(e.id))

In [None]:
def score_all(*, prediction_filter=lambda p: True, label_filter=lambda l: True, col_prefix="", debug=False):
    df_data = []
    for model_id in ALL_MODELS:
        total_score = Score(tp=0, fp=0, fn=0)
        for experiment in experiments:
            predictions = [p for p in get_predictions(experiment.id, model_id) if prediction_filter(p)]
            if not predictions:
                continue
            labels = [l for l in get_gold_labels(experiment.id) if label_filter(l)]
            total_score += score(predictions, labels)
        if debug:
            print(model_id)
            print(total_score)
        df_data.append({"model_id": model_id, **total_score.to_dict(col_prefix)})
    return pd.DataFrame.from_records(df_data, index="model_id")

In [None]:
agg_df = score_all(debug=True)
gamedata_df = score_all(
    prediction_filter=lambda p: p.suggestion["suggest_type"] == "gamedata",
    label_filter=lambda l: label_is_type(l, "gamedata"),
    col_prefix="gamedata-",
    debug=True,
)
foundry_df = score_all(
    prediction_filter=lambda p: p.suggestion["suggest_type"] == "foundry",
    label_filter=lambda l: label_is_type(l, "foundry"),
    col_prefix="foundry-",
)
improv_df = score_all(
    prediction_filter=lambda p: p.suggestion["suggest_type"] == "improvised_npc",
    label_filter=lambda l: label_is_type(l, "improvised_npc"),
    col_prefix="improv-",
)
everything_df = agg_df.join([gamedata_df, foundry_df, improv_df])
everything_df

# Visualization

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

### Main Results

text vs audio, per model

In [None]:
# Pandas code hates automatic formatting
# @formatter:off
# fmt:off

everything_df["model_id"] = [splat[0] for splat in everything_df.index.str.split(".")]
everything_df["modality"] = [splat[1].split("-")[0] for splat in everything_df.index.str.split(".")]
everything_df["prompt_type"] = [splat[1].split("-", 1)[1] if "-" in splat[1] else splat[1] for splat in everything_df.index.str.split(".")]

openai_df = everything_df.loc[(everything_df["model_id"].str.contains("openai")) & (everything_df["prompt_type"] == "zeroshot")].copy()
other_df = everything_df.loc[(~everything_df["model_id"].str.contains("openai")) & (everything_df["prompt_type"] == "fewshot") & (everything_df["model_id"] != "ultravox-tiny")].copy()
baseline_df = everything_df.loc[everything_df["model_id"] == "text"].copy()
baseline_df["modality"] = "text"
baseline_df["model_id"] = "baseline"
new_df = pd.concat([openai_df, other_df, baseline_df], ignore_index=True)

melted_f1 = pd.melt(new_df[["model_id", "f1", "gamedata-f1", "foundry-f1", "improv-f1", "modality"]], id_vars=["model_id", "modality"])
melted_f1 = melted_f1.rename({"variable": "task"}, axis=1)
melted_f1 = melted_f1.replace({"f1": "overall", "gamedata-f1": "game\ndata", "foundry-f1": "stage\ndirect", "improv-f1": "gen\nnpcs"})
melted_f1["model_id"] = melted_f1["model_id"].str.replace("openai", "gpt-4o")

g = sns.catplot(
    data=melted_f1,
    x="task",
    y="value",
    col="model_id",
    hue="modality",
    kind="bar",
    height=5,
    aspect=0.65,
    palette="husl",
    legend_out=False,
)
#sns.displot(data=melted_f1,x="model_id",y="value",hue="f1_type",)

# aesthetics
sns.set_theme("paper", "whitegrid", font="serif", font_scale=1.7)
g.set_axis_labels("", "F1 score")
g.set_titles(col_template="{col_name}")
g.tick_params(axis='x')
g.figure.subplots_adjust(wspace=0.03, hspace=0)
sns.move_legend(g, "center right", bbox_to_anchor=(.995, .58), frameon=False)

g.savefig("figs/performance.pdf")
# @formatter:on
# fmt:on

### Input Variation

per model, text-noreason/audio-noreason/audio-transcribe

In [None]:
everything_df

In [None]:
# Pandas code hates automatic formatting
# @formatter:off
# fmt:off

everything_df["model_id"] = [splat[0] for splat in everything_df.index.str.split(".")]
everything_df["variation"] = [splat[1].replace("zeroshot-", "").replace("fewshot-", "") for splat in everything_df.index.str.split(".")]

variations_df = everything_df.loc[everything_df["variation"].str.contains("noreason") | everything_df["variation"].str.contains("transcribe")].copy()
main_df = everything_df.loc[
    (everything_df["model_id"].str.contains("openai") & everything_df["variation"].str.endswith("audio-zeroshot"))
    | (~everything_df["model_id"].str.contains("openai") & everything_df["variation"].str.endswith("audio-fewshot"))
].copy()
main_df["variation"] = "audio"
baseline_df = everything_df.loc[everything_df["model_id"] == "text"].copy()
baseline_df["variation"] = "baseline"
new_df = pd.concat([variations_df, main_df], ignore_index=True)

var_melted_f1 = pd.melt(new_df[["model_id", "f1", "gamedata-f1", "foundry-f1", "improv-f1", "modality", "variation"]], id_vars=["model_id", "modality", "variation"])
var_melted_f1 = var_melted_f1.rename({"variable": "task"}, axis=1)
var_melted_f1 = var_melted_f1.replace({"f1": "overall", "gamedata-f1": "game\ndata", "foundry-f1": "stage\ndirect", "improv-f1": "gen\nnpcs"})
var_melted_f1 = var_melted_f1.loc[var_melted_f1["model_id"] != "ultravox-tiny"]

g = sns.catplot(
    data=var_melted_f1,
    x="task",
    y="value",
    col="model_id",
    hue="variation",
    kind="bar",
    height=5,
    aspect=0.65,
    palette="husl",
    legend_out=False,
)
#sns.displot(data=melted_f1,x="model_id",y="value",hue="f1_type",)

# aesthetics
sns.set_theme("paper", "whitegrid", font="serif", font_scale=1.7)
g.set_axis_labels("", "F1 score")
g.set_titles(col_template="{col_name}")
g.tick_params(axis='x')
g.figure.subplots_adjust(wspace=0.03, hspace=0)
sns.move_legend(g, "center right", bbox_to_anchor=(.995, .58), frameon=False)

g.savefig("figs/variations.pdf")
# @formatter:on
# fmt:on

## timing visualization

In [None]:
core_splits = [
    "openai.text-zeroshot",
    "openai.audio-zeroshot",
    "openai-mini.text-zeroshot",
    "openai-mini.audio-zeroshot",
    "ultravox.text-fewshot",
    "ultravox.audio-fewshot",
    "qwen-25.text-fewshot",
    "qwen-25.audio-fewshot",
    "phi-4.text-fewshot",
    "phi-4.audio-fewshot",
    #'text.spans',
]
timing_df = pd.read_csv("annotations/Starless Lands Data - system timing.csv")
timing_df["model_id"] = [splat[0].replace("openai", "gpt-4o") for splat in timing_df["name"].str.split(".")]
timing_df["modality"] = [
    splat[1].replace("-zeroshot", "").replace("-fewshot", "") for splat in timing_df["name"].str.split(".")
]
timing_df["model_key_clean"] = [splat.replace("-zeroshot", "").replace("-fewshot", "") for splat in timing_df["name"]]
timing_df = timing_df.loc[timing_df["name"].isin(core_splits)]

In [None]:
sns.set_theme("paper", "ticks", font="serif", font_scale=1.2)
g = sns.catplot(
    data=timing_df,
    x="model_id",
    y=timing_df["relative"] - 1,
    kind="bar",
    hue="modality",
    palette="husl",
    height=4,
    aspect=1.3,
    # legend=False,
)

# g.ax.plot(2.8, 0.57, "^", markersize=10, color="k")

plt.axhline(0, color="k")
# plt.ylim(-0.25, 1.5)
g.set_axis_labels("", "Times Realtime")
g.set_titles(col_template="{col_name}")
g.tick_params(axis="x")
g.figure.subplots_adjust(wspace=0.03, hspace=0)
g.ax.set_yticklabels([f"{label + 1}x" for label in g.ax.get_yticks()])
sns.move_legend(g, "upper left", bbox_to_anchor=(0.2, 0.9), frameon=False)
g.savefig("figs/timing.pdf")

## what if we only score recall on stopwatch annotations

In [None]:
agg_df_stopwatch = score_all(label_filter=lambda l: isinstance(l, StopwatchSuggestion))
gamedata_df_stopwatch = score_all(
    prediction_filter=lambda p: p.suggestion["suggest_type"] == "gamedata",
    label_filter=lambda l: label_is_type(l, "gamedata") and isinstance(l, StopwatchSuggestion),
    col_prefix="gamedata-",
)
foundry_df_stopwatch = score_all(
    prediction_filter=lambda p: p.suggestion["suggest_type"] == "foundry",
    label_filter=lambda l: label_is_type(l, "foundry") and isinstance(l, StopwatchSuggestion),
    col_prefix="foundry-",
)

improv_df_stopwatch = score_all(
    prediction_filter=lambda p: p.suggestion["suggest_type"] == "improvised_npc",
    label_filter=lambda l: label_is_type(l, "improvised_npc") and isinstance(l, StopwatchSuggestion),
    col_prefix="improv-",
)
everything_df_stopwatch = agg_df.join([gamedata_df_stopwatch, foundry_df_stopwatch, improv_df_stopwatch])

In [None]:
everything_df_stopwatch

# raw annotations heatmap

In [None]:
MAIN_MODELS = [
    "openai.audio-zeroshot",
    "openai.text-zeroshot",
    "openai-mini.audio-zeroshot",
    "openai-mini.text-zeroshot",
    "ultravox.audio-fewshot",
    "ultravox.text-fewshot",
    "qwen-25.audio-fewshot",
    "qwen-25.text-fewshot",
    "phi-4.audio-fewshot",
    "phi-4.text-fewshot",
    "text.spans",
]


def get_annotation_index(experiment_id):
    annotations = list(get_annotations(experiment_id))
    annotations_by_suggestion_id = collections.defaultdict(list)

    # go through the human annotations and build up an index
    for annotation in annotations:
        annotations_by_suggestion_id[annotation.suggestion_id].append(annotation)

    # remove all suggestions that are only copies
    for suggestion_id, suggestion_annotations in annotations_by_suggestion_id.copy().items():
        if all("copied" in a.why for a in suggestion_annotations):
            annotations_by_suggestion_id.pop(suggestion_id)

    return annotations_by_suggestion_id


full_annotation_index = {}
for experiment in experiments:
    full_annotation_index.update(get_annotation_index(experiment.id))

In [None]:
len(full_annotation_index)

In [None]:
label_counts_by_model = collections.Counter()

for suggestion_id, annotations in full_annotation_index.items():
    labels_merged = {label for annotation in annotations for label in annotation.labels}
    model_key = annotations[0].entry.model_key
    for label in labels_merged:
        label_counts_by_model[(model_key, label)] += 1

In [None]:
label_counts_by_model_records = [
    (model_id, label, count)
    for (model_id, label), count in label_counts_by_model.items()
    if model_id in MAIN_MODELS and label not in ("appropriate", "inappropriate")
]
label_counts_df = pd.DataFrame.from_records(label_counts_by_model_records, columns=["model_id", "label", "value"])

In [None]:
label_counts_df_pvt = label_counts_df.pivot(index="model_id", columns="label", values="value")
# fill holes
label_counts_df_pvt.fillna(0, inplace=True)
# normalize
label_counts_df_pvt = label_counts_df_pvt.div(label_counts_df_pvt.sum(axis=1), axis=0)
label_counts_df_pvt

In [None]:
sns.heatmap(label_counts_df_pvt)