# BenchFS IOR Result Visualizer

`results/benchfs/*/ior_results/ior_result_*.json` から IOR のサマリ JSON を読み込み、操作別の帯域やレイテンシを可視化します。

In [None]:
from pathlib import Path
import json
import re

import polars as pl
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from IPython.display import display

plt.rcParams.update({"font.size": 14})
pl.Config.set_tbl_rows(100)

DATA_DIR = (Path.cwd() / "processed" / "benchfsd").resolve()
DATA_DIR.mkdir(parents=True, exist_ok=True)
FIG_DIR = (Path.cwd() / "fig" / "benchfsd").resolve()
FIG_DIR.mkdir(parents=True, exist_ok=True)

def save(fig, name: str) -> None:
    for ext in ("png", "pdf"):
        fig.savefig(FIG_DIR / f"{name}.{ext}", bbox_inches="tight")

def comma_formatter(value, _):
    return f"{value:,.0f}"

def slugify(value: str) -> str:
    cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value)
    return cleaned.strip("_") or "unknown"

: 

In [None]:
def resolve_results_root() -> Path:
    candidates = [
        Path("results"),
        Path("..") / "results",
        Path("../..") / "results",
    ]
    for candidate in candidates:
        if candidate.exists():
            return candidate.resolve()
    raise FileNotFoundError("results ディレクトリが見つかりません。ノートブックの位置を確認してください。")

results_root = resolve_results_root()
print(f"Using results directory: {results_root}")

In [None]:
IOR_JSON_PATTERN = re.compile(r"ior_result_(\d+)\.json$")

def parse_number(value):
    if value is None:
        return None
    if isinstance(value, (int, float)):
        return float(value)
    if isinstance(value, str):
        cleaned = value.replace(",", "").strip()
        if cleaned == "":
            return None
        try:
            return float(cleaned)
        except ValueError:
            return None
    return None

def bytes_to_mib(value):
    number = parse_number(value)
    return number / (1024 ** 2) if number is not None else None

def kib_to_mib(value):
    number = parse_number(value)
    return number / 1024.0 if number is not None else None

