# Fine Metrics Demo: Complete Guide to Dask Spans Integration

This notebook demonstrates all fine-grained performance metrics available through Dask Spans in roastcoffea.

## What are Fine Metrics?

Fine metrics provide detailed performance breakdowns beyond wall time:
- **CPU vs I/O time**: How much time spent computing vs waiting?
- **Compression overhead**: Time spent compressing/decompressing data
- **Serialization overhead**: Time spent serializing/deserializing Python objects
- **Disk/Memory I/O**: Bytes read from disk or memory
- **Real compression ratios**: Actual uncompressed vs compressed bytes

## Data Sources

Fine metrics come from **Dask Spans** and are available at multiple granularities:
1. **Cumulative (workflow-level)**: Aggregated across all tasks and workers
2. **Per-task**: Broken down by individual tasks (task prefix)
3. **Per-worker**: (Future) Separated by individual workers

## Setup: Install and Import

In [1]:
# Standard imports

import awkward as ak
from coffea import processor
from coffea.nanoevents import NanoAODSchema
from dask.distributed import Client, LocalCluster

# Get a test file from scikit-hep-testdata
from skhep_testdata import data_path

test_file = data_path("nanoAOD_2015_CMS_Open_Data_ttbar.root")
print(f"Using test file: {test_file}")

Using test file: /Users/moaly/.local/skhepdata/nanoAOD_2015_CMS_Open_Data_ttbar.root


## Define a Simple Coffea Processor

We'll create a processor that does some real work: filtering jets, computing masses, and creating histograms.

In [2]:
class JetAnalysisProcessor(processor.ProcessorABC):
    """Simple jet analysis processor for testing fine metrics."""

    def process(self, events):
        # Select jets with pT > 30 GeV
        jets = events.Jet[events.Jet.pt > 30]

        # Select events with at least 2 jets
        two_jet_events = events[ak.num(jets) >= 2]
        two_jets = jets[ak.num(jets) >= 2]

        # Calculate dijet invariant mass for leading two jets
        if len(two_jets) > 0:
            j1 = two_jets[:, 0]
            j2 = two_jets[:, 1]
            dijet_mass = (j1 + j2).mass
        else:
            dijet_mass = ak.Array([])

        return {
            "nevents": len(events),
            "njets_total": ak.sum(ak.num(jets)),
            "nevents_2jet": len(two_jet_events),
            "dijet_mass_mean": ak.mean(dijet_mass) if len(dijet_mass) > 0 else 0,
            "jet_pt_sum": ak.sum(jets.pt),
        }

    def postprocess(self, accumulator):
        return accumulator

## Part 1: Collecting Fine Metrics

Let's run the processor with metrics collection enabled. Fine metrics are collected automatically when Dask Spans are available.

In [8]:
# Create fileset
fileset = {
    "DY": {
        "files": {test_file: "Events"},
    },
}

# Start Dask cluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)

print(f"Dashboard: {client.dashboard_link}")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 60701 instead


Dashboard: http://127.0.0.1:60701/status


In [21]:
# Run with metrics collection
executor = processor.DaskExecutor(client=client)
runner = processor.Runner(
    executor=executor,
    savemetrics=True,
    schema=NanoAODSchema,
)

output, report = runner(
    fileset,
    treename="Events",
    processor_instance=JetAnalysisProcessor(),
)


# discover workers + useful metadata
sch = client.scheduler_info()
workers = list(sch["workers"].keys())  # e.g. ['tcp://10.0.0.5:12345', ...]


def worker_meta(addr):
    w = sch["workers"][addr]
    return {
        "address": addr,
        "name": w.get("name"),
        "host": w.get("host"),
        "nthreads": w.get("nthreads"),
        "memory_limit": w.get("memory_limit"),
        "versions": w.get("versions", {}),
    }


profiles_by_worker = {
    addr: {
        "meta": worker_meta(addr),
        "profile": client.profile(workers=[addr]),  # same tree shape you printed
    }
    for addr in workers
}

import json

with open("dask_profiles.json", "w") as f:
    json.dump(profiles_by_worker, f, indent=2)

