Skip to content
6 changes: 3 additions & 3 deletions dvc/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
STAGES = "stages"
SINGLE_STAGE_SCHEMA = {
StageParams.PARAM_MD5: output.CHECKSUM_SCHEMA,
StageParams.PARAM_CMD: Any(str, None),
StageParams.PARAM_CMD: Any(str, list, None),
StageParams.PARAM_WDIR: Any(str, None),
StageParams.PARAM_DEPS: Any([dependency.SCHEMA], None),
StageParams.PARAM_OUTS: Any([output.SCHEMA], None),
Expand All @@ -27,7 +27,7 @@
HashInfo.PARAM_NFILES: int,
}
LOCK_FILE_STAGE_SCHEMA = {
Required(StageParams.PARAM_CMD): str,
Required(StageParams.PARAM_CMD): Any(str, list),
StageParams.PARAM_DEPS: [DATA_SCHEMA],
StageParams.PARAM_PARAMS: {str: {str: object}},
StageParams.PARAM_OUTS: [DATA_SCHEMA],
Expand Down Expand Up @@ -63,7 +63,7 @@
VARS_SCHEMA = [str, dict]

STAGE_DEFINITION = {
StageParams.PARAM_CMD: str,
StageParams.PARAM_CMD: Any(str, list),
Optional(SET_KWD): dict,
Optional(StageParams.PARAM_WDIR): str,
Optional(StageParams.PARAM_DEPS): [str],
Expand Down
49 changes: 25 additions & 24 deletions dvc/stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class CheckpointKilledError(StageCmdFailedError):
pass


def _nix_cmd(executable, cmd):
def _make_cmd(executable, cmd):
if executable is None:
return cmd
opts = {"zsh": ["--no-rcs"], "bash": ["--noprofile", "--norc"]}
name = os.path.basename(executable).lower()
return [executable] + opts.get(name, []) + ["-c", cmd]
Expand All @@ -46,13 +48,14 @@ def warn_if_fish(executable):
@unlocked_repo
def cmd_run(stage, *args, checkpoint_func=None, **kwargs):
kwargs = {"cwd": stage.wdir, "env": fix_env(None), "close_fds": True}
cmd = stage.cmd if isinstance(stage.cmd, list) else [stage.cmd]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue you linked to is also about multiline string.
Right now, it's not officially supported, but works depending on the shell.
What do you think of it? Should we support it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about your question: the issue mentions:

  • piping: this shout stay as is as stated by @efiop
  • &&: the purpose of this pr: officially allowing the user to define sequential commands in a single stage from its yaml definition.

I don't see much space to think about something else

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ClementWalter, as you are proposing to close the issue related to multiline/multiple commands, I just wanted to know what your opinion is on that, and if we should support it, as this list and the multiline string seems to be allowing the user the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't understand what you are trying to ask me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok you mean @skshetry if all this work is worth doing because eventually it is only syntactic sugar for yaml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I think I got it, just re-reading from the beginning. You mean: should we also support the parallel case, ie multi line string where every line should a command that runs in parallel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ClementwWalter I believe @skshetry was wondering about your opinion on whether or not we should treat multiline string as a list of commands when running it. #4373 (comment) I've mentioned that it should probably stay as it is, but there is an idea that maybe we should handle that better, similar as gha yaml does it. Let's keep that issue opened for that last question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efiop yes! actually re-reading this morning and re-reading yaml parsing rules it became 100% clear to me what was @skshetry's point, sorry about that yesterday. My guess is that we should not try to parse user input. Maybe we could add a flag in the stage definition about the desired behavior with a list of command: stop on fail or keep going?

if checkpoint_func:
# indicate that checkpoint cmd is being run inside DVC
kwargs["env"].update(_checkpoint_env(stage))

if os.name == "nt":
kwargs["shell"] = True
cmd = stage.cmd
executable = None
else:
# NOTE: when you specify `shell=True`, `Popen` [1] will default to
# `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command.
Expand All @@ -70,32 +73,33 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs):
kwargs["shell"] = False
executable = os.getenv("SHELL") or "/bin/sh"
warn_if_fish(executable)
cmd = _nix_cmd(executable, stage.cmd)

main_thread = isinstance(
threading.current_thread(),
threading._MainThread, # pylint: disable=protected-access
)
old_handler = None
p = None
for _cmd in cmd:
logger.info("$ %s", _cmd)
old_handler = None
p = None

try:
p = subprocess.Popen(cmd, **kwargs)
if main_thread:
old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
p = subprocess.Popen(_make_cmd(executable, _cmd), **kwargs)
if main_thread:
old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)

killed = threading.Event()
with checkpoint_monitor(stage, checkpoint_func, p, killed):
p.communicate()
finally:
if old_handler:
signal.signal(signal.SIGINT, old_handler)
killed = threading.Event()
with checkpoint_monitor(stage, checkpoint_func, p, killed):
p.communicate()
Comment on lines +92 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmrowla Could you please confirm that this will work as expected with checkpoints?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work as expected

