Skip to content

Commit 19f98f5

Browse files
committed
Use SIGINT as the default signal in queue kill
fix: #8624 1. Add a new flag `--force` for `queue kill` 2. Make `SIGINT` to be the default option and `SIGKILL` to be with `--force` 3. Add tests for `queue kill` 4. bump into dvc-task 0.1.7
1 parent aa2e830 commit 19f98f5

File tree

6 files changed

+28
-12
lines changed

6 files changed

+28
-12
lines changed

dvc/commands/queue/kill.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,31 @@ class CmdQueueKill(CmdBase):
1111
"""Kill exp task in queue."""
1212

1313
def run(self):
14-
self.repo.experiments.celery_queue.kill(revs=self.args.task)
14+
self.repo.experiments.celery_queue.kill(
15+
revs=self.args.task, force=self.args.force
16+
)
1517

1618
return 0
1719

1820

1921
def add_parser(queue_subparsers, parent_parser):
20-
QUEUE_KILL_HELP = "Kill actively running experiment queue tasks."
22+
QUEUE_KILL_HELP = "Send SIGINT(Ctrl-C) to running experiment queue tasks."
2123
queue_kill_parser = queue_subparsers.add_parser(
2224
"kill",
2325
parents=[parent_parser],
2426
description=append_doc_link(QUEUE_KILL_HELP, "queue/kill"),
2527
help=QUEUE_KILL_HELP,
2628
formatter_class=argparse.RawDescriptionHelpFormatter,
2729
)
30+
queue_kill_parser.add_argument(
31+
"-f",
32+
"--force",
33+
action="store_true",
34+
default=False,
35+
help="Send SIGKILL (kill -9) instead to running experiment queue "
36+
"tasks. (The default `queue kill` will terminate more gracefully,"
37+
" collecting and cleaning up all resources)",
38+
)
2839
queue_kill_parser.add_argument(
2940
"task",
3041
nargs="*",

dvc/repo/experiments/queue/celery.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,15 @@ def _get_running_task_ids(self) -> Set[str]:
300300
return running_task_ids
301301

302302
def _try_to_kill_tasks(
303-
self, to_kill: Dict[QueueEntry, str]
303+
self, to_kill: Dict[QueueEntry, str], force: bool
304304
) -> Dict[QueueEntry, str]:
305305
fail_to_kill_entries: Dict[QueueEntry, str] = {}
306306
for queue_entry, rev in to_kill.items():
307307
try:
308-
self.proc.kill(queue_entry.stash_rev)
308+
if force:
309+
self.proc.kill(queue_entry.stash_rev)
310+
else:
311+
self.proc.interrupt(queue_entry.stash_rev)
309312
logger.debug(f"Task {rev} had been killed.")
310313
except ProcessLookupError:
311314
fail_to_kill_entries[queue_entry] = rev
@@ -333,7 +336,7 @@ def _mark_inactive_tasks_failure(self, remained_entries):
333336
if remained_revs:
334337
raise CannotKillTasksError(remained_revs)
335338

336-
def kill(self, revs: Collection[str]) -> None:
339+
def kill(self, revs: Collection[str], force: bool = False) -> None:
337340
name_dict: Dict[
338341
str, Optional[QueueEntry]
339342
] = self.match_queue_entry_by_name(set(revs), self.iter_active())
@@ -349,7 +352,7 @@ def kill(self, revs: Collection[str]) -> None:
349352
raise UnresolvedQueueExpNamesError(missing_revs)
350353

351354
fail_to_kill_entries: Dict[QueueEntry, str] = self._try_to_kill_tasks(
352-
to_kill
355+
to_kill, force
353356
)
354357

355358
if fail_to_kill_entries:

dvc/stage/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ def _run(stage, executable, cmd, checkpoint_func, **kwargs):
7979
threading.current_thread(),
8080
threading._MainThread, # pylint: disable=protected-access
8181
)
82+
old_handler = None
8283

8384
exec_cmd = _make_cmd(executable, cmd)
84-
old_handler = None
8585

8686
try:
8787
p = subprocess.Popen(exec_cmd, **kwargs)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ dependencies = [
5252
"typing-extensions>=3.7.4",
5353
"scmrepo==0.1.4",
5454
"dvc-render==0.0.14",
55-
"dvc-task==0.1.6",
55+
"dvc-task==0.1.7",
5656
"dvclive>=1.0",
5757
"dvc-data==0.28.3",
5858
"dvc-http==2.27.2",

tests/unit/command/test_queue.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def test_experiments_kill(dvc, scm, mocker):
9292
[
9393
"queue",
9494
"kill",
95+
"--force",
9596
"exp1",
9697
"exp2",
9798
]
@@ -105,7 +106,7 @@ def test_experiments_kill(dvc, scm, mocker):
105106
)
106107

107108
assert cmd.run() == 0
108-
m.assert_called_once_with(revs=["exp1", "exp2"])
109+
m.assert_called_once_with(revs=["exp1", "exp2"], force=True)
109110

110111

111112
def test_experiments_start(dvc, scm, mocker):

tests/unit/repo/experiments/queue/test_celery.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ def test_post_run_after_kill(test_queue):
8080
assert result_foo.get(timeout=10) == "foo"
8181

8282

83-
def test_celery_queue_kill(test_queue, mocker):
83+
@pytest.mark.parametrize("force", [True, False])
84+
def test_celery_queue_kill(test_queue, mocker, force):
8485

8586
mock_entry_foo = mocker.Mock(stash_rev="foo")
8687
mock_entry_bar = mocker.Mock(stash_rev="bar")
@@ -139,13 +140,13 @@ def kill_function(rev):
139140

140141
kill_mock = mocker.patch.object(
141142
test_queue.proc,
142-
"kill",
143+
"kill" if force else "interrupt",
143144
side_effect=mocker.MagicMock(side_effect=kill_function),
144145
)
145146
with pytest.raises(
146147
CannotKillTasksError, match="Task 'foobar' is initializing,"
147148
):
148-
test_queue.kill(["bar", "foo", "foobar"])
149+
test_queue.kill(["bar", "foo", "foobar"], force=force)
149150
assert kill_mock.called_once_with(mock_entry_foo.stash_rev)
150151
assert kill_mock.called_once_with(mock_entry_bar.stash_rev)
151152
assert kill_mock.called_once_with(mock_entry_foobar.stash_rev)

0 commit comments

Comments
 (0)