Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/scmrepo/git/backend/dulwich/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# Temporarily added while waiting for upstream PR to be merged.
# See https://github.com/jelmer/dulwich/pull/976
from typing import Optional

from dulwich.client import Urllib3HttpGitClient
from dulwich.config import StackedConfig

from scmrepo.git.credentials import CredentialNotFoundError, get_credentials_from_helper
from scmrepo.git.credentials import Credential, CredentialNotFoundError


class GitCredentialsHTTPClient(Urllib3HttpGitClient): # pylint: disable=abstract-method
Expand All @@ -24,17 +22,23 @@ def __init__(
**kwargs,
)

self._store_credentials: Optional["Credential"] = None
if not username:
import base64

try:
helper_username, helper_password = get_credentials_from_helper(
base_url, config or StackedConfig.default()
)
creds = Credential(url=base_url).fill()
except CredentialNotFoundError:
pass
else:
credentials = helper_username + b":" + helper_password
import base64
return
encoded = base64.b64encode(
f"{creds.username}:{creds.password}".encode()
).decode("ascii")
basic_auth = {"authorization": f"Basic {encoded}"}
self.pool_manager.headers.update(basic_auth)
self._store_credentials = creds

encoded = base64.b64encode(credentials).decode("ascii")
basic_auth = {"authorization": f"Basic {encoded}"}
self.pool_manager.headers.update(basic_auth)
def _http_request(self, *args, **kwargs):
result = super()._http_request(*args, **kwargs)
if self._store_credentials is not None:
self._store_credentials.approve()
return result
239 changes: 176 additions & 63 deletions src/scmrepo/git/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,31 @@
Currently Dulwich supports only the `get` operation

"""
import locale
import logging
import os
import shlex
import shutil
import subprocess # nosec B404
import sys
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import urlparse, urlunparse

from dulwich.config import StackedConfig
from dulwich.credentials import urlmatch_credential_sections
from funcy import cached_property

from scmrepo.exceptions import SCMError

if TYPE_CHECKING:
from dulwich.config import ConfigDict

logger = logging.getLogger(__name__)

SectionLike = Union[bytes, str, Tuple[Union[bytes, str], ...]]


class CredentialNotFoundError(Exception):
class CredentialNotFoundError(SCMError):
"""Error occurred while retrieving credentials/no credentials available."""


Expand All @@ -60,10 +70,11 @@ def __init__(self, command: str):
if self._command[0] == "!":
# On Windows this will only work in git-bash and/or WSL2
self._run_kwargs["shell"] = True
self._encoding = locale.getpreferredencoding()

def _prepare_command(self) -> Union[str, List[str]]:
def _prepare_command(self, action: Optional[str] = None) -> Union[str, List[str]]:
if self._command[0] == "!":
return self._command[1:]
return self._command[1:] + (f" {action}" if action else "")

if sys.platform != "win32":
argv = shlex.split(self._command)
Expand All @@ -72,6 +83,8 @@ def _prepare_command(self) -> Union[str, List[str]]:
# join arguments when providing a list, so we can just split
# using whitespace.
argv = self._command.split()
if action:
argv.append(action)

if os.path.isabs(argv[0]):
return argv
Expand All @@ -91,77 +104,84 @@ def _prepare_command(self) -> Union[str, List[str]]:

def get(
self,
*,
protocol: Optional[str] = None,
hostname: Optional[str] = None,
port: Optional[int] = None,
username: Optional[str] = None,
) -> Tuple[bytes, bytes]:
cmd = self._prepare_command()
if isinstance(cmd, str):
cmd += " get"
else:
cmd.append("get")

helper_input = []
if protocol:
helper_input.append(f"protocol={protocol}")
if hostname:
helper_input.append(
f"host={hostname}{':' + str(port) if port is not None else ''}"
)
if username:
helper_input.append(f"username={username}")

if not helper_input:
**kwargs,
) -> "Credential":
if kwargs.get("protocol", kwargs.get("hostname")) is None:
raise ValueError("One of protocol, hostname must be provided")

cmd = self._prepare_command("get")
helper_input = [f"{key}={value}" for key, value in kwargs.items()]
helper_input.append("")

try:
res = subprocess.run( # type: ignore # nosec B603 # breaks on 3.6
cmd,
check=True,
capture_output=True,
input=os.linesep.join(helper_input).encode("ascii"),
input=os.linesep.join(helper_input),
encoding=self._encoding,
**self._run_kwargs,
)
except subprocess.CalledProcessError as exc:
raise CredentialNotFoundError(exc.stderr) from exc
except FileNotFoundError as exc:
raise CredentialNotFoundError("Helper not found") from exc
if res.stderr:
logger.debug(res.stderr)

credentials = {}
for line in res.stdout.strip().splitlines():
try:
key, value = line.split(b"=")
key, value = line.split("=")
credentials[key] = value
except ValueError:
continue
return Credential(**credentials)

if not all(
(
credentials,
b"username" in credentials,
b"password" in credentials,
)
):
raise CredentialNotFoundError("Could not get credentials from helper")

return credentials[b"username"], credentials[b"password"]

def store(self, *args, **kwargs):
def store(self, **kwargs):
"""Store the credential, if applicable to the helper"""
raise NotImplementedError
cmd = self._prepare_command("store")
helper_input = [f"{key}={value}" for key, value in kwargs.items()]
helper_input.append("")

try:
res = subprocess.run( # type: ignore # nosec B603 # pylint: disable=W1510
cmd,
capture_output=True,
input=os.linesep.join(helper_input),
encoding=self._encoding,
**self._run_kwargs,
)
if res.stderr:
logger.debug(res.stderr)
except FileNotFoundError:
logger.debug("Helper not found", exc_info=True)

def erase(self, *args, **kwargs):
def erase(self, **kwargs):
"""Remove a matching credential, if any, from the helper’s storage"""
raise NotImplementedError
cmd = self._prepare_command("erase")
helper_input = [f"{key}={value}" for key, value in kwargs.items()]
helper_input.append("")

try:
res = subprocess.run( # type: ignore # nosec B603 # pylint: disable=W1510
cmd,
capture_output=True,
input=os.linesep.join(helper_input),
encoding=self._encoding,
**self._run_kwargs,
)
if res.stderr:
logger.debug(res.stderr)
except FileNotFoundError:
logger.debug("Helper not found", exc_info=True)


def get_matching_helper_commands(base_url: str, config):
def get_matching_helper_commands(
base_url: str, config: Optional[Union["ConfigDict", "StackedConfig"]] = None
):
config = config or StackedConfig.default()
if isinstance(config, StackedConfig):
backends = config.backends
backends: Iterable["ConfigDict"] = config.backends
else:
backends = [config]

Expand All @@ -177,19 +197,112 @@ def get_matching_helper_commands(base_url: str, config):
yield command.decode(conf.encoding or sys.getdefaultencoding())


def get_credentials_from_helper(base_url: str, config) -> Tuple[bytes, bytes]:
"""Retrieves credentials for the given url from git credential helpers"""
class Credential:
"""Git credentials, equivalent to CGit git-credential API.

