Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Oct 23, 2023
1 parent 56c257b commit 0de5e69
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions python/lsst/analysis/tools/tasks/gatherResourceUsage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@

import numpy as np
import pandas as pd
from lsst.daf.butler import Butler, DatasetRef
from lsst.daf.butler.core.utils import globToRegex
from lsst.daf.butler import Butler, DatasetRef, DatasetType
from lsst.daf.butler.utils import globToRegex
from lsst.pex.config import Field, ListField
from lsst.pipe.base import (
Instrument,
Expand Down Expand Up @@ -568,9 +568,9 @@ def __init__(
# Start by querying for metadata datasets, since we'll need to know
# which dataset types exist in the input collections in order to
# build the pipeline.
base_dataset_type_filter = re.compile(r"\w+_metadata")
input_dataset_types: Any
if not dataset_type_names:
base_dataset_type_filter = re.compile(r"\w+_metadata")
input_dataset_types = base_dataset_type_filter
else:
input_dataset_types = [globToRegex(expr) for expr in dataset_type_names]
Expand All @@ -586,25 +586,11 @@ def __init__(
input_metadata_dataset_type = results.parentDatasetType
refs_for_type = set(results)
if refs_for_type:
if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None:
continue
elif "gatherResourceUsage" in input_metadata_dataset_type.name:
continue
else:
input_task_label = m.group(1)
gather_task_label = f"{input_task_label}_gatherResourceUsage"
gather_task_label, gather_dataset_type_name = self._add_gather_task(
pipeline_graph, input_metadata_dataset_type
)
metadata_refs[gather_task_label] = refs_for_type
gather_dataset_type_name = f"{input_task_label}_resource_usage"
gather_config = GatherResourceUsageConfig()
gather_config.dimensions = input_metadata_dataset_type.dimensions.names
gather_config.connections.input_metadata = input_metadata_dataset_type.name
gather_config.connections.output_table = gather_dataset_type_name
consolidate_config.input_names.append(gather_dataset_type_name)
pipeline_graph.add_task(
label=gather_task_label,
task_class=GatherResourceUsageTask,
config=gather_config,
)
pipeline_graph.add_task(
task_class=ConsolidateResourceUsageTask,
config=consolidate_config,
Expand All @@ -631,6 +617,48 @@ def __init__(
gather_inputs_for_task.append(dataset_key)
self.gather_inputs[gather_task_label] = gather_inputs_for_task

@classmethod
def _add_gather_task(
cls, pipeline_graph: PipelineGraph, input_metadata_dataset_type: DatasetType
) -> tuple[str, str]:
"""Add a single configuration of `GatherResourceUsageTask` to a
pipeline graph.
Parameters
----------
pipeline_graph : `lsst.pipe.base.PipelineGraph`
Pipeline graph to modify in-place.
input_metadata_dataset_type : `lsst.daf.butler.DatasetType`
Dataset type for the task's input dataset, which is the metadata
output of the task whose resource usage information is being
extracted.
Returns
-------
gather_task_label : `str`
Label of the new task in the pipeline.
gather_dataset_type_name : `str
Name of the task's output table dataset type.
"""
if (m := re.fullmatch(r"^(\w+)_metadata$", input_metadata_dataset_type.name)) is None:
return
elif "gatherResourceUsage" in input_metadata_dataset_type.name:
return
else:
input_task_label = m.group(1)
gather_task_label = f"{input_task_label}_gatherResourceUsage"
gather_dataset_type_name = f"{input_task_label}_resource_usage"
gather_config = GatherResourceUsageConfig()
gather_config.dimensions = input_metadata_dataset_type.dimensions.names
gather_config.connections.input_metadata = input_metadata_dataset_type.name
gather_config.connections.output_table = gather_dataset_type_name
pipeline_graph.add_task(
label=gather_task_label,
task_class=GatherResourceUsageTask,
config=gather_config,
)
return gather_task_label, gather_dataset_type_name

def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton:
skeleton = QuantumGraphSkeleton(subgraph.tasks.keys())
consolidate_inputs = []
Expand Down Expand Up @@ -689,10 +717,7 @@ def make_argument_parser(cls) -> argparse.ArgumentParser:
"--dataset-types",
type=str,
action="extend",
help=(
"Glob-style patterns for input metadata dataset types. If a pattern matches a "
"non-metadata dataset type, non-metadata matches will be ignored."
),
help="Glob-style patterns for input metadata dataset types.",
)
parser.add_argument(
"--where",
Expand Down

0 comments on commit 0de5e69

Please sign in to comment.