-
Notifications
You must be signed in to change notification settings - Fork 1.2k
tests: func: convert test_get.py test_utils.py to new fixtures #3014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+176
β178
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,183 +1,193 @@ | ||
| from __future__ import unicode_literals | ||
|
|
||
| import filecmp | ||
| import logging | ||
| import os | ||
|
|
||
| import pytest | ||
|
|
||
| from dvc.cache import Cache | ||
| from dvc.config import Config | ||
| from dvc.exceptions import UrlNotDvcRepoError | ||
| from dvc.repo.get import GetDVCFileError, PathMissingError | ||
| from dvc.repo import Repo | ||
| from dvc.system import System | ||
| from dvc.utils import makedirs | ||
| from dvc.utils.compat import fspath | ||
| from dvc.utils import fspath_py35 | ||
| from tests.utils import trees_equal | ||
|
|
||
|
|
||
| def test_get_repo_file(erepo): | ||
| src = erepo.FOO | ||
| dst = erepo.FOO + "_imported" | ||
| def test_get_repo_file(tmp_dir, erepo_dir, monkeypatch): | ||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.dvc_gen("file", "contents", commit="create file") | ||
|
|
||
| Repo.get(erepo.root_dir, src, dst) | ||
| Repo.get(fspath(erepo_dir), "file", "file_imported") | ||
|
|
||
| assert os.path.exists(dst) | ||
| assert os.path.isfile(dst) | ||
| assert filecmp.cmp(erepo.FOO, dst, shallow=False) | ||
| assert os.path.isfile("file_imported") | ||
| assert (tmp_dir / "file_imported").read_text() == "contents" | ||
|
|
||
|
|
||
| def test_get_repo_dir(erepo): | ||
| src = erepo.DATA_DIR | ||
| dst = erepo.DATA_DIR + "_imported" | ||
| def test_get_repo_dir(tmp_dir, erepo_dir, monkeypatch): | ||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.dvc_gen({"dir": {"file": "contents"}}, commit="create dir") | ||
|
|
||
| Repo.get(erepo.root_dir, src, dst) | ||
| Repo.get(fspath(erepo_dir), "dir", "dir_imported") | ||
|
|
||
| assert os.path.exists(dst) | ||
| assert os.path.isdir(dst) | ||
| trees_equal(src, dst) | ||
| assert os.path.isdir("dir_imported") | ||
| trees_equal(fspath(erepo_dir / "dir"), "dir_imported") | ||
|
|
||
|
|
||
| def test_get_regular_file(erepo): | ||
| def test_get_git_file(tmp_dir, erepo_dir): | ||
| src = "some_file" | ||
| dst = "some_file_imported" | ||
|
|
||
| src_path = os.path.join(erepo.root_dir, src) | ||
| erepo.create(src_path, "hello") | ||
| erepo.dvc.scm.add([src_path]) | ||
| erepo.dvc.scm.commit("add a regular file") | ||
| Repo.get(erepo.root_dir, src, dst) | ||
| erepo_dir.scm_gen({src: "hello"}, commit="add a regular file") | ||
|
|
||
| assert os.path.exists(dst) | ||
| assert os.path.isfile(dst) | ||
| assert filecmp.cmp(src_path, dst, shallow=False) | ||
| Repo.get(fspath(erepo_dir), src, dst) | ||
|
|
||
| assert (tmp_dir / dst).is_file() | ||
| assert (tmp_dir / dst).read_text() == "hello" | ||
|
|
||
| def test_get_regular_dir(erepo): | ||
|
|
||
| def test_get_git_dir(tmp_dir, erepo_dir): | ||
| src = "some_directory" | ||
| dst = "some_directory_imported" | ||
|
|
||
| src_file_path = os.path.join(erepo.root_dir, src, "file.txt") | ||
| erepo.create(src_file_path, "hello") | ||
| erepo.dvc.scm.add([src_file_path]) | ||
| erepo.dvc.scm.commit("add a regular dir") | ||
| Repo.get(erepo.root_dir, src, dst) | ||
| erepo_dir.scm_gen({src: {"file.txt": "hello"}}, commit="add a regular dir") | ||
|
|
||
| Repo.get(fspath(erepo_dir), src, dst) | ||
|
|
||
| assert os.path.exists(dst) | ||
| assert os.path.isdir(dst) | ||
| trees_equal(os.path.join(erepo.root_dir, src), dst) | ||
| assert (tmp_dir / dst).is_dir() | ||
| trees_equal(fspath(erepo_dir / src), fspath(tmp_dir / dst)) | ||
|
|
||
|
|
||
| def test_cache_type_is_properly_overridden(erepo): | ||
| erepo.dvc.config.set( | ||
| Config.SECTION_CACHE, Config.SECTION_CACHE_TYPE, "symlink" | ||
| ) | ||
| erepo.dvc.scm.add([erepo.dvc.config.config_file]) | ||
| erepo.dvc.scm.commit("set cache type to symlinks") | ||
| def test_cache_type_is_properly_overridden(tmp_dir, erepo_dir, monkeypatch): | ||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.dvc.config.set( | ||
| Config.SECTION_CACHE, Config.SECTION_CACHE_TYPE, "symlink" | ||
| ) | ||
| erepo_dir.dvc.cache = Cache(erepo_dir.dvc) | ||
| erepo_dir.scm_add( | ||
| [erepo_dir.dvc.config.config_file], "set cache type to symlinks" | ||
| ) | ||
| erepo_dir.dvc_gen("file", "contents", "create file") | ||
| assert System.is_symlink(erepo_dir / "file") | ||
|
|
||
| src = erepo.FOO | ||
| dst = erepo.FOO + "_imported" | ||
| Repo.get(fspath(erepo_dir), "file", "file_imported") | ||
|
|
||
| Repo.get(erepo.root_dir, src, dst) | ||
| assert not System.is_symlink("file_imported") | ||
| assert (tmp_dir / "file_imported").read_text() == "contents" | ||
|
|
||
| assert not System.is_symlink(dst) | ||
| assert os.path.exists(dst) | ||
| assert os.path.isfile(dst) | ||
|
|
||
| def test_get_repo_rev(tmp_dir, erepo_dir, monkeypatch): | ||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.scm.checkout("new_branch", create_new=True) | ||
| erepo_dir.dvc_gen("file", "contents", commit="create file on branch") | ||
| erepo_dir.scm.checkout("master") | ||
|
|
||
| def test_get_repo_rev(erepo): | ||
| src = "version" | ||
| dst = src | ||
| Repo.get(fspath(erepo_dir), "file", "file_imported", rev="new_branch") | ||
|
|
||
| Repo.get(erepo.root_dir, src, dst, rev="branch") | ||
| assert (tmp_dir / "file_imported").read_text() == "contents" | ||
|
|
||
| assert os.path.exists(dst) | ||
| assert os.path.isfile(dst) | ||
| with open(dst, "r+") as fobj: | ||
| assert fobj.read() == "branch" | ||
|
|
||
| def test_get_from_non_dvc_repo(tmp_dir, erepo_dir): | ||
| erepo_dir.scm.repo.index.remove([erepo_dir.dvc.dvc_dir], r=True) | ||
| erepo_dir.scm.commit("remove dvc") | ||
|
|
||
| def test_get_from_non_dvc_repo(git_erepo): | ||
| with pytest.raises(UrlNotDvcRepoError): | ||
| Repo.get(git_erepo.root_dir, "some_file.zip") | ||
| Repo.get(fspath(erepo_dir), "some_file.zip") | ||
|
|
||
|
|
||
| def test_get_a_dvc_file(erepo): | ||
| def test_get_a_dvc_file(tmp_dir, erepo_dir): | ||
| with pytest.raises(GetDVCFileError): | ||
| Repo.get(erepo.root_dir, "some_file.dvc") | ||
| Repo.get(fspath(erepo_dir), "some_file.dvc") | ||
|
|
||
|
|
||
| # https://github.com/iterative/dvc/pull/2837#discussion_r352123053 | ||
| def test_get_full_dvc_path(erepo): | ||
| external_data_dir = erepo.mkdtemp() | ||
| external_data = os.path.join(external_data_dir, "ext_data") | ||
| with open(external_data, "w+") as fobj: | ||
| fobj.write("ext_data") | ||
|
|
||
| cur_dir = os.getcwd() | ||
| os.chdir(erepo.root_dir) | ||
| erepo.dvc.add(external_data) | ||
| erepo.dvc.scm.add(["ext_data.dvc"]) | ||
| erepo.dvc.scm.commit("add external data") | ||
| os.chdir(cur_dir) | ||
|
|
||
| Repo.get(erepo.root_dir, external_data, "ext_data_imported") | ||
| assert os.path.isfile("ext_data_imported") | ||
| assert filecmp.cmp(external_data, "ext_data_imported", shallow=False) | ||
|
|
||
|
|
||
| def test_non_cached_output(tmp_path, erepo): | ||
| os.chdir(erepo.root_dir) | ||
| erepo.dvc.run( | ||
| outs_no_cache=["non_cached_file"], cmd="echo hello > non_cached_file" | ||
| ) | ||
| erepo.dvc.scm.add(["non_cached_file", "non_cached_file.dvc"]) | ||
| erepo.dvc.scm.commit("add non-cached output") | ||
| os.chdir(fspath_py35(tmp_path)) | ||
| Repo.get(erepo.root_dir, "non_cached_file") | ||
|
|
||
| src = os.path.join(erepo.root_dir, "non_cached_file") | ||
| assert os.path.isfile("non_cached_file") | ||
| assert filecmp.cmp(src, "non_cached_file", shallow=False) | ||
| def test_get_full_dvc_path(tmp_dir, erepo_dir, tmp_path_factory, monkeypatch): | ||
| path = tmp_path_factory.mktemp("ext") | ||
| external_data = path / "ext_data" | ||
| external_data.write_text("ext_data") | ||
|
|
||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.dvc.add(fspath(external_data)) | ||
| erepo_dir.scm_add(["ext_data.dvc"], commit="add external data") | ||
|
|
||
| Repo.get(fspath(erepo_dir), fspath(external_data), "ext_data_imported") | ||
| assert (tmp_dir / "ext_data_imported").is_file() | ||
| assert (tmp_dir / "ext_data_imported").read_text() == "ext_data" | ||
|
|
||
|
|
||
| def test_non_cached_output(tmp_dir, erepo_dir, monkeypatch): | ||
| src = "non_cached_file" | ||
| dst = src + "_imported" | ||
|
|
||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.dvc.run( | ||
| outs_no_cache=[src], cmd="echo hello > non_cached_file" | ||
| ) | ||
| erepo_dir.scm.add([src, src + ".dvc"]) | ||
| erepo_dir.scm.commit("add non-cached output") | ||
|
|
||
| Repo.get(fspath(erepo_dir), src, dst) | ||
|
|
||
| assert (tmp_dir / dst).is_file() | ||
| # NOTE: using strip() to account for `echo` differences on win and *nix | ||
| assert (tmp_dir / dst).read_text().strip() == "hello" | ||
|
|
||
|
|
||
| # https://github.com/iterative/dvc/pull/2837#discussion_r352123053 | ||
| def test_absolute_file_outside_repo(erepo): | ||
| def test_absolute_file_outside_repo(tmp_dir, erepo_dir): | ||
efiop marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| with pytest.raises(PathMissingError): | ||
| Repo.get(erepo.root_dir, "/root/") | ||
| Repo.get(fspath(erepo_dir), "/root/") | ||
|
|
||
|
|
||
| def test_unknown_path(erepo): | ||
| def test_unknown_path(tmp_dir, erepo_dir): | ||
| with pytest.raises(PathMissingError): | ||
| Repo.get(erepo.root_dir, "a_non_existing_file") | ||
| Repo.get(fspath(erepo_dir), "a_non_existing_file") | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("dname", [".", "dir", "dir/subdir"]) | ||
| def test_get_to_dir(dname, erepo): | ||
| src = erepo.FOO | ||
| def test_get_to_dir(tmp_dir, erepo_dir, monkeypatch, dname): | ||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.dvc_gen("file", "contents", commit="create file") | ||
|
|
||
| makedirs(dname, exist_ok=True) | ||
|
|
||
| Repo.get(erepo.root_dir, src, dname) | ||
| Repo.get(fspath(erepo_dir), "file", dname) | ||
|
|
||
| dst = os.path.join(dname, os.path.basename(src)) | ||
| assert (tmp_dir / dname).is_dir() | ||
| assert (tmp_dir / dname / "file").read_text() == "contents" | ||
|
|
||
| assert os.path.isdir(dname) | ||
| assert filecmp.cmp(erepo.FOO, dst, shallow=False) | ||
|
|
||
| def test_get_from_non_dvc_master( | ||
| tmp_dir, erepo_dir, tmp_path, monkeypatch, caplog | ||
| ): | ||
| with monkeypatch.context() as m: | ||
| m.chdir(fspath(erepo_dir)) | ||
| erepo_dir.scm.checkout("new_branch", create_new=True) | ||
| erepo_dir.scm_gen( | ||
| {"some_file": "some_contents"}, commit="create some file" | ||
| ) | ||
| erepo_dir.scm.checkout("master") | ||
|
|
||
| def test_get_from_non_dvc_master(erepo, tmp_path, monkeypatch, caplog): | ||
| monkeypatch.chdir(fspath(tmp_path)) | ||
| erepo.dvc.scm.repo.index.remove([".dvc"], r=True) | ||
| erepo.dvc.scm.commit("remove .dvc") | ||
| erepo_dir.dvc.scm.repo.index.remove([".dvc"], r=True) | ||
| erepo_dir.dvc.scm.commit("remove .dvc") | ||
|
|
||
| # sanity check | ||
| with pytest.raises(UrlNotDvcRepoError): | ||
| Repo.get(fspath(erepo_dir), "some_file") | ||
|
|
||
| caplog.clear() | ||
| imported_file = "foo_imported" | ||
| dst = "file_imported" | ||
| with caplog.at_level(logging.INFO, logger="dvc"): | ||
| Repo.get(erepo._root_dir, erepo.FOO, out=imported_file, rev="branch") | ||
| Repo.get(fspath(erepo_dir), "some_file", out=dst, rev="new_branch") | ||
|
|
||
| assert caplog.text == "" | ||
efiop marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| assert filecmp.cmp( | ||
| os.path.join(erepo._root_dir, erepo.FOO), imported_file, shallow=False | ||
| ) | ||
| assert (tmp_dir / dst).read_text() == "some_contents" | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
π