Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/scmrepo/git/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,6 @@ def fetch_refspecs(
progress: Optional[Callable[["GitProgressEvent"], None]] = None,
**kwargs,
) -> typing.Mapping[str, SyncStatus]:
from .credentials import get_matching_helper_commands

if "dulwich" in kwargs.get("backends", self.backends.backends) and any(
get_matching_helper_commands(url, self.dulwich.repo.get_config_stack())
):
kwargs["backends"] = ["dulwich"]

return self._fetch_refspecs(
url,
refspecs,
Expand Down
33 changes: 0 additions & 33 deletions src/scmrepo/git/backend/dulwich/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from typing import Any, Dict, Optional

from dulwich.client import HTTPUnauthorized, Urllib3HttpGitClient
Expand All @@ -22,21 +21,7 @@ def __init__(
config=config,
**kwargs,
)

self._store_credentials: Optional["Credential"] = None
if not username:
import base64

try:
creds = Credential(url=base_url).fill()
except CredentialNotFoundError:
return
encoded = base64.b64encode(
f"{creds.username}:{creds.password}".encode()
).decode("ascii")
basic_auth = {"authorization": f"Basic {encoded}"}
self.pool_manager.headers.update(basic_auth)
self._store_credentials = creds

def _http_request(
self,
Expand All @@ -60,29 +45,11 @@ def _http_request(
return result

def _get_auth(self) -> Dict[str, str]:
from getpass import getpass

from urllib3.util import make_headers

try:
creds = Credential(username=self._username, url=self._base_url).fill()
self._store_credentials = creds
return make_headers(basic_auth=f"{creds.username}:{creds.password}")
except CredentialNotFoundError:
pass

if os.environ.get("GIT_TERMINAL_PROMPT") == "0":
return {}

try:
if self._username:
username = self._username
else:
username = input(f"Username for '{self._base_url}': ")
if self._password:
password = self._password
else:
password = getpass(f"Password for '{self._base_url}': ")
return make_headers(basic_auth=f"{username}:{password}")
except KeyboardInterrupt:
return {}
17 changes: 1 addition & 16 deletions src/scmrepo/git/backend/pygit2/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from contextlib import AbstractContextManager
from types import TracebackType
from typing import TYPE_CHECKING, Callable, Optional, Type, Union
Expand Down Expand Up @@ -45,8 +44,6 @@ def sideband_progress(self, string: str):
def credentials(
self, url: str, username_from_url: Optional[str], allowed_types: int
) -> "_Pygit2Credential":
from getpass import getpass

from pygit2 import GitError, Passthrough
from pygit2.credentials import GIT_CREDENTIAL_USERPASS_PLAINTEXT, UserPass

Expand All @@ -59,23 +56,11 @@ def credentials(
if self._store_credentials:
creds = self._store_credentials
else:
Credential(username=username_from_url, url=url).fill()
creds = Credential(username=username_from_url, url=url).fill()
self._store_credentials = creds
return UserPass(creds.username, creds.password)
except CredentialNotFoundError:
pass

if os.environ.get("GIT_TERMINAL_PROMPT") != "0":
try:
if username_from_url:
username = username_from_url
else:
username = input(f"Username for '{url}': ")
password = getpass(f"Password for '{url}': ")
if username and password:
return UserPass(username, password)
except KeyboardInterrupt:
pass
raise Passthrough

def _approve_credentials(self):
Expand Down
185 changes: 158 additions & 27 deletions src/scmrepo/git/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@
import shutil
import subprocess # nosec B404
import sys
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Union
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Tuple,
Union,
)
from urllib.parse import urlparse, urlunparse

from dulwich.config import StackedConfig
Expand All @@ -54,17 +65,38 @@ class CredentialNotFoundError(SCMError):
"""Error occurred while retrieving credentials/no credentials available."""


class CredentialHelper:
"""Helper for retrieving credentials for http/https git remotes
class CredentialHelper(ABC):
"""Base git-credential helper."""

@abstractmethod
def get(self, **kwargs) -> "Credential":
"""Get a matching credential from this helper.

Raises:
CredentialNotFoundError: No matching credential was found.
"""

@abstractmethod
def store(self, **kwargs):
"""Store the credential, if applicable to the helper"""

@abstractmethod
def erase(self, **kwargs):
"""Remove a matching credential, if any, from the helper’s storage"""


class GitCredentialHelper(CredentialHelper):
"""Helper for retrieving credentials through git-credential-<helper> commands.

Usage:
>>> helper = CredentialHelper("store") # Use `git credential-store`
>>> helper = GitCredentialHelper("store") # Use `git credential-store`
>>> credentials = helper.get("https://github.com/dtrifiro/aprivaterepo")
>>> username = credentials["username"]
>>> password = credentials["password"]
"""

def __init__(self, command: str):
super().__init__()
self._command = command
self._run_kwargs: Dict[str, Any] = {}
if self._command[0] == "!":
Expand Down Expand Up @@ -102,10 +134,7 @@ def _prepare_command(self, action: Optional[str] = None) -> Union[str, List[str]

return [executable, *argv[1:]]

def get(
self,
**kwargs,
) -> "Credential":
def get(self, **kwargs) -> "Credential":
if kwargs.get("protocol", kwargs.get("hostname")) is None:
raise ValueError("One of protocol, hostname must be provided")
cmd = self._prepare_command("get")
Expand Down Expand Up @@ -175,26 +204,111 @@ def erase(self, **kwargs):
except FileNotFoundError:
logger.debug("Helper not found", exc_info=True)

@staticmethod
def get_matching_commands(
base_url: str, config: Optional[Union["ConfigDict", "StackedConfig"]] = None
):
config = config or StackedConfig.default()
if isinstance(config, StackedConfig):
backends: Iterable["ConfigDict"] = config.backends
else:
backends = [config]

for conf in backends:
# We will try to match credential sections' url with the given url,
# falling back to the generic section if there's no match
for section in urlmatch_credential_sections(conf, base_url):
try:
command = conf.get(section, "helper")
except KeyError:
# no helper configured
continue
yield command.decode(conf.encoding or sys.getdefaultencoding())


class _CredentialKey(NamedTuple):
protocol: str
host: Optional[str]
path: Optional[str]

def get_matching_helper_commands(
base_url: str, config: Optional[Union["ConfigDict", "StackedConfig"]] = None
):
config = config or StackedConfig.default()
if isinstance(config, StackedConfig):
backends: Iterable["ConfigDict"] = config.backends
else:
backends = [config]

for conf in backends:
# We will try to match credential sections' url with the given url,
# falling back to the generic section if there's no match
for section in urlmatch_credential_sections(conf, base_url):
class MemoryCredentialHelper(CredentialHelper):
"""Memory credential helper that supports optional interactive input."""

def __init__(self):
super().__init__()
self._credentials: Dict["_CredentialKey", "Credential"] = {}

def get(self, *, interactive: bool = False, **kwargs) -> "Credential":
"""Get a matching credential from this helper.

Raises:
CredentialNotFoundError: No matching credential was found.
"""
from getpass import getpass

key = self._key(**kwargs)
if key.path:
try_keys = [key, _CredentialKey(key.protocol, key.host, None)]
else:
try_keys = [key]
for try_key in try_keys:
try:
command = conf.get(section, "helper")
return self._credentials[try_key]
except KeyError:
# no helper configured
continue
yield command.decode(conf.encoding or sys.getdefaultencoding())
pass
if not interactive or os.environ.get("GIT_TERMINAL_PROMPT") == "0":
raise CredentialNotFoundError("Interactive input is disabled")

scheme = f"{key.protocol}://" if key.protocol else ""
netloc = f"{key.host}" if key.host else ""
url = f"{scheme}{netloc}"
try:
username = kwargs.get("username", "")
if not username:
username = input(f"Username for '{url}': ")
password = kwargs.get("password", "")
if not password:
url = f"{scheme}{username}@{netloc}"
password = getpass(f"Password for '{url}': ")
except KeyboardInterrupt:
raise CredentialNotFoundError("User cancelled prompt")
return Credential(
protocol=key.protocol,
host=key.host,
path=key.path,
username=username,
password=password,
memory_only=True,
)

def store(self, **kwargs):
"""Store the credential, if applicable to the helper"""
cred = Credential(**kwargs)
cred.memory_only = True
key = self._key(**kwargs)
self._credentials[key] = cred

def erase(self, **kwargs):
"""Remove a matching credential, if any, from the helper’s storage"""
key = self._key(**kwargs)
try:
del self._credentials[key]
except KeyError:
pass

@staticmethod
def _key(
*,
protocol: str = "",
host: Optional[str] = None,
path: Optional[str] = None,
**kwargs,
) -> _CredentialKey:
return _CredentialKey(protocol, host, path)


memory_helper = MemoryCredentialHelper()


class Credential:
Expand Down Expand Up @@ -232,13 +346,15 @@ def __init__(
password: Optional[str] = None,
password_expiry_utc: Optional[int] = None,
url: Optional[str] = None,
memory_only: bool = False,
):
self.protocol = protocol
self.host = host
self.path = path
self.username = username
self.password = password
self.password_expiry_utc = password_expiry_utc
self.memory_only = memory_only
self._approved = False
if url:
parsed = urlparse(url)
Expand Down Expand Up @@ -281,28 +397,43 @@ def _helper_kwargs(self) -> Dict[str, str]:
def helpers(self) -> List["CredentialHelper"]:
url = self.url
return [
CredentialHelper(command) for command in get_matching_helper_commands(url)
GitCredentialHelper(command)
for command in GitCredentialHelper.get_matching_commands(url)
]

def fill(self) -> "Credential":
"""Return a new credential with filled username and password."""
try:
return memory_helper.get(interactive=False, **self._helper_kwargs)
except CredentialNotFoundError:
pass

for helper in self.helpers:
try:
return helper.get(**self._helper_kwargs)
except CredentialNotFoundError:
continue

try:
return memory_helper.get(interactive=True, **self._helper_kwargs)
except CredentialNotFoundError:
pass

raise CredentialNotFoundError(f"No available credentials for '{self.url}'")

def approve(self):
"""Store this credential in available helpers."""
if self._approved or not (self.username and self.password):
return
for helper in self.helpers:
helper.store(**self._helper_kwargs)
if not self.memory_only:
for helper in self.helpers:
helper.store(**self._helper_kwargs)
memory_helper.store(**self._helper_kwargs)
self._approved = True

def reject(self):
"""Remove this credential from available helpers."""
for helper in self.helpers:
helper.erase(**self._helper_kwargs)
memory_helper.erase(**self._helper_kwargs)
self._approved = False