Skip to content

Commit

Permalink
Support including port part in trusted-host (#6909)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostming authored and cjerdonek committed Aug 25, 2019
1 parent d2b7082 commit 8ac2214
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 43 deletions.
1 change: 1 addition & 0 deletions news/6886.feature
@@ -0,0 +1 @@
Support including a port number in ``--trusted-host`` for both HTTP and HTTPS.
4 changes: 2 additions & 2 deletions src/pip/_internal/cli/cmdoptions.py
Expand Up @@ -391,8 +391,8 @@ def trusted_host():
action="append",
metavar="HOSTNAME",
default=[],
help="Mark this host as trusted, even though it does not have valid "
"or any HTTPS.",
help="Mark this host or host:port pair as trusted, even though it "
"does not have valid or any HTTPS.",
)


Expand Down
17 changes: 9 additions & 8 deletions src/pip/_internal/download.py
Expand Up @@ -50,7 +50,7 @@
format_size,
get_installed_version,
hide_url,
netloc_has_port,
parse_netloc,
path_to_display,
path_to_url,
remove_auth_from_url,
Expand All @@ -77,7 +77,7 @@
from pip._internal.vcs.versioncontrol import AuthInfo, VersionControl

Credentials = Tuple[str, str, str]
SecureOrigin = Tuple[str, str, Optional[str]]
SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]

