From b6c24fb3efd46e71be0420bba35860279f55b91b Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Feb 2023 17:48:49 +0900 Subject: [PATCH 1/3] deps: bump dvc-task to >=0.2.0 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 19297cc5dc..39d6d930d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "dvc-http", "dvc-render>=0.1.2", "dvc-studio-client>=0.1.1", - "dvc-task>=0.1.11,<1", + "dvc-task>=0.2.0,<1", "dvclive>=1.2.2", "flatten_dict<1,>=0.4.1", "flufl.lock>=5", From 7690789ca072b9e48b1d3e8597dd94df2e4387fb Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Feb 2023 17:49:25 +0900 Subject: [PATCH 2/3] exp: auto cleanup celery on queue worker exit --- dvc/commands/experiments/queue_worker.py | 9 ++++++++- dvc/repo/experiments/queue/celery.py | 5 ++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/dvc/commands/experiments/queue_worker.py b/dvc/commands/experiments/queue_worker.py index 81b6372d9c..b85ff1d7a6 100644 --- a/dvc/commands/experiments/queue_worker.py +++ b/dvc/commands/experiments/queue_worker.py @@ -9,7 +9,9 @@ class CmdQueueWorker(CmdBase): """Run the exp queue worker.""" def run(self): - self.repo.experiments.celery_queue.worker.start(self.args.name) + self.repo.experiments.celery_queue.worker.start( + self.args.name, fsapp_clean=self.args.clean + ) return 0 @@ -22,4 +24,9 @@ def add_parser(experiments_subparsers, parent_parser): add_help=False, ) parser.add_argument("name", help="Celery worker name.") + parser.add_argument( + "--clean", + action="store_true", + help="Automatically cleanup celery broker on shutdown.", + ) parser.set_defaults(func=CmdQueueWorker) diff --git a/dvc/repo/experiments/queue/celery.py b/dvc/repo/experiments/queue/celery.py index e85b1cf144..8d5364429d 100644 --- a/dvc/repo/experiments/queue/celery.py +++ b/dvc/repo/experiments/queue/celery.py @@ -90,7 +90,7 @@ def celery(self) -> "FSApp": "dvc_task.proc.tasks", ], ) - app.conf.update({"task_acks_late": True}) + app.conf.update({"task_acks_late": True, "result_expires": None}) return app @cached_property @@ -138,6 +138,9 @@ def _spawn_worker(self, num: int = 1): wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[:6] node_name = f"dvc-exp-{wdir_hash}-{num}@localhost" cmd = ["exp", "queue-worker", node_name] + if num == 1: + # automatically run celery cleanup when primary worker shuts down + cmd.append("--clean") name = f"dvc-exp-worker-{num}" logger.debug("start a new worker: %s, node: %s", name, node_name) From e5edf0cb0c961fcf4b1b42e88439c92857f5b279 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Tue, 28 Feb 2023 18:05:02 +0900 Subject: [PATCH 3/3] exp: add `dvc exp clean` command --- dvc/commands/experiments/__init__.py | 2 ++ dvc/commands/experiments/clean.py | 25 ++++++++++++++++++++++ dvc/repo/experiments/__init__.py | 5 +++++ dvc/repo/experiments/clean.py | 12 +++++++++++ tests/func/experiments/test_experiments.py | 6 ++++++ tests/unit/command/test_experiments.py | 13 +++++++++++ 6 files changed, 63 insertions(+) create mode 100644 dvc/commands/experiments/clean.py create mode 100644 dvc/repo/experiments/clean.py diff --git a/dvc/commands/experiments/__init__.py b/dvc/commands/experiments/__init__.py index 7546b0ae74..d7e77d81e3 100644 --- a/dvc/commands/experiments/__init__.py +++ b/dvc/commands/experiments/__init__.py @@ -4,6 +4,7 @@ from dvc.commands.experiments import ( apply, branch, + clean, diff, exec_run, gc, @@ -21,6 +22,7 @@ SUB_COMMANDS = [ apply, branch, + clean, diff, exec_run, gc, diff --git a/dvc/commands/experiments/clean.py b/dvc/commands/experiments/clean.py new file mode 100644 index 0000000000..890eb34f3d --- /dev/null +++ b/dvc/commands/experiments/clean.py @@ -0,0 +1,25 @@ +import argparse +import logging + +from dvc.cli.command import CmdBase +from dvc.cli.utils import append_doc_link + +logger = logging.getLogger(__name__) + + +class CmdExperimentsClean(CmdBase): + def run(self): + self.repo.experiments.clean() + return 0 + + +def add_parser(experiments_subparsers, parent_parser): + EXPERIMENTS_CLEAN_HELP = "Cleanup dvc exp internal tempfiles." + experiments_clean_parser = experiments_subparsers.add_parser( + "clean", + parents=[parent_parser], + description=append_doc_link(EXPERIMENTS_CLEAN_HELP, "exp/clean"), + help=EXPERIMENTS_CLEAN_HELP, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + experiments_clean_parser.set_defaults(func=CmdExperimentsClean) diff --git a/dvc/repo/experiments/__init__.py b/dvc/repo/experiments/__init__.py index c4c74d95aa..d731086daf 100644 --- a/dvc/repo/experiments/__init__.py +++ b/dvc/repo/experiments/__init__.py @@ -494,3 +494,8 @@ def remove(self, *args, **kwargs): from dvc.repo.experiments.remove import remove return remove(self.repo, *args, **kwargs) + + def clean(self, *args, **kwargs): + from dvc.repo.experiments.clean import clean + + return clean(self.repo, *args, **kwargs) diff --git a/dvc/repo/experiments/clean.py b/dvc/repo/experiments/clean.py new file mode 100644 index 0000000000..653f2e2f39 --- /dev/null +++ b/dvc/repo/experiments/clean.py @@ -0,0 +1,12 @@ +from typing import TYPE_CHECKING + +from dvc.ui import ui + +if TYPE_CHECKING: + from dvc.repo import Repo + + +def clean(repo: "Repo"): + ui.write("Cleaning up dvc-task messages...") + repo.experiments.celery_queue.celery.clean() + ui.write("Done!") diff --git a/tests/func/experiments/test_experiments.py b/tests/func/experiments/test_experiments.py index fc340e0053..e9d5f83c9c 100644 --- a/tests/func/experiments/test_experiments.py +++ b/tests/func/experiments/test_experiments.py @@ -633,3 +633,9 @@ def test_experiment_run_dry(tmp_dir, scm, dvc, exp_stage): dvc.experiments.run(exp_stage.addressing, dry=True) assert len(dvc.experiments.ls()["master"]) == 0 + + +def test_clean(tmp_dir, scm, dvc, mocker): + clean = mocker.spy(dvc.experiments.celery_queue.celery, "clean") + dvc.experiments.clean() + clean.assert_called_once_with() diff --git a/tests/unit/command/test_experiments.py b/tests/unit/command/test_experiments.py index be667c5fd5..8fdbb31096 100644 --- a/tests/unit/command/test_experiments.py +++ b/tests/unit/command/test_experiments.py @@ -8,6 +8,7 @@ from dvc.cli import DvcParserError, parse_args from dvc.commands.experiments.apply import CmdExperimentsApply from dvc.commands.experiments.branch import CmdExperimentsBranch +from dvc.commands.experiments.clean import CmdExperimentsClean from dvc.commands.experiments.diff import CmdExperimentsDiff from dvc.commands.experiments.gc import CmdExperimentsGC from dvc.commands.experiments.init import CmdExperimentsInit @@ -926,3 +927,15 @@ def test_experiments_save(dvc, scm, mocker): m.assert_called_once_with( cmd.repo, name="exp-name", force=True, include_untracked=[] ) + + +def test_experiments_clean(dvc, scm, mocker): + cli_args = parse_args(["experiments", "clean"]) + assert cli_args.func == CmdExperimentsClean + + cmd = cli_args.func(cli_args) + m = mocker.patch("dvc.repo.experiments.clean.clean", return_value={}) + + assert cmd.run() == 0 + + m.assert_called_once_with(cmd.repo)