Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 133 additions & 92 deletions dvc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,11 @@
from dvc.external_repo import external_repo


SUMMON_FILE_SCHEMA = Schema(
{
Required("objects"): [
{
Required("name"): str,
"meta": dict,
Required("summon"): {
Required("type"): str,
"deps": [str],
str: object,
},
}
]
}
)
SUMMON_PYTHON_SCHEMA = Schema(
{
Required("type"): "python",
Required("call"): str,
"args": dict,
"deps": [str],
}
)
class SummonError(DvcException):
pass


class SummonError(DvcException):
class SummonErrorNoObjectFound(SummonError):
pass


Expand Down Expand Up @@ -120,94 +99,140 @@ def _make_repo(repo_url=None, rev=None):
yield repo


def summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml", args=None):
"""Instantiate an object described in the `summon_file`."""
with prepare_summon(
name, repo=repo, rev=rev, summon_file=summon_file
) as desc:
class SummonFile(object):
DEF_NAME = "dvcsummon.yaml"
DOBJ_SECTION = "dvc-objects"

SCHEMA = Schema(
{
Required(DOBJ_SECTION): {
str: {
"description": str,
"meta": dict,
Required("summon"): {
Required("type"): str,
"deps": [str],
str: object,
},
}
}
}
)

PYTHON_SCHEMA = Schema(
{
Required("type"): "python",
Required("call"): str,
"args": dict,
"deps": [str],
}
)

def __init__(self, repo_obj, summon_file=None):
self.repo = repo_obj
self.filename = summon_file or SummonFile.DEF_NAME
self.path = os.path.join(self.repo.root_dir, summon_file)
self.dobjs = self._read_summon_content().get(self.DOBJ_SECTION)

def _read_summon_content(self):
try:
summon_dict = SUMMON_PYTHON_SCHEMA(desc.obj["summon"])
with builtin_open(self.path, "r") as fobj:
return SummonFile.SCHEMA(ruamel.yaml.safe_load(fobj.read()))
except FileNotFoundError as exc:
raise SummonError("Summon file not found") from exc
except ruamel.yaml.YAMLError as exc:
raise SummonError("Failed to parse summon file") from exc
except Invalid as exc:
raise SummonError(str(exc)) from exc

_args = {**summon_dict.get("args", {}), **(args or {})}
return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir)


@contextmanager
def prepare_summon(name, repo=None, rev=None, summon_file="dvcsummon.yaml"):
"""Does a couple of things every summon needs as a prerequisite:
clones the repo, parses the summon file and pulls the deps.

Calling code is expected to complete the summon logic following
instructions stated in "summon" dict of the object spec.

Returns a SummonDesc instance, which contains references to a Repo object,
named object specification and resolved paths to deps.
"""
with _make_repo(repo, rev=rev) as _repo:
_require_dvc(_repo)
def _write_summon_content(self):
try:
path = os.path.join(_repo.root_dir, summon_file)
obj = _get_object_spec(name, path)
yield SummonDesc(_repo, obj)
except SummonError as exc:
with builtin_open(self.path, "w") as fobj:
content = SummonFile.SCHEMA(self.dobjs)
ruamel.yaml.serialize_all(content, fobj)
except ruamel.yaml.YAMLError as exc:
raise SummonError(
str(exc) + " at '{}' in '{}'".format(summon_file, repo)
) from exc.__cause__


class SummonDesc:
def __init__(self, repo, obj):
self.repo = repo
self.obj = obj
self._pull_deps()

@property
def deps(self):
return [os.path.join(self.repo.root_dir, d) for d in self._deps]
"Summon file '{}' schema error".format(self.path)
) from exc
except Exception as exc:
raise SummonError(str(exc)) from exc

@property
def _deps(self):
return self.obj["summon"].get("deps", [])
@staticmethod
@contextmanager
def prepare(repo=None, rev=None, summon_file=None):
"""Does a couple of things every summon needs as a prerequisite:
clones the repo and parses the summon file.

Calling code is expected to complete the summon logic following
instructions stated in "summon" dict of the object spec.

Returns a SummonFile instance, which contains references to a Repo
object, named object specification and resolved paths to deps.
"""
summon_file = summon_file or SummonFile.DEF_NAME
with _make_repo(repo, rev=rev) as _repo:
_require_dvc(_repo)
try:
yield SummonFile(_repo, summon_file)
except SummonError as exc:
raise SummonError(
str(exc) + " at '{}' in '{}'".format(summon_file, _repo)
) from exc.__cause__

@staticmethod
def deps_paths(dobj):
return dobj["summon"].get("deps", [])

def deps_abs_paths(self, dobj):
return [
os.path.join(self.repo.root_dir, p) for p in self.deps_paths(dobj)
]

def _pull_deps(self):
if not self._deps:
return
def outs(self, dobj):
return [
self.repo.find_out_by_relpath(d) for d in self.deps_paths(dobj)
]

outs = [self.repo.find_out_by_relpath(d) for d in self._deps]
def pull(self, dobj):
outs = self.outs(dobj)