def load_ior_json(root: Path) -> tuple[pl.DataFrame, pl.DataFrame]:
    summary_records = []
    detail_records = []
    for path in sorted(root.glob("**/ior_results/ior_result_*.json")):
        try:
            data = json.loads(path.read_text())
        except (OSError, json.JSONDecodeError) as exc:
            print(f"⚠️ Failed to parse {path}: {exc}")
            continue
        rel_parts = path.relative_to(root).parts
        match = IOR_JSON_PATTERN.search(path.name)
        run_index = int(match.group(1)) if match else None
        base = {
            "ior_file": str(path.relative_to(root)),
            "collection": "/".join(rel_parts[:-1]),
            "backend": rel_parts[0] if len(rel_parts) > 0 else "",
            "experiment": rel_parts[1] if len(rel_parts) > 1 else "",
            "run": rel_parts[2] if len(rel_parts) > 2 else "",
            "run_index": run_index,
        }
        command = data.get("Command line", "")
        began = data.get("Began", "")
        finished = data.get("Finished", "")
        for summary in data.get("summary", []) or []:
            record = base | {
                "command": command,
                "began": began,
                "finished": finished,
                "operation": summary.get("operation"),
                "bw_max_mib": parse_number(summary.get("bwMaxMIB")),
                "bw_min_mib": parse_number(summary.get("bwMinMIB")),
                "bw_mean_mib": parse_number(summary.get("bwMeanMIB")),
                "bw_std_mib": parse_number(summary.get("bwStdMIB")),
                "ops_max": parse_number(summary.get("OPsMax")),
                "ops_min": parse_number(summary.get("OPsMin")),
                "ops_mean": parse_number(summary.get("OPsMean")),
                "ops_std": parse_number(summary.get("OPsSD")),
                "mean_time": parse_number(summary.get("MeanTime")),
                "xsize_mib": parse_number(summary.get("xsizeMiB")),
                "block_size_bytes": parse_number(summary.get("blockSize")),
                "transfer_size_bytes": parse_number(summary.get("transferSize")),
                "segment_count": summary.get("segmentCount"),
                "num_tasks": summary.get("numTasks"),
                "tasks_per_node": summary.get("tasksPerNode"),
                "repetitions": summary.get("repetitions"),
                "file_per_proc": summary.get("filePerProc"),
            }
            summary_records.append(record)
        for test in data.get("tests", []) or []:
            parameters = test.get("Parameters", {}) or {}
            results = test.get("Results", []) or []
            for result in results:
                detail_record = base | {
                    "command": command,
                    "test_id": test.get("TestID"),
                    "start_time": test.get("StartTime"),
                    "path": test.get("Path"),
                    "operation": result.get("access"),
                    "bw_mib": parse_number(result.get("bwMiB")),
                    "iops": parse_number(result.get("iops")),
                    "latency": parse_number(result.get("latency")),
                    "open_time": parse_number(result.get("openTime")),
                    "transfer_time": parse_number(result.get("wrRdTime")),
                    "close_time": parse_number(result.get("closeTime")),
                    "total_time": parse_number(result.get("totalTime")),
                    "block_kib": parse_number(result.get("blockKiB")),
                    "xfer_kib": parse_number(result.get("xferKiB")),
                    "segment_count": parameters.get("segmentCount"),
                    "transfer_size_bytes": parameters.get("transferSize"),
                    "block_size_bytes": parameters.get("blockSize"),
                }
                detail_records.append(detail_record)
    summary_df = pl.DataFrame(summary_records) if summary_records else pl.DataFrame([])
    if not summary_df.is_empty():
        summary_df = (
            summary_df
            .with_columns(
                pl.col("num_tasks").cast(pl.Int64, strict=False),
                pl.col("tasks_per_node").cast(pl.Int64, strict=False),
                pl.col("repetitions").cast(pl.Int64, strict=False),
                pl.col("file_per_proc").cast(pl.Int64, strict=False),
                pl.col("segment_count").cast(pl.Int64, strict=False),
                (pl.col("block_size_bytes") / (1024 ** 2)).alias("block_size_mib"),
                (pl.col("transfer_size_bytes") / (1024 ** 2)).alias("transfer_size_mib"),
                (pl.col("bw_mean_mib") / 1024.0).alias("bw_mean_gib"),
                (pl.col("bw_max_mib") / 1024.0).alias("bw_max_gib"),
                (pl.col("bw_min_mib") / 1024.0).alias("bw_min_gib"),
            )
            .with_columns(
                pl.when(
                    pl.col("block_size_mib").is_not_null() & pl.col("transfer_size_mib").is_not_null()
                )
                .then(
                    pl.format(
                        "blk {0:.1f} MiB / xfer {1:.2f} MiB",
                        pl.col("block_size_mib"),
                        pl.col("transfer_size_mib"),
                    )
                )
                .otherwise("unknown")
                .alias("config"),
            )
            .sort(["backend", "experiment", "run", "run_index", "operation"])
        )
    detail_df = pl.DataFrame(detail_records) if detail_records else pl.DataFrame([])
    if not detail_df.is_empty():
        detail_df = (
            detail_df
            .with_columns(
                (pl.col("block_kib") / 1024.0).alias("block_mib"),
                (pl.col("xfer_kib") / 1024.0).alias("xfer_mib"),
                (pl.col("transfer_size_bytes") / (1024 ** 2)).alias("transfer_size_mib"),
                (pl.col("block_size_bytes") / (1024 ** 2)).alias("block_size_mib"),
            )
            .sort(["backend", "experiment", "run", "run_index", "operation", "test_id"])
        )
    return summary_df, detail_df

ior_summary_df, ior_detail_df = load_ior_json(results_root)
if ior_summary_df.is_empty():
    print("No ior_result_*.json files found under results.")
else:
    display(ior_summary_df.head().to_pandas())
if ior_detail_df.is_empty():
    print("No detailed IOR test entries available.")
else:
    display(ior_detail_df.head().to_pandas())

## Summary Overview

