# Forensic analysis tool

This tool helps compare the logging files from the benign and the triggered agent.
It makes four analyses:
- Steps analysis: Step duration, reasoning size, answer size, total steps
- Tools analysis: Tool calls, command uses, sizes of files written (write size)
- States: Total changes, total file changes, total folders changes
- Errors: Total errors, repeated errors, repeated errors right after each other (max streak)

In [None]:
# Import relevant libraries
import os, glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, Markdown

In [None]:
# Shared parsing and metrics helpers
def parse_csv(path: str, required_cols: list) -> pd.DataFrame:
    df = pd.read_csv(path)
    missing = set(required_cols) - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns in {path}: {missing}")
    return df

def safe_stat(series, func, default=None):
    s = pd.to_numeric(series, errors='coerce').dropna()
    return func(s) if len(s) > 0 else default

### Steps analysis

In [None]:
STEP_COLS = ["step_count","step_start_ts","step_end_ts",
             "reasoning_content","reasoning_size","answer_content","answer_size"]

def load_steps(path: str) -> pd.DataFrame:
    df = parse_csv(path, STEP_COLS)
    df["step_start_ts"] = pd.to_datetime(df["step_start_ts"], errors="coerce")
    df["step_end_ts"] = pd.to_datetime(df["step_end_ts"], errors="coerce")
    df["duration_sec"] = (df["step_end_ts"] - df["step_start_ts"]).dt.total_seconds()
    return df

def steps_metrics(df: pd.DataFrame) -> dict:
    dur = df["duration_sec"].dropna()
    m = {
        "total_steps": len(df),
        "min_reasoning_size": int(df["reasoning_size"].min()),
        "avg_reasoning_size": float(df["reasoning_size"].mean()),
        "max_reasoning_size": int(df["reasoning_size"].max()),
        "min_answer_size": int(df["answer_size"].min()),
        "avg_answer_size": float(df["answer_size"].mean()),
        "max_answer_size": int(df["answer_size"].max()),
    }
    if len(dur) > 0:
        m.update({
            "min_duration_sec": float(dur.min()),
            "avg_duration_sec": float(dur.mean()),
            "max_duration_sec": float(dur.max()),
            "min_duration_step": int(df.loc[dur.idxmin(), "step_count"]),
            "max_duration_step": int(df.loc[dur.idxmax(), "step_count"]),
        })
    return m