if PY2:
CopytreeKwargs = TypedDict(
Expand Down Expand Up @@ -586,7 +586,7 @@ def __init__(self, *args, **kwargs):

# Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class.
self.pip_trusted_hosts = [] # type: List[str]
self.pip_trusted_origins = [] # type: List[Tuple[str, Optional[int]]]

# Attach our User Agent to the request
self.headers["User-Agent"] = user_agent()
Expand Down Expand Up @@ -670,11 +670,12 @@ def add_trusted_host(self, host, source=None, suppress_logging=False):
msg += ' (from {})'.format(source)
logger.info(msg)

if host not in self.pip_trusted_hosts:
self.pip_trusted_hosts.append(host)
host_port = parse_netloc(host)
if host_port not in self.pip_trusted_origins:
self.pip_trusted_origins.append(host_port)

self.mount(build_url_from_netloc(host) + '/', self._insecure_adapter)
if not netloc_has_port(host):
if not host_port[1]:
# Mount wildcard ports for the same host.
self.mount(
build_url_from_netloc(host) + ':',
Expand All @@ -685,8 +686,8 @@ def iter_secure_origins(self):
# type: () -> Iterator[SecureOrigin]
for secure_origin in SECURE_ORIGINS:
yield secure_origin
for host in self.pip_trusted_hosts:
yield ('*', host, '*')
for host, port in self.pip_trusted_origins:
yield ('*', host, '*' if port is None else port)

def is_secure_origin(self, location):
# type: (Link) -> bool
Expand Down
4 changes: 3 additions & 1 deletion src/pip/_internal/index.py
Expand Up @@ -38,6 +38,7 @@
ARCHIVE_EXTENSIONS,
SUPPORTED_EXTENSIONS,
WHEEL_EXTENSION,
build_netloc,
path_to_url,
redact_password_from_url,
)
Expand Down Expand Up @@ -947,7 +948,8 @@ def index_urls(self):
@property
def trusted_hosts(self):
# type: () -> Iterable[str]
return iter(self.session.pip_trusted_hosts)
for host_port in self.session.pip_trusted_origins:
yield build_netloc(*host_port)

@property
def allow_all_prereleases(self):
Expand Down
21 changes: 17 additions & 4 deletions src/pip/_internal/utils/misc.py
Expand Up @@ -1129,6 +1129,19 @@ def path_to_url(path):
return url


def build_netloc(host, port):
# type: (str, Optional[int]) -> str
"""
Build a netloc from a host-port pair
"""
if port is None:
return host
if ':' in host:
# Only wrap host with square brackets when it is IPv6
host = '[{}]'.format(host)
return '{}:{}'.format(host, port)


def build_url_from_netloc(netloc, scheme='https'):
# type: (str, str) -> str
"""
Expand All @@ -1140,14 +1153,14 @@ def build_url_from_netloc(netloc, scheme='https'):
return '{}://{}'.format(scheme, netloc)


def netloc_has_port(netloc):
# type: (str) -> bool
def parse_netloc(netloc):
# type: (str) -> Tuple[str, Optional[int]]
"""
Return whether the netloc has a port part.
Return the host-port pair from a netloc.
"""
url = build_url_from_netloc(netloc)
parsed = urllib_parse.urlparse(url)
return bool(parsed.port)
return parsed.hostname, parsed.port


def split_auth_from_netloc(netloc):
Expand Down
42 changes: 34 additions & 8 deletions tests/unit/test_download.py
Expand Up @@ -634,24 +634,39 @@ def test_add_trusted_host(self):
insecure_adapter = session._insecure_adapter
prefix2 = 'https://host2/'
prefix3 = 'https://host3/'
prefix3_wildcard = 'https://host3:'

# Confirm some initial conditions as a baseline.
assert session.pip_trusted_hosts == ['host1', 'host3']
assert session.pip_trusted_origins == [
('host1', None), ('host3', None)
]
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix3_wildcard] is insecure_adapter

assert prefix2 not in session.adapters

# Test adding a new host.
session.add_trusted_host('host2')
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2']
assert session.pip_trusted_origins == [
('host1', None), ('host3', None), ('host2', None)
]
# Check that prefix3 is still present.
assert session.adapters[prefix3] is insecure_adapter
assert session.adapters[prefix2] is insecure_adapter

# Test that adding the same host doesn't create a duplicate.
session.add_trusted_host('host3')
assert session.pip_trusted_hosts == ['host1', 'host3', 'host2'], (
'actual: {}'.format(session.pip_trusted_hosts)
)
assert session.pip_trusted_origins == [
('host1', None), ('host3', None), ('host2', None)
], 'actual: {}'.format(session.pip_trusted_origins)

session.add_trusted_host('host4:8080')
prefix4 = 'https://host4:8080/'
assert session.pip_trusted_origins == [
('host1', None), ('host3', None),
('host2', None), ('host4', 8080)
]
assert session.adapters[prefix4] is insecure_adapter

def test_add_trusted_host__logging(self, caplog):
"""
Expand All @@ -676,16 +691,17 @@ def test_add_trusted_host__logging(self, caplog):
assert actual == expected

def test_iter_secure_origins(self):
trusted_hosts = ['host1', 'host2']
trusted_hosts = ['host1', 'host2', 'host3:8080']
session = PipSession(trusted_hosts=trusted_hosts)

actual = list(session.iter_secure_origins())
assert len(actual) == 8
assert len(actual) == 9
# Spot-check that SECURE_ORIGINS is included.
assert actual[0] == ('https', '*', '*')
assert actual[-2:] == [
assert actual[-3:] == [
('*', 'host1', '*'),
('*', 'host2', '*'),
('*', 'host3', 8080)
]

def test_iter_secure_origins__trusted_hosts_empty(self):
Expand Down Expand Up @@ -713,6 +729,16 @@ def test_iter_secure_origins__trusted_hosts_empty(self):
("http://example.com/something/", ["example.com"], True),
# Try changing the case.
("http://eXample.com/something/", ["example.cOm"], True),
# Test hosts with port.
("http://example.com:8080/something/", ["example.com"], True),
# Test a trusted_host with a port.
("http://example.com:8080/something/", ["example.com:8080"], True),
("http://example.com/something/", ["example.com:8080"], False),
(
"http://example.com:8888/something/",
["example.com:8080"],
False
),
],
)
def test_is_secure_origin(self, caplog, location, trusted, expected):
Expand Down
20 changes: 12 additions & 8 deletions tests/unit/test_req_file.py
Expand Up @@ -345,19 +345,23 @@ def test_set_finder_extra_index_urls(self, finder):
def test_set_finder_trusted_host(self, caplog, session, finder):
with caplog.at_level(logging.INFO):
list(process_line(
"--trusted-host=host", "file.txt", 1, finder=finder,
session=session,
"--trusted-host=host1 --trusted-host=host2:8080",
"file.txt", 1, finder=finder, session=session,
))
assert list(finder.trusted_hosts) == ['host']
assert list(finder.trusted_hosts) == ['host1', 'host2:8080']
session = finder.session
assert session.adapters['https://host/'] is session._insecure_adapter
assert session.adapters['https://host1/'] is session._insecure_adapter
assert (
session.adapters['https://host2:8080/']
is session._insecure_adapter
)

