diff --git a/dvc/schema.py b/dvc/schema.py index 6ca5390fdd..9a8785b0b9 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -9,7 +9,7 @@ STAGES = "stages" SINGLE_STAGE_SCHEMA = { StageParams.PARAM_MD5: output.CHECKSUM_SCHEMA, - StageParams.PARAM_CMD: Any(str, None), + StageParams.PARAM_CMD: Any(str, list, None), StageParams.PARAM_WDIR: Any(str, None), StageParams.PARAM_DEPS: Any([dependency.SCHEMA], None), StageParams.PARAM_OUTS: Any([output.SCHEMA], None), @@ -27,7 +27,7 @@ HashInfo.PARAM_NFILES: int, } LOCK_FILE_STAGE_SCHEMA = { - Required(StageParams.PARAM_CMD): str, + Required(StageParams.PARAM_CMD): Any(str, list), StageParams.PARAM_DEPS: [DATA_SCHEMA], StageParams.PARAM_PARAMS: {str: {str: object}}, StageParams.PARAM_OUTS: [DATA_SCHEMA], @@ -63,7 +63,7 @@ VARS_SCHEMA = [str, dict] STAGE_DEFINITION = { - StageParams.PARAM_CMD: str, + StageParams.PARAM_CMD: Any(str, list), Optional(SET_KWD): dict, Optional(StageParams.PARAM_WDIR): str, Optional(StageParams.PARAM_DEPS): [str], diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 68abc80c86..ad0d0b833d 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -21,7 +21,9 @@ class CheckpointKilledError(StageCmdFailedError): pass -def _nix_cmd(executable, cmd): +def _make_cmd(executable, cmd): + if executable is None: + return cmd opts = {"zsh": ["--no-rcs"], "bash": ["--noprofile", "--norc"]} name = os.path.basename(executable).lower() return [executable] + opts.get(name, []) + ["-c", cmd] @@ -46,13 +48,14 @@ def warn_if_fish(executable): @unlocked_repo def cmd_run(stage, *args, checkpoint_func=None, **kwargs): kwargs = {"cwd": stage.wdir, "env": fix_env(None), "close_fds": True} + cmd = stage.cmd if isinstance(stage.cmd, list) else [stage.cmd] if checkpoint_func: # indicate that checkpoint cmd is being run inside DVC kwargs["env"].update(_checkpoint_env(stage)) if os.name == "nt": kwargs["shell"] = True - cmd = stage.cmd + executable = None else: # NOTE: when you specify `shell=True`, `Popen` [1] will default to # `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command. @@ -70,32 +73,33 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs): kwargs["shell"] = False executable = os.getenv("SHELL") or "/bin/sh" warn_if_fish(executable) - cmd = _nix_cmd(executable, stage.cmd) main_thread = isinstance( threading.current_thread(), threading._MainThread, # pylint: disable=protected-access ) - old_handler = None - p = None + for _cmd in cmd: + logger.info("$ %s", _cmd) + old_handler = None + p = None - try: - p = subprocess.Popen(cmd, **kwargs) - if main_thread: - old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + try: + p = subprocess.Popen(_make_cmd(executable, _cmd), **kwargs) + if main_thread: + old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) - killed = threading.Event() - with checkpoint_monitor(stage, checkpoint_func, p, killed): - p.communicate() - finally: - if old_handler: - signal.signal(signal.SIGINT, old_handler) + killed = threading.Event() + with checkpoint_monitor(stage, checkpoint_func, p, killed): + p.communicate() + finally: + if old_handler: + signal.signal(signal.SIGINT, old_handler) - retcode = None if not p else p.returncode - if retcode != 0: - if killed.is_set(): - raise CheckpointKilledError(stage.cmd, retcode) - raise StageCmdFailedError(stage.cmd, retcode) + retcode = None if not p else p.returncode + if retcode != 0: + if killed.is_set(): + raise CheckpointKilledError(_cmd, retcode) + raise StageCmdFailedError(_cmd, retcode) def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs): @@ -110,11 +114,8 @@ def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs): callback_str = "callback " if stage.is_callback else "" logger.info( - "Running %s" "stage '%s' with command:", - callback_str, - stage.addressing, + "Running %s" "stage '%s':", callback_str, stage.addressing, ) - logger.info("\t%s", stage.cmd) if not dry: cmd_run(stage, checkpoint_func=checkpoint_func) diff --git a/scripts/schema/dvc-yaml.json b/scripts/schema/dvc-yaml.json index fa3ff0e73f..b22e533adf 100644 --- a/scripts/schema/dvc-yaml.json +++ b/scripts/schema/dvc-yaml.json @@ -102,8 +102,8 @@ "type": "object", "properties": { "cmd": { - "type": "string", - "description": "Command to run" + "type": ["string", "array"], + "description": "Command(s) to run" }, "wdir": { "type": "string", diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index e658c76087..e51e577441 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -6,7 +6,7 @@ from funcy import lsplit from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK -from dvc.exceptions import CyclicGraphError +from dvc.exceptions import CyclicGraphError, ReproductionError from dvc.main import main from dvc.stage import PipelineStage from dvc.utils.serialize import dump_yaml, load_yaml @@ -382,8 +382,6 @@ def test_repro_when_new_out_overlaps_others_stage_outs(tmp_dir, dvc): def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc): - from dvc.exceptions import ReproductionError - tmp_dir.gen("copy.py", COPY_SCRIPT) tmp_dir.gen("foo", "foo") dump_yaml( @@ -403,8 +401,6 @@ def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc): def test_repro_when_new_outs_added_does_not_exist(tmp_dir, dvc): - from dvc.exceptions import ReproductionError - tmp_dir.gen("copy.py", COPY_SCRIPT) tmp_dir.gen("foo", "foo") dump_yaml( @@ -518,3 +514,39 @@ def test_repro_multiple_params(tmp_dir, dvc): dump_yaml(tmp_dir / "params.yaml", params) assert dvc.reproduce(stage.addressing) == [stage] + + +def test_repro_list_of_commands_in_order(tmp_dir, dvc): + (tmp_dir / "dvc.yaml").write_text( + dedent( + """\ + stages: + multi: + cmd: + - echo foo>foo + - echo bar>bar + """ + ) + ) + dvc.reproduce(target="multi") + assert (tmp_dir / "foo").read_text() == "foo\n" + assert (tmp_dir / "bar").read_text() == "bar\n" + + +def test_repro_list_of_commands_raise_and_stops_after_failure(tmp_dir, dvc): + (tmp_dir / "dvc.yaml").write_text( + dedent( + """\ + stages: + multi: + cmd: + - echo foo>foo + - failed_command + - echo baz>bar + """ + ) + ) + with pytest.raises(ReproductionError): + dvc.reproduce(target="multi") + assert (tmp_dir / "foo").read_text() == "foo\n" + assert not (tmp_dir / "bar").exists() diff --git a/tests/unit/stage/test_loader_pipeline_file.py b/tests/unit/stage/test_loader_pipeline_file.py index a2c476a22e..fc9c2e0e59 100644 --- a/tests/unit/stage/test_loader_pipeline_file.py +++ b/tests/unit/stage/test_loader_pipeline_file.py @@ -160,6 +160,13 @@ def test_load_stage(dvc, stage_data, lock_data): assert stage.outs[0].hash_info == HashInfo("md5", "bar_checksum") +def test_load_stage_cmd_with_list(dvc, stage_data, lock_data): + stage_data["cmd"] = ["cmd-0", "cmd-1"] + dvcfile = Dvcfile(dvc, PIPELINE_FILE) + stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data) + assert stage.cmd == ["cmd-0", "cmd-1"] + + def test_load_stage_outs_with_flags(dvc, stage_data, lock_data): stage_data["outs"] = [{"foo": {"cache": False}}] dvcfile = Dvcfile(dvc, PIPELINE_FILE) diff --git a/tests/unit/stage/test_run.py b/tests/unit/stage/test_run.py index d5a88460b9..bfc607cf57 100644 --- a/tests/unit/stage/test_run.py +++ b/tests/unit/stage/test_run.py @@ -9,6 +9,5 @@ def test_run_stage_dry(caplog): stage = Stage(None, "stage.dvc", cmd="mycmd arg1 arg2") run_stage(stage, dry=True) assert caplog.messages == [ - "Running callback stage 'stage.dvc' with command:", - "\t" + "mycmd arg1 arg2", + "Running callback stage 'stage.dvc':", ]