Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dvc/commands/experiments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dvc.commands.experiments import (
apply,
branch,
clean,
diff,
exec_run,
gc,
Expand All @@ -21,6 +22,7 @@
SUB_COMMANDS = [
apply,
branch,
clean,
diff,
exec_run,
gc,
Expand Down
25 changes: 25 additions & 0 deletions dvc/commands/experiments/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import argparse
import logging

from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link

logger = logging.getLogger(__name__)


class CmdExperimentsClean(CmdBase):
def run(self):
self.repo.experiments.clean()
return 0


def add_parser(experiments_subparsers, parent_parser):
EXPERIMENTS_CLEAN_HELP = "Cleanup dvc exp internal tempfiles."
experiments_clean_parser = experiments_subparsers.add_parser(
"clean",
parents=[parent_parser],
description=append_doc_link(EXPERIMENTS_CLEAN_HELP, "exp/clean"),
help=EXPERIMENTS_CLEAN_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
experiments_clean_parser.set_defaults(func=CmdExperimentsClean)
9 changes: 8 additions & 1 deletion dvc/commands/experiments/queue_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ class CmdQueueWorker(CmdBase):
"""Run the exp queue worker."""

def run(self):
self.repo.experiments.celery_queue.worker.start(self.args.name)
self.repo.experiments.celery_queue.worker.start(
self.args.name, fsapp_clean=self.args.clean
)
return 0


Expand All @@ -22,4 +24,9 @@ def add_parser(experiments_subparsers, parent_parser):
add_help=False,
)
parser.add_argument("name", help="Celery worker name.")
parser.add_argument(
"--clean",
action="store_true",
help="Automatically cleanup celery broker on shutdown.",
)
parser.set_defaults(func=CmdQueueWorker)
5 changes: 5 additions & 0 deletions dvc/repo/experiments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,8 @@ def remove(self, *args, **kwargs):
from dvc.repo.experiments.remove import remove

return remove(self.repo, *args, **kwargs)

def clean(self, *args, **kwargs):
from dvc.repo.experiments.clean import clean

return clean(self.repo, *args, **kwargs)
12 changes: 12 additions & 0 deletions dvc/repo/experiments/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import TYPE_CHECKING

from dvc.ui import ui

if TYPE_CHECKING:
from dvc.repo import Repo


def clean(repo: "Repo"):
ui.write("Cleaning up dvc-task messages...")
repo.experiments.celery_queue.celery.clean()
ui.write("Done!")
5 changes: 4 additions & 1 deletion dvc/repo/experiments/queue/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def celery(self) -> "FSApp":
"dvc_task.proc.tasks",
],
)
app.conf.update({"task_acks_late": True})
app.conf.update({"task_acks_late": True, "result_expires": None})
return app

@cached_property
Expand Down Expand Up @@ -138,6 +138,9 @@ def _spawn_worker(self, num: int = 1):
wdir_hash = hashlib.sha256(self.wdir.encode("utf-8")).hexdigest()[:6]
node_name = f"dvc-exp-{wdir_hash}-{num}@localhost"
cmd = ["exp", "queue-worker", node_name]
if num == 1:
# automatically run celery cleanup when primary worker shuts down
cmd.append("--clean")
name = f"dvc-exp-worker-{num}"

logger.debug("start a new worker: %s, node: %s", name, node_name)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies = [
"dvc-http",
"dvc-render>=0.1.2",
"dvc-studio-client>=0.1.1",
"dvc-task>=0.1.11,<1",
"dvc-task>=0.2.0,<1",
"dvclive>=1.2.2",
"flatten_dict<1,>=0.4.1",
"flufl.lock>=5",
Expand Down
6 changes: 6 additions & 0 deletions tests/func/experiments/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,3 +633,9 @@ def test_experiment_run_dry(tmp_dir, scm, dvc, exp_stage):
dvc.experiments.run(exp_stage.addressing, dry=True)

assert len(dvc.experiments.ls()["master"]) == 0


def test_clean(tmp_dir, scm, dvc, mocker):
clean = mocker.spy(dvc.experiments.celery_queue.celery, "clean")
dvc.experiments.clean()
clean.assert_called_once_with()
13 changes: 13 additions & 0 deletions tests/unit/command/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dvc.cli import DvcParserError, parse_args
from dvc.commands.experiments.apply import CmdExperimentsApply
from dvc.commands.experiments.branch import CmdExperimentsBranch
from dvc.commands.experiments.clean import CmdExperimentsClean
from dvc.commands.experiments.diff import CmdExperimentsDiff
from dvc.commands.experiments.gc import CmdExperimentsGC
from dvc.commands.experiments.init import CmdExperimentsInit
Expand Down Expand Up @@ -926,3 +927,15 @@ def test_experiments_save(dvc, scm, mocker):
m.assert_called_once_with(
cmd.repo, name="exp-name", force=True, include_untracked=[]
)


def test_experiments_clean(dvc, scm, mocker):
cli_args = parse_args(["experiments", "clean"])
assert cli_args.func == CmdExperimentsClean

cmd = cli_args.func(cli_args)
m = mocker.patch("dvc.repo.experiments.clean.clean", return_value={})

assert cmd.run() == 0

m.assert_called_once_with(cmd.repo)