IOR JSONサマリを実験 × 実行 × 操作単位で整形したテーブルを確認します。

In [None]:
if ior_summary_df.is_empty():
    print("No IOR summary rows to show.")
else:
    overview = (
        ior_summary_df
        .select(
            "backend",
            "experiment",
            "run",
            "run_index",
            "operation",
            "config",
            "bw_mean_mib",
            "bw_mean_gib",
            "bw_max_mib",
            "bw_min_mib",
            "mean_time",
            "xsize_mib",
            "num_tasks",
            "tasks_per_node",
        )
        .sort(["backend", "experiment", "run", "run_index", "operation"])
    )
    display(overview.to_pandas())

## Bandwidth Per Run

実験・ジョブごとにブロック/転送サイズ別の平均帯域を可視化します。

In [None]:
if ior_summary_df.is_empty():
    print("No data to plot.")
else:
    for experiment_df in ior_summary_df.partition_by("experiment", maintain_order=True):
        experiment = experiment_df[0, "experiment"] or "unknown"
        for run_df in experiment_df.partition_by("run", maintain_order=True):
            run = run_df[0, "run"] or "unknown"
            tidy = (
                run_df
                .select("config", "operation", "bw_mean_mib")
                .group_by(["config", "operation"], maintain_order=True)
                .agg(pl.col("bw_mean_mib").mean())
                .pivot(values="bw_mean_mib", index="config", columns="operation")
                .sort("config")
            )
            if tidy.is_empty():
                continue
            pdf = tidy.to_pandas().set_index("config")
            display(pdf)
            fig, ax = plt.subplots(figsize=(9, 5), dpi=120)
            pdf.plot(kind="bar", ax=ax, width=0.7)
            ax.set_xlabel("block / transfer size")
            ax.set_ylabel("Bandwidth (MiB/s)")
            ax.set_title(f"{experiment} / {run} bandwidth")
            ax.grid(axis="y", alpha=0.7, linestyle="--", linewidth=1)
            ax.yaxis.set_major_formatter(FuncFormatter(comma_formatter))
            ax.legend(title="operation", fontsize=10)
            plt.xticks(rotation=45, ha="right")
            plt.tight_layout()
            save(fig, f"bandwidth_{slugify(experiment)}_{slugify(run)}")
            plt.show()
            plt.close(fig)

## Latency vs Bandwidth

各操作の平均レイテンシと帯域の関係を散布図で確認します。

In [None]:
if ior_summary_df.is_empty():
    print("No data to plot.")
else:
    pdf = (
        ior_summary_df
        .select(
            "experiment",
            "run",
            "operation",
            "config",
            "bw_mean_mib",
            "mean_time",
            "xsize_mib",
        )
        .to_pandas()
    )
    if pdf.empty:
        print("No summary rows available for plotting.")
    else:
        pdf["point_size"] = 40 + 5 * pdf["xsize_mib"].fillna(0)
        fig, ax = plt.subplots(figsize=(8, 6), dpi=120)
        for operation, group in pdf.groupby("operation"):
            ax.scatter(
                group["bw_mean_mib"],
                group["mean_time"],
                s=group["point_size"],
                alpha=0.75,
                edgecolors="black",
                linewidths=0.6,
                label=operation,
            )
        for _, row in pdf.iterrows():
            ax.annotate(
                f"{row['experiment']}\n{row['config']}",
                (row["bw_mean_mib"], row["mean_time"]),
                textcoords="offset points",
                xytext=(5, 5),
                fontsize=8,
                alpha=0.7,
            )
        ax.set_xlabel("Bandwidth (MiB/s)")
        ax.set_ylabel("Mean time (s)")
        ax.set_title("Mean time vs bandwidth")
        ax.grid(alpha=0.5, linestyle="--", linewidth=0.8)
        ax.xaxis.set_major_formatter(FuncFormatter(comma_formatter))
        ax.legend(title="operation", fontsize=9)
        plt.tight_layout()
        save(fig, "latency_vs_bandwidth")
        plt.show()
        plt.close(fig)

## Detailed Test Results

