Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions python/dftracer/analyzer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,25 @@ class AnalyzerPresetConfigDLIOAILogging(AnalyzerPresetConfigDLIO):
}
)

@dc.dataclass
class AnalyzerPresetConfigDynamic(AnalyzerPresetConfig):
layer_defs: Dict[str, Optional[str]] = dc.field(
default_factory=lambda: {
'dynamic': None,
}
)
name: str = "dynamic"


@dc.dataclass
class AnalyzerPresetConfigStack(AnalyzerPresetConfig):
layer_defs: Dict[str, Optional[str]] = dc.field(
default_factory=lambda: {
'stack': None,
}
)
name: str = "stack"


@dc.dataclass
class AnalyzerConfig:
Expand All @@ -222,6 +241,11 @@ class DarshanAnalyzerConfig(AnalyzerConfig):
time_granularity: Optional[float] = 1
time_resolution: Optional[float] = 1e3

@dc.dataclass
class DataCrumbsAnalyzerConfig(AnalyzerConfig):
_target_: str = "dftracer.analyzer.datacrumbs.DataCrumbsAnalyzer"
time_granularity: Optional[float] = 1
time_resolution: Optional[float] = 1e6

@dc.dataclass
class DFTracerAnalyzerConfig(AnalyzerConfig):
Expand Down Expand Up @@ -389,11 +413,14 @@ def init_hydra_config_store() -> ConfigStore:
cs.store(group="hydra/job", name="custom", node=CustomJobConfig)
cs.store(name="config", node=Config)
cs.store(group="analyzer", name="darshan", node=DarshanAnalyzerConfig)
cs.store(group="analyzer", name="datacrumbs", node=DataCrumbsAnalyzerConfig)
cs.store(group="analyzer", name="dftracer", node=DFTracerAnalyzerConfig)
cs.store(group="analyzer", name="recorder", node=RecorderAnalyzerConfig)
cs.store(group="analyzer/preset", name="posix", node=AnalyzerPresetConfigPOSIX)
cs.store(group="analyzer/preset", name="dlio-prev", node=AnalyzerPresetConfigDLIO)
cs.store(group="analyzer/preset", name="dlio", node=AnalyzerPresetConfigDLIOAILogging)
cs.store(group="analyzer/preset", name="dlio-old", node=AnalyzerPresetConfigDLIO)
cs.store(group="analyzer/preset", name="dynamic", node=AnalyzerPresetConfigDynamic)
cs.store(group="analyzer/preset", name="posix", node=AnalyzerPresetConfigPOSIX)
cs.store(group="analyzer/preset", name="stack", node=AnalyzerPresetConfigStack)
cs.store(group="cluster", name="external", node=ExternalClusterConfig)
cs.store(group="cluster", name="local", node=LocalClusterConfig)
cs.store(group="cluster", name="lsf", node=LSFClusterConfig)
Expand Down
190 changes: 190 additions & 0 deletions python/dftracer/analyzer/datacrumbs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import dask.dataframe as dd
import numpy as np
import pandas as pd
from typing import List

from .analysis_utils import fix_dtypes, fix_std_cols, set_unique_counts
from .constants import COL_COUNT, COL_PROC_NAME, COL_TIME
from .dftracer import DFTracerAnalyzer
from .metrics import set_main_metrics
from .types import ViewType
from .utils.dask_utils import flatten_column_names
from .utils.log_utils import log_block
from .utils.stack_utils import (
add_stack_time_context,
assign_hierarchy,
compute_self_time,
set_stack_metrics,
)


def _add_hierarchy_columns(traces: dd.DataFrame) -> dd.DataFrame:
meta = traces._meta.copy()
meta = meta.assign(
event_id=pd.Series(dtype="string"),
parent_id=pd.Series(dtype="string"),
root_id=pd.Series(dtype="string"),
depth=pd.Series(dtype="int64"),
)
return (
traces.groupby(["pid", "tid"])
.apply(assign_hierarchy, meta=meta)
.reset_index(drop=True)
)


class DataCrumbsAnalyzer(DFTracerAnalyzer):

def read_trace(self, trace_path, extra_columns, extra_columns_fn):
traces = super().read_trace(trace_path, extra_columns, extra_columns_fn)
if self.preset.name == "dynamic":
with log_block("set_dynamic_layer_defs"):
layers = traces['cat'].unique().compute().tolist()
self.layers = layers
for layer in layers:
self.preset.derived_metrics[layer] = {}
self.preset.layer_defs[layer] = f"cat == '{layer}'"
self.preset.layer_deps[layer] = None
return traces

def postread_trace(self, traces, view_types):
traces = super().postread_trace(traces, view_types)
if self.preset.name == "stack":
traces = _add_hierarchy_columns(traces)
traces = compute_self_time(traces)
self._stack_traces = traces
return traces

