Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-33634: Add pipetask purge and cleanup subcommands #178

Merged
merged 3 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-33634.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add "pipetask" CLI commands "purge" and "cleanup".
4 changes: 2 additions & 2 deletions python/lsst/ctrl/mpexec/cli/cmd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

__all__ = ["build", "qgraph", "run"]
__all__ = ["build", "cleanup", "purge", "qgraph", "run"]


from .commands import build, qgraph, run
from .commands import build, cleanup, purge, qgraph, run
41 changes: 40 additions & 1 deletion python/lsst/ctrl/mpexec/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from functools import partial

import click
import lsst.pipe.base.cli.opt as pipeBaseOpts
from lsst.daf.butler.cli.opt import config_file_option, config_option, options_file_option
from lsst.daf.butler.cli.opt import config_file_option, config_option, confirm_option, options_file_option
from lsst.daf.butler.cli.utils import MWCtxObj, catch_and_exit, option_section, unwrap

from .. import opt as ctrlMpExecOpts
from .. import script
from ..script import confirmable
from ..utils import _ACTION_CONFIG, _ACTION_CONFIG_FILE, PipetaskCommand, makePipelineActions

epilog = unwrap(
Expand Down Expand Up @@ -125,3 +128,39 @@ def run(ctx, **kwargs):
pipeline = script.build(**kwargs)
qgraph = script.qgraph(pipelineObj=pipeline, **kwargs)
script.run(qgraphObj=qgraph, **kwargs)


@click.command(cls=PipetaskCommand)
@ctrlMpExecOpts.butler_config_option()
@ctrlMpExecOpts.collection_argument()
@confirm_option()
@ctrlMpExecOpts.recursive_option(
help="""If the parent CHAINED collection has child CHAINED collections,
search the children until nested chains that start with the parent's name
are removed."""
)
def purge(confirm, **kwargs):
"""Remove a CHAINED collection and its contained collections.

COLLECTION is the name of the chained collection to purge. it must not be a
child of any other CHAINED collections

Child collections must be members of exactly one collection.

The collections that will be removed will be printed, there will be an
option to continue or abort (unless using --no-confirm).
"""
confirmable.confirm(partial(script.purge, **kwargs), confirm)


@click.command(cls=PipetaskCommand)
@ctrlMpExecOpts.butler_config_option()
@ctrlMpExecOpts.collection_argument()
@confirm_option()
def cleanup(confirm, **kwargs):
"""Remove non-members of CHAINED collections.

Removes collections that start with the same name as a CHAINED
collection but are not members of that collection.
"""
confirmable.confirm(partial(script.cleanup, **kwargs), confirm)
1 change: 1 addition & 0 deletions python/lsst/ctrl/mpexec/cli/opt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.


from .arguments import *
from .optionGroups import *
from .options import *
25 changes: 25 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# This file is part of ctrl_mpexec.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


from lsst.daf.butler.cli.utils import MWArgumentDecorator

collection_argument = MWArgumentDecorator("collection")
6 changes: 6 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,3 +425,9 @@
),
type=MWPath(dir_okay=False, file_okay=True, writable=True),
)


recursive_option = MWOptionDecorator(
"--recursive",
is_flag=True,
)
2 changes: 2 additions & 0 deletions python/lsst/ctrl/mpexec/cli/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@


from .build import build
from .cleanup import cleanup
from .purge import PurgeResult, purge
from .qgraph import qgraph
from .run import run
118 changes: 118 additions & 0 deletions python/lsst/ctrl/mpexec/cli/script/cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# This file is part of ctrl_mpexec.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


import re

from lsst.daf.butler import Butler, CollectionType
from lsst.daf.butler.registry import CollectionTypeError, MissingCollectionError

from .confirmable import ConfirmableResult


class NoSuchCollectionFailure:
def __init__(self, collection):
self.collection = collection

def __str__(self):
return f'Did not find a collection named "{self.collection}"'


class NotChainedCollectionFailure:
def __init__(self, collection, type):
self.collection = collection
self.type = type