for command in get_matching_helper_commands(base_url, config):
helper = CredentialHelper(command)
parsed = urlparse(base_url)
try:
return helper.get(
protocol=parsed.scheme,
hostname=parsed.hostname,
port=parsed.port,
username=parsed.username,
)
except CredentialNotFoundError:
continue
raise CredentialNotFoundError
Usage:

1. Generate a credential based on context

>>> generated = Credential(url="https://github.com/dtrifiro/aprivaterepo")

2. Ask git-credential to give username/password for this context

>>> credential = generated.fill()

3. Use the credential from (2) in Git operation
4. If the operation in (3) was successful, approve it for re-use in subsequent
operations

>>> credential.approve()

See also:
https://git-scm.com/docs/git-credential#_typical_use_of_git_credential
https://github.com/git/git/blob/master/credential.h

"""

def __init__(
self,
*,
protocol: Optional[str] = None,
host: Optional[str] = None, # host with optional ":<port>" included
path: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
password_expiry_utc: Optional[int] = None,
url: Optional[str] = None,
):
self.protocol = protocol
self.host = host
self.path = path
self.username = username
self.password = password
self.password_expiry_utc = password_expiry_utc
self._approved = False
if url:
parsed = urlparse(url)
self.protocol = self.protocol or parsed.scheme
if not self.protocol:
raise ValueError("protocol must be specified when using URL")
port = f":{parsed.port}" if parsed.port is not None else ""
hostname = parsed.hostname or ""
self.host = self.host or f"{hostname}{port}"
self.username = self.username or parsed.username
self.password = self.password or parsed.password

@property
def url(self) -> str:
if self.username or self.password:
username = self.username or ""
password = self.password or ""
netloc = f"{username}:{password}@{self.host}"
else:
netloc = self.host or ""
return urlunparse((self.protocol or "", netloc, self.path or "", "", "", ""))

@property
def _helper_kwargs(self) -> Dict[str, str]:
kwargs = {}
for attr in (
"protocol",
"host",
"path",
"username",
"password",
"password_expiry_utc",
):
value = getattr(self, attr)
if value is not None:
kwargs[attr] = str(value)
return kwargs

@cached_property
def helpers(self) -> List["CredentialHelper"]:
url = self.url
return [
CredentialHelper(command) for command in get_matching_helper_commands(url)
]

def fill(self) -> "Credential":
"""Return a new credential with filled username and password."""
for helper in self.helpers:
try:
return helper.get(**self._helper_kwargs)
except CredentialNotFoundError:
continue
raise CredentialNotFoundError(f"No available credentials for '{self.url}'")

def approve(self):
"""Store this credential in available helpers."""
if self._approved or not (self.username and self.password):
return
for helper in self.helpers:
helper.store(**self._helper_kwargs)
self._approved = True

def reject(self):
"""Remove this credential from available helpers."""
for helper in self.helpers:
helper.erase(**self._helper_kwargs)
self._approved = False