Skip to content

Commit

Permalink
Implement RFC7617-compliant multi-domain basic authentication
Browse files Browse the repository at this point in the history
The https://datatracker.ietf.org/doc/html/rfc7617#section-2.2
defines multi-domain authentication behaviour and authentication
scopes for basic authentication. This change improves the
implementation of the multi-domain matching to be RC7617 compliant

* path matching (including longest match)
* scheme validation matching

Closes: pypa#10902
  • Loading branch information
potiuk committed Feb 13, 2022
1 parent ec8edbf commit fed5ab8
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 31 deletions.
67 changes: 57 additions & 10 deletions src/pip/_internal/network/auth.py
Expand Up @@ -5,7 +5,7 @@
"""

import urllib.parse
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, List, Optional, Tuple

from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth
from pip._vendor.requests.models import Request, Response
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(
) -> None:
self.prompting = prompting
self.index_urls = index_urls
self.passwords: Dict[str, AuthInfo] = {}
self.passwords: List[Tuple[str, AuthInfo]] = []
# When the user is prompted to enter credentials and keyring is
# available, we will offer to save them. If the user accepts,
# this value is set to the credentials they entered. After the
Expand Down Expand Up @@ -162,6 +162,51 @@ def _get_new_credentials(

return username, password

def _get_matching_credentials(self, original_url) -> Optional[AuthInfo]:
"""
Find matching credentials based on the longest matching prefix found.
According to https://datatracker.ietf.org/doc/html/rfc7617#section-2.2
the authentication scope is defined by removing the last part after
the last "/" of the resource - but in case of `pip` the end of path is always
"/project", so we can treat the `original_url` as full `authentication scope'
for the request. The RFC specifies that prefix matching of the scope is
also within the protection scope specified by the realm value, so we can
safely assume that if we find any match, the longest matching prefix is
the right authentication information we should use.
The specification does not decide which of the matching scopes should be
used if there are more of them. Our decision is to choose the longest
matching one. In case exactly the same prefix will be added several times,
the authentication information from the first one is used.
According to the RFC, the authentication scope includes both scheme and netloc,
Both have to match in order to share the authentication scope.
:param original_url: original URL of the request
:return: Stored Authentication info matching the authentication scope or
None if not found
"""
max_matched_prefix_length = 0
matched_auth_info = None
no_auth_url = remove_auth_from_url(original_url)
parsed_original = urllib.parse.urlparse(no_auth_url)
for prefix, auth_info in self.passwords:
parsed_stored = urllib.parse.urlparse(prefix)
if (
parsed_stored.netloc != parsed_original.netloc
or parsed_stored.scheme != parsed_original.scheme
):
# only consider match when both scheme and netloc match
continue
current_matching_prefix_length = sum(
a == b for a, b in zip(prefix, no_auth_url)
)
if current_matching_prefix_length > max_matched_prefix_length:
matched_auth_info = auth_info
max_matched_prefix_length = current_matching_prefix_length
return matched_auth_info

def _get_url_and_credentials(
self, original_url: str
) -> Tuple[str, Optional[str], Optional[str]]:
Expand All @@ -183,12 +228,14 @@ def _get_url_and_credentials(
# Do this if either the username or the password is missing.
# This accounts for the situation in which the user has specified
# the username in the index url, but the password comes from keyring.
if (username is None or password is None) and netloc in self.passwords:
un, pw = self.passwords[netloc]
# It is possible that the cached credentials are for a different username,
# in which case the cache should be ignored.
if username is None or username == un:
username, password = un, pw
if username is None or password is None:
matched_credentials = self._get_matching_credentials(original_url)
if matched_credentials:
un, pw = matched_credentials
# It is possible that the cached credentials are for a
# different username, in which case the cache should be ignored.
if username is None or username == un:
username, password = un, pw

if username is not None or password is not None:
# Convert the username and password if they're None, so that
Expand All @@ -199,7 +246,7 @@ def _get_url_and_credentials(
password = password or ""

# Store any acquired credentials.
self.passwords[netloc] = (username, password)
self.passwords.append((remove_auth_from_url(url), (username, password)))

assert (
# Credentials were found
Expand Down Expand Up @@ -272,7 +319,7 @@ def handle_401(self, resp: Response, **kwargs: Any) -> Response:
# Store the new username and password to use for future requests
self._credentials_to_save = None
if username is not None and password is not None:
self.passwords[parsed.netloc] = (username, password)
self.passwords.append((resp.url, (username, password)))

# Prompt to save the password to keyring
if save and self._should_save_password_to_keyring():
Expand Down
90 changes: 69 additions & 21 deletions tests/unit/test_network_auth.py
Expand Up @@ -12,26 +12,26 @@
["input_url", "url", "username", "password"],
[
(
"http://user%40email.com:password@example.com/path",
"http://example.com/path",
"https://user%40email.com:password@example.com/path",
"https://example.com/path",
"user@email.com",
"password",
),
(
"http://username:password@example.com/path",
"http://example.com/path",
"https://username:password@example.com/path",
"https://example.com/path",
"username",
"password",
),
(
"http://token@example.com/path",
"http://example.com/path",
"https://token@example.com/path",
"https://example.com/path",
"token",
"",
),
(
"http://example.com/path",
"http://example.com/path",
"https://example.com/path",
"https://example.com/path",
None,
None,
),
Expand All @@ -50,43 +50,91 @@ def test_get_credentials_parses_correctly(
(username is None and password is None)
or
# Credentials were found and "cached" appropriately
auth.passwords["example.com"] == (username, password)
(url, (username, password)) in auth.passwords
)


def test_get_credentials_not_to_uses_cached_credentials() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
auth.passwords.append(("https://example.com", ("user", "pass")))

got = auth._get_url_and_credentials("http://foo:bar@example.com/path")
expected = ("http://example.com/path", "foo", "bar")
got = auth._get_url_and_credentials("https://foo:bar@example.com/path")
expected = ("https://example.com/path", "foo", "bar")
assert got == expected


def test_get_credentials_not_to_uses_cached_credentials_only_username() -> None:
def test_get_credentials_not_to_use_cached_credentials_only_username() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
auth.passwords.append(("https://example.com", ("user", "pass")))

got = auth._get_url_and_credentials("http://foo@example.com/path")
expected = ("http://example.com/path", "foo", "")
got = auth._get_url_and_credentials("https://foo@example.com/path")
expected = ("https://example.com/path", "foo", "")
assert got == expected


def test_multi_domain_credentials_match() -> None:
auth = MultiDomainBasicAuth()
auth.passwords.append(("https://example.com", ("user", "pass")))
auth.passwords.append(("https://example.com/path", ("user", "pass2")))

got = auth._get_url_and_credentials("https://user@example.com/path")
expected = ("https://example.com/path", "user", "pass2")
assert got == expected


def test_multi_domain_credentials_longest_match() -> None:
auth = MultiDomainBasicAuth()
auth.passwords.append(("https://example.com", ("user", "pass")))
auth.passwords.append(("https://example.com/path", ("user", "pass2")))
auth.passwords.append(("https://example.com/path/subpath", ("user", "pass3")))

got = auth._get_url_and_credentials("https://user@example.com/path")
expected = ("https://example.com/path", "user", "pass2")
assert got == expected


def test_get_credentials_uses_cached_credentials() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
auth.passwords.append(("https://example.com", ("user", "pass")))

got = auth._get_url_and_credentials("https://example.com/path")
expected = ("https://example.com/path", "user", "pass")
assert got == expected


def test_get_credentials_not_uses_cached_credentials_different_scheme_http() -> None:
auth = MultiDomainBasicAuth()
auth.passwords.append(("http://example.com", ("user", "pass")))

got = auth._get_url_and_credentials("https://example.com/path")
expected = ("https://example.com/path", None, None)
assert got == expected


def test_get_credentials_not_uses_cached_credentials_different_scheme_https() -> None:
auth = MultiDomainBasicAuth()
auth.passwords.append(("https://example.com", ("user", "pass")))

got = auth._get_url_and_credentials("http://example.com/path")
expected = ("http://example.com/path", "user", "pass")
expected = ("http://example.com/path", None, None)
assert got == expected


def test_get_credentials_uses_cached_credentials_only_username() -> None:
auth = MultiDomainBasicAuth()
auth.passwords["example.com"] = ("user", "pass")
auth.passwords.append(("https://example.com", ("user", "pass")))

got = auth._get_url_and_credentials("https://user@example.com/path")
expected = ("https://example.com/path", "user", "pass")
assert got == expected


def test_get_credentials_uses_cached_credentials_wrong_username() -> None:
auth = MultiDomainBasicAuth()
auth.passwords.append(("https://example.com", ("user", "pass")))

got = auth._get_url_and_credentials("http://user@example.com/path")
expected = ("http://example.com/path", "user", "pass")
got = auth._get_url_and_credentials("https://user2@example.com/path")
expected = ("https://example.com/path", "user2", "")
assert got == expected


Expand Down

0 comments on commit fed5ab8

Please sign in to comment.