Skip to content

Commit

Permalink
Merge pull request #385 from lsst/tickets/DM-41117
Browse files Browse the repository at this point in the history
DM-41117: Use Registry caching context in QG builder
  • Loading branch information
andy-slac committed Nov 14, 2023
2 parents 58cb176 + 1701dfa commit b64b5b1
Showing 1 changed file with 45 additions and 42 deletions.
87 changes: 45 additions & 42 deletions python/lsst/pipe/base/quantum_graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def __init__(
self.metadata = TaskMetadata()
self._pipeline_graph = pipeline_graph
self.butler = butler
self._pipeline_graph.resolve(self.butler.registry)
if input_collections is None:
input_collections = butler.collections
if not input_collections:
Expand Down Expand Up @@ -233,11 +232,13 @@ def __init__(
packages_storage_class,
)
}
self._find_empty_dimension_datasets()
self.prerequisite_info = {
task_node.label: PrerequisiteInfo(task_node, self._pipeline_graph)
for task_node in pipeline_graph.tasks.values()
}
with self.butler.registry.caching_context():
self._pipeline_graph.resolve(self.butler.registry)
self._find_empty_dimension_datasets()
self.prerequisite_info = {
task_node.label: PrerequisiteInfo(task_node, self._pipeline_graph)
for task_node in pipeline_graph.tasks.values()
}

log: LsstLogAdapter
"""Logger to use for all quantum-graph generation messages.
Expand Down Expand Up @@ -339,42 +340,44 @@ def build(self, metadata: Mapping[str, Any] | None = None) -> QuantumGraph:
call this method exactly once. See class documentation for details on
what it does.
"""
full_skeleton = QuantumGraphSkeleton(self._pipeline_graph.tasks)
subgraphs = list(self._pipeline_graph.split_independent())
for i, subgraph in enumerate(subgraphs):
self.log.info(
"Processing pipeline subgraph %d of %d with %d task(s).",
i + 1,
len(subgraphs),
len(subgraph.tasks),
)
self.log.verbose("Subgraph tasks: [%s]", ", ".join(label for label in subgraph.tasks))
subgraph_skeleton = self.process_subgraph(subgraph)
full_skeleton.update(subgraph_skeleton)
# Loop over tasks. The pipeline graph must be topologically sorted,
# so a quantum is only processed after any quantum that provides its
# inputs has been processed.
for task_node in self._pipeline_graph.tasks.values():
self._resolve_task_quanta(task_node, full_skeleton)
# Add global init-outputs to the skeleton.
for dataset_type in self._global_init_output_types.values():
dataset_key = full_skeleton.add_dataset_node(
dataset_type.name, self.empty_data_id, is_global_init_output=True
)
ref = self.existing_datasets.outputs_in_the_way.get(dataset_key)
if ref is None:
ref = DatasetRef(dataset_type, self.empty_data_id, run=self.output_run)
full_skeleton[dataset_key]["ref"] = ref
# Remove dataset nodes with no edges that are not global init outputs,
# which are generally overall-inputs whose original quanta end up
# skipped or with no work to do (we can't remove these along with the
# quanta because no quantum knows if its the only consumer).
full_skeleton.remove_orphan_datasets()
self._attach_datastore_records(full_skeleton)
# TODO initialize most metadata here instead of in ctrl_mpexec.
if metadata is None:
metadata = {}
return self._construct_quantum_graph(full_skeleton, metadata)
with self.butler.registry.caching_context():
full_skeleton = QuantumGraphSkeleton(self._pipeline_graph.tasks)
subgraphs = list(self._pipeline_graph.split_independent())
for i, subgraph in enumerate(subgraphs):
self.log.info(
"Processing pipeline subgraph %d of %d with %d task(s).",
i + 1,
len(subgraphs),
len(subgraph.tasks),
)
self.log.verbose("Subgraph tasks: [%s]", ", ".join(label for label in subgraph.tasks))
subgraph_skeleton = self.process_subgraph(subgraph)
full_skeleton.update(subgraph_skeleton)
# Loop over tasks. The pipeline graph must be topologically
# sorted, so a quantum is only processed after any quantum that
# provides its inputs has been processed.
for task_node in self._pipeline_graph.tasks.values():
self._resolve_task_quanta(task_node, full_skeleton)
# Add global init-outputs to the skeleton.
for dataset_type in self._global_init_output_types.values():
dataset_key = full_skeleton.add_dataset_node(
dataset_type.name, self.empty_data_id, is_global_init_output=True
)
ref = self.existing_datasets.outputs_in_the_way.get(dataset_key)
if ref is None:
ref = DatasetRef(dataset_type, self.empty_data_id, run=self.output_run)
full_skeleton[dataset_key]["ref"] = ref
# Remove dataset nodes with no edges that are not global init
# outputs, which are generally overall-inputs whose original quanta
# end up skipped or with no work to do (we can't remove these along
# with the quanta because no quantum knows if its the only
# consumer).
full_skeleton.remove_orphan_datasets()
self._attach_datastore_records(full_skeleton)
# TODO initialize most metadata here instead of in ctrl_mpexec.
if metadata is None:
metadata = {}
return self._construct_quantum_graph(full_skeleton, metadata)

@abstractmethod
def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton:
Expand Down

0 comments on commit b64b5b1

Please sign in to comment.