Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-41978: Switch logic used to exclude tasks #25

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 9 additions & 19 deletions python/lsst/source/injection/utils/make_injection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import logging

from lsst.analysis.tools.interfaces import AnalysisPipelineTask
from lsst.pipe.base import Pipeline
from lsst.pipe.base import LabelSpecifier, Pipeline


def _get_dataset_type_names(conns, fields):
Expand Down Expand Up @@ -155,27 +155,17 @@ def make_injection_pipeline(
# Remove all tasks which are not to be included in the injection pipeline.
if isinstance(excluded_tasks, str):
excluded_tasks = set(excluded_tasks.split(","))
not_excluded_tasks = set()
for task_label in excluded_tasks:
# First remove tasks from their host subsets, if present.
try:
host_subsets = pipeline.findSubsetsWithLabel(task_label)
except ValueError:
pass
else:
for host_subset in host_subsets:
pipeline.removeLabelFromSubset(host_subset, task_label)
# Then remove the task from the pipeline.
try:
pipeline.removeTask(task_label)
except KeyError:
not_excluded_tasks.add(task_label)
if len(not_excluded_tasks) > 0:
grammar = "Task" if len(not_excluded_tasks) == 1 else "Tasks"
all_tasks = {taskDef.label for taskDef in pipeline.toExpandedPipeline()}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be toExpandedPipeline? (A question for @natelust)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context: here we're just looping over all tasks in the pipeline to get a set of task names. No actual task/config manipulation is performed in this block.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's never quite clear for me when I need to expand my pipeline, and when I can get away with using the base pipeline object. I.e., I want to be sure that I'm including all the tasks from imported pipeline YAMLs as well.

preserved_tasks = all_tasks - excluded_tasks
label_specifier = LabelSpecifier(labels=preserved_tasks)
# EDIT mode removes tasks from parent subsets but keeps the subset itself.
pipeline = pipeline.subsetFromLabels(label_specifier, pipeline.PipelineSubsetCtrl.EDIT)
if len(not_found_tasks := excluded_tasks - all_tasks) > 0:
grammar = "Task" if len(not_found_tasks) == 1 else "Tasks"
logger.warning(
"%s marked for exclusion not found in the reference pipeline: %s.",
grammar,
", ".join(sorted(not_excluded_tasks)),
", ".join(sorted(not_found_tasks)),
)

# Determine the set of dataset type names affected by source injection.
Expand Down