# Test the log message.
actual = [(r.levelname, r.message) for r in caplog.records]
expected = [
('INFO', "adding trusted host: 'host' (from line 1 of file.txt)"),
]
assert actual == expected
expected = (
'INFO', "adding trusted host: 'host1' (from line 1 of file.txt)"
)
assert expected in actual

def test_noop_always_unzip(self, finder):
# noop, but confirm it can be set
Expand Down
45 changes: 33 additions & 12 deletions tests/unit/test_utils.py
Expand Up @@ -38,6 +38,7 @@
from pip._internal.utils.hashes import Hashes, MissingHashes
from pip._internal.utils.misc import (
HiddenText,
build_netloc,
build_url_from_netloc,
call_subprocess,
egg_link_path,
Expand All @@ -49,9 +50,9 @@
hide_value,
make_command,
make_subprocess_output_error,
netloc_has_port,
normalize_path,
normalize_version_info,
parse_netloc,
path_to_display,
path_to_url,
redact_netloc,
Expand Down Expand Up @@ -1232,29 +1233,49 @@ def test_path_to_url_win():
assert path_to_url('file') == 'file:' + urllib_request.pathname2url(path)


@pytest.mark.parametrize('netloc, expected_url, expected_has_port', [
@pytest.mark.parametrize('host_port, expected_netloc', [
# Test domain name.
('example.com', 'https://example.com', False),
('example.com:5000', 'https://example.com:5000', True),
(('example.com', None), 'example.com'),
(('example.com', 5000), 'example.com:5000'),
# Test IPv4 address.
('127.0.0.1', 'https://127.0.0.1', False),
('127.0.0.1:5000', 'https://127.0.0.1:5000', True),
(('127.0.0.1', None), '127.0.0.1'),
(('127.0.0.1', 5000), '127.0.0.1:5000'),
# Test bare IPv6 address.
('2001:DB6::1', 'https://[2001:DB6::1]', False),
(('2001:db6::1', None), '2001:db6::1'),
# Test IPv6 with port.
('[2001:DB6::1]:5000', 'https://[2001:DB6::1]:5000', True),
(('2001:db6::1', 5000), '[2001:db6::1]:5000'),
])
def test_build_netloc(host_port, expected_netloc):
assert build_netloc(*host_port) == expected_netloc


@pytest.mark.parametrize('netloc, expected_url, expected_host_port', [
# Test domain name.
('example.com', 'https://example.com', ('example.com', None)),
('example.com:5000', 'https://example.com:5000', ('example.com', 5000)),
# Test IPv4 address.
('127.0.0.1', 'https://127.0.0.1', ('127.0.0.1', None)),
('127.0.0.1:5000', 'https://127.0.0.1:5000', ('127.0.0.1', 5000)),
# Test bare IPv6 address.
('2001:db6::1', 'https://[2001:db6::1]', ('2001:db6::1', None)),
# Test IPv6 with port.
(
'[2001:db6::1]:5000',
'https://[2001:db6::1]:5000',
('2001:db6::1', 5000)
),
# Test netloc with auth.
(
'user:password@localhost:5000',
'https://user:password@localhost:5000',
True
('localhost', 5000)
)
])
def test_build_url_from_netloc_and_netloc_has_port(
netloc, expected_url, expected_has_port,
def test_build_url_from_netloc_and_parse_netloc(
netloc, expected_url, expected_host_port,
):
assert build_url_from_netloc(netloc) == expected_url
assert netloc_has_port(netloc) is expected_has_port
assert parse_netloc(netloc) == expected_host_port


@pytest.mark.parametrize('netloc, expected', [
Expand Down

0 comments on commit 8ac2214

Please sign in to comment.