Output()

In [27]:
from __future__ import annotations

from collections.abc import Iterable
from typing import Any

# ---------- helpers ----------


def _iter_children(node: dict[str, Any]) -> Iterable[dict[str, Any]]:
    ch = node.get("children") or {}
    if isinstance(ch, dict):
        return ch.values()
    if isinstance(ch, list):
        return ch
    return ()


def _label_from_node(node: dict[str, Any]) -> tuple[str, str, int]:
    """
    Return a normalized (func, file, line) using identifier/description.
    """
    func = file = ""
    line = 0
    ident = node.get("identifier")
    if isinstance(ident, str) and ";" in ident:
        try:
            f, p, ln = ident.split(";", 2)
            func, file, line = f, p, int(ln)
        except Exception:
            pass
    desc = node.get("description") or {}
    func = (desc.get("name") or func or "").strip()
    file = (desc.get("filename") or file or "").strip()
    line = int(desc.get("line_number") or line or 0)
    return func, file, line


def _node_label_string(func: str, file: str, line: int) -> str:
    # concise but unique-ish; keep file basename to shorten long paths if you prefer
    return f"{func}@{file}:{line}"


# ---------- core traversal ----------


def _build_tree_and_stacks(
    node: dict[str, Any],
    *,
    interval_ms: float,
    path_labels: list[str],
    stacks_exclusive: dict[tuple[str, ...], int],
    root_total_count: int,
    max_depth: int = 2000,
) -> tuple[dict[str, Any] | None, int]:
    """
    DFS:
      - returns (clean_tree_node, inclusive_count)
      - fills stacks_exclusive with exclusive counts per stack path
    """
    if max_depth <= 0:
        return None, 0

    count_incl = int(node.get("count") or 0)
    func, file, line = _label_from_node(node)
    here_label = _node_label_string(func, file, line)
    new_path = path_labels + [here_label]

    # Recurse children
    children_clean = []
    children_count_sum = 0
    for ch in _iter_children(node):
        child_clean, child_incl = _build_tree_and_stacks(
            ch,
            interval_ms=interval_ms,
            path_labels=new_path,
            stacks_exclusive=stacks_exclusive,
            root_total_count=root_total_count,
            max_depth=max_depth - 1,
        )
        if child_clean is not None:
            children_clean.append(child_clean)
            children_count_sum += child_incl

    # Exclusive count for THIS frame
    count_excl = max(count_incl - children_count_sum, 0)
    if count_excl > 0:
        stacks_exclusive[tuple(new_path)] = (
            stacks_exclusive.get(tuple(new_path), 0) + count_excl
        )

    # Build lean tree node
    time_ms_incl = count_incl * interval_ms
    time_ms_excl = count_excl * interval_ms
    pct_incl = (100.0 * count_incl / root_total_count) if root_total_count else 0.0
    pct_excl = (100.0 * count_excl / root_total_count) if root_total_count else 0.0

    clean = {
        "label": here_label,
        "func": func,
        "file": file,
        "line": line,
        "count_incl": count_incl,
        "time_ms_incl": time_ms_incl,
        "pct_incl": pct_incl,
        "count_excl": count_excl,
        "time_ms_excl": time_ms_excl,
        "pct_excl": pct_excl,
        "children": children_clean,
    }
    return clean, count_incl


def _sum_all_counts(node: dict[str, Any]) -> int:
    total = int(node.get("count") or 0)
    for ch in _iter_children(node):
        total += _sum_all_counts(ch)
    return total


# ---------- public APIs ----------


