diff --git a/src/scmrepo/git/backend/dulwich/client.py b/src/scmrepo/git/backend/dulwich/client.py index a15d481c..82f6ddad 100644 --- a/src/scmrepo/git/backend/dulwich/client.py +++ b/src/scmrepo/git/backend/dulwich/client.py @@ -1,10 +1,8 @@ -# Temporarily added while waiting for upstream PR to be merged. -# See https://github.com/jelmer/dulwich/pull/976 +from typing import Optional from dulwich.client import Urllib3HttpGitClient -from dulwich.config import StackedConfig -from scmrepo.git.credentials import CredentialNotFoundError, get_credentials_from_helper +from scmrepo.git.credentials import Credential, CredentialNotFoundError class GitCredentialsHTTPClient(Urllib3HttpGitClient): # pylint: disable=abstract-method @@ -24,17 +22,23 @@ def __init__( **kwargs, ) + self._store_credentials: Optional["Credential"] = None if not username: + import base64 + try: - helper_username, helper_password = get_credentials_from_helper( - base_url, config or StackedConfig.default() - ) + creds = Credential(url=base_url).fill() except CredentialNotFoundError: - pass - else: - credentials = helper_username + b":" + helper_password - import base64 + return + encoded = base64.b64encode( + f"{creds.username}:{creds.password}".encode() + ).decode("ascii") + basic_auth = {"authorization": f"Basic {encoded}"} + self.pool_manager.headers.update(basic_auth) + self._store_credentials = creds - encoded = base64.b64encode(credentials).decode("ascii") - basic_auth = {"authorization": f"Basic {encoded}"} - self.pool_manager.headers.update(basic_auth) + def _http_request(self, *args, **kwargs): + result = super()._http_request(*args, **kwargs) + if self._store_credentials is not None: + self._store_credentials.approve() + return result diff --git a/src/scmrepo/git/credentials.py b/src/scmrepo/git/credentials.py index f5b5abde..d74988f2 100644 --- a/src/scmrepo/git/credentials.py +++ b/src/scmrepo/git/credentials.py @@ -26,21 +26,31 @@ Currently Dulwich supports only the `get` operation """ +import locale +import logging import os import shlex import shutil import subprocess # nosec B404 import sys -from typing import Any, Dict, List, Optional, Tuple, Union -from urllib.parse import urlparse +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Union +from urllib.parse import urlparse, urlunparse from dulwich.config import StackedConfig from dulwich.credentials import urlmatch_credential_sections +from funcy import cached_property + +from scmrepo.exceptions import SCMError + +if TYPE_CHECKING: + from dulwich.config import ConfigDict + +logger = logging.getLogger(__name__) SectionLike = Union[bytes, str, Tuple[Union[bytes, str], ...]] -class CredentialNotFoundError(Exception): +class CredentialNotFoundError(SCMError): """Error occurred while retrieving credentials/no credentials available.""" @@ -60,10 +70,11 @@ def __init__(self, command: str): if self._command[0] == "!": # On Windows this will only work in git-bash and/or WSL2 self._run_kwargs["shell"] = True + self._encoding = locale.getpreferredencoding() - def _prepare_command(self) -> Union[str, List[str]]: + def _prepare_command(self, action: Optional[str] = None) -> Union[str, List[str]]: if self._command[0] == "!": - return self._command[1:] + return self._command[1:] + (f" {action}" if action else "") if sys.platform != "win32": argv = shlex.split(self._command) @@ -72,6 +83,8 @@ def _prepare_command(self) -> Union[str, List[str]]: # join arguments when providing a list, so we can just split # using whitespace. argv = self._command.split() + if action: + argv.append(action) if os.path.isabs(argv[0]): return argv @@ -91,31 +104,12 @@ def _prepare_command(self) -> Union[str, List[str]]: def get( self, - *, - protocol: Optional[str] = None, - hostname: Optional[str] = None, - port: Optional[int] = None, - username: Optional[str] = None, - ) -> Tuple[bytes, bytes]: - cmd = self._prepare_command() - if isinstance(cmd, str): - cmd += " get" - else: - cmd.append("get") - - helper_input = [] - if protocol: - helper_input.append(f"protocol={protocol}") - if hostname: - helper_input.append( - f"host={hostname}{':' + str(port) if port is not None else ''}" - ) - if username: - helper_input.append(f"username={username}") - - if not helper_input: + **kwargs, + ) -> "Credential": + if kwargs.get("protocol", kwargs.get("hostname")) is None: raise ValueError("One of protocol, hostname must be provided") - + cmd = self._prepare_command("get") + helper_input = [f"{key}={value}" for key, value in kwargs.items()] helper_input.append("") try: @@ -123,45 +117,71 @@ def get( cmd, check=True, capture_output=True, - input=os.linesep.join(helper_input).encode("ascii"), + input=os.linesep.join(helper_input), + encoding=self._encoding, **self._run_kwargs, ) except subprocess.CalledProcessError as exc: raise CredentialNotFoundError(exc.stderr) from exc except FileNotFoundError as exc: raise CredentialNotFoundError("Helper not found") from exc + if res.stderr: + logger.debug(res.stderr) credentials = {} for line in res.stdout.strip().splitlines(): try: - key, value = line.split(b"=") + key, value = line.split("=") credentials[key] = value except ValueError: continue + return Credential(**credentials) - if not all( - ( - credentials, - b"username" in credentials, - b"password" in credentials, - ) - ): - raise CredentialNotFoundError("Could not get credentials from helper") - - return credentials[b"username"], credentials[b"password"] - - def store(self, *args, **kwargs): + def store(self, **kwargs): """Store the credential, if applicable to the helper""" - raise NotImplementedError + cmd = self._prepare_command("store") + helper_input = [f"{key}={value}" for key, value in kwargs.items()] + helper_input.append("") + + try: + res = subprocess.run( # type: ignore # nosec B603 # pylint: disable=W1510 + cmd, + capture_output=True, + input=os.linesep.join(helper_input), + encoding=self._encoding, + **self._run_kwargs, + ) + if res.stderr: + logger.debug(res.stderr) + except FileNotFoundError: + logger.debug("Helper not found", exc_info=True) - def erase(self, *args, **kwargs): + def erase(self, **kwargs): """Remove a matching credential, if any, from the helper’s storage""" - raise NotImplementedError + cmd = self._prepare_command("erase") + helper_input = [f"{key}={value}" for key, value in kwargs.items()] + helper_input.append("") + + try: + res = subprocess.run( # type: ignore # nosec B603 # pylint: disable=W1510 + cmd, + capture_output=True, + input=os.linesep.join(helper_input), + encoding=self._encoding, + **self._run_kwargs, + ) + if res.stderr: + logger.debug(res.stderr) + except FileNotFoundError: + logger.debug("Helper not found", exc_info=True) -def get_matching_helper_commands(base_url: str, config): +def get_matching_helper_commands( + base_url: str, config: Optional[Union["ConfigDict", "StackedConfig"]] = None +): + config = config or StackedConfig.default() if isinstance(config, StackedConfig): - backends = config.backends + backends: Iterable["ConfigDict"] = config.backends else: backends = [config] @@ -177,19 +197,112 @@ def get_matching_helper_commands(base_url: str, config): yield command.decode(conf.encoding or sys.getdefaultencoding()) -def get_credentials_from_helper(base_url: str, config) -> Tuple[bytes, bytes]: - """Retrieves credentials for the given url from git credential helpers""" +class Credential: + """Git credentials, equivalent to CGit git-credential API. - for command in get_matching_helper_commands(base_url, config): - helper = CredentialHelper(command) - parsed = urlparse(base_url) - try: - return helper.get( - protocol=parsed.scheme, - hostname=parsed.hostname, - port=parsed.port, - username=parsed.username, - ) - except CredentialNotFoundError: - continue - raise CredentialNotFoundError + Usage: + + 1. Generate a credential based on context + + >>> generated = Credential(url="https://github.com/dtrifiro/aprivaterepo") + + 2. Ask git-credential to give username/password for this context + + >>> credential = generated.fill() + + 3. Use the credential from (2) in Git operation + 4. If the operation in (3) was successful, approve it for re-use in subsequent + operations + + >>> credential.approve() + + See also: + https://git-scm.com/docs/git-credential#_typical_use_of_git_credential + https://github.com/git/git/blob/master/credential.h + + """ + + def __init__( + self, + *, + protocol: Optional[str] = None, + host: Optional[str] = None, # host with optional ":" included + path: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + password_expiry_utc: Optional[int] = None, + url: Optional[str] = None, + ): + self.protocol = protocol + self.host = host + self.path = path + self.username = username + self.password = password + self.password_expiry_utc = password_expiry_utc + self._approved = False + if url: + parsed = urlparse(url) + self.protocol = self.protocol or parsed.scheme + if not self.protocol: + raise ValueError("protocol must be specified when using URL") + port = f":{parsed.port}" if parsed.port is not None else "" + hostname = parsed.hostname or "" + self.host = self.host or f"{hostname}{port}" + self.username = self.username or parsed.username + self.password = self.password or parsed.password + + @property + def url(self) -> str: + if self.username or self.password: + username = self.username or "" + password = self.password or "" + netloc = f"{username}:{password}@{self.host}" + else: + netloc = self.host or "" + return urlunparse((self.protocol or "", netloc, self.path or "", "", "", "")) + + @property + def _helper_kwargs(self) -> Dict[str, str]: + kwargs = {} + for attr in ( + "protocol", + "host", + "path", + "username", + "password", + "password_expiry_utc", + ): + value = getattr(self, attr) + if value is not None: + kwargs[attr] = str(value) + return kwargs + + @cached_property + def helpers(self) -> List["CredentialHelper"]: + url = self.url + return [ + CredentialHelper(command) for command in get_matching_helper_commands(url) + ] + + def fill(self) -> "Credential": + """Return a new credential with filled username and password.""" + for helper in self.helpers: + try: + return helper.get(**self._helper_kwargs) + except CredentialNotFoundError: + continue + raise CredentialNotFoundError(f"No available credentials for '{self.url}'") + + def approve(self): + """Store this credential in available helpers.""" + if self._approved or not (self.username and self.password): + return + for helper in self.helpers: + helper.store(**self._helper_kwargs) + self._approved = True + + def reject(self): + """Remove this credential from available helpers.""" + for helper in self.helpers: + helper.erase(**self._helper_kwargs) + self._approved = False