From 9bc97d58ae126a82c0f1ba7111dc102d07ca539c Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Sat, 21 Nov 2020 19:10:34 +0100 Subject: [PATCH 01/10] Allow list type in STAGE_DEFINITION scheme and join(cmd) if list --- dvc/schema.py | 2 +- dvc/stage/__init__.py | 2 +- tests/unit/stage/test_loader_pipeline_file.py | 7 +++++++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/dvc/schema.py b/dvc/schema.py index 6ca5390fdd..bb5f3cf281 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -63,7 +63,7 @@ VARS_SCHEMA = [str, dict] STAGE_DEFINITION = { - StageParams.PARAM_CMD: str, + StageParams.PARAM_CMD: Any(str, list), Optional(SET_KWD): dict, Optional(StageParams.PARAM_WDIR): str, Optional(StageParams.PARAM_DEPS): [str], diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 533e995fe7..dd697b4d9b 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -143,7 +143,7 @@ def __init__( self.repo = repo self._path = path - self.cmd = cmd + self.cmd = " && ".join(cmd) if isinstance(cmd, list) else cmd self.wdir = wdir self.outs = outs self.deps = deps diff --git a/tests/unit/stage/test_loader_pipeline_file.py b/tests/unit/stage/test_loader_pipeline_file.py index a2c476a22e..bbb24f43f0 100644 --- a/tests/unit/stage/test_loader_pipeline_file.py +++ b/tests/unit/stage/test_loader_pipeline_file.py @@ -160,6 +160,13 @@ def test_load_stage(dvc, stage_data, lock_data): assert stage.outs[0].hash_info == HashInfo("md5", "bar_checksum") +def test_load_stage_cmd_with_list(dvc, stage_data, lock_data): + stage_data["cmd"] = ["cmd-0", "cmd-1"] + dvcfile = Dvcfile(dvc, PIPELINE_FILE) + stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data) + assert stage.cmd == "cmd-0 && cmd-1" + + def test_load_stage_outs_with_flags(dvc, stage_data, lock_data): stage_data["outs"] = [{"foo": {"cache": False}}] dvcfile = Dvcfile(dvc, PIPELINE_FILE) From 84fd314d4fac68fe6ba819784c65f9939b9216ef Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Wed, 25 Nov 2020 18:16:52 +0100 Subject: [PATCH 02/10] Remove join in Stage.__init__ and update test --- dvc/schema.py | 4 ++-- dvc/stage/__init__.py | 2 +- tests/unit/stage/test_loader_pipeline_file.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dvc/schema.py b/dvc/schema.py index bb5f3cf281..9a8785b0b9 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -9,7 +9,7 @@ STAGES = "stages" SINGLE_STAGE_SCHEMA = { StageParams.PARAM_MD5: output.CHECKSUM_SCHEMA, - StageParams.PARAM_CMD: Any(str, None), + StageParams.PARAM_CMD: Any(str, list, None), StageParams.PARAM_WDIR: Any(str, None), StageParams.PARAM_DEPS: Any([dependency.SCHEMA], None), StageParams.PARAM_OUTS: Any([output.SCHEMA], None), @@ -27,7 +27,7 @@ HashInfo.PARAM_NFILES: int, } LOCK_FILE_STAGE_SCHEMA = { - Required(StageParams.PARAM_CMD): str, + Required(StageParams.PARAM_CMD): Any(str, list), StageParams.PARAM_DEPS: [DATA_SCHEMA], StageParams.PARAM_PARAMS: {str: {str: object}}, StageParams.PARAM_OUTS: [DATA_SCHEMA], diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index dd697b4d9b..533e995fe7 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -143,7 +143,7 @@ def __init__( self.repo = repo self._path = path - self.cmd = " && ".join(cmd) if isinstance(cmd, list) else cmd + self.cmd = cmd self.wdir = wdir self.outs = outs self.deps = deps diff --git a/tests/unit/stage/test_loader_pipeline_file.py b/tests/unit/stage/test_loader_pipeline_file.py index bbb24f43f0..fc9c2e0e59 100644 --- a/tests/unit/stage/test_loader_pipeline_file.py +++ b/tests/unit/stage/test_loader_pipeline_file.py @@ -164,7 +164,7 @@ def test_load_stage_cmd_with_list(dvc, stage_data, lock_data): stage_data["cmd"] = ["cmd-0", "cmd-1"] dvcfile = Dvcfile(dvc, PIPELINE_FILE) stage = StageLoader.load_stage(dvcfile, "stage-1", stage_data, lock_data) - assert stage.cmd == "cmd-0 && cmd-1" + assert stage.cmd == ["cmd-0", "cmd-1"] def test_load_stage_outs_with_flags(dvc, stage_data, lock_data): From 3df4daab22223a636417bc8fe68de3c9b496d637 Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Wed, 25 Nov 2020 18:42:56 +0100 Subject: [PATCH 03/10] Loop over commands in cmd_run --- dvc/stage/run.py | 43 ++++++++++++++++++------------------ scripts/schema/dvc-yaml.json | 4 ++-- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 68abc80c86..0a859a0ad2 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -46,13 +46,13 @@ def warn_if_fish(executable): @unlocked_repo def cmd_run(stage, *args, checkpoint_func=None, **kwargs): kwargs = {"cwd": stage.wdir, "env": fix_env(None), "close_fds": True} + cmd = stage.cmd if isinstance(stage.cmd, list) else [stage.cmd] if checkpoint_func: # indicate that checkpoint cmd is being run inside DVC kwargs["env"].update(_checkpoint_env(stage)) if os.name == "nt": kwargs["shell"] = True - cmd = stage.cmd else: # NOTE: when you specify `shell=True`, `Popen` [1] will default to # `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command. @@ -70,32 +70,33 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs): kwargs["shell"] = False executable = os.getenv("SHELL") or "/bin/sh" warn_if_fish(executable) - cmd = _nix_cmd(executable, stage.cmd) + cmd = [_nix_cmd(executable, _cmd) for _cmd in cmd] main_thread = isinstance( threading.current_thread(), threading._MainThread, # pylint: disable=protected-access ) - old_handler = None - p = None + for _cmd in cmd: + old_handler = None + p = None - try: - p = subprocess.Popen(cmd, **kwargs) - if main_thread: - old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) - - killed = threading.Event() - with checkpoint_monitor(stage, checkpoint_func, p, killed): - p.communicate() - finally: - if old_handler: - signal.signal(signal.SIGINT, old_handler) - - retcode = None if not p else p.returncode - if retcode != 0: - if killed.is_set(): - raise CheckpointKilledError(stage.cmd, retcode) - raise StageCmdFailedError(stage.cmd, retcode) + try: + p = subprocess.Popen(_cmd, **kwargs) + if main_thread: + old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + + killed = threading.Event() + with checkpoint_monitor(stage, checkpoint_func, p, killed): + p.communicate() + finally: + if old_handler: + signal.signal(signal.SIGINT, old_handler) + + retcode = None if not p else p.returncode + if retcode != 0: + if killed.is_set(): + raise CheckpointKilledError(stage.cmd, retcode) + raise StageCmdFailedError(stage.cmd, retcode) def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs): diff --git a/scripts/schema/dvc-yaml.json b/scripts/schema/dvc-yaml.json index fa3ff0e73f..b22e533adf 100644 --- a/scripts/schema/dvc-yaml.json +++ b/scripts/schema/dvc-yaml.json @@ -102,8 +102,8 @@ "type": "object", "properties": { "cmd": { - "type": "string", - "description": "Command to run" + "type": ["string", "array"], + "description": "Command(s) to run" }, "wdir": { "type": "string", From 12ac2b078951c0076242035501f2f788d26e8dab Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Sun, 29 Nov 2020 13:02:59 +0100 Subject: [PATCH 04/10] Improve logging of stage.run with detailed by command --- dvc/stage/run.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 0a859a0ad2..49fea3195a 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -21,7 +21,9 @@ class CheckpointKilledError(StageCmdFailedError): pass -def _nix_cmd(executable, cmd): +def _make_cmd(executable, cmd): + if executable is None: + return cmd opts = {"zsh": ["--no-rcs"], "bash": ["--noprofile", "--norc"]} name = os.path.basename(executable).lower() return [executable] + opts.get(name, []) + ["-c", cmd] @@ -53,6 +55,7 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs): if os.name == "nt": kwargs["shell"] = True + executable = None else: # NOTE: when you specify `shell=True`, `Popen` [1] will default to # `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command. @@ -70,18 +73,18 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs): kwargs["shell"] = False executable = os.getenv("SHELL") or "/bin/sh" warn_if_fish(executable) - cmd = [_nix_cmd(executable, _cmd) for _cmd in cmd] main_thread = isinstance( threading.current_thread(), threading._MainThread, # pylint: disable=protected-access ) for _cmd in cmd: + logger.info("\trunning %s", _cmd) old_handler = None p = None try: - p = subprocess.Popen(_cmd, **kwargs) + p = subprocess.Popen(_make_cmd(executable, _cmd), **kwargs) if main_thread: old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -95,8 +98,8 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs): retcode = None if not p else p.returncode if retcode != 0: if killed.is_set(): - raise CheckpointKilledError(stage.cmd, retcode) - raise StageCmdFailedError(stage.cmd, retcode) + raise CheckpointKilledError(_cmd, retcode) + raise StageCmdFailedError(_cmd, retcode) def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs): @@ -111,11 +114,8 @@ def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs): callback_str = "callback " if stage.is_callback else "" logger.info( - "Running %s" "stage '%s' with command:", - callback_str, - stage.addressing, + "Running %s" "stage '%s':", callback_str, stage.addressing, ) - logger.info("\t%s", stage.cmd) if not dry: cmd_run(stage, checkpoint_func=checkpoint_func) From 1d9d8c640bf7643707b58d45bd80c00b4ee47f44 Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Sun, 29 Nov 2020 13:25:10 +0100 Subject: [PATCH 05/10] Fix test based on log message --- tests/unit/stage/test_run.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/stage/test_run.py b/tests/unit/stage/test_run.py index d5a88460b9..bfc607cf57 100644 --- a/tests/unit/stage/test_run.py +++ b/tests/unit/stage/test_run.py @@ -9,6 +9,5 @@ def test_run_stage_dry(caplog): stage = Stage(None, "stage.dvc", cmd="mycmd arg1 arg2") run_stage(stage, dry=True) assert caplog.messages == [ - "Running callback stage 'stage.dvc' with command:", - "\t" + "mycmd arg1 arg2", + "Running callback stage 'stage.dvc':", ] From 3e77a80a78b001b7380c2442ce99fec61184010b Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Wed, 2 Dec 2020 16:55:13 +0100 Subject: [PATCH 06/10] Update command logging with > --- dvc/stage/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 49fea3195a..837da82d5c 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -79,7 +79,7 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs): threading._MainThread, # pylint: disable=protected-access ) for _cmd in cmd: - logger.info("\trunning %s", _cmd) + logger.info("> %s", _cmd) old_handler = None p = None From dfb9e5b420c1798b1c20a2f645bd1209196683aa Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Wed, 2 Dec 2020 19:49:34 +0100 Subject: [PATCH 07/10] Add tests for multi lines command --- tests/func/test_repro_multistage.py | 48 ++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index e658c76087..7ae91c443b 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -1,12 +1,13 @@ import os from copy import deepcopy +from pathlib import Path from textwrap import dedent import pytest from funcy import lsplit from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK -from dvc.exceptions import CyclicGraphError +from dvc.exceptions import CyclicGraphError, ReproductionError from dvc.main import main from dvc.stage import PipelineStage from dvc.utils.serialize import dump_yaml, load_yaml @@ -382,8 +383,6 @@ def test_repro_when_new_out_overlaps_others_stage_outs(tmp_dir, dvc): def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc): - from dvc.exceptions import ReproductionError - tmp_dir.gen("copy.py", COPY_SCRIPT) tmp_dir.gen("foo", "foo") dump_yaml( @@ -403,8 +402,6 @@ def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc): def test_repro_when_new_outs_added_does_not_exist(tmp_dir, dvc): - from dvc.exceptions import ReproductionError - tmp_dir.gen("copy.py", COPY_SCRIPT) tmp_dir.gen("foo", "foo") dump_yaml( @@ -518,3 +515,44 @@ def test_repro_multiple_params(tmp_dir, dvc): dump_yaml(tmp_dir / "params.yaml", params) assert dvc.reproduce(stage.addressing) == [stage] + + +def test_repro_list_of_commands_in_order(tmp_dir, dvc): + dvcfile_content = { + "stages": { + "multi_commands": { + "cmd": [ + """echo "First command" > foo""", + """echo "Second command" >> foo""", + ], + "outs": ["foo"], + } + } + } + dump_yaml(PIPELINE_FILE, dvcfile_content) + dvc.reproduce(target="multi_commands") + with open(Path(tmp_dir) / "foo", "r") as foo: + lines = foo.read().splitlines() + assert lines == ["First command", "Second command"] + + +def test_repro_list_of_commands_raise_and_stops_after_failure(tmp_dir, dvc): + dvcfile_content = { + "stages": { + "multi_commands": { + "cmd": [ + """echo "First command" > foo""", + """ehco "Second command" >> foo""", + """echo "Third command" >> foo""", + ], + "outs": ["foo"], + } + } + } + dump_yaml(PIPELINE_FILE, dvcfile_content) + + with pytest.raises(ReproductionError): + dvc.reproduce(target="multi_commands") + with open(Path(tmp_dir) / "foo", "r") as foo: + lines = foo.read().splitlines() + assert lines == ["First command"] From 4aed8b42b9eb8fe8376e4c7f1c76773f0452e23a Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Wed, 2 Dec 2020 20:24:57 +0100 Subject: [PATCH 08/10] Update > to $ in stage command log to prevent confusion with > operator --- dvc/stage/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/stage/run.py b/dvc/stage/run.py index 837da82d5c..ad0d0b833d 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -79,7 +79,7 @@ def cmd_run(stage, *args, checkpoint_func=None, **kwargs): threading._MainThread, # pylint: disable=protected-access ) for _cmd in cmd: - logger.info("> %s", _cmd) + logger.info("$ %s", _cmd) old_handler = None p = None From 8bb3279a39b3cf51d312a1eb12427182cc10dee5 Mon Sep 17 00:00:00 2001 From: Clement Walter Date: Wed, 2 Dec 2020 21:02:24 +0100 Subject: [PATCH 09/10] Fix PR review about test: single quote and read_text --- tests/func/test_repro_multistage.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index 7ae91c443b..f5c74e887d 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -1,6 +1,5 @@ import os from copy import deepcopy -from pathlib import Path from textwrap import dedent import pytest @@ -522,8 +521,8 @@ def test_repro_list_of_commands_in_order(tmp_dir, dvc): "stages": { "multi_commands": { "cmd": [ - """echo "First command" > foo""", - """echo "Second command" >> foo""", + 'echo "First command" > foo', + 'echo "Second command" >> foo', ], "outs": ["foo"], } @@ -531,9 +530,7 @@ def test_repro_list_of_commands_in_order(tmp_dir, dvc): } dump_yaml(PIPELINE_FILE, dvcfile_content) dvc.reproduce(target="multi_commands") - with open(Path(tmp_dir) / "foo", "r") as foo: - lines = foo.read().splitlines() - assert lines == ["First command", "Second command"] + assert (tmp_dir / "foo").read_text() == "First command\nSecond command\n" def test_repro_list_of_commands_raise_and_stops_after_failure(tmp_dir, dvc): @@ -553,6 +550,4 @@ def test_repro_list_of_commands_raise_and_stops_after_failure(tmp_dir, dvc): with pytest.raises(ReproductionError): dvc.reproduce(target="multi_commands") - with open(Path(tmp_dir) / "foo", "r") as foo: - lines = foo.read().splitlines() - assert lines == ["First command"] + assert (tmp_dir / "foo").read_text() == "First command\n" From a62f7cc583e3c2c6691ed058d254fbeb3693a1c3 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Thu, 3 Dec 2020 12:24:42 +0200 Subject: [PATCH 10/10] tests: unify/clarify tests --- tests/func/test_repro_multistage.py | 59 ++++++++++++++--------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index f5c74e887d..e51e577441 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -517,37 +517,36 @@ def test_repro_multiple_params(tmp_dir, dvc): def test_repro_list_of_commands_in_order(tmp_dir, dvc): - dvcfile_content = { - "stages": { - "multi_commands": { - "cmd": [ - 'echo "First command" > foo', - 'echo "Second command" >> foo', - ], - "outs": ["foo"], - } - } - } - dump_yaml(PIPELINE_FILE, dvcfile_content) - dvc.reproduce(target="multi_commands") - assert (tmp_dir / "foo").read_text() == "First command\nSecond command\n" + (tmp_dir / "dvc.yaml").write_text( + dedent( + """\ + stages: + multi: + cmd: + - echo foo>foo + - echo bar>bar + """ + ) + ) + dvc.reproduce(target="multi") + assert (tmp_dir / "foo").read_text() == "foo\n" + assert (tmp_dir / "bar").read_text() == "bar\n" def test_repro_list_of_commands_raise_and_stops_after_failure(tmp_dir, dvc): - dvcfile_content = { - "stages": { - "multi_commands": { - "cmd": [ - """echo "First command" > foo""", - """ehco "Second command" >> foo""", - """echo "Third command" >> foo""", - ], - "outs": ["foo"], - } - } - } - dump_yaml(PIPELINE_FILE, dvcfile_content) - + (tmp_dir / "dvc.yaml").write_text( + dedent( + """\ + stages: + multi: + cmd: + - echo foo>foo + - failed_command + - echo baz>bar + """ + ) + ) with pytest.raises(ReproductionError): - dvc.reproduce(target="multi_commands") - assert (tmp_dir / "foo").read_text() == "First command\n" + dvc.reproduce(target="multi") + assert (tmp_dir / "foo").read_text() == "foo\n" + assert not (tmp_dir / "bar").exists()