diff --git a/python/dftracer/analyzer/config.py b/python/dftracer/analyzer/config.py index b882864..378dce3 100644 --- a/python/dftracer/analyzer/config.py +++ b/python/dftracer/analyzer/config.py @@ -203,6 +203,25 @@ class AnalyzerPresetConfigDLIOAILogging(AnalyzerPresetConfigDLIO): } ) +@dc.dataclass +class AnalyzerPresetConfigDynamic(AnalyzerPresetConfig): + layer_defs: Dict[str, Optional[str]] = dc.field( + default_factory=lambda: { + 'dynamic': None, + } + ) + name: str = "dynamic" + + +@dc.dataclass +class AnalyzerPresetConfigStack(AnalyzerPresetConfig): + layer_defs: Dict[str, Optional[str]] = dc.field( + default_factory=lambda: { + 'stack': None, + } + ) + name: str = "stack" + @dc.dataclass class AnalyzerConfig: @@ -222,6 +241,11 @@ class DarshanAnalyzerConfig(AnalyzerConfig): time_granularity: Optional[float] = 1 time_resolution: Optional[float] = 1e3 +@dc.dataclass +class DataCrumbsAnalyzerConfig(AnalyzerConfig): + _target_: str = "dftracer.analyzer.datacrumbs.DataCrumbsAnalyzer" + time_granularity: Optional[float] = 1 + time_resolution: Optional[float] = 1e6 @dc.dataclass class DFTracerAnalyzerConfig(AnalyzerConfig): @@ -389,11 +413,14 @@ def init_hydra_config_store() -> ConfigStore: cs.store(group="hydra/job", name="custom", node=CustomJobConfig) cs.store(name="config", node=Config) cs.store(group="analyzer", name="darshan", node=DarshanAnalyzerConfig) + cs.store(group="analyzer", name="datacrumbs", node=DataCrumbsAnalyzerConfig) cs.store(group="analyzer", name="dftracer", node=DFTracerAnalyzerConfig) cs.store(group="analyzer", name="recorder", node=RecorderAnalyzerConfig) - cs.store(group="analyzer/preset", name="posix", node=AnalyzerPresetConfigPOSIX) - cs.store(group="analyzer/preset", name="dlio-prev", node=AnalyzerPresetConfigDLIO) cs.store(group="analyzer/preset", name="dlio", node=AnalyzerPresetConfigDLIOAILogging) + cs.store(group="analyzer/preset", name="dlio-old", node=AnalyzerPresetConfigDLIO) + cs.store(group="analyzer/preset", name="dynamic", node=AnalyzerPresetConfigDynamic) + cs.store(group="analyzer/preset", name="posix", node=AnalyzerPresetConfigPOSIX) + cs.store(group="analyzer/preset", name="stack", node=AnalyzerPresetConfigStack) cs.store(group="cluster", name="external", node=ExternalClusterConfig) cs.store(group="cluster", name="local", node=LocalClusterConfig) cs.store(group="cluster", name="lsf", node=LSFClusterConfig) diff --git a/python/dftracer/analyzer/datacrumbs.py b/python/dftracer/analyzer/datacrumbs.py new file mode 100644 index 0000000..523b248 --- /dev/null +++ b/python/dftracer/analyzer/datacrumbs.py @@ -0,0 +1,190 @@ +import dask.dataframe as dd +import numpy as np +import pandas as pd +from typing import List + +from .analysis_utils import fix_dtypes, fix_std_cols, set_unique_counts +from .constants import COL_COUNT, COL_PROC_NAME, COL_TIME +from .dftracer import DFTracerAnalyzer +from .metrics import set_main_metrics +from .types import ViewType +from .utils.dask_utils import flatten_column_names +from .utils.log_utils import log_block +from .utils.stack_utils import ( + add_stack_time_context, + assign_hierarchy, + compute_self_time, + set_stack_metrics, +) + + +def _add_hierarchy_columns(traces: dd.DataFrame) -> dd.DataFrame: + meta = traces._meta.copy() + meta = meta.assign( + event_id=pd.Series(dtype="string"), + parent_id=pd.Series(dtype="string"), + root_id=pd.Series(dtype="string"), + depth=pd.Series(dtype="int64"), + ) + return ( + traces.groupby(["pid", "tid"]) + .apply(assign_hierarchy, meta=meta) + .reset_index(drop=True) + ) + + +class DataCrumbsAnalyzer(DFTracerAnalyzer): + + def read_trace(self, trace_path, extra_columns, extra_columns_fn): + traces = super().read_trace(trace_path, extra_columns, extra_columns_fn) + if self.preset.name == "dynamic": + with log_block("set_dynamic_layer_defs"): + layers = traces['cat'].unique().compute().tolist() + self.layers = layers + for layer in layers: + self.preset.derived_metrics[layer] = {} + self.preset.layer_defs[layer] = f"cat == '{layer}'" + self.preset.layer_deps[layer] = None + return traces + + def postread_trace(self, traces, view_types): + traces = super().postread_trace(traces, view_types) + if self.preset.name == "stack": + traces = _add_hierarchy_columns(traces) + traces = compute_self_time(traces) + self._stack_traces = traces + return traces + + def _compute_high_level_metrics( + self, + traces: dd.DataFrame, + view_types: list, + partition_size: str, + ) -> dd.DataFrame: + if self.preset.name != "stack": + return super()._compute_high_level_metrics(traces, view_types, partition_size) + + extra_cols = ["cat", "func_name", "parent_id", "root_id", "depth"] + present_extra = [col for col in extra_cols if col in traces.columns] + group_cols = list(dict.fromkeys(list(view_types) + present_extra)) + + agg: dict[str, str] = {} + if COL_TIME in traces.columns: + agg[COL_TIME] = "sum" + if "self_time" in traces.columns: + agg["self_time"] = "sum" + if "child_time" in traces.columns: + agg["child_time"] = "sum" + if COL_COUNT in traces.columns: + agg[COL_COUNT] = "sum" + if "size" in traces.columns: + agg["size"] = "sum" + for col in traces.columns: + if col in group_cols: + continue + if "_bin_" in col: + agg[col] = "sum" + + split_out = max(1, int(np.sqrt(traces.npartitions))) + return ( + traces.groupby(group_cols) + .agg(agg, split_out=split_out) + .repartition(partition_size=partition_size) + ) + + def _compute_main_view( + self, + layer, + hlm: dd.DataFrame, + view_types: List[ViewType], + partition_size: str, + ) -> dd.DataFrame: + if self.preset.name != "stack": + return super()._compute_main_view(layer, hlm, view_types, partition_size) + + with log_block("drop_and_set_metrics", layer=layer): + hlm = hlm.map_partitions(self.set_layer_metrics, derived_metrics=self.preset.derived_metrics.get(layer, {})) + + stack_keys = ["parent_id", "root_id", "depth"] + group_keys = list(view_types) + stack_keys + main_view_agg = {} + for col in hlm.columns: + if col in group_keys: + continue + if pd.api.types.is_numeric_dtype(hlm[col].dtype): + main_view_agg[col] = "sum" + else: + main_view_agg[col] = "first" + + main_view = hlm.groupby(group_keys).agg(main_view_agg, split_out=hlm.npartitions) + if any(col.endswith("count") for col in main_view.columns) and any( + col.endswith("size") for col in main_view.columns + ): + main_view = main_view.map_partitions(set_main_metrics) + main_view = ( + main_view.replace(0, pd.NA) + .map_partitions(fix_dtypes, time_sliced=self.time_sliced) + .persist() + ) + return add_stack_time_context(main_view, traces=self._stack_traces) + + def _compute_view( + self, + layer, + records: dd.DataFrame, + view_key, + view_type: str, + view_types: List[ViewType], + ) -> dd.DataFrame: + if self.preset.name != "stack": + return super()._compute_view(layer, records, view_key, view_type, view_types) + + stack_keys = ["parent_id", "root_id", "depth"] + group_keys = [view_type] + stack_keys + if not set(stack_keys).issubset(records.columns): + records = records.reset_index() + + view_agg = {} + for col in records.columns: + if col in group_keys: + continue + if col in ["parent_time", "root_time"]: + view_agg[col] = ["first"] + elif pd.api.types.is_numeric_dtype(records[col].dtype): + view_agg[col] = ["sum", "min", "max", "mean", "std"] + else: + view_agg[col] = ["first"] + + std_cols = [col for col, aggs in view_agg.items() if isinstance(aggs, list) and "std" in aggs] + records = records.map_partitions(fix_std_cols, std_cols=std_cols) + + pre_view = records + if view_type != COL_PROC_NAME and COL_PROC_NAME in pre_view.columns: + pre_view = pre_view.groupby(group_keys + [COL_PROC_NAME]).sum().reset_index() + + view = pre_view.groupby(group_keys).agg(view_agg) + view = flatten_column_names(view) + + stack_key_cols = {"parent_id", "root_id", "depth", "parent_id_first", "root_id_first", "depth_first"} + + def replace_zero_metrics(pdf: pd.DataFrame) -> pd.DataFrame: + metric_cols = [ + c + for c in pdf.columns + if c not in stack_key_cols and pd.api.types.is_numeric_dtype(pdf[c]) + ] + pdf = pdf.copy() + pdf[metric_cols] = pdf[metric_cols].replace(0, pd.NA) + return pdf + + job_time = self.get_job_time(self._stack_traces).compute() + view = ( + view.map_partitions(replace_zero_metrics) + .map_partitions(set_unique_counts, layer=layer) + .map_partitions(fix_dtypes, time_sliced=self.time_sliced) + .map_partitions(set_stack_metrics, job_time=job_time) + .persist() + ) + return view + + \ No newline at end of file diff --git a/python/dftracer/analyzer/meson.build b/python/dftracer/analyzer/meson.build index af90145..76eeabd 100644 --- a/python/dftracer/analyzer/meson.build +++ b/python/dftracer/analyzer/meson.build @@ -10,6 +10,7 @@ py.install_sources( 'config.py', 'constants.py', 'darshan.py', + 'datacrumbs.py', 'dftracer.py', 'metrics.py', 'output.py', diff --git a/python/dftracer/analyzer/utils/meson.build b/python/dftracer/analyzer/utils/meson.build index 179be24..e25e7a2 100644 --- a/python/dftracer/analyzer/utils/meson.build +++ b/python/dftracer/analyzer/utils/meson.build @@ -12,6 +12,7 @@ py.install_sources( 'json_encoders.py', 'log_utils.py', 'notebook_utils.py', + 'stack_utils.py', 'warning_utils.py', ], subdir: 'dftracer/analyzer/utils', diff --git a/python/dftracer/analyzer/utils/stack_utils.py b/python/dftracer/analyzer/utils/stack_utils.py new file mode 100644 index 0000000..235eac7 --- /dev/null +++ b/python/dftracer/analyzer/utils/stack_utils.py @@ -0,0 +1,126 @@ +import dask.dataframe as dd +import pandas as pd +from typing import List + +from ..constants import COL_TIME, COL_TIME_END, COL_TIME_START + + +def assign_hierarchy(pdf: pd.DataFrame) -> pd.DataFrame: + pdf = pdf.sort_values( + [COL_TIME_START, COL_TIME_END], + ascending=[True, False], + ).reset_index(drop=True) + + event_ids: List[str] = [] + parent_ids: List[str] = [] + root_ids: List[str] = [] + depths: List[int] = [] + + stack: List[dict] = [] + for i, row in pdf.iterrows(): + event_id = f"{row['pid']}-{row['tid']}-{i}" + start = row[COL_TIME_START] + end = row[COL_TIME_END] + + while stack and start >= stack[-1]["end"]: + stack.pop() + + if stack and end <= stack[-1]["end"]: + parent = stack[-1]["id"] + root = stack[-1]["root"] + depth = stack[-1]["depth"] + 1 + else: + parent = "" + root = event_id + depth = 0 + + event_ids.append(event_id) + parent_ids.append(parent) + root_ids.append(root) + depths.append(depth) + + stack.append({"end": end, "id": event_id, "root": root, "depth": depth}) + + pdf = pdf.copy() + pdf["event_id"] = event_ids + pdf["parent_id"] = parent_ids + pdf["root_id"] = root_ids + pdf["depth"] = depths + return pdf + + +def compute_self_time(traces: dd.DataFrame) -> dd.DataFrame: + child_time = traces.groupby("parent_id")[COL_TIME].sum().rename("child_time") + traces = traces.merge( + child_time.to_frame(), + left_on="event_id", + right_index=True, + how="left", + ) + traces["child_time"] = traces["child_time"].fillna(0) + traces["self_time"] = traces[COL_TIME] - traces["child_time"] + return traces + + +def set_stack_metrics(df: pd.DataFrame, job_time: float) -> pd.DataFrame: + df = df.copy() + + def pick(pref: list[str]) -> str: + for name in pref: + if name in df.columns: + return name + return "" + + def safe_divide(numer, denom): + try: + return numer / denom + except ZeroDivisionError: + return pd.Series([pd.NA] * len(numer), index=numer.index) + + time_col = pick(["time_sum", "time"]) + self_col = pick(["self_time_sum", "self_time"]) + child_col = pick(["child_time_sum", "child_time"]) + parent_time_col = pick(["parent_time_first", "parent_time"]) + root_time_col = pick(["root_time_first", "root_time"]) + + with pd.option_context("mode.use_inf_as_na", True): + parent_denom = ( + df[parent_time_col].where(df[parent_time_col] != 0, pd.NA) if parent_time_col else None + ) + root_denom = df[root_time_col].where(df[root_time_col] != 0, pd.NA) if root_time_col else None + if time_col and parent_denom is not None: + df["time_frac_parent"] = safe_divide(df[time_col], parent_denom) + if self_col and parent_denom is not None: + df["self_time_frac_parent"] = safe_divide(df[self_col], parent_denom) + if child_col and parent_denom is not None: + df["child_time_frac_parent"] = safe_divide(df[child_col], parent_denom) + if time_col and child_col: + df["child_time_frac_self"] = df[child_col] / df[time_col] + if time_col and root_denom is not None: + df["time_frac_root"] = safe_divide(df[time_col], root_denom) + if self_col and root_denom is not None: + df["self_time_frac_root"] = safe_divide(df[self_col], root_denom) + if child_col and root_denom is not None: + df["child_time_frac_root"] = safe_divide(df[child_col], root_denom) + if root_time_col and job_time != 0: + df["root_time_frac_job"] = safe_divide(df[root_time_col], job_time) + if time_col and job_time != 0: + df["time_frac_job"] = safe_divide(df[time_col], job_time) + return df + + +def add_stack_time_context(main_view: dd.DataFrame, traces: dd.DataFrame) -> dd.DataFrame: + if "root_id" not in main_view.columns: + main_view = main_view.reset_index() + + parent_time_df = traces[["event_id", COL_TIME]].rename( + columns={"event_id": "parent_id", COL_TIME: "parent_time"} + ) + root_time_df = ( + traces[traces["depth"] == 0][["root_id", COL_TIME]] + .rename(columns={COL_TIME: "root_time"}) + .drop_duplicates(subset=["root_id"]) + ) + main_view = main_view.merge(parent_time_df, on="parent_id", how="left") + main_view = main_view.merge(root_time_df, on="root_id", how="left") + return main_view diff --git a/tests/data/datacrumbs-hdf5.tar.gz b/tests/data/datacrumbs-hdf5.tar.gz new file mode 100644 index 0000000..1e8b043 Binary files /dev/null and b/tests/data/datacrumbs-hdf5.tar.gz differ diff --git a/tests/data/datacrumbs-ior.tar.gz b/tests/data/datacrumbs-ior.tar.gz new file mode 100644 index 0000000..42e6c53 Binary files /dev/null and b/tests/data/datacrumbs-ior.tar.gz differ diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 32c20c8..214f308 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -1,4 +1,5 @@ import os +import pandas as pd import pathlib import pytest import random @@ -11,6 +12,8 @@ full_analyzer_trace_params = [ ("darshan", "posix", "tests/data/extracted/darshan-posix"), ("darshan", "posix", "tests/data/extracted/darshan-posix-dxt"), + ("datacrumbs", "stack", "tests/data/extracted/datacrumbs-hdf5"), + ("datacrumbs", "stack", "tests/data/extracted/datacrumbs-ior"), ("dftracer", "dlio", "tests/data/extracted/dftracer-dlio"), ("dftracer", "posix", "tests/data/extracted/dftracer-posix"), ("recorder", "posix", "tests/data/extracted/recorder-posix-parquet"), @@ -73,7 +76,9 @@ def _test_e2e( scheduler_address = dask_cluster.scheduler_address view_types = ["proc_name", "time_range"] - if trace_path.endswith("darshan-posix"): + if analyzer == "datacrumbs": + view_types = ["proc_name", "func_name"] + elif trace_path.endswith("darshan-posix"): view_types = ["file_name", "proc_name"] hydra_overrides = [ @@ -116,9 +121,83 @@ def _test_e2e( assert len(result.layers) == len(dfa.hydra_config.analyzer.preset.layer_defs), ( f"Expected {len(dfa.hydra_config.analyzer.preset.layer_defs)} layers, got {len(result.layers)}" ) + if checkpoint: assert any(glob(f"{result.checkpoint_dir}/*.parquet")), "No checkpoint found" + if preset == "stack": + stack_layer = next(iter(result.layers), None) + assert stack_layer is not None, "Expected a stack layer to be present" + + # Schema sanity + hierarchy invariants for layer views + for view_key, view_df in result.views[stack_layer].items(): + view = view_df.reset_index() + for col in ("parent_id", "root_id", "depth"): + assert col in view.columns, f"Missing {col} in stack view {view_key}" + assert view["root_id"].ne("").all(), f"Empty root_id values in stack view {view_key}" + depth_zero = view["depth"] == 0 + if depth_zero.any(): + assert view.loc[depth_zero, "parent_id"].isin(["", None]).all(), ( + f"Root rows should not have parent_id in stack view {view_key}" + ) + if (~depth_zero).any(): + assert (view.loc[~depth_zero, "parent_id"] != "").all(), ( + f"Non-root rows missing parent_id in stack view {view_key}" + ) + + # Basic invariants for time metrics when present + time_sum = view["time_sum"] + self_sum = view["self_time_sum"] + child_sum = view["child_time_sum"] + assert (time_sum.dropna() >= 0).all(), f"Negative time_sum in {view_key}" + assert (self_sum.dropna() >= 0).all(), f"Negative self_time_sum in {view_key}" + assert (child_sum.dropna() >= 0).all(), f"Negative child_time_sum in {view_key}" + assert (time_sum.dropna() >= self_sum.dropna()).all(), ( + f"time_sum < self_time_sum in {view_key}" + ) + delta = (self_sum + child_sum) - time_sum + assert (delta.abs().dropna() < 1e-6).all(), ( + f"self_time_sum + child_time_sum != time_sum in {view_key}" + ) + + # Deterministic top child selection for roots with a strict max + if "func_name" in view.columns: + candidate = view[view["depth"] > 0].copy() + if not candidate.empty: + candidate = candidate.dropna(subset=["time_frac_root", "root_id"]) + if not candidate.empty: + time_sort = "time_sum" if "time_sum" in candidate.columns else None + sort_cols = ["root_id", "time_frac_root"] + sort_asc = [True, False] + if time_sort: + sort_cols.append(time_sort) + sort_asc.append(False) + sort_cols.append("func_name") + sort_asc.append(True) + top = candidate.sort_values(sort_cols, ascending=sort_asc).groupby("root_id").head(2) + for root_id, group in top.groupby("root_id"): + if len(group) < 2: + continue + first, second = group.iloc[0], group.iloc[1] + if pd.isna(first["time_frac_root"]) or pd.isna(second["time_frac_root"]): + continue + if first["time_frac_root"] > second["time_frac_root"]: + max_row = candidate[candidate["root_id"] == root_id].loc[ + candidate[candidate["root_id"] == root_id]["time_frac_root"].idxmax() + ] + assert first["func_name"] == max_row["func_name"], ( + f"Non-deterministic top child in root {root_id} for {view_key}" + ) + + # Fraction bounds for flat views + for view_key, flat_view in result.flat_views.items(): + frac_cols = [col for col in flat_view.columns if "_frac_" in col] + assert frac_cols, f"Expected fraction columns in flat view {view_key}" + for col in frac_cols: + series = flat_view[col] + in_range = series.between(0, 1) | series.isna() + assert in_range.all(), f"Out-of-range fraction in {view_key}.{col}" + # Shutdown the Dask client and cluster dfa.shutdown() diff --git a/tests/utils/test_stack_utils.py b/tests/utils/test_stack_utils.py new file mode 100644 index 0000000..ba971ef --- /dev/null +++ b/tests/utils/test_stack_utils.py @@ -0,0 +1,127 @@ +import pandas as pd +import dask.dataframe as dd + +from dftracer.analyzer.constants import COL_TIME, COL_TIME_END, COL_TIME_START +from dftracer.analyzer.utils.stack_utils import ( + add_stack_time_context, + assign_hierarchy, + compute_self_time, + set_stack_metrics, +) + + +def test_assign_hierarchy_nested_stack(): + pdf = pd.DataFrame( + { + "pid": [0, 0, 0, 0, 0], + "tid": [0, 0, 0, 0, 0], + COL_TIME_START: [0, 1, 2, 6, 11], + COL_TIME_END: [10, 8, 4, 7, 12], + } + ) + + out = assign_hierarchy(pdf) + + # Event IDs are assigned after sorting; verify consistent stack structure. + root_id = out.loc[0, "event_id"] + assert out.loc[0, "depth"] == 0 + assert out.loc[0, "parent_id"] == "" + assert out.loc[0, "root_id"] == root_id + + assert out.loc[1, "parent_id"] == root_id + assert out.loc[1, "root_id"] == root_id + assert out.loc[1, "depth"] == 1 + + assert out.loc[2, "parent_id"] == out.loc[1, "event_id"] + assert out.loc[2, "root_id"] == root_id + assert out.loc[2, "depth"] == 2 + + assert out.loc[3, "parent_id"] == out.loc[1, "event_id"] + assert out.loc[3, "root_id"] == root_id + assert out.loc[3, "depth"] == 2 + + assert out.loc[4, "depth"] == 0 + assert out.loc[4, "parent_id"] == "" + assert out.loc[4, "root_id"] == out.loc[4, "event_id"] + + +def test_compute_self_time_rollup(): + pdf = pd.DataFrame( + { + "event_id": ["r", "c1", "c2"], + "parent_id": ["", "r", "r"], + COL_TIME: [10.0, 3.0, 2.0], + } + ) + traces = dd.from_pandas(pdf, npartitions=1) + result = compute_self_time(traces).compute() + + root = result.loc[result["event_id"] == "r"].iloc[0] + child1 = result.loc[result["event_id"] == "c1"].iloc[0] + child2 = result.loc[result["event_id"] == "c2"].iloc[0] + + assert root["child_time"] == 5.0 + assert root["self_time"] == 5.0 + assert child1["child_time"] == 0.0 + assert child1["self_time"] == 3.0 + assert child2["child_time"] == 0.0 + assert child2["self_time"] == 2.0 + + +def test_add_stack_time_context(): + traces_pdf = pd.DataFrame( + { + "event_id": ["r", "c1"], + "root_id": ["r", "r"], + "depth": [0, 1], + COL_TIME: [10.0, 4.0], + } + ) + traces = dd.from_pandas(traces_pdf, npartitions=1) + + main_view_pdf = pd.DataFrame( + { + "parent_id": ["", "r"], + "root_id": ["r", "r"], + } + ) + main_view = dd.from_pandas(main_view_pdf, npartitions=1) + out = add_stack_time_context(main_view, traces).compute() + + root_row = out.iloc[0] + child_row = out.iloc[1] + assert pd.isna(root_row["parent_time"]) + assert root_row["root_time"] == 10.0 + assert child_row["parent_time"] == 10.0 + assert child_row["root_time"] == 10.0 + + +def test_set_stack_metrics_fractions(): + df = pd.DataFrame( + { + "time_sum": [5.0, 1.0], + "self_time_sum": [3.0, 1.0], + "child_time_sum": [2.0, 0.0], + "parent_time": [10.0, 0.0], + "root_time": [20.0, 0.0], + } + ) + + out = set_stack_metrics(df, job_time=40.0) + + assert out.loc[0, "time_frac_parent"] == 0.5 + assert out.loc[0, "self_time_frac_parent"] == 0.3 + assert out.loc[0, "child_time_frac_parent"] == 0.2 + assert out.loc[0, "child_time_frac_self"] == 0.4 + assert out.loc[0, "time_frac_root"] == 0.25 + assert out.loc[0, "self_time_frac_root"] == 0.15 + assert out.loc[0, "child_time_frac_root"] == 0.1 + assert out.loc[0, "root_time_frac_job"] == 0.5 + assert out.loc[0, "time_frac_job"] == 0.125 + + assert pd.isna(out.loc[1, "time_frac_parent"]) + assert pd.isna(out.loc[1, "self_time_frac_parent"]) + assert pd.isna(out.loc[1, "child_time_frac_parent"]) + assert pd.isna(out.loc[1, "time_frac_root"]) + assert pd.isna(out.loc[1, "self_time_frac_root"]) + assert pd.isna(out.loc[1, "child_time_frac_root"]) \ No newline at end of file