Skip to content

Commit

Permalink
Merge branch 'tickets/DM-35494'
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulPrice committed Jul 14, 2022
2 parents 269e72b + d6a2db6 commit 1ad71ff
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 6 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-35494.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Added support for transferring files into execution butler.
9 changes: 9 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import click
import lsst.daf.butler.cli.opt as dafButlerOpts
import lsst.pipe.base.cli.opt as pipeBaseOpts
from lsst.daf.butler.cli.opt import transfer_option
from lsst.daf.butler.cli.utils import OptionGroup, option_section, unwrap

from . import options as ctrlMpExecOpts
Expand Down Expand Up @@ -88,6 +89,14 @@ def __init__(self) -> None:
ctrlMpExecOpts.qgraph_dot_option(),
ctrlMpExecOpts.save_execution_butler_option(),
ctrlMpExecOpts.clobber_execution_butler_option(),
ctrlMpExecOpts.target_datastore_root_option(),
transfer_option(
help=unwrap(
"""Data transfer mode for the execution butler datastore.
Defaults to "copy" if --target-datastore-root is provided.
"""
),
),
ctrlMpExecOpts.dataset_query_constraint(),
ctrlMpExecOpts.qgraph_header_data_option(),
]
Expand Down
10 changes: 10 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,16 @@
),
is_flag=True,
)

target_datastore_root_option = MWOptionDecorator(
"--target-datastore-root",
help=unwrap(
"""Root directory for datastore of execution butler.
Default is to use the original datastore.
"""
),
)

dataset_query_constraint = MWOptionDecorator(
"--dataset-query-constraint",
help=unwrap(
Expand Down
10 changes: 10 additions & 0 deletions python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def qgraph( # type: ignore
show,
save_execution_butler,
clobber_execution_butler,
target_datastore_root,
transfer,
clobber_outputs,
dataset_query_constraint,
show_qgraph_header=False,
Expand Down Expand Up @@ -135,6 +137,12 @@ def qgraph( # type: ignore
QuantumGraph.
clobber_execution_butler : `bool`
It True overwrite existing execution butler files if present.
target_datastore_root : `str` or `None`
URI location for the execution butler's datastore.
transfer : `str` or `None`
Transfer mode for execution butler creation. This should be a
``transfer`` string recognized by
:func:`lsst.resources.ResourcePath.transfer_from`.
clobber_outputs : `bool`
Remove outputs from previous execution of the same quantum before new
execution. If ``skip_existing`` is also passed, then only failed
Expand Down Expand Up @@ -178,6 +186,8 @@ def qgraph( # type: ignore
skip_existing=skip_existing,
execution_butler_location=save_execution_butler,
clobber_execution_butler=clobber_execution_butler,
target_datastore_root=target_datastore_root,
transfer=transfer,
clobber_outputs=clobber_outputs,
dataset_query_constraint=dataset_query_constraint,
show_qgraph_header=show_qgraph_header,
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,8 @@ def builderShim(butler: Butler) -> Butler:
butlerModifier=builderShim,
collections=all_inputs,
clobber=args.clobber_execution_butler,
datastoreRoot=args.target_datastore_root,
transfer=args.transfer,
)

return qgraph
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/ctrl/mpexec/mock_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _checkMembership(self, ref: Union[List[DatasetRef], DatasetRef], inout: set)

class MockPipelineTaskConfig(PipelineTaskConfig, pipelineConnections=PipelineTaskConnections):

failCondition = Field(
failCondition: Field[str] = Field(
dtype=str,
default="",
doc=(
Expand All @@ -127,7 +127,7 @@ class MockPipelineTaskConfig(PipelineTaskConfig, pipelineConnections=PipelineTas
),
)

failException = Field(
failException: Field[str] = Field(
dtype=str,
default="builtins.ValueError",
doc=(
Expand Down Expand Up @@ -156,7 +156,7 @@ class MockPipelineTask(PipelineTask):

ConfigClass = MockPipelineTaskConfig

def __init__(self, *, config: Optional[PipelineTaskConfig] = None, **kwargs: Any):
def __init__(self, *, config: Optional[MockPipelineTaskConfig] = None, **kwargs: Any):
super().__init__(config=config, **kwargs)

self.failException: Optional[type] = None
Expand Down
5 changes: 2 additions & 3 deletions python/lsst/ctrl/mpexec/taskFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@

if TYPE_CHECKING:
from lsst.daf.butler import Butler
from lsst.pex.config import Config
from lsst.pipe.base import PipelineTask
from lsst.pipe.base import PipelineTask, PipelineTaskConfig
from lsst.pipe.base.configOverrides import ConfigOverrides

_LOG = logging.getLogger(__name__)
Expand All @@ -45,7 +44,7 @@ def makeTask(
self,
taskClass: type[PipelineTask],
label: Optional[str],
config: Optional[Config],
config: Optional[PipelineTaskConfig],
overrides: Optional[ConfigOverrides],
butler: Optional[Butler],
) -> PipelineTask:
Expand Down

0 comments on commit 1ad71ff

Please sign in to comment.