def __str__(self):
return f'COLLETION must be a CHAINED collection, "{self.collection}" is a "{self.type}" collection.'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: COLLETION



class CleanupResult(ConfirmableResult):
def __init__(self, butler_config):
self.butler_config = butler_config
self.runs_to_remove = []
self.others_to_remove = []
self.failure = None

def describe(self, will: bool) -> str:
if self.can_continue:
msg = "Will remove:" if will else "Removed:"
msg += "\n"
msg += f" runs: {', '.join(self.runs_to_remove)}\n"
msg += f" others: {', '.join(self.others_to_remove)}"
else:
msg = "Did not find any collections to remove."
return msg

def on_confirmation(self) -> None:
butler = Butler(self.butler_config, writeable=True)
for collection in self.others_to_remove:
butler.registry.removeCollection(collection)
butler.removeRuns(self.runs_to_remove)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth wrapping this line and the for loop above in with butler.transaction() so it's all atomic.


@property
def failed(self) -> bool:
return self.failure is not None

@property
def describe_failure(self) -> str:
return str(self.failure)

@property
def can_continue(self) -> bool:
return self.runs_to_remove or self.others_to_remove


def cleanup(
butler_config: str,
collection: str,
):
"""Remove collections that start with the same name as a CHAINED
collection but are not members of that collection.

Parameters
----------
butler_config : str
The path location of the gen3 butler/registry config file.
collection : str
TODO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring.

"""
butler = Butler(butler_config)
result = CleanupResult(butler_config)
try:
to_keep = set(butler.registry.getCollectionChain(collection))
except MissingCollectionError:
result.failure = NoSuchCollectionFailure(collection)
return result
except CollectionTypeError:
result.failure = NotChainedCollectionFailure(
collection, butler.registry.getCollectionType(collection).name
)
return result
regex = re.compile(collection + ".+")
to_consider = set(butler.registry.queryCollections(regex))
to_remove = to_consider - to_keep
for r in to_remove:
if butler.registry.getCollectionType(r) == CollectionType.RUN:
result.runs_to_remove.append(r)
else:
result.others_to_remove.append(r)
return result
93 changes: 93 additions & 0 deletions python/lsst/ctrl/mpexec/cli/script/confirmable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# This file is part of ctrl_mpexec.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


from abc import ABC, abstractmethod, abstractproperty
from typing import Callable

import click


class ConfirmableResult(ABC):

"""Interface for a results class that can be used by the `confirm`
function."""

@abstractmethod
def describe(self, will: bool) -> str:
"""Get a message describing what will be or was done. This is printed
just before "Continue?" on the CLI, if confirming, or just before
"Done." if confirmation is being skipped.

Parameters
----------
will : bool
True if confirmation is being requested, False if --no-confirm was
used, and the action is completed.
"""
pass

@abstractmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe

@property
@abstractmethod

is now preferred.

def on_confirmation(self) -> None:
"""Performs the action that was returned from `describe`. This is
Called just after the user has confirmed (if needed)."""
pass

@abstractproperty
def failed(self) -> bool:
"""Query if there was a failure preparing the ConfirmableResult,
before `on_confirmation` is called."""
pass

@abstractproperty
def describe_failure(self) -> str:
"""Get a message describing the failure. This is used as the message
when raising a `ClickException` to stop with exit code 1."""
pass

@abstractproperty
def can_continue(self) -> bool:
"""Query if the ConfirmableResult can continue. Returns `False` if
there is no work to be done."""
pass


def confirm(script_func: Callable[[], None], confirm: bool) -> ConfirmableResult:
result = script_func()
if result.failed:
raise click.ClickException(result.describe_failure)
if not result.can_continue:
print(result.describe(will=True))
return
if confirm:
print(result.describe(will=True))
if result.can_continue:
do_continue = click.confirm("Continue?", default=False)
else:
do_continue = True
if not do_continue:
print("Aborted.")
else:
result.on_confirmation()
if not confirm:
print(result.describe(will=False))
else:
print("Done.")