diff --git a/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.pdf b/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.pdf index 173da41..444d31a 100644 Binary files a/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.pdf and b/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.pdf differ diff --git a/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.png b/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.png index 4651bbe..7a2d498 100644 Binary files a/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.png and b/doc/lsst.source.injection/_assets/DRP_with_injected_coadd.png differ diff --git a/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.pdf b/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.pdf index a71ecc0..38dc759 100644 Binary files a/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.pdf and b/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.pdf differ diff --git a/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.png b/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.png index f6253b3..29db525 100644 Binary files a/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.png and b/doc/lsst.source.injection/_assets/DRP_with_injected_exposure.png differ diff --git a/doc/lsst.source.injection/_assets/DRP_with_injected_visit.pdf b/doc/lsst.source.injection/_assets/DRP_with_injected_visit.pdf index 47fd428..2fb65f4 100644 Binary files a/doc/lsst.source.injection/_assets/DRP_with_injected_visit.pdf and b/doc/lsst.source.injection/_assets/DRP_with_injected_visit.pdf differ diff --git a/doc/lsst.source.injection/_assets/DRP_with_injected_visit.png b/doc/lsst.source.injection/_assets/DRP_with_injected_visit.png index 78e7c8c..7e0eb75 100644 Binary files a/doc/lsst.source.injection/_assets/DRP_with_injected_visit.png and b/doc/lsst.source.injection/_assets/DRP_with_injected_visit.png differ diff --git a/doc/lsst.source.injection/reference/11_make_injection_pipeline.rst b/doc/lsst.source.injection/reference/11_make_injection_pipeline.rst index 38a013f..ae2a8e4 100644 --- a/doc/lsst.source.injection/reference/11_make_injection_pipeline.rst +++ b/doc/lsst.source.injection/reference/11_make_injection_pipeline.rst @@ -15,43 +15,38 @@ The reference pipeline file must be a complete pipeline definition YAML file, ty Either the :doc:`make_injection_pipeline <../scripts/make_injection_pipeline>` command line script or the associated :py:func:`~lsst.source.injection.make_injection_pipeline` Python function may be used to generate a fully qualified injection pipeline. Examples on this page illustrate the use of both methods. -.. note:: - - Two legacy dynamic source injection pipelines are automatically generated inside the ``drp_pipe`` repository. - These pipelines are located in the ``$DRP_PIPE_DIR/pipelines/HSC`` directory, facilitating source injection data reductions for the Hyper Suprime-Cam RC2 and RC2 subset datasets: ``DRP-RC2+injected_deepCoadd.yaml`` and ``DRP-RC2_subset+injected_deepCoadd.yaml``, respectively. - As indicated by the appended name, synthetic sources are injected into the ``deepCoadd`` dataset type. - .. _lsst.source.injection-ref-make-stubs: Injection Pipeline Stubs ========================= A number of different source injection pipeline stubs have been constructed in the ``$SOURCE_INJECTION_DIR/pipelines`` directory. -Each of these pipeline stubs contain a single task that is used to inject sources into a particular dataset type. +Each of these pipeline stubs contains a single task that is used to inject sources into a particular dataset type. Although these injection pipeline YAML stubs can be used directly, it is recommended that the :doc:`make_injection_pipeline <../scripts/make_injection_pipeline>` command line script or the associated :py:func:`~lsst.source.injection.make_injection_pipeline` Python function be used to generate a complete source injection pipeline definition YAML file for subsequent use. -A complete injection pipeline definition file will contain the pipeline stub as a subtask alongside any additional tasks required to complete the source injection process. -Tasks from the reference pipeline may either be removed or have specific configuration overrides applied as necessary to support subsequent injected source image data reduction. +Tasks from a reference pipeline may either be removed or have specific configuration overrides applied as necessary to support subsequent injected source image data reduction. .. note:: - When using the above utilities to construct a fully qualified injection pipeline, any existing subsets will also be updated to include the injection task where appropriate. - Furthermore, a series of ``injected_*`` subsets will be constructed. - These ``injected_*`` subsets are copies of existent subsets, but with any tasks not directly impacted by source injection removed. + When using the above utilities to construct a fully qualified injection pipeline, only the immediate consuming tasks of the injected dataset type have their input connections modified to accept the injected (prefixed) dataset type. + All further downstream dataset type names are left unchanged, and any tasks consuming those downstream dataset types are also left unchanged, unless explicitly reconfigured by the user. - For example, if the ``inject_exposure.yaml`` pipeline stub is used to inject sources into a ``post_isr_image`` dataset type, the subset of the reference pipeline containing the ``isr`` task will be updated to also include the ``injectExposure`` task. + Any existing subsets containing the task that produces the dataset type being injected into will be updated to also include the injection task. + In addition, injected variants of existing subsets are constructed with the ``injected_`` prefix; these variants contain the injection task and all tasks downstream of the point of source injection. + This behavior can be disabled by passing ``--no-update-subsets`` on the command line, or setting ``update_subsets`` to ``False`` in Python. - This behavior can be disabled by passing the ``-e`` argument on the command line, or setting ``exclude_subsets`` to ``True`` in Python. - Additionally, a new subset, ``injected_[MY_SUBSET]``, will also be created containing all tasks from the ``[MY_SUBSET]`` subset but with the ``isr`` task removed (as sources will be injected after this task has run). + For example, if the ``inject_exposure.yaml`` pipeline stub is used to inject sources into a ``post_isr_image`` dataset type, the subset of the reference pipeline containing the ``isr`` task will be updated to also include the ``injectExposure`` task. .. note:: After a fully qualified injection pipeline has been generated, a check is performed to ensure that all reference :ref:`pipeline contracts ` (if any) are satisfied. Pipeline contracts are a means by which to ensure that certain configuration values are set in a predictable manner. - When generating an injection pipeline, it's possible that some of these contracts will become invalid. - For example, if a contract specifies that the dataset type produced by a task prior to source injection matches the dataset type consumed by a task after source injection, this contract may become invalid if the tasks downstream of source injection have been modified to instead consume the new source injected input. - The :doc:`make_injection_pipeline <../scripts/make_injection_pipeline>` command line script and the :py:func:`~lsst.source.injection.make_injection_pipeline` Python function will check for this and warn if any contracts are invalid. - Invalid contracts will be removed from the final output pipeline YAML. + + When generating an injection pipeline, it's possible that some contracts are invalidated. + For example, if a contract specifies that the dataset type produced by a task prior to source injection matches the dataset type consumed by a task after source injection, this contract may become invalid if the task after source injection has been modified to instead consume the new source injected input. + + The :doc:`make_injection_pipeline <../scripts/make_injection_pipeline>` command line script and the :py:func:`~lsst.source.injection.make_injection_pipeline` Python function will check for this and warn if any contracts are invalid when the pipeline is generated. + Invalid contracts will be removed from the final output pipeline YAML, with a warning. The table below lists the available pipeline YAML stubs inside the ``$SOURCE_INJECTION_DIR/pipelines`` directory and the dataset types they are designed to inject sources into: @@ -182,7 +177,7 @@ Visualize an Injection Pipeline Any pipeline YAML, including an injection pipeline, can be visualized to clarify exactly what the pipeline does. In this section we provide instructions for visualizing the ``DRP-injection.yaml`` pipeline generated in the above examples. Options for text-based outputs on the command line and rich rendered outputs are presented. -The tasks and dataset types printed below are accurate as of ``w_2025_37`` of the LSST Science Pipelines. +The tasks and dataset types printed below are accurate as of ``w_2026_17`` of the LSST Science Pipelines. .. tip:: @@ -232,70 +227,57 @@ returning: .. code-block:: shell - ○ flat: {detector, physical_filter} ExposureF - │ - ○ │ bfk: {detector} BrighterFatterKernel - │ │ - ○ │ │ camera: {instrument} Camera - │ │ │ - ○ │ │ │ crosstalk: {detector} CrosstalkCalib - │ │ │ │ - ○ │ │ │ │ cti: {detector} IsrCalib - │ │ │ │ │ - ◍ │ │ │ │ │ dark, bias: {detector} ExposureF - │ │ │ │ │ │ - ○ │ │ │ │ │ │ defects: {detector} Defects - │ │ │ │ │ │ │ - ○ │ │ │ │ │ │ │ linearizer: {detector} Linearizer - │ │ │ │ │ │ │ │ - ○ │ │ │ │ │ │ │ │ ptc: {detector} PhotonTransferCurveDataset - │ │ │ │ │ │ │ │ │ - ○ │ │ │ │ │ │ │ │ │ raw: {detector, exposure} Exposure - ╰─┴─┴─┴─┴─┴─┴─┴─┴─┤ - ■ isr: {detector, exposure} - ╭─┤ - ○ │ isrStatistics: {detector, exposure} StructuredDataDict - │ - ○ post_isr_image: {detector, exposure} Exposure - │ - ○ │ injection_catalog: {band, htm7} ArrowAstropy - │ │ - ○ │ │ visit_summary: {visit} ExposureCatalog - ╰─┴─┤ - ■ injectExposure: {detector, exposure} - ╭─┤ - ○ │ injected_post_isr_image_catalog: {detector, exposure}...[1] - │ - ○ injected_post_isr_image: {detector, exposure} Exposure - │ - ○ │ the_monster_20250219: {htm7} SimpleCatalog - ╰─┤ - ■ calibrateImage: {detector, visit} - ╭─┤ - ○ │ injected_preliminary_visit_image: {detector, visit} E...[2] - ╭─┤ - ○ │ injected_preliminary_visit_image_background: {detecto...[3] - ╭─┤ - ◍ │ injected_single_visit_star_footprints, injected_singl...[4] - ╭─┤ - ◍ │ injected_single_visit_star_unstandardized, injected_s...[5] - │ - ◍ injected_initial_photometry_match_detector, injected_...[6] - [1] - injected_post_isr_image_catalog: {detector, exposure} ArrowAstropy - [2] - injected_preliminary_visit_image: {detector, visit} ExposureF - [3] - injected_preliminary_visit_image_background: {detector, visit} Background - [4] - injected_single_visit_star_footprints, - injected_single_visit_psf_star_footprints: {detector, visit} SourceCatalog - [5] - injected_single_visit_star_unstandardized, injected_single_visit_psf_star: - {detector, visit} ArrowAstropy - [6] - injected_initial_photometry_match_detector, - injected_initial_astrometry_match_detector: {detector, visit} Catalog + ○ ptc: {detector} PhotonTransferCurveDataset + │ + ○ │ camera: {instrument} Camera + │ │ + ○ │ │ crosstalk: {detector} CrosstalkCalib + │ │ │ + ◍ │ │ │ dark, bias: {detector} ExposureF + │ │ │ │ + ○ │ │ │ │ defects: {detector} Defects + │ │ │ │ │ + ○ │ │ │ │ │ flat: {detector, physical_filter} ExposureF + │ │ │ │ │ │ + ◍ │ │ │ │ │ │ gain_correction, electroBfDistortionMatrix, cti: {detector} IsrCalib + │ │ │ │ │ │ │ + ○ │ │ │ │ │ │ │ linearizer: {detector} Linearizer + │ │ │ │ │ │ │ │ + ○ │ │ │ │ │ │ │ │ raw: {detector, exposure} Exposure + ╰─┴─┴─┴─┴─┴─┴─┴─┤ + ■ isr: {detector, exposure} + ╭─┤ + ○ │ isrStatistics: {detector, exposure} StructuredDataDict + │ + ○ post_isr_image: {detector, exposure} Exposure + │ + ○ │ injection_catalog: {band, htm7} ArrowAstropy + │ │ + ○ │ │ visit_summary: {visit} ExposureCatalog + ╰─┴─┤ + ■ injectExposure: {detector, exposure} + ╭─┤ + ○ │ injected_post_isr_image_catalog: {detector, exposure} ArrowAstropy + │ + ○ injected_post_isr_image: {detector, exposure} Exposure + │ + ○ │ astrometry_camera: {physical_filter} Camera + │ │ + ○ │ │ the_monster_20250219: {htm7} SimpleCatalog + ╰─┴─┤ + ■ calibrateImage: {detector, visit} + ╭─┤ + ○ │ preliminary_visit_image: {detector, visit} ExposureF + ╭─┤ + ○ │ preliminary_visit_image_background: {detector, visit} Background + ╭─┤ + ○ │ preliminary_visit_mask: {detector, visit} Mask + ╭─┤ + ◍ │ single_visit_star_footprints, single_visit_psf_star_footprints: {detector, visit} SourceCatalog + ╭─┤ + ◍ │ single_visit_star_unstandardized, single_visit_psf_star: {detector, visit} ArrowAstropy + │ + ◍ initial_photometry_match_detector, initial_astrometry_match_detector: {detector, visit} Catalog .. _lsst.source.injection-ref-make-visualize-render: diff --git a/doc/lsst.source.injection/reference/21_inject_synthetic_sources.rst b/doc/lsst.source.injection/reference/21_inject_synthetic_sources.rst index b8c96a1..9cc4674 100644 --- a/doc/lsst.source.injection/reference/21_inject_synthetic_sources.rst +++ b/doc/lsst.source.injection/reference/21_inject_synthetic_sources.rst @@ -75,7 +75,7 @@ The ``injectExposure`` task (:lsst-task:`~lsst.source.injection.ExposureInjectTa .. note:: Existing subsets will have injection variants denoted by the prefix ``injected_``. - These subsets only include the injection task (where relevant) and any tasks after the point of source injection. + These subsets include the injection task and any tasks after the point of source injection. The ``injected_`` subsets can save memory and runtime if the tasks prior to injection have already been run. The image plane of the ``injected_post_isr_image`` will be modified from the original by the addition of a light profile for every injected object. @@ -87,8 +87,7 @@ The variance plane gains additional estimated variance consistent with the amoun Setting ``inject_variance`` to ``False`` in the injection task config will prevent any changes to the variance plane. Not modifying the variance plane is likely to bias any downstream measurements and should normally never be done, unless such bias is the object of study. -Assuming processing completes successfully, the ``injected_post_isr_image`` and associated ``injected_post_isr_image_catalog`` will be written to the butler repository. -Various downstream data products should also exist, including the ``injected_preliminary_visit_image`` dataset type (see example images below). +Assuming processing completes successfully, the ``injected_post_isr_image`` and associated ``injected_post_isr_image_catalog`` will be written to the butler repository (see example images below). Standard log messages that get printed as part of a successful run may include lines similar to: @@ -112,8 +111,7 @@ An example injected output produced by the above snippet is shown below. .. - Calibrated exposure (``preliminary_visit_image`` and - ``injected_preliminary_visit_image``) data for LSSTCam visit 2025050300351, detector 94, showcasing the injection of a series of synthetic Sérsic sources. + Calibrated exposure data for LSSTCam visit 2025050300351, detector 94, showcasing the injection of a series of synthetic Sérsic sources before and after injection. Images are asinh scaled. .. list-table:: diff --git a/python/lsst/source/injection/bin/make_injection_pipeline.py b/python/lsst/source/injection/bin/make_injection_pipeline.py index 3591948..5969cb0 100644 --- a/python/lsst/source/injection/bin/make_injection_pipeline.py +++ b/python/lsst/source/injection/bin/make_injection_pipeline.py @@ -35,17 +35,14 @@ def build_argparser(): parser = ArgumentParser( description="""Make an expanded source injection pipeline. -This script takes a reference pipeline definition file in YAML format and -prefixes all post-injection dataset type names with the injected prefix. If an -optional injection pipeline definition YAML file is also provided, the -injection task will be merged into the pipeline. +This command takes a reference pipeline definition file in YAML format and +prefixes the input connections for all immediate consuming tasks of an injected +dataset with the injected prefix. If an optional injection pipeline definition +YAML file is also provided, the injection task will be merged into the pipeline. -Unless explicitly excluded, all subsets from the reference pipeline containing -the task which generates the injection dataset type will also be updated to -include the injection task. A series of new injection subsets will also be -constructed. These new subsets are copies of existent subsets, but with tasks -not directly impacted by source injection removed. Injected subsets will be the -original existent subset name with the 'injected_' prefix prepended. +By default, all subsets from the reference pipeline containing the task which +generates the injection dataset type will also be updated to include the +injection task. When the injection pipeline is constructed, a check on all existing pipeline contracts is performed. If any contracts are violated, they are removed from @@ -82,10 +79,10 @@ def build_argparser(): metavar="FILE", ) parser.add_argument( - "-e", - "--exclude-subsets", + "--no-update-subsets", help="Do not update pipeline subsets to include the injection task.", - action="store_true", + dest="update_subsets", + action="store_false", ) parser.add_argument( "-x", @@ -167,22 +164,20 @@ def main(): logger.setLevel(logging.DEBUG) args = build_argparser().parse_args() + kwargs = {k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]} + if hasattr(args, "filename"): if os.path.exists(args.filename): if not hasattr(args, "overwrite"): raise RuntimeError(f"File {args.filename} already exists; use --overwrite to write anyway.") else: logger.warning("File %s already exists; overwriting.", args.filename) - pipeline = make_injection_pipeline( - **{k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]} - ) + pipeline = make_injection_pipeline(**kwargs) pipeline.write_to_uri(args.filename) logger.info( "Modified pipeline definition YAML file saved at %s.", os.path.realpath(args.filename), ) else: - pipeline = make_injection_pipeline( - **{k: v for k, v in vars(args).items() if k not in ["filename", "overwrite"]} - ) + pipeline = make_injection_pipeline(**kwargs) print("\n", pipeline, sep="") diff --git a/python/lsst/source/injection/utils/_make_injection_pipeline.py b/python/lsst/source/injection/utils/_make_injection_pipeline.py index 67f3a65..3c2918b 100644 --- a/python/lsst/source/injection/utils/_make_injection_pipeline.py +++ b/python/lsst/source/injection/utils/_make_injection_pipeline.py @@ -23,14 +23,113 @@ __all__ = ["make_injection_pipeline"] -import itertools import logging +import warnings -from lsst.analysis.tools.interfaces import AnalysisPipelineTask -from lsst.pipe.base import LabelSpecifier, Pipeline +from lsst.pipe.base import LabelSpecifier, Pipeline, PipelineGraph from lsst.pipe.base.pipelineIR import ContractError +def _infer_injection_pipeline(dataset_type_name: str, logger: logging.Logger) -> str | None: + """Infer the injection pipeline from the dataset type name. + + Parameters + ---------- + dataset_type_name : `str` + Name of the dataset type being injected into. + logger : `~logging.Logger` + Logger for warning and info messages. + + Returns + ------- + injection_pipeline : `str` | `None` + Location of an injection pipeline definition YAML file stub, or None if + no suitable injection pipeline could be inferred. + """ + injection_pipeline = None + match dataset_type_name: + case "postISRCCD" | "post_isr_image": + injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_exposure.yaml" + case "icExp" | "calexp" | "initial_pvi" | "pvi" | "preliminary_visit_image" | "visit_image": + injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_visit.yaml" + case ( + "deepCoadd" + | "deepCoadd_calexp" + | "goodSeeingCoadd" + | "deep_coadd_predetection" + | "deep_coadd" + | "deep_coadd_cell_predetection" + | "template_coadd" + ): + injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_coadd.yaml" + case _: + # Print a warning rather than a raise, as the user may wish to + # edit connection names without merging an injection pipeline. + logger.warning( + "Unable to infer injection pipeline stub from dataset type name '%s' and none was " + "provided. No injection pipeline will be merged into the output pipeline.", + dataset_type_name, + ) + if injection_pipeline: + logger.info( + "Injected dataset type '%s' used to infer injection pipeline: %s", + dataset_type_name, + injection_pipeline, + ) + return injection_pipeline + + +def _merge_injection_pipeline( + pipeline: Pipeline, + injection_pipeline: Pipeline | str | None, + dataset_type_name: str, + prefix: str, +) -> None | str: + """Merge an injection pipeline into an existing pipeline. + + Parameters + ---------- + pipeline : `~lsst.pipe.base.Pipeline` + Pipeline to merge the injection pipeline into. + injection_pipeline : `~lsst.pipe.base.Pipeline` | `str` | `None` + Injection pipeline to merge, or location of an injection pipeline + definition YAML file stub. If None, no injection pipeline is merged. + dataset_type_name : `str` + Name of the dataset type being injected into. + prefix : `str` + Prefix to prepend to each affected post-injection dataset type name. + + Returns + ------- + injection_task_label : `str` | `None` + Label of the injection task, if an injection pipeline was merged, or + None if no injection pipeline was merged. + + Notes + ----- + This function modifies the input pipeline in place. + """ + if injection_pipeline is None: + return None + if isinstance(injection_pipeline, str): + injection_pipeline = Pipeline.fromFile(injection_pipeline) + if len(injection_pipeline) != 1: + raise RuntimeError( + f"The injection pipeline contains {len(injection_pipeline)} tasks; only 1 task is allowed." + ) + pipeline.mergePipeline(injection_pipeline) + + injection_task_label = next(iter(injection_pipeline.task_labels)) + pipeline.addConfigOverride(injection_task_label, "connections.input_exposure", dataset_type_name) + pipeline.addConfigOverride( + injection_task_label, "connections.output_exposure", prefix + dataset_type_name + ) + pipeline.addConfigOverride( + injection_task_label, "connections.output_catalog", prefix + dataset_type_name + "_catalog" + ) + return injection_task_label + + def _parse_config_override(config_override: str) -> tuple[str, str, str]: """Parse a config override string into a label, a key and a value. @@ -70,11 +169,415 @@ def _parse_config_override(config_override: str) -> tuple[str, str, str]: return label, key, value +def _configure_injection_pipeline( + pipeline: Pipeline, + config: str | list[str], + logger: logging.Logger, +) -> None: + """Apply user-supplied config overrides to the pipeline. + + Parameters + ---------- + pipeline : `~lsst.pipe.base.Pipeline` + Pipeline to apply config overrides to. Pipeline is modified in place. + config : `str` | `list` [`str`] + Config override(s) to apply, in the format 'label:key=value'. + logger : `~logging.Logger` + Logger for warning and info messages. + + Notes + ----- + This function modifies the input pipeline in place. + """ + if isinstance(config, str): + config = [config] + for conf in config: + config_label, config_key, config_value = _parse_config_override(conf) + try: + pipeline.addConfigOverride(config_label, config_key, config_value) + except LookupError: + logger.debug( + "Config override '%s' for label '%s' not found in the reference " + "pipeline, either due to a typo or the label not existing in " + "the reference pipeline.", + conf, + config_label, + ) + + +def _remove_excluded_tasks( + pipeline: Pipeline, + excluded_tasks: set[str] | str, + logger: logging.Logger, +) -> Pipeline: + """Remove excluded tasks from the pipeline and any subsets, + and remove any empty subsets. + + Parameters + ---------- + pipeline : `~lsst.pipe.base.Pipeline` + Pipeline to remove tasks from. This pipeline is modified in place. + excluded_tasks : `set` [`str`] | `str` + Task labels to exclude from the injection pipeline. + logger : `~logging.Logger` + Logger for warning and info messages. + + Returns + ------- + pipeline : `~lsst.pipe.base.Pipeline` + The input pipeline with excluded tasks and empty subsets removed. + """ + if isinstance(excluded_tasks, str): + excluded_tasks = set(excluded_tasks.split(",")) + all_tasks = set(pipeline.task_labels) + preserved_tasks = all_tasks - excluded_tasks + + preserved_task_labels = LabelSpecifier(labels=preserved_tasks) + # EDIT mode removes tasks from parent subsets but keeps the subset itself. + pipeline = pipeline.subsetFromLabels(preserved_task_labels, pipeline.PipelineSubsetCtrl.EDIT) + + if len(found_tasks := excluded_tasks & all_tasks) > 0: + grammar = "task" if len(found_tasks) == 1 else "tasks" + logger.info( + "%d %s excluded from the output pipeline: %s", + len(found_tasks), + grammar, + ", ".join(sorted(found_tasks)), + ) + + removed_subsets = set() + for subset_label, subset_tasks in pipeline.subsets.items(): + if not subset_tasks: + removed_subsets.add(subset_label) + pipeline.removeLabeledSubset(subset_label) + if (removed_subsets_count := len(removed_subsets)) > 0: + grammar = "subset" if removed_subsets_count == 1 else "subsets" + logger.warning( + "Removed %d empty %s from the pipeline: %s.", + removed_subsets_count, + grammar, + ", ".join(sorted(removed_subsets)), + ) + + return pipeline + + +def _get_pipeline_graph(pipeline: Pipeline, logger: logging.Logger) -> PipelineGraph: + """Get the pipeline graph, handling any contract errors. + + Pipeline contracts that are violated by any modifications made to the + pipeline will be removed, with a warning logged for each contract that's + removed. + + Parameters + ---------- + pipeline : `~lsst.pipe.base.Pipeline` + Pipeline to validate contracts for. + logger : `~logging.Logger` + Logger for warning and info messages. + + Returns + ------- + pipeline_graph : `~lsst.pipe.base.PipelineGraph` + The pipeline graph for the input pipeline, with any violated contracts + removed from the input pipeline. + + Notes + ----- + This function modifies the input pipeline in place, removing any violated + contracts. + """ + try: + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message=r".*formatted like a Pipeline parameter but was not found within the Pipeline.*", + category=UserWarning, + ) + pipeline_graph = pipeline.to_graph() + except ContractError: + contracts_initial = pipeline._pipelineIR.contracts + pipeline._pipelineIR.contracts = [] + contracts_passed = [] + contracts_failed = [] + + for contract in contracts_initial: + pipeline._pipelineIR.contracts = [contract] + try: + _ = pipeline.to_graph() + except ContractError: + contracts_failed.append(contract) + continue + contracts_passed.append(contract) + + pipeline._pipelineIR.contracts = contracts_passed + pipeline_graph = pipeline.to_graph() + + if contracts_failed: + logger.warning( + "The following contracts were violated and have been removed: \n%s", + "\n".join([str(contract) for contract in contracts_failed]), + ) + return pipeline_graph + + +def _collect_injected_task_labels( + pipeline_graph: PipelineGraph, + dataset_type_name: str, +) -> set[str]: + """Collect tasks downstream of the injection point. + + Parameters + ---------- + pipeline_graph : `~lsst.pipe.base.PipelineGraph` + Pipeline graph to inspect. + dataset_type_name : `str` + Name of the dataset type being injected into. + + Returns + ------- + injected_task_labels : `set` [`str`] + Labels of all tasks that consume the injected dataset type directly or + indirectly, including the injection task itself if present. + """ + injected_task_labels = set() + + dataset_type_frontier = {dataset_type_name} + seen_dataset_types = set(dataset_type_frontier) + + # Note: here we opt to walk the pipeline graph instead of using + # `pipeline_graph._xgraph.successors`. The `_xgraph` attribute is a private + # implementation detail and therefore not a guaranteed interface. + + while dataset_type_frontier: + next_frontier = set() + for current_dataset_type in dataset_type_frontier: + for task_node in pipeline_graph.consumers_of(current_dataset_type): + if task_node.label in injected_task_labels: + continue + + injected_task_labels.add(task_node.label) + + output_edges = task_node.iter_all_outputs() + + for edge in output_edges: + output_dataset_type = edge.parent_dataset_type_name + if output_dataset_type not in seen_dataset_types: + seen_dataset_types.add(output_dataset_type) + next_frontier.add(output_dataset_type) + dataset_type_frontier = next_frontier + + return injected_task_labels + + +def _add_injected_subsets( + pipeline: Pipeline, + injected_task_labels: set[str], + prefix: str, + logger: logging.Logger, +) -> int: + """Create injected variants of existing subsets. + + Parameters + ---------- + pipeline : `~lsst.pipe.base.Pipeline` + Pipeline to modify in place. + injected_task_labels : `set` [`str`] + Labels of tasks downstream of the injection point. + prefix : `str` + Prefix to prepend to the subset names. + logger : `~logging.Logger` + Logger for warning and info messages. + + Returns + ------- + subset_count : `int` + Number of injected subsets created. + """ + if not injected_task_labels: + return 0 + + injected_label_specifier = LabelSpecifier(labels=injected_task_labels) + injected_pipeline = pipeline.subsetFromLabels(injected_label_specifier, pipeline.PipelineSubsetCtrl.EDIT) + + injected_subset_labels = set() + for subset_label, subset_tasks in injected_pipeline.subsets.items(): + if not subset_tasks: + continue + injected_subset_label = prefix + subset_label + injected_subset_description = ( + f"All tasks from the '{subset_label}' subset impacted by source injection." + ) + pipeline.addLabeledSubset(injected_subset_label, injected_subset_description, subset_tasks) + injected_subset_labels.add(injected_subset_label) + + return len(injected_subset_labels) + + +def _reconfigure_injection_pipeline( + pipeline: Pipeline, + dataset_type_name: str, + prefix: str, + injection_task_label: str | None, + update_subsets: bool, + logger: logging.Logger, +) -> None: + """Reconfigure the injection pipeline by prefixing post-injection dataset + type names and updating subsets. + + Parameters + ---------- + pipeline : `~lsst.pipe.base.Pipeline` + Pipeline to configure. This pipeline is modified in place. + dataset_type_name : `str` + Name of the dataset type being injected into. + prefix : `str` + Prefix to prepend to each affected post-injection dataset type name. + injection_task_label : `str` | `None` + Label of the injection task. + update_subsets : `bool` + If True, update pipeline subsets to include the injection task. + logger : `~logging.Logger` + Logger for warning and info messages. + + Notes + ----- + This function modifies the input pipeline in place. + """ + # Use pipeline graph to determine tasks with connections to be modified + pipeline_graph = _get_pipeline_graph(pipeline, logger) + injected_task_labels = _collect_injected_task_labels(pipeline_graph, dataset_type_name) + post_injection_tasks = pipeline_graph.consumers_of(dataset_type_name) + if len(post_injection_tasks) == 0: + logger.warning( + "Dataset type '%s' not found in the reference pipeline; no input connection edits to be made.", + dataset_type_name, + ) + if post_injection_tasks: + post_injection_tasks = [task for task in post_injection_tasks if task.label != injection_task_label] + else: + post_injection_tasks = [] + + # Loop over each post injection task; prefix input connections only + for task_node in post_injection_tasks: + input_edges = task_node.iter_all_inputs() + + for edge in input_edges: + if hasattr(task_node.config.connections.ConnectionsClass, edge.connection_name): + if edge.parent_dataset_type_name == dataset_type_name: + pipeline.addConfigOverride( + task_node.label, + "connections." + edge.connection_name, + prefix + edge.dataset_type_name, + ) + + # Update subsets to include the injection task + if ( + update_subsets + and injection_task_label is not None + and (pre_injection_task := pipeline_graph.producer_of(dataset_type_name)) is not None + ): + precursor_subsets = pipeline.findSubsetsWithLabel(pre_injection_task.label) + for subset in precursor_subsets: + pipeline.addLabelToSubset(subset, injection_task_label) + + injected_subset_count = 0 + if update_subsets: + injected_subset_count = _add_injected_subsets(pipeline, injected_task_labels, prefix, logger) + + logger.info( + "Made an injection pipeline containing %d task%s and %d injected subset%s.", + len(pipeline), + "" if len(pipeline) == 1 else "s", + injected_subset_count, + "" if injected_subset_count == 1 else "s", + ) + + +def _add_additional_pipelines( + pipeline: Pipeline, + additional_pipelines: list[Pipeline] | list[str], + additional_subset: list[str] | str | None, + logger: logging.Logger, +) -> None: + """Add additional pipelines to the injection pipeline, and optionally add + all additional tasks to existing or new subsets. + + Parameters + ---------- + pipeline : `~lsst.pipe.base.Pipeline` + Pipeline to add additional pipelines to. Pipeline is modified in place. + additional_pipelines : `list` [`~lsst.pipe.base.Pipeline`] | `list` [`str`] + Additional pipelines to merge, or locations of additional pipeline + definition YAML file stubs. + additional_subset : `list` [`str`] | `str` | `None` + A list of subset definitions in the form + "subset_name[:subset_description]". + These subsets will be created if they don't already exist. All tasks + from the additional pipelines will be added to these subsets. + If None, additional tasks will not be added to any subsets. + logger : `~logging.Logger` + Logger for warning and info messages. + + Notes + ----- + This function modifies the input pipeline in place. + """ + # Merge all additional pipelines into the main pipeline + additional_tasks: set[str] = set() + for additional_pipeline in additional_pipelines: + if isinstance(additional_pipeline, str): + additional_pipeline = Pipeline.fromFile(additional_pipeline) + additional_tasks.update(additional_pipeline.task_labels) + pipeline.mergePipeline(additional_pipeline) + + # Add all tasks to subset_name; create the subset if it does not exist + subset_text = "" + if additional_subset is not None: + if not isinstance(additional_subset, list): + additional_subset = [additional_subset] + subset_names_old = [] + subset_names_new = [] + for subset in additional_subset: + # Parse the subset definition + if ":" in subset: + subset_name, subset_description = subset.split(":", 1) + else: + subset_name = subset + subset_description = "" + # Add or create the subset with all additional tasks + if subset_name in pipeline.subsets: + subset_names_old.append(subset_name) + for additional_task in additional_tasks: + pipeline.addLabelToSubset(subset_name, additional_task) + else: + subset_names_new.append(subset_name) + pipeline.addLabeledSubset(subset_name, subset_description, additional_tasks) + if subset_names_old: + subset_text += f", and to existing subset{'s' if len(subset_names_old) > 1 else ''} " + subset_text += f"{', '.join(sorted(subset_names_old))}" + if subset_names_new: + subset_text += f", and to new subset{'s' if len(subset_names_new) > 1 else ''} " + subset_text += f"{', '.join(sorted(subset_names_new))}" + + # Revalidate the pipeline graph + _ = _get_pipeline_graph(pipeline, logger) + + grammar = "task" if len(additional_tasks) == 1 else "tasks" + logger.info( + "Added %d %s to the pipeline%s: %s", + len(additional_tasks), + grammar, + subset_text, + ", ".join(sorted(additional_tasks)), + ) + + def make_injection_pipeline( dataset_type_name: str, reference_pipeline: Pipeline | str, injection_pipeline: Pipeline | str | None = None, - exclude_subsets: bool = False, + update_subsets: bool = True, excluded_tasks: set[str] | str = { "jointcal", "gbdesAstrometricFit", @@ -86,26 +589,22 @@ def make_injection_pipeline( instrument: str | None = None, config: str | list[str] | None = None, additional_pipelines: list[Pipeline] | list[str] | None = None, - additional_subset: list[str] | None = None, + additional_subset: list[str] | str | None = None, log_level: int = logging.INFO, ) -> Pipeline: """Make an expanded source injection pipeline. - This function takes a reference pipeline definition file in YAML format and - prefixes all post-injection dataset type names with the injected prefix. If + This function takes a reference pipeline definition file and prefixes all + immediately post-injection dataset type names with the injected prefix. If an optional injection pipeline definition YAML file is also provided, the injection task will be merged into the pipeline. - Unless explicitly excluded, all subsets from the reference pipeline - containing the task which generates the injection dataset type will also be - updated to include the injection task. A series of new injected subsets - will also be created. These new subsets are copies of existent subsets, but - containing only the tasks which are affected by source injection. New - injected subsets will be the original subset name with the prefix - 'injected_' prepended. + Unless subset updates are explicitly disabled, all subsets from the + reference pipeline containing the task which generates the injection + dataset type will also be updated to include the injection task. When the injection pipeline is constructed, a check on all existing - pipeline contracts is performed. If any contracts are violated, they are + pipeline contracts is performed. If any contracts are violated, they're removed from the pipeline. A warning is logged for each contract that is removed. @@ -119,21 +618,20 @@ def make_injection_pipeline( Location of an injection pipeline definition YAML file stub. If not provided, an attempt to infer the injection pipeline will be made based on the injected dataset type name. - exclude_subsets : `bool`, optional - If True, do not update pipeline subsets to include the injection task. + update_subsets : `bool`, optional + If True, update pipeline subsets to include the injection task. excluded_tasks : `set` [`str`] | `str` - Set or comma-separated string of task labels to exclude from the - injection pipeline. + Set of task labels to exclude, or a comma-separated string of labels. prefix : `str`, optional Prefix to prepend to each affected post-injection dataset type name. instrument : `str`, optional Add instrument overrides. Must be a fully qualified class name. config : `str` | `list` [`str`], optional Config override for a task, in the format 'label:key=value'. - additional_pipelines: `list`[Pipeline] | `list`[`str`] - Location(s) of additional input pipeline definition YAML file(s). - Tasks from these additional pipelines will be added to the output - injection pipeline. + additional_pipelines: `list`[Pipeline] | `list`[`str`], optional + Additional pipelines to merge into the output pipeline, or their YAML + file locations. Tasks from these additional pipelines will be added to + the output injection pipeline. additional_subset: `list`[`str`] | `str`, optional A list of subset definitions in the form "subset_name[:subset_description]". @@ -147,288 +645,43 @@ def make_injection_pipeline( pipeline : `lsst.pipe.base.Pipeline` An expanded source injection pipeline. """ - # Instantiate logger. logger = logging.getLogger(__name__) logger.setLevel(log_level) - retry_config_overrides = [] - # Load the pipeline and apply config overrides, if supplied. + # Get the main reference pipeline if isinstance(reference_pipeline, str): pipeline = Pipeline.fromFile(reference_pipeline) else: pipeline = reference_pipeline - if config: - if isinstance(config, str): - config = [config] - for conf in config: - config_label, config_key, config_value = _parse_config_override(conf) - try: - pipeline.addConfigOverride(config_label, config_key, config_value) - except LookupError: - logger.debug( - "Config override '%s' for label '%s' not found in the reference " - "pipeline, either due to a typo or the label not existing in " - "the reference pipeline. Retrying after the injection task is added.", - conf, - config_label, - ) - retry_config_overrides.append([config_label, config_key, config_value]) - - # Add an instrument override, if provided. + + # Add an instrument override if instrument: pipeline.addInstrument(instrument) - # Remove all tasks which are not to be included in the injection pipeline. - if isinstance(excluded_tasks, str): - excluded_tasks = set(excluded_tasks.split(",")) - all_tasks = set(pipeline.task_labels) - preserved_tasks = all_tasks - excluded_tasks - label_specifier = LabelSpecifier(labels=preserved_tasks) - # EDIT mode removes tasks from parent subsets but keeps the subset itself. - pipeline = pipeline.subsetFromLabels(label_specifier, pipeline.PipelineSubsetCtrl.EDIT) - if len(not_found_tasks := excluded_tasks - all_tasks) > 0: - grammar = "Task" if len(not_found_tasks) == 1 else "Tasks" - logger.warning( - "%s marked for exclusion not found in the reference pipeline: %s.", - grammar, - ", ".join(sorted(not_found_tasks)), - ) - - # Check for any empty subsets and remove them. - removed_subsets = set() - for subset_label, subset_tasks in pipeline.subsets.items(): - if not subset_tasks: - removed_subsets.add(subset_label) - pipeline.removeLabeledSubset(subset_label) - if (removed_subsets_count := len(removed_subsets)) > 0: - grammar = "subset" if removed_subsets_count == 1 else "subsets" - logger.warning( - "Removed %d empty %s from the pipeline: %s.", - removed_subsets_count, - grammar, - ", ".join(sorted(removed_subsets)), + # Infer the injection pipeline if not provided, and where possible + if not injection_pipeline: + injection_pipeline = _infer_injection_pipeline( + dataset_type_name, + logger, ) - # Determine the set of dataset type names affected by source injection. - injected_tasks = set() - all_connection_type_names = set() - injected_types = {dataset_type_name} - precursor_injection_task_labels = set() - # Loop over all tasks in the pipeline. - for task_node in pipeline.to_graph().tasks.values(): - # Add override for Analysis Tools task outputs (but not inputs). - # Connections in Analysis Tools are dynamically assigned, and so are - # not able to be modified in the same way as a static connection. - # Instead, we add an override to the connections.outputName field. - # This field is prepended to all Analysis Tools connections, and so - # will prepend the injection prefix to all plot/metric outputs. - if isAnalysisPipelineTask := issubclass(task_node.task_class, AnalysisPipelineTask): - pipeline.addConfigOverride( - task_node.label, - "connections.outputName", - prefix + task_node.config.connections.outputName, - ) + # Merge the injection pipeline into the main pipeline + injection_task_label = _merge_injection_pipeline(pipeline, injection_pipeline, dataset_type_name, prefix) - input_types = { - read_edge.parent_dataset_type_name - for read_edge in itertools.chain(task_node.inputs.values(), task_node.init.inputs.values()) - } - output_types = { - write_edge.parent_dataset_type_name - for write_edge in itertools.chain(task_node.outputs.values(), task_node.init.outputs.values()) - } - - all_connection_type_names |= input_types | output_types - # Identify the precursor task: allows appending inject task to subset. - if dataset_type_name in output_types: - precursor_injection_task_labels.add(task_node.label) - # If the task has any injected dataset type names as inputs, add the - # task to a set of tasks touched by injection, and add all of the - # outputs of this task to the set of injected types. - if len(input_types & injected_types) > 0: - injected_tasks |= {task_node.label} - injected_types |= output_types - # Add the injection prefix to all affected dataset type names. - for edge in itertools.chain( - task_node.init.inputs.values(), - task_node.inputs.values(), - task_node.init.outputs.values(), - task_node.outputs.values(), - ): - # Continue if this is an analysis task and edge is an output. - if isAnalysisPipelineTask and ( - edge in set(task_node.init.outputs.values()) | set(task_node.outputs.values()) - ): - continue - if hasattr(task_node.config.connections.ConnectionsClass, edge.connection_name): - # If the connection type is not dynamic, modify as usual. - if edge.parent_dataset_type_name in injected_types: - pipeline.addConfigOverride( - task_node.label, - "connections." + edge.connection_name, - prefix + edge.dataset_type_name, - ) - else: - # Add log warning if the connection type is dynamic. - logger.warning( - "Dynamic connection %s in task %s is not supported here. This connection will " - "neither be modified nor merged into the output injection pipeline.", - edge.connection_name, - task_node.label, - ) - # Raise if the injected dataset type does not exist in the pipeline. - if dataset_type_name not in all_connection_type_names: - raise RuntimeError( - f"Dataset type '{dataset_type_name}' not found in the reference pipeline; " - "no connection type edits to be made." - ) + # Apply all user-supplied config overrides + if config is not None: + _configure_injection_pipeline(pipeline, config, logger) - # Attempt to infer the injection pipeline from the dataset type name. - if not injection_pipeline: - match dataset_type_name: - case "postISRCCD" | "post_isr_image": - injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_exposure.yaml" - case "icExp" | "calexp" | "initial_pvi" | "pvi" | "preliminary_visit_image" | "visit_image": - injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_visit.yaml" - case ( - "deepCoadd" - | "deepCoadd_calexp" - | "goodSeeingCoadd" - | "deep_coadd_predetection" - | "deep_coadd" - | "template_coadd" - ): - injection_pipeline = "$SOURCE_INJECTION_DIR/pipelines/inject_coadd.yaml" - case _: - # Print a warning rather than a raise, as the user may wish to - # edit connection names without merging an injection pipeline. - logger.warning( - "Unable to infer injection pipeline stub from dataset type name '%s' and none was " - "provided. No injection pipeline will be merged into the output pipeline.", - dataset_type_name, - ) - if injection_pipeline: - logger.info( - "Injected dataset type '%s' used to infer injection pipeline: %s", - dataset_type_name, - injection_pipeline, - ) + # Remove excluded tasks from the pipeline, and remove any empty subsets + pipeline = _remove_excluded_tasks(pipeline, excluded_tasks, logger) - # Merge the injection pipeline to the modified pipeline, if provided. - if injection_pipeline: - if isinstance(injection_pipeline, str): - injection_pipeline = Pipeline.fromFile(injection_pipeline) - if len(injection_pipeline) != 1: - raise RuntimeError( - f"The injection pipeline contains {len(injection_pipeline)} tasks; only 1 task is allowed." - ) - pipeline.mergePipeline(injection_pipeline) - # Loop over all injection tasks and modify the connection names. - for injection_task_label in injection_pipeline.task_labels: - injected_tasks.add(injection_task_label) - pipeline.addConfigOverride(injection_task_label, "connections.input_exposure", dataset_type_name) - pipeline.addConfigOverride( - injection_task_label, "connections.output_exposure", prefix + dataset_type_name - ) - pipeline.addConfigOverride( - injection_task_label, "connections.output_catalog", prefix + dataset_type_name + "_catalog" - ) - # Optionally update subsets to include the injection task. - if not exclude_subsets: - for label in precursor_injection_task_labels: - precursor_subsets = pipeline.findSubsetsWithLabel(label) - for subset in precursor_subsets: - pipeline.addLabelToSubset(subset, injection_task_label) - if retry_config_overrides: - # Retry config overrides that were not found in the pipeline before - # the injection task was added. - for config_label, config_key, config_value in retry_config_overrides: - pipeline.addConfigOverride(config_label, config_key, config_value) - - # Create injected subsets. - injected_label_specifier = LabelSpecifier(labels=injected_tasks) - injected_pipeline = pipeline.subsetFromLabels(injected_label_specifier, pipeline.PipelineSubsetCtrl.EDIT) - injected_subset_labels = set() - for injected_subset in injected_pipeline.subsets.keys(): - injected_subset_label = "injected_" + injected_subset - injected_subset_description = ( - "All tasks from the '" + injected_subset + "' subset impacted by source injection." - ) - if len(injected_subset_tasks := injected_pipeline.subsets[injected_subset]) > 0: - injected_subset_labels |= {injected_subset_label} - pipeline.addLabeledSubset( - injected_subset_label, injected_subset_description, injected_subset_tasks - ) - - grammar1 = "task" if len(pipeline) == 1 else "tasks" - grammar2 = "subset" if len(injected_subset_labels) == 1 else "subsets" - logger.info( - "Made an injection pipeline containing %d %s and %d new injected %s.", - len(pipeline), - grammar1, - len(injected_subset_labels), - grammar2, + # Prefix post-injection dataset type name connections and update subsets + _reconfigure_injection_pipeline( + pipeline, dataset_type_name, prefix, injection_task_label, update_subsets, logger ) # Optionally include additional tasks in the injection pipeline. if additional_pipelines: - additional_tasks: set[str] = set() - # Record all input task labels and merge all input pipelines into the - # injection pipeline. - for additional_pipeline in additional_pipelines: - if isinstance(additional_pipeline, str): - additional_pipeline = Pipeline.fromFile(additional_pipeline) - additional_tasks.update(additional_pipeline.task_labels) - pipeline.mergePipeline(additional_pipeline) - - # Add all tasks to subset_name. If the subset does not exist create it. - if not isinstance(additional_subset, list) and additional_subset is not None: - additional_subset = [additional_subset] - for subset in additional_subset: - if ":" in subset: - subset_name, subset_description = subset.split(":", 1) - else: - subset_name = subset - subset_description = "" - - if subset_name in pipeline.subsets.keys(): - for additional_task in additional_tasks: - pipeline.addLabelToSubset(subset_name, additional_task) - subset_grammar = f"the existing subset {subset_name}" - else: - pipeline.addLabeledSubset(subset_name, subset_description, additional_tasks) - subset_grammar = f"a new subset {subset_name}" - - # Logging info. - task_grammar = "task" if len(additional_tasks) == 1 else "tasks" - logger.info( - "Added %d %s to %s", - len(additional_tasks), - task_grammar, - subset_grammar, - ) - - # Validate contracts, and remove any that are violated - try: - _ = pipeline.to_graph() - except ContractError: - contracts_initial = pipeline._pipelineIR.contracts - pipeline._pipelineIR.contracts = [] - contracts_passed = [] - contracts_failed = [] - for contract in contracts_initial: - pipeline._pipelineIR.contracts = [contract] - try: - _ = pipeline.to_graph() - except ContractError: - contracts_failed.append(contract) - continue - contracts_passed.append(contract) - pipeline._pipelineIR.contracts = contracts_passed - if contracts_failed: - logger.warning( - "The following contracts were violated and have been removed: \n%s", - "\n".join([str(contract) for contract in contracts_failed]), - ) + _add_additional_pipelines(pipeline, additional_pipelines, additional_subset, logger) return pipeline diff --git a/tests/test_utils.py b/tests/test_utils.py index 3099817..ff3cc6d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -120,12 +120,12 @@ def test_make_injection_pipeline(self): dataset_type_name="postISRCCD", # Unchanged to match task default reference_pipeline=self.reference_pipeline, injection_pipeline=injection_pipeline, - exclude_subsets=False, + update_subsets=True, excluded_tasks={"calibrate"}, prefix="injected_", instrument="lsst.obs.subaru.HyperSuprimeCam", additional_pipelines=[additional_pipeline], - additional_subset=["newSubset:newSubset description"], + additional_subset=["additional_subset:Additional subset description"], log_level=logging.DEBUG, ) @@ -137,7 +137,10 @@ def test_make_injection_pipeline(self): # Test that all surviving tasks are still in a subset. surviving_task_subsets = [merged_pipeline.findSubsetsWithLabel(x) for x in surviving_task_labels] self.assertEqual(sum(1 for s in surviving_task_subsets if s), len(surviving_task_labels)) - self.assertIn("newSubset", merged_pipeline.findSubsetsWithLabel("additional_task")) + self.assertIn("additional_subset", merged_pipeline.findSubsetsWithLabel("additional_task")) + self.assertNotIn("injected_test_subset", merged_pipeline.findSubsetsWithLabel("isr")) + self.assertIn("injected_test_subset", merged_pipeline.findSubsetsWithLabel("inject_exposure")) + self.assertIn("injected_test_subset", merged_pipeline.findSubsetsWithLabel("characterizeImage")) # Test that connection names have been properly configured. for t in merged_pipeline.to_graph().tasks.values(): @@ -149,9 +152,9 @@ def test_make_injection_pipeline(self): self.assertEqual(t.outputs["output_catalog"].dataset_type_name, "injected_postISRCCD_catalog") elif t.label == "characterizeImage": self.assertEqual(t.inputs["exposure"].dataset_type_name, "injected_postISRCCD") - self.assertEqual(t.outputs["characterized"].dataset_type_name, "injected_icExp") - self.assertEqual(t.outputs["backgroundModel"].dataset_type_name, "injected_icExpBackground") - self.assertEqual(t.outputs["sourceCat"].dataset_type_name, "injected_icSrc") + self.assertEqual(t.outputs["characterized"].dataset_type_name, "icExp") + self.assertEqual(t.outputs["backgroundModel"].dataset_type_name, "icExpBackground") + self.assertEqual(t.outputs["sourceCat"].dataset_type_name, "icSrc") def test_ingest_injection_catalog(self): input_dataset_refs = ingest_injection_catalog(