def _compute_high_level_metrics(
self,
traces: dd.DataFrame,
view_types: list,
partition_size: str,
) -> dd.DataFrame:
if self.preset.name != "stack":
return super()._compute_high_level_metrics(traces, view_types, partition_size)

extra_cols = ["cat", "func_name", "parent_id", "root_id", "depth"]
present_extra = [col for col in extra_cols if col in traces.columns]
group_cols = list(dict.fromkeys(list(view_types) + present_extra))

agg: dict[str, str] = {}
if COL_TIME in traces.columns:
agg[COL_TIME] = "sum"
if "self_time" in traces.columns:
agg["self_time"] = "sum"
if "child_time" in traces.columns:
agg["child_time"] = "sum"
if COL_COUNT in traces.columns:
agg[COL_COUNT] = "sum"
if "size" in traces.columns:
agg["size"] = "sum"
for col in traces.columns:
if col in group_cols:
continue
if "_bin_" in col:
agg[col] = "sum"

split_out = max(1, int(np.sqrt(traces.npartitions)))
return (
traces.groupby(group_cols)
.agg(agg, split_out=split_out)
.repartition(partition_size=partition_size)
)

def _compute_main_view(
self,
layer,
hlm: dd.DataFrame,
view_types: List[ViewType],
partition_size: str,
) -> dd.DataFrame:
if self.preset.name != "stack":
return super()._compute_main_view(layer, hlm, view_types, partition_size)

with log_block("drop_and_set_metrics", layer=layer):
hlm = hlm.map_partitions(self.set_layer_metrics, derived_metrics=self.preset.derived_metrics.get(layer, {}))

stack_keys = ["parent_id", "root_id", "depth"]
group_keys = list(view_types) + stack_keys
main_view_agg = {}
for col in hlm.columns:
if col in group_keys:
continue
if pd.api.types.is_numeric_dtype(hlm[col].dtype):
main_view_agg[col] = "sum"
else:
main_view_agg[col] = "first"

main_view = hlm.groupby(group_keys).agg(main_view_agg, split_out=hlm.npartitions)
if any(col.endswith("count") for col in main_view.columns) and any(
col.endswith("size") for col in main_view.columns
):
main_view = main_view.map_partitions(set_main_metrics)
main_view = (
main_view.replace(0, pd.NA)
.map_partitions(fix_dtypes, time_sliced=self.time_sliced)
.persist()
)
return add_stack_time_context(main_view, traces=self._stack_traces)

def _compute_view(
self,
layer,
records: dd.DataFrame,
view_key,
view_type: str,
view_types: List[ViewType],
) -> dd.DataFrame:
if self.preset.name != "stack":
return super()._compute_view(layer, records, view_key, view_type, view_types)

stack_keys = ["parent_id", "root_id", "depth"]
group_keys = [view_type] + stack_keys
if not set(stack_keys).issubset(records.columns):
records = records.reset_index()

view_agg = {}
for col in records.columns:
if col in group_keys:
continue
if col in ["parent_time", "root_time"]:
view_agg[col] = ["first"]
elif pd.api.types.is_numeric_dtype(records[col].dtype):
view_agg[col] = ["sum", "min", "max", "mean", "std"]
else:
view_agg[col] = ["first"]

std_cols = [col for col, aggs in view_agg.items() if isinstance(aggs, list) and "std" in aggs]
records = records.map_partitions(fix_std_cols, std_cols=std_cols)

pre_view = records
if view_type != COL_PROC_NAME and COL_PROC_NAME in pre_view.columns:
pre_view = pre_view.groupby(group_keys + [COL_PROC_NAME]).sum().reset_index()

view = pre_view.groupby(group_keys).agg(view_agg)
view = flatten_column_names(view)

stack_key_cols = {"parent_id", "root_id", "depth", "parent_id_first", "root_id_first", "depth_first"}

def replace_zero_metrics(pdf: pd.DataFrame) -> pd.DataFrame:
metric_cols = [
c
for c in pdf.columns
if c not in stack_key_cols and pd.api.types.is_numeric_dtype(pdf[c])
]
pdf = pdf.copy()
pdf[metric_cols] = pdf[metric_cols].replace(0, pd.NA)
return pdf

job_time = self.get_job_time(self._stack_traces).compute()
view = (
view.map_partitions(replace_zero_metrics)
.map_partitions(set_unique_counts, layer=layer)
.map_partitions(fix_dtypes, time_sliced=self.time_sliced)
.map_partitions(set_stack_metrics, job_time=job_time)
.persist()
)
return view


