Skip to content

Commit

Permalink
Create injected subsets
Browse files Browse the repository at this point in the history
  • Loading branch information
leeskelvin committed Dec 20, 2023
1 parent 6eba1e9 commit 7570323
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ Tasks from the reference pipeline may either be removed or have specific configu

.. note::

When using the above utilities to construct a fully qualified injection pipeline, any subsets will also be updated to include the injection task where appropriate.
When using the above utilities to construct a fully qualified injection pipeline, any existing subsets will also be updated to include the injection task where appropriate.
Furthermore, a series of ``injected_*`` subsets will be constructed.
These ``injected_*`` subsets are copies of existent subsets, but with any tasks not directly impacted by source injection removed.

For example, if the ``inject_exposure.yaml`` pipeline stub is used to inject sources into a ``postISRCCD`` dataset type, the ``step1`` subset of the reference pipeline will be updated to also include the ``inject_exposure`` task.
This behavior can be disabled by passing the ``-e`` argument on the command line, or setting ``exclude_subsets`` to ``True`` in Python.
Additionally, a new subset, ``injected_step1``, will also be created containing all tasks from the ``step1`` subset but with the ``isr`` task removed (as sources will be injected after this task has run).

The table below lists the available pipeline YAML stubs inside the ``$SOURCE_INJECTION_DIR/pipelines`` directory and the dataset types they are designed to inject sources into:

Expand Down
45 changes: 35 additions & 10 deletions python/lsst/source/injection/utils/make_injection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def make_injection_pipeline(
)

# Determine the set of dataset type names affected by source injection.
injected_tasks = set()
all_connection_type_names = set()
injected_types = {dataset_type_name}
precursor_injection_task_labels = set()
Expand All @@ -194,9 +195,11 @@ def make_injection_pipeline(
# Identify the precursor task: allows appending inject task to subset.
if dataset_type_name in output_types:
precursor_injection_task_labels.add(taskDef.label)
# If the task has any injected dataset type names as inputs, add all of
# its outputs to the set of injected types.
# If the task has any injected dataset type names as inputs, add the
# task to a set of tasks touched by injection, and add all of the
# outputs of this task to the set of injected types.
if len(input_types & injected_types) > 0:
injected_tasks |= {taskDef.label}
injected_types |= output_types
# Add the injection prefix to all affected dataset type names.
for field in conns.initInputs | conns.inputs | conns.initOutputs | conns.outputs:
Expand Down Expand Up @@ -246,16 +249,15 @@ def make_injection_pipeline(
# Merge the injection pipeline to the modified pipeline, if provided.
if injection_pipeline:
if isinstance(injection_pipeline, str):
pipeline2 = Pipeline.fromFile(injection_pipeline)
else:
pipeline2 = injection_pipeline
if len(pipeline2) != 1:
injection_pipeline = Pipeline.fromFile(injection_pipeline)
if len(injection_pipeline) != 1:
raise RuntimeError(
f"The injection pipeline contains {len(pipeline2)} tasks; only one task is allowed."
f"The injection pipeline contains {len(injection_pipeline)} tasks; only 1 task is allowed."
)
pipeline.mergePipeline(pipeline2)
pipeline.mergePipeline(injection_pipeline)
# Loop over all injection tasks and modify the connection names.
for injection_taskDef in pipeline2.toExpandedPipeline():
for injection_taskDef in injection_pipeline.toExpandedPipeline():
injected_tasks |= {injection_taskDef.label}
conns = injection_taskDef.connections
pipeline.addConfigOverride(
injection_taskDef.label, "connections.input_exposure", dataset_type_name
Expand All @@ -270,5 +272,28 @@ def make_injection_pipeline(
for subset in precursor_subsets:
pipeline.addLabelToSubset(subset, injection_taskDef.label)

logger.info("Made an injection pipeline containing %d tasks.", len(pipeline))
# Create injected subsets.
injected_label_specifier = LabelSpecifier(labels=injected_tasks)
injected_pipeline = pipeline.subsetFromLabels(injected_label_specifier, pipeline.PipelineSubsetCtrl.EDIT)
injected_subset_labels = set()
for injected_subset in injected_pipeline.subsets.keys():
injected_subset_label = "injected_" + injected_subset
injected_subset_description = (
"All tasks from the '" + injected_subset + "' subset impacted by source injection."
)
if len(injected_subset_tasks := injected_pipeline.subsets[injected_subset]) > 0:
injected_subset_labels |= {injected_subset_label}
pipeline.addLabeledSubset(
injected_subset_label, injected_subset_description, injected_subset_tasks
)

grammar1 = "task" if len(pipeline) == 1 else "tasks"
grammar2 = "subset" if len(injected_subset_labels) == 1 else "subsets"
logger.info(
"Made an injection pipeline containing %d %s and %d new injected %s.",
len(pipeline),
grammar1,
len(injected_subset_labels),
grammar2,
)
return pipeline

0 comments on commit 7570323

Please sign in to comment.