テスト内の各アクセス結果（write/read）を抽出し、ブロック/転送サイズごとの統計を確認します。

In [None]:
if ior_detail_df.is_empty():
    print("No detailed IOR test entries to summarize.")
else:
    detail_summary = (
        ior_detail_df
        .with_columns(
            pl.col("block_kib").cast(pl.Int64, strict=False),
            pl.col("xfer_kib").cast(pl.Int64, strict=False),
        )
        .group_by(["backend", "experiment", "run", "run_index", "operation", "block_kib", "xfer_kib"], maintain_order=True)
        .agg(
            pl.len().alias("samples"),
            pl.col("bw_mib").mean().alias("bw_mib_mean"),
            pl.col("bw_mib").max().alias("bw_mib_max"),
            pl.col("bw_mib").min().alias("bw_mib_min"),
            pl.col("latency").mean().alias("latency_mean"),
            pl.col("total_time").mean().alias("total_time_mean"),
        )
        .with_columns(
            (pl.col("bw_mib_mean") / 1024.0).alias("bw_gib_mean"),
            pl.format("blk {0} KiB / xfer {1} KiB", pl.col("block_kib"), pl.col("xfer_kib")).alias("config"),
        )
        .sort(["backend", "experiment", "run", "run_index", "operation", "block_kib", "xfer_kib"])
)
    display(detail_summary.to_pandas())

## Bandwidth by Run Index

同一ジョブ内の run_index ごとの帯域推移を確認します。

In [None]:
if ior_detail_df.is_empty():
    print("No IOR data to plot.")
else:
    for experiment_df in ior_detail_df.partition_by("experiment", maintain_order=True):
        experiment = experiment_df[0, "experiment"] or "unknown"
        for run_df in experiment_df.partition_by("run", maintain_order=True):
            run = run_df[0, "run"] or "unknown"
            tidy = (
                run_df
                .select("run_index", "operation", "bw_mib")
                .drop_nulls("bw_mib")
                .group_by(["run_index", "operation"], maintain_order=True)
                .agg(pl.col("bw_mib").mean())
                .sort(["operation", "run_index"])
            )
            if tidy.is_empty():
                continue
            pdf = tidy.to_pandas()
            fig, ax = plt.subplots(figsize=(9, 5), dpi=120)
            for operation, subset in pdf.groupby("operation"):
                ax.plot(
                    subset["run_index"],
                    subset["bw_mib"],
                    marker="o",
                    linewidth=2,
                    label=operation,
                )
            ax.set_xlabel("run index")
            ax.set_ylabel("Bandwidth (MiB/s)")
            ax.set_title(f"{experiment} / {run} run index sweep")
            ax.grid(alpha=0.6, linestyle="--", linewidth=0.9)
            ax.yaxis.set_major_formatter(FuncFormatter(comma_formatter))
            ax.legend(title="operation", fontsize=10)
            plt.tight_layout()
            save(fig, f"bandwidth_run_index_{slugify(experiment)}_{slugify(run)}")
            plt.show()
            plt.close(fig)

## Export Processed JSON

整形済みのサマリと詳細データをJSONとして保存します。

In [None]:
if ior_summary_df.is_empty() and ior_detail_df.is_empty():
    print("No IOR results to export.")
else:
    if not ior_summary_df.is_empty():
        summary_out = DATA_DIR / "ior_summary.json"
        summary_out.parent.mkdir(parents=True, exist_ok=True)
        summary_out.write_text(
            json.dumps(ior_summary_df.to_pandas().to_dict(orient="records"), ensure_ascii=False, indent=2)
        )
        print(f"Wrote {summary_out}")
    else:
        print("No summary records to export.")
    if not ior_detail_df.is_empty():
        detail_out = DATA_DIR / "ior_detail.json"
        detail_out.parent.mkdir(parents=True, exist_ok=True)
        detail_out.write_text(
            json.dumps(ior_detail_df.to_pandas().to_dict(orient="records"), ensure_ascii=False, indent=2)
        )
        print(f"Wrote {detail_out}")
    else:
        print("No detailed records to export.")