Skip to content

Commit

Permalink
Add task for summarizing resource usage into a single table.
Browse files Browse the repository at this point in the history
  • Loading branch information
yalsayyad authored and TallJimbo committed Sep 8, 2023
1 parent fadca26 commit 65aa428
Showing 1 changed file with 162 additions and 1 deletion.
163 changes: 162 additions & 1 deletion python/lsst/analysis/drp/gatherResourceUsage.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,109 @@
_LOG = logging.getLogger(__name__)


class ConsolidateResourceUsageConnections(PipelineTaskConnections, dimensions=()):
output_table = ct.Output(
name="ResourceUsageSummary",
storageClass="DataFrame",
dimensions=(),
doc="Consolidated table of resource usage statistics. One row per task label",
)

def __init__(self, *, config):
super().__init__(config=config)
for name in self.config.input_names:
setattr(
self,
name,
ct.Input(
name,
storageClass="DataFrame",
dimensions=(),
doc="Resource usage statistics for a task.",
),
)
self.inputs.add(name)


class ConsolidateResourceUsageConfig(
PipelineTaskConfig, pipelineConnections=ConsolidateResourceUsageConnections
):
input_names = ListField(
doc="Input resource usage dataset type names",
dtype=str,
default=[],
)


class ConsolidateResourceUsageTask(PipelineTask):
"""A `PipelineTask` that summarizes task resource usage into a single
table with per-task rows.
Notes
-----
This is an unusual `PipelineTask` in that its input connection has
dynamic dimensions, and its quanta are generally built via a custom
quantum-graph builder defined in the same module.
"""

ConfigClass = ConsolidateResourceUsageConfig
_DefaultName = "consolidateResourceUsage"

def run(self, **kwargs):
quantiles = []
for input_name, ru_table in kwargs.items():
if not input_name.endswith("resource_usage"):
continue
else:
df = ru_table.quantile(
[0.0, 0.01, 0.05, 0.32, 0.50, 0.68, 0.95, 0.99, 1.0],
numeric_only=True,
).reset_index()
df["task"] = input_name.replace("_resource_usage", "")
df["quanta"] = len(ru_table)
df["integrated_runtime"] = ru_table["run_time"].sum()

quantiles.append(
df[
[
"index",
"quanta",
"task",
"memory",
"init_time",
"run_time",
"integrated_runtime",
]
]
)

full_quantiles = pd.concat(quantiles)
full_quantiles["percentile"] = (full_quantiles["index"] * 100).astype(int)
full_quantiles["percentile_name"] = "p" + full_quantiles["percentile"].astype(str).str.zfill(3)
full_quantiles["memoryGB"] = full_quantiles["memory"] / 1024 / 1024 / 1024
full_quantiles["integrated_runtime_hrs"] = full_quantiles["integrated_runtime"] / 3600.0
memoryGB = pd.pivot_table(
full_quantiles, values="memoryGB", columns=["percentile_name"], index=["task"]
).add_prefix("mem_GB_")
runtime = pd.pivot_table(
full_quantiles, values="run_time", columns=["percentile_name"], index=["task"]
).add_prefix("runtime_s_")
memrun = pd.merge(
memoryGB.reset_index(),
runtime.reset_index(),
left_on="task",
right_on="task",
)
memrun = pd.merge(
full_quantiles[
["task", "quanta", "integrated_runtime_hrs"]
].drop_duplicates().sort_values("task"),
memrun,
)

return Struct(output_table=memrun)


class GatherResourceUsageConnections(
PipelineTaskConnections,
dimensions=(),
Expand Down Expand Up @@ -242,20 +345,31 @@ def build_quantum_graph(cls, metadata_refs, graph_metadata):
init_outputs = {}
empty_dimensions: DimensionGraph | None = None
data_id: DataCoordinate | None = None

consolidate_inputs = {}
consolidate_config = ConsolidateResourceUsageConfig()

for input_metadata_dataset_type, metadata_refs in metadata_refs_by_dataset_type.items():
if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None:
continue
elif "gatherResourceUsage" in input_metadata_dataset_type.name:
continue
else:
input_task_label = m.group(1)
_LOG.info(
"Creating GatherResourceUsage quantum for %s with %d input datasets.",
input_task_label,
len(metadata_refs),
)
dataset_type_name = f"{input_task_label}_resource_usage"

config = cls.ConfigClass()
config.dimensions = input_metadata_dataset_type.dimensions.names
config.connections.input_metadata = input_metadata_dataset_type.name
config.connections.output_table = f"{input_task_label}_resource_usage"
config.connections.output_table = dataset_type_name

consolidate_config.input_names.append(dataset_type_name)

task_def = TaskDef(
taskName=get_full_type_name(cls),
taskClass=cls,
Expand All @@ -275,6 +389,7 @@ def build_quantum_graph(cls, metadata_refs, graph_metadata):
DatasetRef(output_table_dataset_type, data_id, run=output_run)
],
}
consolidate_inputs.update(outputs)
if task_def.metadataDatasetName is not None:
output_metadata_dataset_type = DatasetType(
task_def.metadataDatasetName,
Expand Down Expand Up @@ -307,6 +422,52 @@ def build_quantum_graph(cls, metadata_refs, graph_metadata):
)
init_outputs[task_def] = [DatasetRef(config_dataset_type, data_id, run=output_run)]

# Now once more for the task to consolidate all the individual resource
# usage tables
task_def = TaskDef(
taskName=get_full_type_name(ConsolidateResourceUsageTask),
taskClass=ConsolidateResourceUsageTask,
config=consolidate_config,
label=ConsolidateResourceUsageTask._DefaultName,
)

output_table_dataset_type = DatasetType(
consolidate_config.connections.output_table,
dimensions=empty_dimensions,
storageClass=ConsolidateResourceUsageConnections.output_table.storageClass,
)

outputs = {
output_table_dataset_type: [DatasetRef(output_table_dataset_type, data_id, run=output_run)],
}

if task_def.metadataDatasetName is not None:
output_metadata_dataset_type = DatasetType(
task_def.metadataDatasetName,
dimensions=empty_dimensions,
storageClass="TaskMetadata",
)
outputs[output_metadata_dataset_type] = [
DatasetRef(output_metadata_dataset_type, data_id, run=output_run)
]

if task_def.logOutputDatasetName is not None:
log_dataset_type = DatasetType(
task_def.logOutputDatasetName,
dimensions=empty_dimensions,
storageClass="ButlerLogRecords",
)
outputs[log_dataset_type] = [DatasetRef(log_dataset_type, data_id, run=output_run)]

quantum = Quantum(
taskName=task_def.taskName,
taskClass=task_def.taskClass,
dataId=data_id,
inputs=consolidate_inputs,
outputs=outputs,
)
quanta_by_task_def[task_def] = {quantum}

global_init_outputs = []
if empty_dimensions is not None and data_id is not None:
packages_dataset_type = DatasetType(
Expand Down

0 comments on commit 65aa428

Please sign in to comment.