Skip to content

Commit

Permalink
Add methods to machine-generate pipelines for gathering resource stats.
Browse files Browse the repository at this point in the history
The actual CLI script will be going into drp_pipe, because the intent
is to only run it inside the SCons scripts there.
  • Loading branch information
TallJimbo committed Mar 14, 2022
1 parent 036e580 commit d2f13e5
Showing 1 changed file with 174 additions and 1 deletion.
175 changes: 174 additions & 1 deletion python/lsst/analysis/drp/gatherResourceStatistics.py
Expand Up @@ -27,17 +27,24 @@
"GatherResourceStatisticsTask",
)

import argparse
import sys

import numpy as np
import pandas as pd

from lsst.daf.butler import Butler, DimensionUniverse
from lsst.pex.config import Config, ConfigDictField, Field, ListField
from lsst.pipe.base import (
Pipeline,
PipelineTask,
PipelineTaskConfig,
PipelineTaskConnections,
Struct,
)
from lsst.pipe.base.pipelineIR import ConfigIR, LabeledSubset, PipelineIR, TaskIR
from lsst.pipe.base import connectionTypes as ct
from lsst.utils.introspection import get_full_type_name

# It's not great to be importing a private symbol, but this is a temporary
# workaround for the fact that prior to w.2022.10, the units for memory values
Expand Down Expand Up @@ -187,12 +194,146 @@ class GatherResourceStatisticsTask(PipelineTask):
It is expected that this task will be configured to run multiple times in
most pipelines, with different output labels and output dataset type names,
for each of the sets of dimensions whose tasks are prominent enough to be
worth aggregating resource usage statistics.
worth aggregating resource usage statistics. The `make_pipeline` method
can be used to create a `Pipeline` that runs this task on all metadata
datasets produced produced by some other pipeline.
"""

ConfigClass = GatherResourceStatisticsConfig
_DefaultName = "gatherResourceStatistics"

@classmethod
def make_pipeline(cls, input_pipeline, universe):
"""Create a new pipeline that gathers resource statistics from the
metadata produced by another pipeline.
Parameters
----------
input_pipeline : `Iterable` [ `lsst.pipe.base.TaskDef` ]
An `lsst.pipe.base.Pipeline` or other iterable over
`lsst.pipe.base.TaskDef` instances, containing the tasks whose
resource usage statistics should be gathered.
universe : `lsst.daf.butler.DimensionUniverse`
Object managing all dimensions (data ID keys) recognized by the
butler; used to standardize task and dataset type dimensions by
expanding out required and implied dependencies.
Returns
-------
output_pipeline : `lsst.pipe.base.Pipeline`
An `lsst.pipe.base.Pipeline` that runs
`GatherResourceStatisticsTask` once for each distinct set of
dimensions in ``input_pipeline``, on all of the tasks with those
dimensions.
"""
this_tasks_name = get_full_type_name(cls)
# Process the input pipeline, grouping its tasks by their dimensions.
input_labels_by_dimensions = defaultdict(list)
for task_def in input_pipeline:
if task_def.taskName == this_tasks_name:
# Don't gather resource statistics on existing configurations
# of this task.
continue
dimensions = universe.extract(task_def.connections.dimensions)
input_labels_by_dimensions[dimensions].append(task_def.label)
# Process the dict we just created to create the new Pipeline, with one
# task configuration for each key.
output_pipeline_ir = PipelineIR(
{
"description": "A pipeline that aggregates resource usage statistics.",
"tasks": {},
}
)
output_subset = LabeledSubset(
"gatherResourceStatistics",
set(),
"Gather resource usage statistics from metadata into tables.",
)
output_pipeline_ir.labeled_subsets[output_subset.label] = output_subset
for dimensions, input_labels in input_labels_by_dimensions.items():
dimensions_name = _compact_dimensions_name(dimensions)
output_label = f"{cls._DefaultName}_{dimensions_name}"
# This string is a snippet of Python code - the innards of a
# dict comprehension, not including the curly braces. It only
# makes sense if you look at the multi-line block it's inserted
# into below.
labels_config_str = "".join(
f' "{label}": C(),\n' for label in input_labels
)
config_ir = ConfigIR(
rest={
"dimensions": list(dimensions.names),
"connections.output_table": f"resource_statistics_{dimensions_name}",
},
python=(
f"from {ExtractResourceUsageConfig.__module__} import ExtractResourceUsageConfig as C\n"
"config.labels = {\n"
f"{labels_config_str}}}"
),
)
task_ir = TaskIR(
label=output_label,
klass=this_tasks_name,
config=[config_ir],
)
output_pipeline_ir.tasks[output_label] = task_ir
output_subset.subset.add(output_label)
return Pipeline.fromIR(output_pipeline_ir)

@classmethod
def make_pipeline_cli(cls, argv):
"""A command-line entry point for `make_pipeline`.
Parameters
----------
argv : `Sequence` [ `str` ]
Command-line arguments to parse; usually just ``sys.argv[1:]``.
Notes
-----
This is intentionally not runnable via ``python -m`` on this module,
because this sets ``cls.__module__`` to ``__main__``, making it
impossible to embed the task's full class name in a `Pipeline` without
substantial workarounds.
"""
parser = argparse.ArgumentParser(
description="Generate a pipeline that gathers resource usage statistics for another pipeline"
)
parser.add_argument(
"input",
type=str,
help="URI to the pipeline whose tasks' metadata will be processed for resource usage.",
)
parser.add_argument(
"-o",
"--output",
type=str,
help="URI to write the resource-usage-gathering pipeline to. Default is STDOUT.",
default=None,
)
parser.add_argument(
"--data-repository",
type=str,
help=(
"URI to a butler data repository whose dimensions should be used "
"to standardize those of tasks, instead of the defaults in daf_butler."
),
default=None,
)
arguments = parser.parse_args(argv)
input_pipeline = Pipeline.from_uri(arguments.input)
if arguments.data_repository is not None:
universe = Butler(arguments.data_repository).registry.dimensions
else:
universe = DimensionUniverse()
output_pipeline = cls.make_pipeline(
input_pipeline, universe=universe
)
if arguments.output is not None:
output_pipeline.write_to_uri(arguments.output)
else:
sys.stdout.write(str(output_pipeline))

def runQuantum(
self,
butlerQC,
Expand Down Expand Up @@ -357,3 +498,35 @@ def _dtype_from_field_spec(field_spec):
return np.dtype((str, field_spec.length))
else:
return np.dtype(python_type)


def _compact_dimensions_name(dimensions):
"""Return a compact string that uniquely identifies a set of dimensions.
For example, given "(instrument, visit, detector)", this returns
"visit_detector_patch", because "instrument" is always present in data IDs
with either detector or visit.
Parameters
----------
dimension : `lsst.daf.butler.DimensionGraph`
Dimensions to generate a name for.
Returnsx
-------
name : `str`
Compact name for the set of dimensions. Guaranteed to have only
alphanumeric characters and underscores (providing that is also true
of all dimension names), and for
"""
terms = set(dimensions.required.names)
while True:
for name in list(terms):
if any(
name in dimensions[other].required.names and name != other
for other in terms
):
terms.remove(name)
break
else:
return "_".join(terms)

0 comments on commit d2f13e5

Please sign in to comment.