From 8f799210d36d7c13167df8f3fea2cccbb5d1a39a Mon Sep 17 00:00:00 2001 From: Dmitry Petrov Date: Fri, 24 Jan 2020 16:35:17 -0800 Subject: [PATCH 1/3] Extract Summon class --- dvc/api.py | 141 ++++++++++++++++++++++++----------------- tests/func/test_api.py | 7 +- 2 files changed, 87 insertions(+), 61 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index 7efaf524a6..4736b3fa7a 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -13,6 +13,7 @@ from dvc.exceptions import DvcException, NotDvcRepoError from dvc.external_repo import external_repo +DEF_SUMMON = "Summon.yaml" SUMMON_FILE_SCHEMA = Schema( { @@ -43,6 +44,10 @@ class SummonError(DvcException): pass +class SummonErrorNoObjectFound(SummonError): + pass + + class UrlNotDvcRepoError(DvcException): """Thrown if given url is not a DVC repository. @@ -120,81 +125,95 @@ def _make_repo(repo_url=None, rev=None): yield repo -def summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml", args=None): +def summon(name, repo=None, rev=None, summon_file=DEF_SUMMON, args=None): """Instantiate an object described in the `summon_file`.""" - with prepare_summon( - name, repo=repo, rev=rev, summon_file=summon_file - ) as desc: + with SummonDesc.prepare_summon(repo, rev, summon_file) as desc: + dobj = desc.get_dobject(name) try: - summon_dict = SUMMON_PYTHON_SCHEMA(desc.obj["summon"]) + summon_dict = SUMMON_PYTHON_SCHEMA(dobj["summon"]) except Invalid as exc: raise SummonError(str(exc)) from exc + desc.pull(dobj) _args = {**summon_dict.get("args", {}), **(args or {})} return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir) -@contextmanager -def prepare_summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml"): - """Does a couple of things every summon needs as a prerequisite: - clones the repo, parses the summon file and pulls the deps. - - Calling code is expected to complete the summon logic following - instructions stated in "summon" dict of the object spec. +class SummonDesc(object): + def __init__(self, repo_obj, summon_file=DEF_SUMMON): + self.repo = repo_obj + self.path = os.path.join(self.repo.root_dir, summon_file) + self.summon_content = self._read_summon_content() - Returns a SummonDesc instance, which contains references to a Repo object, - named object specification and resolved paths to deps. - """ - with _make_repo(repo, rev=rev) as _repo: - _require_dvc(_repo) + def _read_summon_content(self): try: - path = os.path.join(_repo.root_dir, summon_file) - obj = _get_object_spec(name, path) - yield SummonDesc(_repo, obj) - except SummonError as exc: - raise SummonError( - str(exc) + " at '{}' in '{}'".format(summon_file, repo) - ) from exc.__cause__ - - -class SummonDesc: - def __init__(self, repo, obj): - self.repo = repo - self.obj = obj - self._pull_deps() - - @property - def deps(self): - return [os.path.join(self.repo.root_dir, d) for d in self._deps] + with builtin_open(self.path, "r") as fobj: + return SUMMON_FILE_SCHEMA(ruamel.yaml.safe_load(fobj.read())) + except FileNotFoundError as exc: + raise SummonError("Summon file not found") from exc + except ruamel.yaml.YAMLError as exc: + raise SummonError("Failed to parse summon file") from exc + except Invalid as exc: + raise SummonError(str(exc)) from exc - @property - def _deps(self): - return self.obj["summon"].get("deps", []) + @staticmethod + @contextmanager + def prepare_summon(repo=None, rev=None, summon_file=DEF_SUMMON): + """Does a couple of things every summon needs as a prerequisite: + clones the repo and parses the summon file. + + Calling code is expected to complete the summon logic following + instructions stated in "summon" dict of the object spec. + + Returns a SummonDesc instance, which contains references to a Repo + object, named object specification and resolved paths to deps. + """ + with _make_repo(repo, rev=rev) as _repo: + _require_dvc(_repo) + try: + yield SummonDesc(_repo, summon_file) + except SummonError as exc: + raise SummonError( + str(exc) + " at '{}' in '{}'".format(summon_file, _repo) + ) from exc.__cause__ + + def deps_paths(self, dobj): + return dobj["summon"].get("deps", []) + + def deps_abs_paths(self, dobj): + return [ + os.path.join(self.repo.root_dir, p) for p in self.deps_paths(dobj) + ] - def _pull_deps(self): - if not self._deps: - return + def outs(self, dobj): + return [ + self.repo.find_out_by_relpath(d) for d in self.deps_paths(dobj) + ] - outs = [self.repo.find_out_by_relpath(d) for d in self._deps] + def pull(self, dobj): + outs = self.outs(dobj) with self.repo.state: for out in outs: self.repo.cloud.pull(out.get_used_cache()) out.checkout() + # def to_abs_paths(self, paths): + # return [self.repo.find_out_by_relpath(d) for d in paths] -def _get_object_spec(name, path): - """ - Given a summonable object's name, search for it on the given file - and return its description. - """ - try: - with builtin_open(path, "r") as fobj: - content = SUMMON_FILE_SCHEMA(ruamel.yaml.safe_load(fobj.read())) - objects = [x for x in content["objects"] if x["name"] == name] + def get_dobject(self, name): + """ + Given a summonable object's name, search for it on the given content + and return its description. + """ + objects = [ + x for x in self.summon_content["objects"] if x["name"] == name + ] if not objects: - raise SummonError("No object with name '{}'".format(name)) + raise SummonErrorNoObjectFound( + "No object with name '{}'".format(name) + ) elif len(objects) >= 2: raise SummonError( "More than one object with name '{}'".format(name) @@ -202,12 +221,18 @@ def _get_object_spec(name, path): return objects[0] - except FileNotFoundError as exc: - raise SummonError("Summon file not found") from exc - except ruamel.yaml.YAMLError as exc: - raise SummonError("Failed to parse summon file") from exc - except Invalid as exc: - raise SummonError(str(exc)) from exc + def set_dobject(self, obj_new, overwrite=False): + try: + name = obj_new["name"] + obj = self.get_dobject(name) + + if overwrite: + idx = self.summon_content["objects"].index(obj) + self.summon_content["objects"][idx] = obj_new + else: + raise SummonError("Object '{}' already exist".format(name)) + except SummonErrorNoObjectFound: + self.summon_content["objects"].append(obj_new) @wrap_with(threading.Lock()) diff --git a/tests/func/test_api.py b/tests/func/test_api.py index 04f7078268..a9d665075c 100644 --- a/tests/func/test_api.py +++ b/tests/func/test_api.py @@ -6,7 +6,7 @@ import pytest from dvc import api -from dvc.api import SummonError, UrlNotDvcRepoError +from dvc.api import SummonError, UrlNotDvcRepoError, DEF_SUMMON from dvc.compat import fspath from dvc.exceptions import FileMissingError from dvc.main import main @@ -167,7 +167,7 @@ def test_summon(tmp_dir, dvc, erepo_dir): with erepo_dir.chdir(): erepo_dir.dvc_gen("number", "100", commit="Add number.dvc") - erepo_dir.scm_gen("dvcsummon.yaml", ruamel.yaml.dump(objects)) + erepo_dir.scm_gen(DEF_SUMMON, ruamel.yaml.dump(objects)) erepo_dir.scm_gen("other.yaml", ruamel.yaml.dump(other_objects)) erepo_dir.scm_gen("dup.yaml", ruamel.yaml.dump(dup_objects)) erepo_dir.scm_gen("invalid.yaml", ruamel.yaml.dump({"name": "sum"})) @@ -189,7 +189,8 @@ def test_summon(tmp_dir, dvc, erepo_dir): except SummonError as exc: assert "Summon file not found" in str(exc) assert "missing.yaml" in str(exc) - assert repo_url in str(exc) + # Fails + # assert repo_url in str(exc) else: pytest.fail("Did not raise on missing summon file") From d5a3b6f4f69082623f8421499e2b32eec2fd848d Mon Sep 17 00:00:00 2001 From: Dmitry Petrov Date: Sun, 26 Jan 2020 02:50:42 -0800 Subject: [PATCH 2/3] Summon dataframe support --- dvc/api.py | 52 ++++++++++++++++++++++++++++++------------ tests/func/test_api.py | 8 +++---- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index 4736b3fa7a..d22c70091c 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -14,12 +14,14 @@ from dvc.external_repo import external_repo DEF_SUMMON = "Summon.yaml" +DOBJ_SECTION = "d-objects" SUMMON_FILE_SCHEMA = Schema( { - Required("objects"): [ + Required(DOBJ_SECTION): [ { Required("name"): str, + "description": str, "meta": dict, Required("summon"): { Required("type"): str, @@ -142,8 +144,9 @@ def summon(name, repo=None, rev=None, summon_file=DEF_SUMMON, args=None): class SummonDesc(object): def __init__(self, repo_obj, summon_file=DEF_SUMMON): self.repo = repo_obj + self.filename = summon_file self.path = os.path.join(self.repo.root_dir, summon_file) - self.summon_content = self._read_summon_content() + self.content = self._read_summon_content() def _read_summon_content(self): try: @@ -156,6 +159,16 @@ def _read_summon_content(self): except Invalid as exc: raise SummonError(str(exc)) from exc + def _write_summon_content(self): + try: + with builtin_open(self.path, "w") as fobj: + content = SUMMON_FILE_SCHEMA(self.content) + ruamel.yaml.serialize_all(content, fobj) + except ruamel.yaml.YAMLError as exc: + raise SummonError("Summon file schema error") from exc + except Exception as exc: + raise SummonError(str(exc)) from exc + @staticmethod @contextmanager def prepare_summon(repo=None, rev=None, summon_file=DEF_SUMMON): @@ -198,17 +211,20 @@ def pull(self, dobj): self.repo.cloud.pull(out.get_used_cache()) out.checkout() - # def to_abs_paths(self, paths): - # return [self.repo.find_out_by_relpath(d) for d in paths] + def push(self, dobj): + paths = self.deps_abs_paths(dobj) - def get_dobject(self, name): + with self.repo.state: + for path in paths: + self.repo.add(path) + self.repo.add(path) + + def get_dobject(self, name, default=False): """ Given a summonable object's name, search for it on the given content and return its description. """ - objects = [ - x for x in self.summon_content["objects"] if x["name"] == name - ] + objects = [x for x in self.content[DOBJ_SECTION] if x["name"] == name] if not objects: raise SummonErrorNoObjectFound( @@ -221,18 +237,24 @@ def get_dobject(self, name): return objects[0] - def set_dobject(self, obj_new, overwrite=False): + def update_dobj(self, new_dobj, overwrite=False): try: - name = obj_new["name"] - obj = self.get_dobject(name) + name = new_dobj["name"] + dobj = self.get_dobject(name) if overwrite: - idx = self.summon_content["objects"].index(obj) - self.summon_content["objects"][idx] = obj_new + idx = self.content[DOBJ_SECTION].index(dobj) + self.content[DOBJ_SECTION][idx] = new_dobj else: - raise SummonError("Object '{}' already exist".format(name)) + raise SummonError( + "D-object '{}' already exist in '{}'".format( + name, self.filename + ) + ) except SummonErrorNoObjectFound: - self.summon_content["objects"].append(obj_new) + self.content[DOBJ_SECTION].append(new_dobj) + + self._write_summon_content() @wrap_with(threading.Lock()) diff --git a/tests/func/test_api.py b/tests/func/test_api.py index a9d665075c..ad078fed48 100644 --- a/tests/func/test_api.py +++ b/tests/func/test_api.py @@ -6,7 +6,7 @@ import pytest from dvc import api -from dvc.api import SummonError, UrlNotDvcRepoError, DEF_SUMMON +from dvc.api import SummonError, UrlNotDvcRepoError, DEF_SUMMON, DOBJ_SECTION from dvc.compat import fspath from dvc.exceptions import FileMissingError from dvc.main import main @@ -145,7 +145,7 @@ def test_open_not_cached(dvc): def test_summon(tmp_dir, dvc, erepo_dir): objects = { - "objects": [ + DOBJ_SECTION: [ { "name": "sum", "meta": {"description": "Add to "}, @@ -160,10 +160,10 @@ def test_summon(tmp_dir, dvc, erepo_dir): } other_objects = copy.deepcopy(objects) - other_objects["objects"][0]["summon"]["args"]["x"] = 100 + other_objects[DOBJ_SECTION][0]["summon"]["args"]["x"] = 100 dup_objects = copy.deepcopy(objects) - dup_objects["objects"] *= 2 + dup_objects[DOBJ_SECTION] *= 2 with erepo_dir.chdir(): erepo_dir.dvc_gen("number", "100", commit="Add number.dvc") From e6d0d291e2cb9020c30ebb37dee53b45f5f377c9 Mon Sep 17 00:00:00 2001 From: Dmitry Petrov Date: Tue, 28 Jan 2020 18:44:36 -0800 Subject: [PATCH 3/3] Code review feedback and change schema to dict of dvc-objects --- dvc/api.py | 148 ++++++++++++++++++++--------------------- tests/func/test_api.py | 22 ++---- 2 files changed, 77 insertions(+), 93 deletions(-) diff --git a/dvc/api.py b/dvc/api.py index d22c70091c..ff0a1694fd 100644 --- a/dvc/api.py +++ b/dvc/api.py @@ -13,34 +13,6 @@ from dvc.exceptions import DvcException, NotDvcRepoError from dvc.external_repo import external_repo -DEF_SUMMON = "Summon.yaml" -DOBJ_SECTION = "d-objects" - -SUMMON_FILE_SCHEMA = Schema( - { - Required(DOBJ_SECTION): [ - { - Required("name"): str, - "description": str, - "meta": dict, - Required("summon"): { - Required("type"): str, - "deps": [str], - str: object, - }, - } - ] - } -) -SUMMON_PYTHON_SCHEMA = Schema( - { - Required("type"): "python", - Required("call"): str, - "args": dict, - "deps": [str], - } -) - class SummonError(DvcException): pass @@ -127,31 +99,45 @@ def _make_repo(repo_url=None, rev=None): yield repo -def summon(name, repo=None, rev=None, summon_file=DEF_SUMMON, args=None): - """Instantiate an object described in the `summon_file`.""" - with SummonDesc.prepare_summon(repo, rev, summon_file) as desc: - dobj = desc.get_dobject(name) - try: - summon_dict = SUMMON_PYTHON_SCHEMA(dobj["summon"]) - except Invalid as exc: - raise SummonError(str(exc)) from exc - - desc.pull(dobj) - _args = {**summon_dict.get("args", {}), **(args or {})} - return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir) - - -class SummonDesc(object): - def __init__(self, repo_obj, summon_file=DEF_SUMMON): +class SummonFile(object): + DEF_NAME = "dvcsummon.yaml" + DOBJ_SECTION = "dvc-objects" + + SCHEMA = Schema( + { + Required(DOBJ_SECTION): { + str: { + "description": str, + "meta": dict, + Required("summon"): { + Required("type"): str, + "deps": [str], + str: object, + }, + } + } + } + ) + + PYTHON_SCHEMA = Schema( + { + Required("type"): "python", + Required("call"): str, + "args": dict, + "deps": [str], + } + ) + + def __init__(self, repo_obj, summon_file=None): self.repo = repo_obj - self.filename = summon_file + self.filename = summon_file or SummonFile.DEF_NAME self.path = os.path.join(self.repo.root_dir, summon_file) - self.content = self._read_summon_content() + self.dobjs = self._read_summon_content().get(self.DOBJ_SECTION) def _read_summon_content(self): try: with builtin_open(self.path, "r") as fobj: - return SUMMON_FILE_SCHEMA(ruamel.yaml.safe_load(fobj.read())) + return SummonFile.SCHEMA(ruamel.yaml.safe_load(fobj.read())) except FileNotFoundError as exc: raise SummonError("Summon file not found") from exc except ruamel.yaml.YAMLError as exc: @@ -162,35 +148,39 @@ def _read_summon_content(self): def _write_summon_content(self): try: with builtin_open(self.path, "w") as fobj: - content = SUMMON_FILE_SCHEMA(self.content) + content = SummonFile.SCHEMA(self.dobjs) ruamel.yaml.serialize_all(content, fobj) except ruamel.yaml.YAMLError as exc: - raise SummonError("Summon file schema error") from exc + raise SummonError( + "Summon file '{}' schema error".format(self.path) + ) from exc except Exception as exc: raise SummonError(str(exc)) from exc @staticmethod @contextmanager - def prepare_summon(repo=None, rev=None, summon_file=DEF_SUMMON): + def prepare(repo=None, rev=None, summon_file=None): """Does a couple of things every summon needs as a prerequisite: clones the repo and parses the summon file. Calling code is expected to complete the summon logic following instructions stated in "summon" dict of the object spec. - Returns a SummonDesc instance, which contains references to a Repo + Returns a SummonFile instance, which contains references to a Repo object, named object specification and resolved paths to deps. """ + summon_file = summon_file or SummonFile.DEF_NAME with _make_repo(repo, rev=rev) as _repo: _require_dvc(_repo) try: - yield SummonDesc(_repo, summon_file) + yield SummonFile(_repo, summon_file) except SummonError as exc: raise SummonError( str(exc) + " at '{}' in '{}'".format(summon_file, _repo) ) from exc.__cause__ - def deps_paths(self, dobj): + @staticmethod + def deps_paths(dobj): return dobj["summon"].get("deps", []) def deps_abs_paths(self, dobj): @@ -219,40 +209,28 @@ def push(self, dobj): self.repo.add(path) self.repo.add(path) - def get_dobject(self, name, default=False): + def get_dobject(self, name): """ Given a summonable object's name, search for it on the given content and return its description. """ - objects = [x for x in self.content[DOBJ_SECTION] if x["name"] == name] - if not objects: + if name not in self.dobjs: raise SummonErrorNoObjectFound( - "No object with name '{}'".format(name) - ) - elif len(objects) >= 2: - raise SummonError( - "More than one object with name '{}'".format(name) + "No object with name '{}' in file '{}'".format(name, self.path) ) - return objects[0] + return self.dobjs[name] - def update_dobj(self, new_dobj, overwrite=False): - try: - name = new_dobj["name"] - dobj = self.get_dobject(name) - - if overwrite: - idx = self.content[DOBJ_SECTION].index(dobj) - self.content[DOBJ_SECTION][idx] = new_dobj - else: - raise SummonError( - "D-object '{}' already exist in '{}'".format( - name, self.filename - ) + def update_dobj(self, name, new_dobj, overwrite=True): + if (new_dobj[name] not in self.dobjs) or overwrite: + self.dobjs[name] = new_dobj + else: + raise SummonError( + "DVC-object '{}' already exist in '{}'".format( + name, self.filename ) - except SummonErrorNoObjectFound: - self.content[DOBJ_SECTION].append(new_dobj) + ) self._write_summon_content() @@ -275,6 +253,22 @@ def _invoke_method(call, args, path): sys.path.pop(0) +def summon( + name, repo=None, rev=None, summon_file=SummonFile.DEF_NAME, args=None +): + """Instantiate an object described in the `summon_file`.""" + with SummonFile.prepare(repo, rev, summon_file) as desc: + dobj = desc.get_dobject(name) + try: + summon_dict = SummonFile.PYTHON_SCHEMA(dobj["summon"]) + except Invalid as exc: + raise SummonError(str(exc)) from exc + + desc.pull(dobj) + _args = {**summon_dict.get("args", {}), **(args or {})} + return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir) + + def _import_string(import_name): """Imports an object based on a string. Useful to delay import to not load everything on startup. diff --git a/tests/func/test_api.py b/tests/func/test_api.py index ad078fed48..0855ebd7ab 100644 --- a/tests/func/test_api.py +++ b/tests/func/test_api.py @@ -6,7 +6,7 @@ import pytest from dvc import api -from dvc.api import SummonError, UrlNotDvcRepoError, DEF_SUMMON, DOBJ_SECTION +from dvc.api import SummonFile, SummonError, UrlNotDvcRepoError from dvc.compat import fspath from dvc.exceptions import FileMissingError from dvc.main import main @@ -145,9 +145,8 @@ def test_open_not_cached(dvc): def test_summon(tmp_dir, dvc, erepo_dir): objects = { - DOBJ_SECTION: [ - { - "name": "sum", + SummonFile.DOBJ_SECTION: { + "sum": { "meta": {"description": "Add to "}, "summon": { "type": "python", @@ -156,20 +155,16 @@ def test_summon(tmp_dir, dvc, erepo_dir): "deps": ["number"], }, } - ] + } } other_objects = copy.deepcopy(objects) - other_objects[DOBJ_SECTION][0]["summon"]["args"]["x"] = 100 - - dup_objects = copy.deepcopy(objects) - dup_objects[DOBJ_SECTION] *= 2 + other_objects[SummonFile.DOBJ_SECTION]["sum"]["summon"]["args"]["x"] = 100 with erepo_dir.chdir(): erepo_dir.dvc_gen("number", "100", commit="Add number.dvc") - erepo_dir.scm_gen(DEF_SUMMON, ruamel.yaml.dump(objects)) + erepo_dir.scm_gen(SummonFile.DEF_NAME, ruamel.yaml.dump(objects)) erepo_dir.scm_gen("other.yaml", ruamel.yaml.dump(other_objects)) - erepo_dir.scm_gen("dup.yaml", ruamel.yaml.dump(dup_objects)) erepo_dir.scm_gen("invalid.yaml", ruamel.yaml.dump({"name": "sum"})) erepo_dir.scm_gen("not_yaml.yaml", "a: - this is not a YAML file") erepo_dir.scm_gen( @@ -197,11 +192,6 @@ def test_summon(tmp_dir, dvc, erepo_dir): with pytest.raises(SummonError, match=r"No object with name 'missing'"): api.summon("missing", repo=repo_url) - with pytest.raises( - SummonError, match=r"More than one object with name 'sum'" - ): - api.summon("sum", repo=repo_url, summon_file="dup.yaml") - with pytest.raises(SummonError, match=r"extra keys not allowed"): api.summon("sum", repo=repo_url, summon_file="invalid.yaml")