def parse_profile_to_stacks_and_tree(
    profile_tree: dict[str, Any],
    *,
    interval_ms: float = 10.0,
    top_n_stacks: int = 50,
) -> dict[str, Any]:
    """
    Convert one Dask profile tree to:
      - clean hierarchical tree with incl/excl time
      - stack list (exclusive), good for flame graphs
    """
    # Some profiles report root=0; fall back to total over subtree so percentages are meaningful
    root_count = int(profile_tree.get("count") or 0)
    fallback_total = _sum_all_counts(profile_tree)
    total_count_used = root_count if root_count > 0 else fallback_total
    total_time_ms = total_count_used * interval_ms

    stacks_exclusive: dict[tuple[str, ...], int] = {}
    clean_tree, _ = _build_tree_and_stacks(
        profile_tree,
        interval_ms=interval_ms,
        path_labels=[],
        stacks_exclusive=stacks_exclusive,
        root_total_count=total_count_used,
    )

    # Build stack rows
    stack_rows = []
    for stack, cnt in stacks_exclusive.items():
        t_ms = cnt * interval_ms
        stack_rows.append(
            {
                "stack": list(stack),  # ["root@...", "...", "leaf@..."]
                "depth": len(stack),
                "count_excl": cnt,
                "time_ms_excl": t_ms,
                "time_s_excl": t_ms / 1000.0,
                "pct_of_total": (100.0 * cnt / total_count_used)
                if total_count_used
                else 0.0,
            }
        )
    stack_rows.sort(key=lambda r: r["count_excl"], reverse=True)
    top_rows = stack_rows[:top_n_stacks]

    # Optional: a ready-to-write flamegraph TSV (exclusive)
    flamegraph_tsv = "\n".join(
        [";".join(row["stack"]) + f"\t{row['count_excl']}" for row in stack_rows]
    )

    return {
        "meta": {
            "interval_ms": interval_ms,
            "total_count_used": total_count_used,
            "total_time_ms": total_time_ms,
            "empty_tree": (total_count_used == 0),
        },
        "tree": clean_tree,
        "stacks": {
            "rows": stack_rows,
            "top": top_rows,
            "flamegraph_tsv": flamegraph_tsv,  # can be big; drop if you don't need it
        },
    }


def parse_profiles_by_worker_to_stacks(
    profiles_by_worker: dict[str, dict[str, Any]],
    *,
    interval_ms: float = 10.0,
    top_n_stacks: int = 50,
) -> dict[str, Any]:
    """
    Apply parse_profile_to_stacks_and_tree to each worker and bundle results.
    """
    workers_out = []
    for addr, payload in profiles_by_worker.items():
        meta = payload.get("meta", {}) or {"address": addr}
        tree = payload.get("profile", {}) or {}
        parsed = parse_profile_to_stacks_and_tree(
            tree, interval_ms=interval_ms, top_n_stacks=top_n_stacks
        )
        workers_out.append(
            {
                "worker": {
                    "address": meta.get("address", addr),
                    "name": meta.get("name"),
                    "host": meta.get("host"),
                    "nthreads": meta.get("nthreads"),
                    "memory_limit": meta.get("memory_limit"),
                },
                "profile": parsed,
            }
        )
    return {
        "interval_ms": interval_ms,
        "workers": workers_out,
    }

In [28]:
# 1) Collect per-worker profiles (you already have this)
# profiles_by_worker = { addr: {"meta": {...}, "profile": client.profile(workers=[addr])} ... }

# 2) Parse into clean tree + per-stack (exclusive) aggregation
parsed = parse_profiles_by_worker_to_stacks(profiles_by_worker, interval_ms=10.0, top_n_stacks=40)

# 3) Inspect a workerâ€™s top stacks
w0 = parsed["workers"][0]
print(w0["worker"])
for row in w0["profile"]["stacks"]["top"][:10]:
    print(f"{row['pct_of_total']:5.1f}%  {row['time_ms_excl']:8.1f} ms  depth={row['depth']}  {row['stack'][-1]}")

# 4) If you want a flame graph, write the TSV (one per worker)
# with open("worker0.flame.tsv", "w") as f:
#     f.write(w0["profile"]["stacks"]["flamegraph_tsv"])
# Then use flamegraph.pl or speedscope to visualize.

{'address': 'tcp://127.0.0.1:60713', 'name': 3, 'host': '127.0.0.1', 'nthreads': 1, 'memory_limit': None}


## Cleanup

In [None]:
# Close cluster
client.close()
cluster.close()
print("Cluster closed")

Cluster closed
