diff --git a/src/scmrepo/git/__init__.py b/src/scmrepo/git/__init__.py index bbdb9c81..f33fede8 100644 --- a/src/scmrepo/git/__init__.py +++ b/src/scmrepo/git/__init__.py @@ -330,13 +330,6 @@ def fetch_refspecs( progress: Optional[Callable[["GitProgressEvent"], None]] = None, **kwargs, ) -> typing.Mapping[str, SyncStatus]: - from .credentials import get_matching_helper_commands - - if "dulwich" in kwargs.get("backends", self.backends.backends) and any( - get_matching_helper_commands(url, self.dulwich.repo.get_config_stack()) - ): - kwargs["backends"] = ["dulwich"] - return self._fetch_refspecs( url, refspecs, diff --git a/src/scmrepo/git/backend/dulwich/client.py b/src/scmrepo/git/backend/dulwich/client.py index 616553bf..d5f678e5 100644 --- a/src/scmrepo/git/backend/dulwich/client.py +++ b/src/scmrepo/git/backend/dulwich/client.py @@ -1,4 +1,3 @@ -import os from typing import Any, Dict, Optional from dulwich.client import HTTPUnauthorized, Urllib3HttpGitClient @@ -22,21 +21,7 @@ def __init__( config=config, **kwargs, ) - self._store_credentials: Optional["Credential"] = None - if not username: - import base64 - - try: - creds = Credential(url=base_url).fill() - except CredentialNotFoundError: - return - encoded = base64.b64encode( - f"{creds.username}:{creds.password}".encode() - ).decode("ascii") - basic_auth = {"authorization": f"Basic {encoded}"} - self.pool_manager.headers.update(basic_auth) - self._store_credentials = creds def _http_request( self, @@ -60,8 +45,6 @@ def _http_request( return result def _get_auth(self) -> Dict[str, str]: - from getpass import getpass - from urllib3.util import make_headers try: @@ -69,20 +52,4 @@ def _get_auth(self) -> Dict[str, str]: self._store_credentials = creds return make_headers(basic_auth=f"{creds.username}:{creds.password}") except CredentialNotFoundError: - pass - - if os.environ.get("GIT_TERMINAL_PROMPT") == "0": - return {} - - try: - if self._username: - username = self._username - else: - username = input(f"Username for '{self._base_url}': ") - if self._password: - password = self._password - else: - password = getpass(f"Password for '{self._base_url}': ") - return make_headers(basic_auth=f"{username}:{password}") - except KeyboardInterrupt: return {} diff --git a/src/scmrepo/git/backend/pygit2/callbacks.py b/src/scmrepo/git/backend/pygit2/callbacks.py index 8f6f4680..5ddf5ed1 100644 --- a/src/scmrepo/git/backend/pygit2/callbacks.py +++ b/src/scmrepo/git/backend/pygit2/callbacks.py @@ -1,4 +1,3 @@ -import os from contextlib import AbstractContextManager from types import TracebackType from typing import TYPE_CHECKING, Callable, Optional, Type, Union @@ -45,8 +44,6 @@ def sideband_progress(self, string: str): def credentials( self, url: str, username_from_url: Optional[str], allowed_types: int ) -> "_Pygit2Credential": - from getpass import getpass - from pygit2 import GitError, Passthrough from pygit2.credentials import GIT_CREDENTIAL_USERPASS_PLAINTEXT, UserPass @@ -59,23 +56,11 @@ def credentials( if self._store_credentials: creds = self._store_credentials else: - Credential(username=username_from_url, url=url).fill() + creds = Credential(username=username_from_url, url=url).fill() self._store_credentials = creds return UserPass(creds.username, creds.password) except CredentialNotFoundError: pass - - if os.environ.get("GIT_TERMINAL_PROMPT") != "0": - try: - if username_from_url: - username = username_from_url - else: - username = input(f"Username for '{url}': ") - password = getpass(f"Password for '{url}': ") - if username and password: - return UserPass(username, password) - except KeyboardInterrupt: - pass raise Passthrough def _approve_credentials(self): diff --git a/src/scmrepo/git/credentials.py b/src/scmrepo/git/credentials.py index d74988f2..963c1c07 100644 --- a/src/scmrepo/git/credentials.py +++ b/src/scmrepo/git/credentials.py @@ -33,7 +33,18 @@ import shutil import subprocess # nosec B404 import sys -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Union +from abc import ABC, abstractmethod +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + NamedTuple, + Optional, + Tuple, + Union, +) from urllib.parse import urlparse, urlunparse from dulwich.config import StackedConfig @@ -54,17 +65,38 @@ class CredentialNotFoundError(SCMError): """Error occurred while retrieving credentials/no credentials available.""" -class CredentialHelper: - """Helper for retrieving credentials for http/https git remotes +class CredentialHelper(ABC): + """Base git-credential helper.""" + + @abstractmethod + def get(self, **kwargs) -> "Credential": + """Get a matching credential from this helper. + + Raises: + CredentialNotFoundError: No matching credential was found. + """ + + @abstractmethod + def store(self, **kwargs): + """Store the credential, if applicable to the helper""" + + @abstractmethod + def erase(self, **kwargs): + """Remove a matching credential, if any, from the helper’s storage""" + + +class GitCredentialHelper(CredentialHelper): + """Helper for retrieving credentials through git-credential- commands. Usage: - >>> helper = CredentialHelper("store") # Use `git credential-store` + >>> helper = GitCredentialHelper("store") # Use `git credential-store` >>> credentials = helper.get("https://github.com/dtrifiro/aprivaterepo") >>> username = credentials["username"] >>> password = credentials["password"] """ def __init__(self, command: str): + super().__init__() self._command = command self._run_kwargs: Dict[str, Any] = {} if self._command[0] == "!": @@ -102,10 +134,7 @@ def _prepare_command(self, action: Optional[str] = None) -> Union[str, List[str] return [executable, *argv[1:]] - def get( - self, - **kwargs, - ) -> "Credential": + def get(self, **kwargs) -> "Credential": if kwargs.get("protocol", kwargs.get("hostname")) is None: raise ValueError("One of protocol, hostname must be provided") cmd = self._prepare_command("get") @@ -175,26 +204,111 @@ def erase(self, **kwargs): except FileNotFoundError: logger.debug("Helper not found", exc_info=True) + @staticmethod + def get_matching_commands( + base_url: str, config: Optional[Union["ConfigDict", "StackedConfig"]] = None + ): + config = config or StackedConfig.default() + if isinstance(config, StackedConfig): + backends: Iterable["ConfigDict"] = config.backends + else: + backends = [config] + + for conf in backends: + # We will try to match credential sections' url with the given url, + # falling back to the generic section if there's no match + for section in urlmatch_credential_sections(conf, base_url): + try: + command = conf.get(section, "helper") + except KeyError: + # no helper configured + continue + yield command.decode(conf.encoding or sys.getdefaultencoding()) + + +class _CredentialKey(NamedTuple): + protocol: str + host: Optional[str] + path: Optional[str] -def get_matching_helper_commands( - base_url: str, config: Optional[Union["ConfigDict", "StackedConfig"]] = None -): - config = config or StackedConfig.default() - if isinstance(config, StackedConfig): - backends: Iterable["ConfigDict"] = config.backends - else: - backends = [config] - for conf in backends: - # We will try to match credential sections' url with the given url, - # falling back to the generic section if there's no match - for section in urlmatch_credential_sections(conf, base_url): +class MemoryCredentialHelper(CredentialHelper): + """Memory credential helper that supports optional interactive input.""" + + def __init__(self): + super().__init__() + self._credentials: Dict["_CredentialKey", "Credential"] = {} + + def get(self, *, interactive: bool = False, **kwargs) -> "Credential": + """Get a matching credential from this helper. + + Raises: + CredentialNotFoundError: No matching credential was found. + """ + from getpass import getpass + + key = self._key(**kwargs) + if key.path: + try_keys = [key, _CredentialKey(key.protocol, key.host, None)] + else: + try_keys = [key] + for try_key in try_keys: try: - command = conf.get(section, "helper") + return self._credentials[try_key] except KeyError: - # no helper configured - continue - yield command.decode(conf.encoding or sys.getdefaultencoding()) + pass + if not interactive or os.environ.get("GIT_TERMINAL_PROMPT") == "0": + raise CredentialNotFoundError("Interactive input is disabled") + + scheme = f"{key.protocol}://" if key.protocol else "" + netloc = f"{key.host}" if key.host else "" + url = f"{scheme}{netloc}" + try: + username = kwargs.get("username", "") + if not username: + username = input(f"Username for '{url}': ") + password = kwargs.get("password", "") + if not password: + url = f"{scheme}{username}@{netloc}" + password = getpass(f"Password for '{url}': ") + except KeyboardInterrupt: + raise CredentialNotFoundError("User cancelled prompt") + return Credential( + protocol=key.protocol, + host=key.host, + path=key.path, + username=username, + password=password, + memory_only=True, + ) + + def store(self, **kwargs): + """Store the credential, if applicable to the helper""" + cred = Credential(**kwargs) + cred.memory_only = True + key = self._key(**kwargs) + self._credentials[key] = cred + + def erase(self, **kwargs): + """Remove a matching credential, if any, from the helper’s storage""" + key = self._key(**kwargs) + try: + del self._credentials[key] + except KeyError: + pass + + @staticmethod + def _key( + *, + protocol: str = "", + host: Optional[str] = None, + path: Optional[str] = None, + **kwargs, + ) -> _CredentialKey: + return _CredentialKey(protocol, host, path) + + +memory_helper = MemoryCredentialHelper() class Credential: @@ -232,6 +346,7 @@ def __init__( password: Optional[str] = None, password_expiry_utc: Optional[int] = None, url: Optional[str] = None, + memory_only: bool = False, ): self.protocol = protocol self.host = host @@ -239,6 +354,7 @@ def __init__( self.username = username self.password = password self.password_expiry_utc = password_expiry_utc + self.memory_only = memory_only self._approved = False if url: parsed = urlparse(url) @@ -281,28 +397,43 @@ def _helper_kwargs(self) -> Dict[str, str]: def helpers(self) -> List["CredentialHelper"]: url = self.url return [ - CredentialHelper(command) for command in get_matching_helper_commands(url) + GitCredentialHelper(command) + for command in GitCredentialHelper.get_matching_commands(url) ] def fill(self) -> "Credential": """Return a new credential with filled username and password.""" + try: + return memory_helper.get(interactive=False, **self._helper_kwargs) + except CredentialNotFoundError: + pass + for helper in self.helpers: try: return helper.get(**self._helper_kwargs) except CredentialNotFoundError: continue + + try: + return memory_helper.get(interactive=True, **self._helper_kwargs) + except CredentialNotFoundError: + pass + raise CredentialNotFoundError(f"No available credentials for '{self.url}'") def approve(self): """Store this credential in available helpers.""" if self._approved or not (self.username and self.password): return - for helper in self.helpers: - helper.store(**self._helper_kwargs) + if not self.memory_only: + for helper in self.helpers: + helper.store(**self._helper_kwargs) + memory_helper.store(**self._helper_kwargs) self._approved = True def reject(self): """Remove this credential from available helpers.""" for helper in self.helpers: helper.erase(**self._helper_kwargs) + memory_helper.erase(**self._helper_kwargs) self._approved = False