finally:
if old_handler:
signal.signal(signal.SIGINT, old_handler)

retcode = None if not p else p.returncode
if retcode != 0:
if killed.is_set():
raise CheckpointKilledError(stage.cmd, retcode)
raise StageCmdFailedError(stage.cmd, retcode)
retcode = None if not p else p.returncode
if retcode != 0:
if killed.is_set():
raise CheckpointKilledError(_cmd, retcode)
raise StageCmdFailedError(_cmd, retcode)


def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs):
Expand All @@ -110,11 +114,8 @@ def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs):

callback_str = "callback " if stage.is_callback else ""
logger.info(
"Running %s" "stage '%s' with command:",
callback_str,
stage.addressing,
"Running %s" "stage '%s':", callback_str, stage.addressing,
)
logger.info("\t%s", stage.cmd)
if not dry:
cmd_run(stage, checkpoint_func=checkpoint_func)

Expand Down
4 changes: 2 additions & 2 deletions scripts/schema/dvc-yaml.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@
"type": "object",
"properties": {
"cmd": {
"type": "string",
"description": "Command to run"
"type": ["string", "array"],
"description": "Command(s) to run"
},
"wdir": {
"type": "string",
Expand Down
42 changes: 37 additions & 5 deletions tests/func/test_repro_multistage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from funcy import lsplit

from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK
from dvc.exceptions import CyclicGraphError
from dvc.exceptions import CyclicGraphError, ReproductionError
from dvc.main import main
from dvc.stage import PipelineStage
from dvc.utils.serialize import dump_yaml, load_yaml
Expand Down Expand Up @@ -382,8 +382,6 @@ def test_repro_when_new_out_overlaps_others_stage_outs(tmp_dir, dvc):


def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc):
from dvc.exceptions import ReproductionError

tmp_dir.gen("copy.py", COPY_SCRIPT)
tmp_dir.gen("foo", "foo")
dump_yaml(
Expand All @@ -403,8 +401,6 @@ def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc):


def test_repro_when_new_outs_added_does_not_exist(tmp_dir, dvc):
from dvc.exceptions import ReproductionError

tmp_dir.gen("copy.py", COPY_SCRIPT)
tmp_dir.gen("foo", "foo")
dump_yaml(
Expand Down Expand Up @@ -518,3 +514,39 @@ def test_repro_multiple_params(tmp_dir, dvc):
dump_yaml(tmp_dir / "params.yaml", params)

assert dvc.reproduce(stage.addressing) == [stage]


def test_repro_list_of_commands_in_order(tmp_dir, dvc):
(tmp_dir / "dvc.yaml").write_text(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, I was not sure if it was better to write yaml directly or to use the dump_yaml func indeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the PIPELINE_FILE variable be used instead of direct dvc.yaml ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't really matter in the test.

dedent(
"""\
stages:
multi:
cmd:
- echo foo>foo
- echo bar>bar
Comment on lines +526 to +527
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much better because actually my version crashed on windows, thanks

"""
)
)
dvc.reproduce(target="multi")
assert (tmp_dir / "foo").read_text() == "foo\n"
assert (tmp_dir / "bar").read_text() == "bar\n"


def test_repro_list_of_commands_raise_and_stops_after_failure(tmp_dir, dvc):
(tmp_dir / "dvc.yaml").write_text(
dedent(
"""\
stages:
multi:
cmd:
- echo foo>foo
- failed_command
- echo baz>bar
"""
)
)
with pytest.raises(ReproductionError):
dvc.reproduce(target="multi")
assert (tmp_dir / "foo").read_text() == "foo\n"
assert not (tmp_dir / "bar").exists()
7 changes: 7 additions & 0 deletions tests/unit/stage/test_loader_pipeline_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ def test_load_stage(dvc, stage_data, lock_data):
assert stage.outs[0].hash_info == HashInfo("md5", "bar_checksum")


def test_load_stage_cmd_with_list(dvc, stage_data, lock_data):
stage_data["cmd"] = ["cmd-0", "cmd-1"]
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data)
assert stage.cmd == ["cmd-0", "cmd-1"]


def test_load_stage_outs_with_flags(dvc, stage_data, lock_data):
stage_data["outs"] = [{"foo": {"cache": False}}]
dvcfile = Dvcfile(dvc, PIPELINE_FILE)
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/stage/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ def test_run_stage_dry(caplog):
stage = Stage(None, "stage.dvc", cmd="mycmd arg1 arg2")
run_stage(stage, dry=True)
assert caplog.messages == [
"Running callback stage 'stage.dvc' with command:",
"\t" + "mycmd arg1 arg2",
"Running callback stage 'stage.dvc':",
]