From afd29e5dad42d2ad64fc9f97064baae47207783b Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Fri, 10 Jan 2020 20:55:28 +0700 Subject: [PATCH 1/5] summon: protect chdir/import/call with a thread lock --- dvc/api.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index ffb044a027..7ba7528e29 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -4,7 +4,9 @@ import copy from urllib.parse import urlparse from contextlib import contextmanager, _GeneratorContextManager as GCM +import threading +from funcy import wrap_with import ruamel.yaml from voluptuous import Schema, Required, Invalid @@ -154,14 +156,8 @@ def _pull_dependencies(repo, deps): out.checkout() +@wrap_with(threading.Lock()) def _invoke_method(call, args, path): - # XXX: Some issues with this approach: - # * Not thread safe - # * Import will pollute sys.modules - # * Weird errors if there is a name clash within sys.modules - - # XXX: sys.path manipulation is "theoretically" not needed - # but tests are failing for an unknown reason. cwd = os.getcwd() try: From c548b0792d6233f4b009f83247f8e38a5d859f06 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Fri, 10 Jan 2020 20:59:17 +0700 Subject: [PATCH 2/5] summon: fix open() shadowing --- dvc/api.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index 7ba7528e29..22dccb5e11 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -1,3 +1,4 @@ +from builtins import open as builtin_open import importlib import os import sys @@ -11,7 +12,7 @@ from voluptuous import Schema, Required, Invalid from dvc.repo import Repo -from dvc.exceptions import DvcException, FileMissingError +from dvc.exceptions import DvcException from dvc.external_repo import external_repo @@ -123,8 +124,8 @@ def _get_object_from_summon_file(name, path): and return its description. """ try: - with open(path, "r") as fobj: - content = SUMMON_SCHEMA(ruamel.yaml.safe_load(fobj.read())) + with builtin_open(path, "r") as fd: + content = SUMMON_SCHEMA(ruamel.yaml.safe_load(fd.read())) objects = [x for x in content["objects"] if x["name"] == name] if not objects: @@ -136,8 +137,8 @@ def _get_object_from_summon_file(name, path): return objects[0] - except FileMissingError: - raise SummonError("Summon file not found") + except FileNotFoundError as exc: + raise SummonError("Summon file not found") from exc except ruamel.yaml.YAMLError as exc: raise SummonError("Failed to parse summon file") from exc except Invalid as exc: From 4a3dd63720b051f183032ef10c424b623db8125d Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Fri, 10 Jan 2020 22:02:58 +0700 Subject: [PATCH 3/5] summon: separate prepare_summon() --- dvc/api.py | 84 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index 22dccb5e11..44999af754 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -2,7 +2,6 @@ import importlib import os import sys -import copy from urllib.parse import urlparse from contextlib import contextmanager, _GeneratorContextManager as GCM import threading @@ -16,22 +15,29 @@ from dvc.external_repo import external_repo -SUMMON_SCHEMA = Schema( +SUMMON_FILE_SCHEMA = Schema( { Required("objects"): [ { Required("name"): str, "meta": dict, Required("summon"): { - Required("type"): "python", - Required("call"): str, - "args": dict, + Required("type"): str, "deps": [str], + str: object, }, } ] } ) +SUMMON_PYTHON_SCHEMA = Schema( + { + Required("type"): "python", + Required("call"): str, + "args": dict, + "deps": [str], + } +) class SummonError(DvcException): @@ -99,33 +105,67 @@ def _make_repo(repo_url, rev=None): def summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml", args=None): + """Instantiate an object described in the summon file.""" + with prepare_summon( + name, repo=repo, rev=rev, summon_file=summon_file + ) as desc: + try: + summon = SUMMON_PYTHON_SCHEMA(desc.obj["summon"]) + except Invalid as exc: + raise SummonError(str(exc)) from exc + + _args = {**summon.get("args", {}), **(args or {})} + return _invoke_method(summon["call"], _args, path=desc.repo.root_dir) + + +@contextmanager +def prepare_summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml"): """Instantiate an object described in the summon file.""" with _make_repo(repo, rev=rev) as _repo: try: path = os.path.join(_repo.root_dir, summon_file) - obj = _get_object_from_summon_file(name, path) - info = obj["summon"] + obj = _get_object_desc(name, path) + yield SummonDesc(_repo, obj) except SummonError as exc: raise SummonError( str(exc) + " at '{}' in '{}'".format(summon_file, repo) - ) from exc + ) from exc.__cause__ - _pull_dependencies(_repo, info.get("deps", [])) - _args = copy.deepcopy(info.get("args", {})) - _args.update(args or {}) +class SummonDesc: + def __init__(self, repo, obj): + self.repo = repo + self.obj = obj + self._pull_deps() - return _invoke_method(info["call"], _args, path=_repo.root_dir) + @property + def deps(self): + return [os.path.join(self.repo.root_dir, d) for d in self._deps] + @property + def _deps(self): + return self.obj["summon"].get("deps", []) -def _get_object_from_summon_file(name, path): + def _pull_deps(self): + if not self._deps: + return + + outs = [self.repo.find_out_by_relpath(d) for d in self._deps] + + with self.repo.state: + for out in outs: + self.repo.cloud.pull(out.get_used_cache()) + out.checkout() + + +def _get_object_desc(name, path): """ Given a summonable object's name, search for it on the given file and return its description. """ try: - with builtin_open(path, "r") as fd: - content = SUMMON_SCHEMA(ruamel.yaml.safe_load(fd.read())) + with builtin_open(path, "r") as fobj: + content = SUMMON_FILE_SCHEMA(ruamel.yaml.safe_load(fobj.read())) objects = [x for x in content["objects"] if x["name"] == name] if not objects: @@ -142,19 +182,7 @@ def _get_object_from_summon_file(name, path): except ruamel.yaml.YAMLError as exc: raise SummonError("Failed to parse summon file") from exc except Invalid as exc: - raise SummonError(str(exc)) - - -def _pull_dependencies(repo, deps): - if not deps: - return - - outs = [repo.find_out_by_relpath(dep) for dep in deps] - - with repo.state: - for out in outs: - repo.cloud.pull(out.get_used_cache()) - out.checkout() + raise SummonError(str(exc)) from exc @wrap_with(threading.Lock()) From d0de9d0ed0a7df3284aabe6ec24665e1c6187cd3 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Mon, 13 Jan 2020 15:19:54 +0700 Subject: [PATCH 4/5] summon: revert dvc.api._invoke_mehtod() considerations It's now thread-safe, but the rest is still relevant. --- dvc/api.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dvc/api.py b/dvc/api.py index 44999af754..525459b7ef 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -187,6 +187,10 @@ def _get_object_desc(name, path): @wrap_with(threading.Lock()) def _invoke_method(call, args, path): + # XXX: Some issues with this approach: + # * Import will pollute sys.modules + # * sys.path manipulation is "theoretically" not needed, + # but tests are failing for an unknown reason. cwd = os.getcwd() try: From be90c38182d630517563df2670bc8ab573b1fe48 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Mon, 13 Jan 2020 15:22:16 +0700 Subject: [PATCH 5/5] summon: nice up summon code - proper doc-string for prepare_summon() - summon -> summon_dict in summon() to not overwrite a glo bal name. This was not a bug, but a bad practice, which might have caused some issues in the future. - better name for _get_object_spec() --- dvc/api.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index 525459b7ef..655594bf1c 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -110,21 +110,29 @@ def summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml", args=None): name, repo=repo, rev=rev, summon_file=summon_file ) as desc: try: - summon = SUMMON_PYTHON_SCHEMA(desc.obj["summon"]) + summon_dict = SUMMON_PYTHON_SCHEMA(desc.obj["summon"]) except Invalid as exc: raise SummonError(str(exc)) from exc - _args = {**summon.get("args", {}), **(args or {})} - return _invoke_method(summon["call"], _args, path=desc.repo.root_dir) + _args = {**summon_dict.get("args", {}), **(args or {})} + return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir) @contextmanager def prepare_summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml"): - """Instantiate an object described in the summon file.""" + """Does a couple of things every summon needs as a prerequisite: + clones the repo, parses the summon file and pulls the deps. + + Calling code is expected to complete the summon logic following + instructions stated in "summon" dict of the object spec. + + Returns a SummonDesc instance, which contains references to a Repo object, + named object specification and resolved paths to deps. + """ with _make_repo(repo, rev=rev) as _repo: try: path = os.path.join(_repo.root_dir, summon_file) - obj = _get_object_desc(name, path) + obj = _get_object_spec(name, path) yield SummonDesc(_repo, obj) except SummonError as exc: raise SummonError( @@ -158,7 +166,7 @@ def _pull_deps(self): out.checkout() -def _get_object_desc(name, path): +def _get_object_spec(name, path): """ Given a summonable object's name, search for it on the given file and return its description.