with self.repo.state:
for out in outs:
self.repo.cloud.pull(out.get_used_cache())
out.checkout()

def push(self, dobj):
paths = self.deps_abs_paths(dobj)

def _get_object_spec(name, path):
"""
Given a summonable object's name, search for it on the given file
and return its description.
"""
try:
with builtin_open(path, "r") as fobj:
content = SUMMON_FILE_SCHEMA(ruamel.yaml.safe_load(fobj.read()))
objects = [x for x in content["objects"] if x["name"] == name]
with self.repo.state:
for path in paths:
self.repo.add(path)
self.repo.add(path)

def get_dobject(self, name):
"""
Given a summonable object's name, search for it on the given content
and return its description.
"""

if name not in self.dobjs:
raise SummonErrorNoObjectFound(
"No object with name '{}' in file '{}'".format(name, self.path)
)

return self.dobjs[name]

if not objects:
raise SummonError("No object with name '{}'".format(name))
elif len(objects) >= 2:
def update_dobj(self, name, new_dobj, overwrite=True):
if (new_dobj[name] not in self.dobjs) or overwrite:
self.dobjs[name] = new_dobj
else:
raise SummonError(
"More than one object with name '{}'".format(name)
"DVC-object '{}' already exist in '{}'".format(
name, self.filename
)
)

return objects[0]

except FileNotFoundError as exc:
raise SummonError("Summon file not found") from exc
except ruamel.yaml.YAMLError as exc:
raise SummonError("Failed to parse summon file") from exc
except Invalid as exc:
raise SummonError(str(exc)) from exc
self._write_summon_content()


@wrap_with(threading.Lock())
Expand All @@ -228,6 +253,22 @@ def _invoke_method(call, args, path):
sys.path.pop(0)


def summon(
name, repo=None, rev=None, summon_file=SummonFile.DEF_NAME, args=None
):
"""Instantiate an object described in the `summon_file`."""
with SummonFile.prepare(repo, rev, summon_file) as desc:
dobj = desc.get_dobject(name)
try:
summon_dict = SummonFile.PYTHON_SCHEMA(dobj["summon"])
except Invalid as exc:
raise SummonError(str(exc)) from exc

desc.pull(dobj)
_args = {**summon_dict.get("args", {}), **(args or {})}
return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir)


def _import_string(import_name):
"""Imports an object based on a string.
Useful to delay import to not load everything on startup.
Expand Down
25 changes: 8 additions & 17 deletions tests/func/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from dvc import api
from dvc.api import SummonError, UrlNotDvcRepoError
from dvc.api import SummonFile, SummonError, UrlNotDvcRepoError
from dvc.compat import fspath
from dvc.exceptions import FileMissingError
from dvc.main import main
Expand Down Expand Up @@ -145,9 +145,8 @@ def test_open_not_cached(dvc):

def test_summon(tmp_dir, dvc, erepo_dir):
objects = {
"objects": [
{
"name": "sum",
SummonFile.DOBJ_SECTION: {
"sum": {
"meta": {"description": "Add <x> to <number>"},
"summon": {
"type": "python",
Expand All @@ -156,20 +155,16 @@ def test_summon(tmp_dir, dvc, erepo_dir):
"deps": ["number"],
},
}
]
}
}

other_objects = copy.deepcopy(objects)
other_objects["objects"][0]["summon"]["args"]["x"] = 100

dup_objects = copy.deepcopy(objects)
dup_objects["objects"] *= 2
other_objects[SummonFile.DOBJ_SECTION]["sum"]["summon"]["args"]["x"] = 100

with erepo_dir.chdir():
erepo_dir.dvc_gen("number", "100", commit="Add number.dvc")
erepo_dir.scm_gen("dvcsummon.yaml", ruamel.yaml.dump(objects))
erepo_dir.scm_gen(SummonFile.DEF_NAME, ruamel.yaml.dump(objects))
erepo_dir.scm_gen("other.yaml", ruamel.yaml.dump(other_objects))
erepo_dir.scm_gen("dup.yaml", ruamel.yaml.dump(dup_objects))
erepo_dir.scm_gen("invalid.yaml", ruamel.yaml.dump({"name": "sum"}))
erepo_dir.scm_gen("not_yaml.yaml", "a: - this is not a YAML file")
erepo_dir.scm_gen(
Expand All @@ -189,18 +184,14 @@ def test_summon(tmp_dir, dvc, erepo_dir):
except SummonError as exc:
assert "Summon file not found" in str(exc)
assert "missing.yaml" in str(exc)
assert repo_url in str(exc)
# Fails
# assert repo_url in str(exc)
else:
pytest.fail("Did not raise on missing summon file")

with pytest.raises(SummonError, match=r"No object with name 'missing'"):
api.summon("missing", repo=repo_url)

with pytest.raises(
SummonError, match=r"More than one object with name 'sum'"
):
api.summon("sum", repo=repo_url, summon_file="dup.yaml")

with pytest.raises(SummonError, match=r"extra keys not allowed"):
api.summon("sum", repo=repo_url, summon_file="invalid.yaml")

Expand Down