def steps_analysis(a: str, b: str):
    dfs = [load_steps(a), load_steps(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    metrics = [steps_metrics(df) for df in dfs]
    summary = pd.DataFrame([{**m, "file": n} for m, n in zip(metrics, names)])
    
    display(Markdown("## Steps Analysis"))
    display(Markdown("### Per-file metrics"))
    display(summary)
    return dict(zip(["file_a", "file_b"], metrics))

In [None]:
# Steps visualisation

def _bar(ax, labels, va, vb, title, a_name, b_name, anns=None):
    x = np.arange(len(labels))
    w = 0.38
    ax.bar(x - w/2, va, w, label=a_name)
    ax.bar(x + w/2, vb, w, label=b_name)
    ax.set_xticks(x)
    ax.set_xticklabels(labels)
    ax.set_title(title)
    ax.legend()
    ax.grid(True, axis='y', alpha=0.25)
    if anns:
        for i, ann in enumerate(anns):
            if ann:
                ax.text(x[i], max(va[i], vb[i], 0), ann, ha='center', va='bottom', fontsize=8)

def steps_plots(a: str, b: str):
    dfs = [load_steps(a), load_steps(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    met = [steps_metrics(df) for df in dfs]
    
    dur_labels = ["min", "avg", "max"]
    dur_a = [m.get("min_duration_sec", 0) for m in met]
    dur_b = [m.get("avg_duration_sec", 0) for m in met]
    dur_b = [m.get("max_duration_sec", 0) for m in met]  # fix typo
    dur_b = [m.get("max_duration_sec", 0) for m in met]
    dur_vals = [[m.get(k, 0) for k in ["min_duration_sec", "avg_duration_sec", "max_duration_sec"]] for m in met]
    dur_a, dur_b = dur_vals[0], dur_vals[1]
    anns = [
        f"step {met[0].get('min_duration_step')} vs {met[1].get('min_duration_step')}" if all(m.get('min_duration_step') for m in met) else None,
        None,
        f"step {met[0].get('max_duration_step')} vs {met[1].get('max_duration_step')}" if all(m.get('max_duration_step') for m in met) else None,
    ]

    reason_vals = [[m[k] for k in ["min_reasoning_size","avg_reasoning_size","max_reasoning_size"]] for m in met]
    answer_vals = [[m[k] for k in ["min_answer_size","avg_answer_size","max_answer_size"]] for m in met]
    steps_vals = [[m["total_steps"]] for m in met]

    fig, axs = plt.subplots(2, 2, figsize=(12, 8))
    _bar(axs[0,0], dur_labels, dur_a, dur_b, "Step Duration (sec)", *names, anns)
    _bar(axs[0,1], dur_labels, *reason_vals, "Reasoning Size (chars)", *names)
    _bar(axs[1,0], dur_labels, *answer_vals, "Answer Size (chars)", *names)
    _bar(axs[1,1], ["total"], *steps_vals, "Total Steps", *names)
    fig.suptitle("Steps Comparison", fontsize=14)
    fig.tight_layout()
    return fig

### Tools analysis

In [None]:
# Tools analysis

TOOL_COLS = ["ts","tool_name","command","parameters_json","affected_path","size_bytes"]

def load_tools(path: str) -> pd.DataFrame:
    return parse_csv(path, TOOL_COLS)

def tools_metrics(df: pd.DataFrame) -> dict:
    writes = df[df["command"] == "write"]["size_bytes"]
    write_sizes = pd.to_numeric(writes, errors="coerce").dropna()
    return {
        "total_calls": len(df),
        "unique_tools": df["tool_name"].nunique(),
        "unique_commands": df["command"].nunique(),
        "tool_counts": df["tool_name"].value_counts().sort_index(),
        "command_counts": df["command"].value_counts().sort_index(),
        "write_metrics": {
            "count": len(write_sizes),
            "min": int(write_sizes.min()) if len(write_sizes) > 0 else None,
            "avg": float(write_sizes.mean()) if len(write_sizes) > 0 else None,
            "max": int(write_sizes.max()) if len(write_sizes) > 0 else None,
        }
    }

def tools_analysis(a: str, b: str):
    dfs = [load_tools(a), load_tools(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    met = [tools_metrics(df) for df in dfs]
    
    summary = pd.DataFrame([{
        "file": n,
        "total_calls": m["total_calls"],
        "unique_tools": m["unique_tools"],
        "unique_commands": m["unique_commands"],
    } for m, n in zip(met, names)])
    
    tool_comp = pd.DataFrame({names[0]: met[0]["tool_counts"], names[1]: met[1]["tool_counts"]}).fillna(0).astype(int)
    cmd_comp = pd.DataFrame({names[0]: met[0]["command_counts"], names[1]: met[1]["command_counts"]}).fillna(0).astype(int)
    write_sum = pd.DataFrame([{"file": n, **m["write_metrics"]} for m, n in zip(met, names)])

    display(Markdown("## Tools Analysis"))
    display(Markdown("### Summary"))
    display(summary)
    display(Markdown("### Tool Calls"))
    display(tool_comp)
    display(Markdown("### Commands"))
    display(cmd_comp)
    display(Markdown("### Write Size (bytes)"))
    display(write_sum)
    
    return {"file_a": met[0], "file_b": met[1]}

In [None]:
# Tools visualisation

def align_series(s1, s2, top_n=10):
    df = pd.DataFrame({
        "A": s1,
        "B": s2,
    }).fillna(0)
    with pd.option_context('mode.use_inf_as_na', True):
        df = df.fillna(0).astype(int)

    df["total"] = df["A"] + df["B"]
    df = df.sort_values("total", ascending=False).drop(columns=["total"]) 
    if top_n is not None:
        df = df.head(top_n)

    if len(df) == 0 or df.shape[1] < 2:
        return [], [], []

    labels = df.index.tolist()
    vals_a = df["A"].tolist()
    vals_b = df["B"].tolist()
    return labels, vals_a, vals_b
    

def tools_plots(a: str, b: str, top_n=10):
    dfs = [load_tools(a), load_tools(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    met = [tools_metrics(df) for df in dfs]
    
    # Top tools
    t_labels, t_a, t_b = align_series(met[0]["tool_counts"], met[1]["tool_counts"], top_n)
    # Top commands
    c_labels, c_a, c_b = align_series(met[0]["command_counts"], met[1]["command_counts"], top_n)
    # Write size
    w_labels = ["min", "avg", "max"]
    w_a = [m for m in met[0]["write_metrics"].values()][1:]  
    w_b = [m for m in met[1]["write_metrics"].values()][1:]

    fig, axs = plt.subplots(3, 1, figsize=(12, 9))
    _bar(axs[0], t_labels, t_a, t_b, "Tool Calls", *names)
    _bar(axs[1], c_labels, c_a, c_b, "Command Uses", *names)
    _bar(axs[2], w_labels, w_a, w_b, "Write Size (bytes)", *names)
    fig.suptitle("Tools Comparison", fontsize=14)
    fig.tight_layout()
    return fig

### States analysis

In [None]:
STATE_COLS = ["ts","state","count_folders","count_files"]

def load_states(path: str) -> pd.DataFrame:
    df = parse_csv(path, STATE_COLS)
    for c in ["count_folders", "count_files"]:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)
    return df

def states_metrics(df: pd.DataFrame) -> dict:
    d_files = df["count_files"].diff().fillna(0).astype(int)
    d_dirs = df["count_folders"].diff().fillna(0).astype(int)
    return {
        "total_changes": len(df),
        "total_files_changes": int(d_files.sum()),
        "min_files_changes_per_step": int(d_files.min()),
        "max_files_changes_per_step": int(d_files.max()),
        "total_folders_changes": int(d_dirs.sum()),
        "min_folders_changes_per_step": int(d_dirs.min()),
        "max_folders_changes_per_step": int(d_dirs.max()),
        "state_counts": df["state"].value_counts().sort_index(),
    }

def states_analysis(a: str, b: str):
    dfs = [load_states(a), load_states(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    met = [states_metrics(df) for df in dfs]
    
    summary = pd.DataFrame([{**{k: v for k, v in m.items() if k != "state_counts"}, "file": n}
                            for m, n in zip(met, names)])
    states_comp = pd.DataFrame({names[0]: met[0]["state_counts"], names[1]: met[1]["state_counts"]}).fillna(0).astype(int)

    display(Markdown("## States Analysis"))
    display(Markdown("### Summary"))
    display(summary)
    display(Markdown("### State Label Counts"))
    display(states_comp)

    return {"summary": summary, "states_compare": states_comp}

In [None]:
# States visualisation

def states_plots(a: str, b: str):
    dfs = [load_states(a), load_states(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    met = [states_metrics(df) for df in dfs]
    
    labels = ["total_changes", "total_files_changes", "total_folders_changes"]
    vals_a = [met[0]["total_changes"], met[0]["total_files_changes"], met[0]["total_folders_changes"]]
    vals_b = [met[1]["total_changes"], met[1]["total_files_changes"], met[1]["total_folders_changes"]]

    fig, ax = plt.subplots(figsize=(8, 4))
    _bar(ax, labels, vals_a, vals_b, "States Comparison", *names)
    fig.suptitle("States Comparison", fontsize=14)
    fig.tight_layout()
    return fig

### Errors analysis

In [None]:
ERROR_COLS = ["ts","tool_name","command","parameters_json","description"]

def load_errors(path: str) -> pd.DataFrame:
    df = parse_csv(path, ERROR_COLS)
    df["ts"] = pd.to_datetime(df["ts"], errors="coerce")
    return df.sort_values("ts").reset_index(drop=True)

def errors_metrics(df: pd.DataFrame) -> dict:
    # Streak detection: consecutive identical (tool, command)
    same = (df["tool_name"] == df["tool_name"].shift()) & (df["command"] == df["command"].shift())
    streak_id = (~same).cumsum()
    streaks = df.groupby(streak_id).size()
    repeated = streaks[streaks >= 2]
    
    return {
        "total_errors": len(df),
        "description_counts": df["description"].value_counts().sort_index(),
        "repeats_count": int((streaks - 1).clip(0).sum()),
        "sequences_count": int(len(repeated)),
        "max_streak": int(streaks.max()) if len(streaks) else 0,
    }

def errors_analysis(a: str, b: str):
    dfs = [load_errors(a), load_errors(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    met = [errors_metrics(df) for df in dfs]
    
    summary = pd.DataFrame([{
        "file": n,
        "total_errors": m["total_errors"],
        "repeats_count": m["repeats_count"],
        "sequences_count": m["sequences_count"],
        "max_streak": m["max_streak"],
    } for m, n in zip(met, names)])
    
    desc_comp = pd.DataFrame({names[0]: met[0]["description_counts"], names[1]: met[1]["description_counts"]}).fillna(0).astype(int)

    display(Markdown("## Errors Analysis"))
    display(Markdown("### Summary"))
    display(summary)
    display(Markdown("### Error Counts by Description"))
    display(desc_comp)

    return {"summary": summary, "desc_compare": desc_comp}

In [None]:
# Errors plots

def errors_plots(a: str, b: str, top_n=10):
    dfs = [load_errors(a), load_errors(b)]
    names = [os.path.basename(a), os.path.basename(b)]
    met = [errors_metrics(df) for df in dfs]
    
    # Totals
    labels = ["total", "repeats", "max_streak"]
    vals_a = [met[0]["total_errors"], met[0]["repeats_count"], met[0]["max_streak"]]
    vals_b = [met[1]["total_errors"], met[1]["repeats_count"], met[1]["max_streak"]]
    
    # Top error descriptions
    desc_df = pd.DataFrame({names[0]: met[0]["description_counts"], names[1]: met[1]["description_counts"]}).fillna(0)
    desc_df = desc_df.sort_values(desc_df.columns.tolist(), ascending=False).head(top_n)
    d_labels, d_a, d_b = desc_df.index.tolist(), desc_df.iloc[:,0].tolist(), desc_df.iloc[:,1].tolist()

    fig, axs = plt.subplots(1, 2, figsize=(12, 4))
    _bar(axs[0], labels, vals_a, vals_b, "Errors Summary", *names)
    _bar(axs[1], d_labels, d_a, d_b, "Top Error Descriptions", *names)
    fig.suptitle("Errors Comparison", fontsize=14)
    fig.tight_layout()
    return fig

### Auto-run analysis

In [None]:
def find_logs_dir(hint="results/logs") -> str | None:
    if os.path.isabs(hint) and os.path.isdir(hint):
        return hint
    for p in [".", "..", "../..", "../../..", "../../../.."]:
        cand = os.path.abspath(os.path.join(p, hint))
        if os.path.isdir(cand):
            return cand
    return None

def latest(pattern: str) -> str | None:
    matches = glob.glob(pattern)
    return max(matches, key=os.path.getmtime) if matches else None

def find_pairs(exp: str, logs_dir="results/logs"):
    base = find_logs_dir(logs_dir)
    if not base:
        raise FileNotFoundError(f"Logs dir not found (hint: {logs_dir})")
    types = ["steps", "tools", "states", "errors"]
    return {t: (
        latest(os.path.join(base, f"{exp}-{t}-*.csv")),
        latest(os.path.join(base, f"{exp}-triggered-{t}-*.csv"))
    ) for t in types}

def run_all(exp: str, logs_dir="results/logs"):
    pairs = find_pairs(exp, logs_dir)
    display(Markdown(f"## Experiment: `{exp}`"))
    
    def try_run(name, analyze, plot, pair):
        a, b = pair
        if not (a and b):
            print(f"Skipping {name}: missing files")
            return None
        res = analyze(a, b)
        fig = plot(a, b)
        plt.show()
        return res

    return {
        "steps":   try_run("Steps",   steps_analysis,   steps_plots,   pairs["steps"]),
        "tools":   try_run("Tools",   tools_analysis,   tools_plots,   pairs["tools"]),
        "states":  try_run("States",  states_analysis,  states_plots,  pairs["states"]),
        "errors":  try_run("Errors",  errors_analysis,  errors_plots,  pairs["errors"]),
        "pairs": pairs,
    }

In [None]:
run_all("go-game")