1 change: 1 addition & 0 deletions python/dftracer/analyzer/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ py.install_sources(
'config.py',
'constants.py',
'darshan.py',
'datacrumbs.py',
'dftracer.py',
'metrics.py',
'output.py',
Expand Down
1 change: 1 addition & 0 deletions python/dftracer/analyzer/utils/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ py.install_sources(
'json_encoders.py',
'log_utils.py',
'notebook_utils.py',
'stack_utils.py',
'warning_utils.py',
],
subdir: 'dftracer/analyzer/utils',
Expand Down
126 changes: 126 additions & 0 deletions python/dftracer/analyzer/utils/stack_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import dask.dataframe as dd
import pandas as pd
from typing import List

from ..constants import COL_TIME, COL_TIME_END, COL_TIME_START


def assign_hierarchy(pdf: pd.DataFrame) -> pd.DataFrame:
pdf = pdf.sort_values(
[COL_TIME_START, COL_TIME_END],
ascending=[True, False],
).reset_index(drop=True)

event_ids: List[str] = []
parent_ids: List[str] = []
root_ids: List[str] = []
depths: List[int] = []

stack: List[dict] = []
for i, row in pdf.iterrows():
event_id = f"{row['pid']}-{row['tid']}-{i}"
start = row[COL_TIME_START]
end = row[COL_TIME_END]

while stack and start >= stack[-1]["end"]:
stack.pop()

if stack and end <= stack[-1]["end"]:
parent = stack[-1]["id"]
root = stack[-1]["root"]
depth = stack[-1]["depth"] + 1
else:
parent = ""
root = event_id
depth = 0

event_ids.append(event_id)
parent_ids.append(parent)
root_ids.append(root)
depths.append(depth)

stack.append({"end": end, "id": event_id, "root": root, "depth": depth})

pdf = pdf.copy()
pdf["event_id"] = event_ids
pdf["parent_id"] = parent_ids
pdf["root_id"] = root_ids
pdf["depth"] = depths
return pdf


def compute_self_time(traces: dd.DataFrame) -> dd.DataFrame:
child_time = traces.groupby("parent_id")[COL_TIME].sum().rename("child_time")
traces = traces.merge(
child_time.to_frame(),
left_on="event_id",
right_index=True,
how="left",
)
traces["child_time"] = traces["child_time"].fillna(0)
traces["self_time"] = traces[COL_TIME] - traces["child_time"]
return traces


def set_stack_metrics(df: pd.DataFrame, job_time: float) -> pd.DataFrame:
df = df.copy()

def pick(pref: list[str]) -> str:
for name in pref:
if name in df.columns:
return name
return ""

def safe_divide(numer, denom):
try:
return numer / denom
except ZeroDivisionError:
return pd.Series([pd.NA] * len(numer), index=numer.index)

time_col = pick(["time_sum", "time"])
self_col = pick(["self_time_sum", "self_time"])
child_col = pick(["child_time_sum", "child_time"])
parent_time_col = pick(["parent_time_first", "parent_time"])
root_time_col = pick(["root_time_first", "root_time"])

with pd.option_context("mode.use_inf_as_na", True):
parent_denom = (
df[parent_time_col].where(df[parent_time_col] != 0, pd.NA) if parent_time_col else None
)
root_denom = df[root_time_col].where(df[root_time_col] != 0, pd.NA) if root_time_col else None
if time_col and parent_denom is not None:
df["time_frac_parent"] = safe_divide(df[time_col], parent_denom)
if self_col and parent_denom is not None:
df["self_time_frac_parent"] = safe_divide(df[self_col], parent_denom)
if child_col and parent_denom is not None:
df["child_time_frac_parent"] = safe_divide(df[child_col], parent_denom)
if time_col and child_col:
df["child_time_frac_self"] = df[child_col] / df[time_col]
if time_col and root_denom is not None:
df["time_frac_root"] = safe_divide(df[time_col], root_denom)
if self_col and root_denom is not None:
df["self_time_frac_root"] = safe_divide(df[self_col], root_denom)
if child_col and root_denom is not None:
df["child_time_frac_root"] = safe_divide(df[child_col], root_denom)
if root_time_col and job_time != 0:
df["root_time_frac_job"] = safe_divide(df[root_time_col], job_time)
if time_col and job_time != 0:
df["time_frac_job"] = safe_divide(df[time_col], job_time)
return df


def add_stack_time_context(main_view: dd.DataFrame, traces: dd.DataFrame) -> dd.DataFrame:
if "root_id" not in main_view.columns:
main_view = main_view.reset_index()

parent_time_df = traces[["event_id", COL_TIME]].rename(
columns={"event_id": "parent_id", COL_TIME: "parent_time"}
)
root_time_df = (
traces[traces["depth"] == 0][["root_id", COL_TIME]]
.rename(columns={COL_TIME: "root_time"})
.drop_duplicates(subset=["root_id"])
)
main_view = main_view.merge(parent_time_df, on="parent_id", how="left")
main_view = main_view.merge(root_time_df, on="root_id", how="left")
return main_view
Binary file added tests/data/datacrumbs-hdf5.tar.gz
Binary file not shown.
Binary file added tests/data/datacrumbs-ior.tar.gz
Binary file not shown.
Loading