diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..b019f32b --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[run] +branch = True +source = scmrepo + +[report] +exclude_lines = + if TYPE_CHECKING: diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 0216a76a..9371bce9 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -2,9 +2,10 @@ name: Tests on: push jobs: tests: - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} strategy: matrix: + os: [ubuntu-20.04, windows-latest, macos-latest] python-version: ['3.7', '3.8', '3.9', '3.10'] name: Python ${{ matrix.python-version }} steps: diff --git a/pyproject.toml b/pyproject.toml index 47251167..7a86fdeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,11 @@ profile = "black" known_first_party = ["scmrepo"] line_length = 79 +[tool.pytest.ini_options] +markers = [ + "skip_git_backend: skip tests for given backend", +] + [tool.mypy] # Error output show_column_numbers = true @@ -37,7 +42,7 @@ check_untyped_defs = false warn_no_return = true warn_redundant_casts = true warn_unreachable = true -files = ["scmrepo"] +files = ["scmrepo", "tests"] [[tool.mypy.overrides]] module = [ @@ -59,7 +64,7 @@ extension-pkg-whitelist = ["pygit2"] disable = [ "format", "refactoring", "design", "no-self-use", "invalid-name", "duplicate-code", "missing-function-docstring", "missing-module-docstring", "missing-class-docstring", - "raise-missing-from", "import-outside-toplevel", "cyclic-import" + "raise-missing-from", "import-outside-toplevel", "cyclic-import", "fixme", ] enable = ["c-extension-no-member", "no-else-return"] diff --git a/scmrepo/git/__init__.py b/scmrepo/git/__init__.py index c4459cbf..7c738601 100644 --- a/scmrepo/git/__init__.py +++ b/scmrepo/git/__init__.py @@ -6,7 +6,7 @@ from collections.abc import Mapping from contextlib import contextmanager from functools import partialmethod -from typing import Dict, Iterable, Optional, Tuple, Type +from typing import Dict, Iterable, Optional, Tuple, Type, Union from funcy import cached_property, first from pathspec.patterns import GitWildMatchPattern @@ -260,6 +260,29 @@ def get_fs(self, rev: str): return GitFileSystem(scm=self, rev=rev) + @classmethod + def init( + cls, path: str, bare: bool = False, _backend: str = None + ) -> "Git": + for name, backend in GitBackends.DEFAULT.items(): + if _backend and name != _backend: + continue + try: + backend.init(path, bare=bare) + # TODO: reuse created object instead of initializing a new one. + return cls(path) + except NotImplementedError: + pass + raise NoGitBackendError("init") + + def add_commit( + self, + paths: Union[str, Iterable[str]], + message: str, + ) -> None: + self.add(paths) + self.commit(msg=message) + is_ignored = partialmethod(_backend_func, "is_ignored") add = partialmethod(_backend_func, "add") commit = partialmethod(_backend_func, "commit") diff --git a/scmrepo/git/backend/base.py b/scmrepo/git/backend/base.py index 4840eaa9..9877c6da 100644 --- a/scmrepo/git/backend/base.py +++ b/scmrepo/git/backend/base.py @@ -55,6 +55,11 @@ def clone( ): pass + @staticmethod + @abstractmethod + def init(path: str, bare: bool = False) -> None: + pass + @property @abstractmethod def dir(self) -> str: diff --git a/scmrepo/git/backend/dulwich/__init__.py b/scmrepo/git/backend/dulwich/__init__.py index c08042fa..fdcbc364 100644 --- a/scmrepo/git/backend/dulwich/__init__.py +++ b/scmrepo/git/backend/dulwich/__init__.py @@ -139,6 +139,12 @@ def clone( ): raise NotImplementedError + @staticmethod + def init(path: str, bare: bool = False) -> None: + from dulwich.porcelain import init + + init(path, bare=bare) + @property def dir(self) -> str: return self.repo.commondir() diff --git a/scmrepo/git/backend/gitpython.py b/scmrepo/git/backend/gitpython.py index 678372da..60cbc963 100644 --- a/scmrepo/git/backend/gitpython.py +++ b/scmrepo/git/backend/gitpython.py @@ -202,6 +202,21 @@ def clone( ) ) from exc + @staticmethod + def init(path: str, bare: bool = False) -> None: + from funcy import retry + from git import Repo + from git.exc import GitCommandNotFound + + # NOTE: handles EAGAIN error on BSD systems (osx in our case). + # Otherwise when running tests you might get this exception: + # + # GitCommandNotFound: Cmd('git') not found due to: + # OSError('[Errno 35] Resource temporarily unavailable') + method = retry(5, GitCommandNotFound)(Repo.init) + git = method(path, bare=bare) + git.close() + @staticmethod def is_sha(rev): import git diff --git a/scmrepo/git/backend/pygit2.py b/scmrepo/git/backend/pygit2.py index 5b7a1a2f..b7e2990f 100644 --- a/scmrepo/git/backend/pygit2.py +++ b/scmrepo/git/backend/pygit2.py @@ -156,6 +156,12 @@ def clone( ): raise NotImplementedError + @staticmethod + def init(path: str, bare: bool = False) -> None: + from pygit2 import init_repository + + init_repository(path, bare=bare) + @property def dir(self) -> str: raise NotImplementedError diff --git a/setup.cfg b/setup.cfg index 2a7b355a..90789736 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,11 +37,19 @@ install_requires= dev = pytest==6.2.5 pytest-sugar==0.9.4 + pytest-test-utils==0.0.5 + pytest-cov==3.0.0 + pytest-mock==3.6.1 pylint==2.11.1 mypy==0.910 types-certifi==2021.10.8.0 types-paramiko==2.8.1 +[options.packages.find] +exclude = + tests + tests.* + [flake8] ignore= E203, # Whitespace before ':' diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..33d956f6 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,46 @@ +import os +import sys + +import pygit2 +import pytest +from pytest_test_utils import TempDirFactory, TmpDir + +from scmrepo.git import Git + + +@pytest.fixture(autouse=True) +def isolate(tmp_dir_factory: TempDirFactory, monkeypatch: pytest.MonkeyPatch): + path = tmp_dir_factory.mktemp("mock") + home_dir = path / "home" + home_dir.mkdir() + + if sys.platform == "win32": + home_drive, home_path = os.path.splitdrive(home_dir) + monkeypatch.setenv("USERPROFILE", str(home_dir)) + monkeypatch.setenv("HOMEDRIVE", home_drive) + monkeypatch.setenv("HOMEPATH", home_path) + else: + monkeypatch.setenv("HOME", str(home_dir)) + + monkeypatch.setenv("GIT_CONFIG_NOSYSTEM", "1") + contents = b""" +[user] +name=DVC Tester +email=dvctester@example.com +[init] +defaultBranch=master +""" + (home_dir / ".gitconfig").write_bytes(contents) + pygit2.settings.search_path[pygit2.GIT_CONFIG_LEVEL_GLOBAL] = str(home_dir) + + +@pytest.fixture +def scm(tmp_dir: TmpDir): + git_ = Git.init(tmp_dir) + sig = git_.pygit2.default_signature + + assert sig.email == "dvctester@example.com" + assert sig.name == "DVC Tester" + + yield git_ + git_.close() diff --git a/tests/test_dulwich.py b/tests/test_dulwich.py new file mode 100644 index 00000000..fce0c20d --- /dev/null +++ b/tests/test_dulwich.py @@ -0,0 +1,28 @@ +import pytest +from pytest_mock import MockerFixture + + +@pytest.mark.parametrize( + "algorithm", [b"ssh-rsa", b"rsa-sha2-256", b"rsa-sha2-512"] +) +def test_dulwich_github_compat(mocker: MockerFixture, algorithm: bytes): + from asyncssh.misc import ProtocolError + + from scmrepo.git.backend.dulwich.asyncssh_vendor import ( + _process_public_key_ok_gh, + ) + + key_data = b"foo" + auth = mocker.Mock( + _keypair=mocker.Mock(algorithm=algorithm, public_data=key_data), + ) + packet = mocker.Mock() + + with pytest.raises(ProtocolError): + strings = iter((b"ed21556", key_data)) + packet.get_string = lambda: next(strings) + _process_public_key_ok_gh(auth, None, None, packet) + + strings = iter((b"ssh-rsa", key_data)) + packet.get_string = lambda: next(strings) + _process_public_key_ok_gh(auth, None, None, packet) diff --git a/tests/test_git.py b/tests/test_git.py new file mode 100644 index 00000000..58b520fc --- /dev/null +++ b/tests/test_git.py @@ -0,0 +1,609 @@ +import os +from typing import Iterator, Type + +import pytest +from pytest_test_utils import TempDirFactory, TmpDir +from pytest_test_utils.matchers import Matcher + +from scmrepo.exceptions import MergeConflictError, RevError, SCMError +from scmrepo.git import Git + +# pylint: disable=redefined-outer-name,unused-argument + + +@pytest.fixture(params=["gitpython", "dulwich", "pygit2"]) +def git_backend(request) -> str: + marker = request.node.get_closest_marker("skip_git_backend") + to_skip = marker.args if marker else [] + + backend = request.param + if backend in to_skip: + pytest.skip() + return backend + + +@pytest.fixture +def git(tmp_dir: TmpDir, git_backend: str) -> Iterator[Git]: + git_ = Git(tmp_dir, backends=[git_backend]) + yield git_ + git_.close() + + +@pytest.fixture +def remote_git_dir(tmp_dir_factory: TempDirFactory): + git_dir = tmp_dir_factory.mktemp("git-remote") + remote_git = Git.init(git_dir) + remote_git.close() + return git_dir + + +@pytest.mark.parametrize("backend", ["gitpython", "dulwich", "pygit2"]) +def test_git_init(tmp_dir: TmpDir, backend: str): + Git.init(".", _backend=backend) + assert (tmp_dir / ".git").is_dir() + Git(tmp_dir) + + +@pytest.mark.parametrize("backend", ["gitpython", "dulwich", "pygit2"]) +def test_git_init_bare(tmp_dir: TmpDir, backend: str): + Git.init(".", bare=True, _backend=backend) + assert list(tmp_dir.iterdir()) + Git(tmp_dir) + + +@pytest.mark.parametrize( + "path, expected", + [ + (os.path.join("path", "to", ".gitignore"), True), + (os.path.join("path", "to", ".git", "internal", "file"), True), + (os.path.join("some", "non-.git", "file"), False), + ], + ids=["gitignore_file", "git_internal_file", "non_git_file"], +) +def test_belongs_to_scm(scm: Git, git: Git, path: str, expected: str): + assert git.belongs_to_scm(path) == expected + + +def test_walk_with_submodules( + tmp_dir: Git, + scm: Git, + remote_git_dir: TmpDir, +): + remote_git = Git(remote_git_dir) + remote_git_dir.gen({"foo": "foo", "bar": "bar", "dir": {"data": "data"}}) + remote_git.add_commit(["foo", "bar", "dir"], message="add dir and files") + scm.gitpython.repo.create_submodule( + "submodule", "submodule", url=os.fspath(remote_git_dir) + ) + scm.commit("added submodule") + + files = [] + dirs = [] + fs = scm.get_fs("HEAD") + for _, dnames, fnames in fs.walk("."): + dirs.extend(dnames) + files.extend(fnames) + + # currently we don't walk through submodules + assert not dirs + assert set(files) == {".gitmodules", "submodule"} + + +def test_walk_onerror(tmp_dir: TmpDir, scm: Git): + def onerror(exc): + raise exc + + tmp_dir.gen("foo", "foo") + scm.add_commit("foo", message="init") + + fs = scm.get_fs("HEAD") + + # path does not exist + for _ in fs.walk("dir"): + pass + with pytest.raises(OSError): + for _ in fs.walk("dir", onerror=onerror): + pass + + # path is not a directory + for _ in fs.walk("foo"): + pass + with pytest.raises(OSError): + for _ in fs.walk("foo", onerror=onerror): + pass + + +@pytest.mark.skip_git_backend("pygit2") +def test_is_tracked(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen( + { + "tracked": "tracked", + "dir": {"data": "data", "subdir": {"subdata": "subdata"}}, + }, + ) + scm.add_commit(["tracked", "dir"], message="add dirs and files") + tmp_dir.gen({"untracked": "untracked", "dir": {"untracked": "untracked"}}) + + # sanity check + assert (tmp_dir / "untracked").exists() + assert (tmp_dir / "tracked").exists() + assert (tmp_dir / "dir" / "untracked").exists() + assert (tmp_dir / "dir" / "data").exists() + assert (tmp_dir / "dir" / "subdir" / "subdata").exists() + + assert not git.is_tracked("untracked") + assert not git.is_tracked(os.path.join("dir", "untracked")) + + assert git.is_tracked("tracked") + assert git.is_tracked("dir") + assert git.is_tracked(os.path.join("dir", "data")) + assert git.is_tracked(os.path.join("dir", "subdir")) + assert git.is_tracked(os.path.join("dir", "subdir", "subdata")) + + +@pytest.mark.skip_git_backend("pygit2") +def test_is_tracked_unicode(tmp_dir: TmpDir, scm: Git, git: Git): + files = tmp_dir.gen("ṭṝḁḉḵḗḋ", "tracked") + scm.add_commit(files, message="add unicode") + tmp_dir.gen("ṳṋṭṝḁḉḵḗḋ", "untracked") + + assert git.is_tracked("ṭṝḁḉḵḗḋ") + assert not git.is_tracked("ṳṋṭṝḁḉḵḗḋ") + + +@pytest.mark.skip_git_backend("pygit2") +def test_no_commits(tmp_dir: TmpDir, scm: Git, git: Git): + assert git.no_commits + + tmp_dir.gen("foo", "foo") + scm.add_commit(["foo"], message="foo") + + assert not git.no_commits + + +@pytest.mark.skip_git_backend("dulwich") +def test_branch_revs(tmp_dir: TmpDir, scm: Git, git: Git): + def _gen(i: int): + tmp_dir.gen({"file": f"{i}"}) + scm.add_commit("file", message=f"{i}") + return scm.get_rev() + + base, *others = [_gen(i) for i in range(5)] + branch_revs = list(git.branch_revs("master", base))[::-1] + assert branch_revs == others + + +def test_set_ref(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen("file", "0") + scm.add_commit("file", message="init") + init_rev = scm.get_rev() + + tmp_dir.gen({"file": "1"}) + scm.add_commit("file", message="commit") + commit_rev = scm.get_rev() + + git.set_ref("refs/foo/bar", init_rev) + assert ( + init_rev + == (tmp_dir / ".git" / "refs" / "foo" / "bar").read_text().strip() + ) + + with pytest.raises(SCMError): + git.set_ref("refs/foo/bar", commit_rev, old_ref=commit_rev) + git.set_ref("refs/foo/bar", commit_rev, old_ref=init_rev) + assert ( + commit_rev + == (tmp_dir / ".git" / "refs" / "foo" / "bar").read_text().strip() + ) + + git.set_ref("refs/foo/baz", "refs/heads/master", symbolic=True) + assert ( + tmp_dir / ".git" / "refs" / "foo" / "baz" + ).read_text().strip() == "ref: refs/heads/master" + + +def test_get_ref(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen({"file": "0"}) + scm.add_commit("file", message="init") + init_rev = scm.get_rev() + tmp_dir.gen( + { + os.path.join(".git", "refs", "foo", "bar"): init_rev, + os.path.join( + ".git", "refs", "foo", "baz" + ): "ref: refs/heads/master", + } + ) + + assert init_rev == git.get_ref("refs/foo/bar") + assert init_rev == git.get_ref("refs/foo/baz") + assert git.get_ref("refs/foo/baz", follow=False) == "refs/heads/master" + assert git.get_ref("refs/foo/qux") is None + + +def test_remove_ref(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen({"file": "0"}) + scm.add_commit("file", message="init") + init_rev = scm.get_rev() + + tmp_dir.gen(os.path.join(".git", "refs", "foo", "bar"), init_rev) + tmp_dir.gen({"file": "1"}) + scm.add_commit("file", message="commit") + commit_rev = scm.get_rev() + + with pytest.raises(SCMError): + git.remove_ref("refs/foo/bar", old_ref=commit_rev) + git.remove_ref("refs/foo/bar", old_ref=init_rev) + assert not (tmp_dir / ".git" / "refs" / "foo" / "bar").exists() + + +@pytest.mark.skip_git_backend("dulwich") +def test_refs_containing(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen({"file": "0"}) + scm.add_commit("file", message="init") + init_rev = scm.get_rev() + tmp_dir.gen( + { + os.path.join(".git", "refs", "foo", "bar"): init_rev, + os.path.join(".git", "refs", "foo", "baz"): init_rev, + } + ) + + expected = {"refs/foo/bar", "refs/foo/baz", "refs/heads/master"} + assert expected == set(git.get_refs_containing(init_rev)) + + +@pytest.mark.skip_git_backend("pygit2", "gitpython") +@pytest.mark.parametrize("use_url", [True, False]) +def test_push_refspec( + tmp_dir: TmpDir, + scm: Git, + git: Git, + remote_git_dir: TmpDir, + use_url: str, +): + tmp_dir.gen({"file": "0"}) + scm.add_commit("file", message="init") + init_rev = scm.get_rev() + tmp_dir.gen( + { + os.path.join(".git", "refs", "foo", "bar"): init_rev, + os.path.join(".git", "refs", "foo", "baz"): init_rev, + } + ) + + url = f"file://{remote_git_dir.resolve().as_posix()}" + remote_scm = Git(remote_git_dir) + scm.gitpython.repo.create_remote("origin", url) + + with pytest.raises(SCMError): + git.push_refspec("bad-remote", "refs/foo/bar", "refs/foo/bar") + + remote = url if use_url else "origin" + git.push_refspec(remote, "refs/foo/bar", "refs/foo/bar") + assert init_rev == remote_scm.get_ref("refs/foo/bar") + + remote_scm.checkout("refs/foo/bar") + assert init_rev == remote_scm.get_rev() + assert (remote_git_dir / "file").read_text() == "0" + + git.push_refspec(remote, "refs/foo/", "refs/foo/") + assert init_rev == remote_scm.get_ref("refs/foo/baz") + + git.push_refspec(remote, None, "refs/foo/baz") + assert remote_scm.get_ref("refs/foo/baz") is None + + +@pytest.mark.skip_git_backend("pygit2", "gitpython") +def test_fetch_refspecs( + scm: Git, + git: Git, + remote_git_dir: TmpDir, +): + url = f"file://{remote_git_dir.resolve().as_posix()}" + + remote_scm = Git(remote_git_dir) + remote_git_dir.gen("file", "0") + remote_scm.add_commit("file", message="init") + + init_rev = remote_scm.get_rev() + + remote_git_dir.gen( + { + os.path.join(".git", "refs", "foo", "bar"): init_rev, + os.path.join(".git", "refs", "foo", "baz"): init_rev, + } + ) + + git.fetch_refspecs( + url, ["refs/foo/bar:refs/foo/bar", "refs/foo/baz:refs/foo/baz"] + ) + assert init_rev == scm.get_ref("refs/foo/bar") + assert init_rev == scm.get_ref("refs/foo/baz") + + remote_scm.checkout("refs/foo/bar") + assert init_rev == remote_scm.get_rev() + assert (remote_git_dir / "file").read_text() == "0" + + +@pytest.mark.skip_git_backend("dulwich", "pygit2") +def test_list_all_commits( + tmp_dir: TmpDir, scm: Git, git: Git, matcher: Type[Matcher] +): + def _gen(s): + tmp_dir.gen(s, s) + scm.add_commit(s, message=s) + return scm.get_rev() + + rev_a = _gen("a") + rev_b = _gen("b") + scm.tag("tag") + rev_c = _gen("c") + scm.gitpython.git.reset(rev_a, hard=True) + scm.set_ref("refs/foo/bar", rev_c) + + assert git.list_all_commits() == matcher.unordered(rev_a, rev_b) + + +@pytest.mark.skip_git_backend("pygit2") +def test_ignore_remove_empty(tmp_dir: TmpDir, scm: Git, git: Git): + test_entries = [ + {"entry": "/foo1", "path": f"{tmp_dir}/foo1"}, + {"entry": "/foo2", "path": f"{tmp_dir}/foo2"}, + ] + + path_to_gitignore = tmp_dir / ".gitignore" + + with open(path_to_gitignore, "a", encoding="utf-8") as f: + for entry in test_entries: + f.write(entry["entry"] + "\n") + + assert path_to_gitignore.exists() + + git.ignore_remove(test_entries[0]["path"]) + assert path_to_gitignore.exists() + + git.ignore_remove(test_entries[1]["path"]) + assert not path_to_gitignore.exists() + + +@pytest.mark.skip_git_backend("pygit2") +@pytest.mark.skipif( + os.name == "nt", reason="Git hooks not supported on Windows" +) +@pytest.mark.parametrize("hook", ["pre-commit", "commit-msg"]) +def test_commit_no_verify(tmp_dir: TmpDir, scm: Git, git: Git, hook: str): + import stat + + hook_file = os.path.join(".git", "hooks", hook) + tmp_dir.gen(hook_file, "#!/usr/bin/env python\nimport sys\nsys.exit(1)") + os.chmod(hook_file, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) + + tmp_dir.gen("foo", "foo") + git.add(["foo"]) + with pytest.raises(SCMError): + git.commit("commit foo") + git.commit("commit foo", no_verify=True) + + +@pytest.mark.skip_git_backend("dulwich") +@pytest.mark.parametrize("squash", [True, False]) +def test_merge(tmp_dir: TmpDir, scm: Git, git: Git, squash: bool): + tmp_dir.gen("foo", "foo") + scm.add_commit("foo", message="init") + init_rev = scm.get_rev() + + scm.checkout("branch", create_new=True) + tmp_dir.gen("foo", "bar") + scm.add_commit("foo", message="bar") + branch = scm.resolve_rev("branch") + + scm.checkout("master") + + with pytest.raises(MergeConflictError): + tmp_dir.gen("foo", "baz") + scm.add_commit("foo", message="baz") + git.merge(branch, commit=not squash, squash=squash, msg="merge") + + scm.gitpython.git.reset(init_rev, hard=True) + merge_rev = git.merge( + branch, commit=not squash, squash=squash, msg="merge" + ) + assert (tmp_dir / "foo").read_text() == "bar" + if squash: + assert merge_rev is None + assert scm.get_rev() == init_rev + else: + assert scm.get_rev() == merge_rev + + +@pytest.mark.skip_git_backend("dulwich") +def test_checkout_index(tmp_dir: TmpDir, scm: Git, git: Git): + files = tmp_dir.gen({"foo": "foo", "bar": "bar", "dir": {"baz": "baz"}}) + scm.add_commit(files, message="init") + tmp_dir.gen({"foo": "baz", "dir": {"baz": "foo"}}) + + with (tmp_dir / "dir").chdir(): + git.checkout_index([os.path.join("..", "foo"), "baz"], force=True) + assert (tmp_dir / "foo").read_text() == "foo" + assert (tmp_dir / "dir" / "baz").read_text() == "baz" + + tmp_dir.gen({"foo": "baz", "bar": "baz", "dir": {"baz": "foo"}}) + git.checkout_index(force=True) + assert (tmp_dir / "foo").read_text() == "foo" + assert (tmp_dir / "bar").read_text() == "bar" + assert (tmp_dir / "dir" / "baz").read_text() == "baz" + + +@pytest.mark.skip_git_backend("dulwich") +@pytest.mark.parametrize( + "strategy, expected", [("ours", "baz"), ("theirs", "bar")] +) +def test_checkout_index_conflicts( + tmp_dir: TmpDir, scm: Git, git: Git, strategy: str, expected: str +): + tmp_dir.gen({"file": "foo"}) + scm.add_commit("file", message="init") + + scm.checkout("branch", create_new=True) + tmp_dir.gen({"file": "bar"}) + scm.add_commit("file", message="bar") + rev_bar = scm.get_rev() + + scm.checkout("master") + tmp_dir.gen({"file": "baz"}) + scm.add_commit("file", message="baz") + + with pytest.raises(MergeConflictError): + git.merge(rev_bar, commit=False, squash=True) + + git.checkout_index( + ours=strategy == "ours", + theirs=strategy == "theirs", + ) + assert (tmp_dir / "file").read_text() == expected + + +@pytest.mark.skip_git_backend("dulwich") +def test_resolve_rev( + tmp_dir: TmpDir, + scm: Git, + git: Git, + remote_git_dir: TmpDir, +): + url = f"file://{remote_git_dir.resolve().as_posix()}" + scm.gitpython.repo.create_remote("origin", url) + scm.gitpython.repo.create_remote("upstream", url) + + tmp_dir.gen({"file": "0"}) + scm.add_commit("file", message="init") + init_rev = scm.get_rev() + + tmp_dir.gen({"file": "1"}) + scm.add_commit("file", message="1") + rev = scm.get_rev() + + scm.checkout("branch", create_new=True) + tmp_dir.gen( + { + os.path.join(".git", "refs", "foo"): rev, + os.path.join(".git", "refs", "remotes", "origin", "bar"): rev, + os.path.join(".git", "refs", "remotes", "origin", "baz"): rev, + os.path.join( + ".git", "refs", "remotes", "upstream", "baz" + ): init_rev, + } + ) + + assert git.resolve_rev(rev) == rev + assert git.resolve_rev(rev[:7]) == rev + assert git.resolve_rev("HEAD") == rev + assert git.resolve_rev("branch") == rev + assert git.resolve_rev("refs/foo") == rev + assert git.resolve_rev("bar") == rev + assert git.resolve_rev("origin/baz") == rev + + with pytest.raises(RevError): + git.resolve_rev("qux") + + with pytest.raises(RevError): + git.resolve_rev("baz") + + +@pytest.mark.skip_git_backend("dulwich") +def test_checkout(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen({"foo": "foo"}) + scm.add_commit("foo", message="foo") + foo_rev = scm.get_rev() + + tmp_dir.gen("foo", "bar") + scm.add_commit("foo", message="bar") + bar_rev = scm.get_rev() + + git.checkout("branch", create_new=True) + assert git.get_ref("HEAD", follow=False) == "refs/heads/branch" + assert (tmp_dir / "foo").read_text() == "bar" + + git.checkout("master", detach=True) + assert git.get_ref("HEAD", follow=False) == bar_rev + + git.checkout("master") + assert git.get_ref("HEAD", follow=False) == "refs/heads/master" + + git.checkout(foo_rev[:7]) + assert git.get_ref("HEAD", follow=False) == foo_rev + assert (tmp_dir / "foo").read_text() == "foo" + + +@pytest.mark.skip_git_backend("dulwich") +def test_reset(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen({"foo": "foo", "dir": {"baz": "baz"}}) + scm.add_commit(["foo", "dir"], message="init") + + tmp_dir.gen({"foo": "bar", "dir": {"baz": "bar"}}) + scm.add(["foo", os.path.join("dir", "baz")]) + git.reset() + assert (tmp_dir / "foo").read_text() == "bar" + assert (tmp_dir / "dir" / "baz").read_text() == "bar" + staged, unstaged, _ = scm.status() + assert len(staged) == 0 + assert set(unstaged) == {"foo", "dir/baz"} + + scm.add(["foo", os.path.join("dir", "baz")]) + git.reset(hard=True) + assert (tmp_dir / "foo").read_text() == "foo" + assert (tmp_dir / "dir" / "baz").read_text() == "baz" + staged, unstaged, _ = scm.status() + assert len(staged) == 0 + assert len(unstaged) == 0 + + tmp_dir.gen({"foo": "bar", "bar": "bar", "dir": {"baz": "bar"}}) + scm.add(["foo", "bar", os.path.join("dir", "baz")]) + with (tmp_dir / "dir").chdir(): + git.reset(paths=[os.path.join("..", "foo"), os.path.join("baz")]) + assert (tmp_dir / "foo").read_text() == "bar" + assert (tmp_dir / "bar").read_text() == "bar" + assert (tmp_dir / "dir" / "baz").read_text() == "bar" + staged, unstaged, _ = scm.status() + assert len(staged) == 1 + assert len(unstaged) == 2 + + +@pytest.mark.skip_git_backend("pygit2") +def test_add(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen({"foo": "foo", "bar": "bar", "dir": {"baz": "baz"}}) + git.add(["foo", "dir"]) + staged, unstaged, untracked = scm.status() + assert set(staged["add"]) == {"foo", "dir/baz"} + assert len(unstaged) == 0 + assert len(untracked) == 1 + + scm.commit("commit") + tmp_dir.gen({"foo": "bar", "dir": {"baz": "bar"}}) + git.add([], update=True) + staged, unstaged, _ = scm.status() + assert set(staged["modify"]) == {"foo", "dir/baz"} + assert len(unstaged) == 0 + assert len(untracked) == 1 + + scm.reset() + git.add(["dir"], update=True) + staged, unstaged, _ = scm.status() + assert set(staged["modify"]) == {"dir/baz"} + assert len(unstaged) == 1 + assert len(untracked) == 1 + + +@pytest.mark.skip_git_backend("dulwich", "gitpython") +@pytest.mark.skipif(os.name != "nt", reason="Windows only") +def test_checkout_subdir(tmp_dir: TmpDir, scm: Git, git: Git): + tmp_dir.gen("foo", "foo") + scm.add_commit("foo", message="init") + rev = scm.get_rev() + + tmp_dir.gen({"dir": {"bar": "bar"}}) + scm.add_commit("dir", message="dir") + + with (tmp_dir / "dir").chdir(): + git.checkout(rev) + assert not (tmp_dir / "dir" / "bar").exists() diff --git a/tests/test_pygit2.py b/tests/test_pygit2.py new file mode 100644 index 00000000..2dbed34e --- /dev/null +++ b/tests/test_pygit2.py @@ -0,0 +1,32 @@ +import pygit2 +import pytest +from pytest_test_utils import TmpDir + +from scmrepo.git import Git +from scmrepo.git.backend.pygit2 import Pygit2Backend + + +@pytest.mark.parametrize("use_sha", [True, False]) +def test_pygit_resolve_refish(tmp_dir: TmpDir, scm: Git, use_sha: str): + backend = Pygit2Backend(tmp_dir) + tmp_dir.gen("foo", "foo") + scm.add_commit("foo", message="foo") + head = scm.get_rev() + tag = "my_tag" + scm.gitpython.git.tag("-a", tag, "-m", "create annotated tag") + + if use_sha: + # refish will be annotated tag SHA (not commit SHA) + ref = backend.repo.references.get(f"refs/tags/{tag}") + refish = str(ref.target) + else: + refish = tag + + assert refish != head + commit, ref = backend._resolve_refish( # pylint: disable=protected-access + refish + ) + assert isinstance(commit, pygit2.Commit) + assert str(commit.id) == head + if not use_sha: + assert ref.name == f"refs/tags/{tag}" diff --git a/tests/test_scmrepo.py b/tests/test_scmrepo.py index 26c4c99f..3d8eaf12 100644 --- a/tests/test_scmrepo.py +++ b/tests/test_scmrepo.py @@ -1,27 +1,18 @@ -import os +from typing import Type from unittest.mock import MagicMock +from pytest_test_utils import TmpDir +from pytest_test_utils.matchers import Matcher + from scmrepo.git import Git from scmrepo.progress import GitProgressEvent -class ANY: - def __init__(self, expected_type): - self.expected_type = expected_type - - def __repr__(self): - return "Any" + self.expected_type.__name__.capitalize() - - def __eq__(self, other): - return isinstance(other, self.expected_type) - - -def test_clone(tmp_path: os.PathLike): - os.chdir(tmp_path) +def test_clone(tmp_dir: TmpDir, matcher: Type[Matcher]): progress = MagicMock() url = "https://github.com/iterative/dvcyaml-schema" Git.clone(url, "dir", progress=progress) - progress.assert_called_with(ANY(GitProgressEvent)) - assert (tmp_path / "dir").exists() + progress.assert_called_with(matcher.instance_of(GitProgressEvent)) + assert (tmp_dir / "dir").exists()