diff --git a/hyperlink/_url.py b/hyperlink/_url.py index e1d115a0..cda28a5a 100644 --- a/hyperlink/_url.py +++ b/hyperlink/_url.py @@ -120,13 +120,21 @@ def __nonzero__(self): _UNRESERVED_CHARS = frozenset('~-._0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' 'abcdefghijklmnopqrstuvwxyz') + # URL parsing regex (based on RFC 3986 Appendix B, with modifications) _URL_RE = re.compile(r'^((?P[^:/?#]+):)?' - r'((?P<_netloc_sep>//)(?P[^/?#]*))?' + r'((?P<_netloc_sep>//)' + r'(?P[^/?#]*))?' r'(?P[^?#]*)' r'(\?(?P[^#]*))?' - r'(#(?P.*))?') + r'(#(?P.*))?$') _SCHEME_RE = re.compile(r'^[a-zA-Z0-9+-.]*$') +_AUTHORITY_RE = re.compile(r'^(?:(?P[^@/?#]*)@)?' + r'(?P' + r'(?:\[(?P[^[\]/?#]*)\])' + r'|(?P[^:/?#[\]]*)' + r'|(?P.*?))?' + r'(?::(?P.*))?$') _HEX_CHAR_MAP = dict([((a + b).encode('ascii'), @@ -531,15 +539,14 @@ def parse_host(host): >>> parse_host('googlewebsite.com') == (None, 'googlewebsite.com') True - >>> parse_host('[::1]') == (socket.AF_INET6, '::1') + >>> parse_host('::1') == (socket.AF_INET6, '::1') True >>> parse_host('192.168.1.1') == (socket.AF_INET, '192.168.1.1') True """ if not host: return None, u'' - if u':' in host and u'[' == host[0] and u']' == host[-1]: - host = host[1:-1] + if u':' in host: try: inet_pton(socket.AF_INET6, host) except socket.error as se: @@ -602,9 +609,6 @@ class URL(object): rooted (bool): Whether or not the path begins with a slash. userinfo (unicode): The username or colon-separated username:password pair. - family: A socket module constant used when the host is an - IP constant to differentiate IPv4 and domain names, as - well as validate IPv6. uses_netloc (bool): Indicates whether two slashes appear between the scheme and the host (``http://eg.com`` vs ``mailto:e@g.com``). Set automatically based on scheme. @@ -617,7 +621,7 @@ class URL(object): """ def __init__(self, scheme=None, host=None, path=(), query=(), fragment=u'', - port=None, rooted=None, userinfo=u'', family=None, uses_netloc=None): + port=None, rooted=None, userinfo=u'', uses_netloc=None): if host is not None and scheme is None: scheme = u'http' # TODO: why if port is None: @@ -645,7 +649,8 @@ def __init__(self, scheme=None, host=None, path=(), query=(), fragment=u'', ' "-", and "." allowed. Did you meant to call' ' %s.from_text()?' % (self._scheme, self.__class__.__name__)) - self._host = _textcheck("host", host, '/?#@') + + _, self._host = parse_host(_textcheck('host', host, '/?#@')) if isinstance(path, unicode): raise TypeError("expected iterable of text for path, not: %r" % (path,)) @@ -660,14 +665,11 @@ def __init__(self, scheme=None, host=None, path=(), query=(), fragment=u'', self._port = _typecheck("port", port, int, NoneType) self._rooted = _typecheck("rooted", rooted, bool) self._userinfo = _textcheck("userinfo", userinfo, '/?#@') - self._family = _typecheck("family", family, - type(socket.AF_INET), NoneType) - if ':' in self._host and self._family != socket.AF_INET6: - raise ValueError('invalid ":" present in host: %r' % self._host) uses_netloc = scheme_uses_netloc(self._scheme, uses_netloc) self._uses_netloc = _typecheck("uses_netloc", uses_netloc, bool, NoneType) + return @property @@ -767,15 +769,6 @@ def userinfo(self): """ return self._userinfo - @property - def family(self): - """Set to a socket constant (:data:`socket.AF_INET` or - :data:`socket.AF_INET6`) when the :attr:`~hyperlink.URL.host` - is an IP address. Set to ``None`` if the host is a domain name or - not set. - """ - return self._family - @property def uses_netloc(self): """ @@ -811,8 +804,9 @@ def authority(self, with_password=False, **kw): with_password = kw.pop('includeSecrets', with_password) if kw: raise TypeError('got unexpected keyword arguments: %r' % kw.keys()) - if self.family == socket.AF_INET6: - hostport = ['[' + self.host + ']'] + host = self.host + if ':' in host: + hostport = ['[' + host + ']'] else: hostport = [self.host] if self.port != SCHEME_PORT_MAP.get(self.scheme): @@ -830,7 +824,7 @@ def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented for attr in ['scheme', 'userinfo', 'host', 'query', - 'fragment', 'port', 'family', 'uses_netloc']: + 'fragment', 'port', 'uses_netloc']: if getattr(self, attr) != getattr(other, attr): return False if self.path == other.path or (self.path in _ROOT_PATHS @@ -846,7 +840,7 @@ def __ne__(self, other): def __hash__(self): return hash((self.__class__, self.scheme, self.userinfo, self.host, self.path, self.query, self.fragment, self.port, - self.rooted, self.family, self.uses_netloc)) + self.rooted, self.uses_netloc)) @property def absolute(self): @@ -865,7 +859,7 @@ def absolute(self): def replace(self, scheme=_UNSET, host=_UNSET, path=_UNSET, query=_UNSET, fragment=_UNSET, port=_UNSET, rooted=_UNSET, userinfo=_UNSET, - family=_UNSET, uses_netloc=_UNSET): + uses_netloc=_UNSET): """:class:`URL` objects are immutable, which means that attributes are designed to be set only once, at construction. Instead of modifying an existing URL, one simply creates a copy with the @@ -886,9 +880,6 @@ def replace(self, scheme=_UNSET, host=_UNSET, path=_UNSET, query=_UNSET, rooted (bool): Whether or not the path begins with a slash. userinfo (unicode): The username or colon-separated username:password pair. - family: A socket module constant used when the host is an - IP constant to differentiate IPv4 and domain names, as - well as validate IPv6. uses_netloc (bool): Indicates whether two slashes appear between the scheme and the host (``http://eg.com`` vs ``mailto:e@g.com``) @@ -907,7 +898,6 @@ def replace(self, scheme=_UNSET, host=_UNSET, path=_UNSET, query=_UNSET, port=_optional(port, self.port), rooted=_optional(rooted, self.rooted), userinfo=_optional(userinfo, self.userinfo), - family=_optional(family, self.family), uses_netloc=_optional(uses_netloc, self.uses_netloc) ) @@ -948,29 +938,27 @@ def from_text(cls, text): except AttributeError: raise URLParseError('could not parse url: %r' % text) - au_text = gs['authority'] - userinfo, hostinfo = u'', au_text + au_text = gs['authority'] or u'' + au_m = _AUTHORITY_RE.match(au_text) + try: + au_gs = au_m.groupdict() + except AttributeError: + raise URLParseError('invalid authority %r in url: %r' + % (au_text, text)) + if au_gs['bad_host']: + raise URLParseError('invalid host %r in url: %r') - if au_text: - userinfo, sep, hostinfo = au_text.rpartition('@') + userinfo = au_gs['userinfo'] or u'' - host, port = None, None - if hostinfo: - host, sep, port_str = hostinfo.rpartition(u':') - if not sep: - host = port_str - else: - if u']' in port_str: - host = hostinfo # wrong split, was an ipv6 - else: - try: - port = int(port_str) - except ValueError: - if not port_str: # TODO: excessive? - raise URLParseError('port must not be empty') - raise URLParseError('expected integer for port, not %r' - % port_str) - family, host = parse_host(host) + host = au_gs['ipv6_host'] or au_gs['plain_host'] + port = au_gs['port'] + if port is not None: + try: + port = int(port) + except ValueError: + if not port: # TODO: excessive? + raise URLParseError('port must not be empty: %r' % au_text) + raise URLParseError('expected integer for port, not %r' % port) scheme = gs['scheme'] or u'' fragment = gs['fragment'] or u'' @@ -985,14 +973,14 @@ def from_text(cls, text): rooted = False else: path = () - rooted = bool(hostinfo) + rooted = bool(au_text) if gs['query']: query = ((qe.split(u"=", 1) if u'=' in qe else (qe, None)) for qe in gs['query'].split(u"&")) else: query = () return cls(scheme, host, path, query, fragment, port, - rooted, userinfo, family, uses_netloc) + rooted, userinfo, uses_netloc) def child(self, *segments): """Make a new :class:`URL` where the given path segments are a child diff --git a/hyperlink/test/test_url.py b/hyperlink/test/test_url.py index a5af642a..f779c54a 100644 --- a/hyperlink/test/test_url.py +++ b/hyperlink/test/test_url.py @@ -10,7 +10,8 @@ from .common import HyperlinkTestCase from .. import URL, URLParseError # automatically import the py27 windows implementation when appropriate -from .._url import inet_pton, SCHEME_PORT_MAP +from .. import _url +from .._url import inet_pton, SCHEME_PORT_MAP, parse_host unicode = type(u'') @@ -856,9 +857,37 @@ def test_ipv6_with_port(self): url = URL.from_text(t) assert url.host == '2001:0db8:85a3:0000:0000:8a2e:0370:7334' assert url.port == 80 - assert url.family == socket.AF_INET6 assert SCHEME_PORT_MAP[url.scheme] != url.port + def test_basic(self): + text = 'https://user:pass@example.com/path/to/here?k=v#nice' + url = URL.from_text(text) + assert url.scheme == 'https' + assert url.userinfo == 'user:pass' + assert url.host == 'example.com' + assert url.path == ('path', 'to', 'here') + assert url.fragment == 'nice' + + text = 'https://user:pass@127.0.0.1/path/to/here?k=v#nice' + url = URL.from_text(text) + assert url.scheme == 'https' + assert url.userinfo == 'user:pass' + assert url.host == '127.0.0.1' + assert url.path == ('path', 'to', 'here') + + text = 'https://user:pass@[::1]/path/to/here?k=v#nice' + url = URL.from_text(text) + assert url.scheme == 'https' + assert url.userinfo == 'user:pass' + assert url.host == '::1' + assert url.path == ('path', 'to', 'here') + + def test_invalid_url(self): + self.assertRaises(URLParseError, URL.from_text, '#\n\n') + + def test_invalid_authority_url(self): + self.assertRaises(URLParseError, URL.from_text, 'http://abc:\n\n/#') + def test_invalid_ipv6(self): invalid_ipv6_ips = ['2001::0234:C1ab::A0:aabc:003F', '2001::1::3F', @@ -871,16 +900,6 @@ def test_invalid_ipv6(self): socket.AF_INET6, ip) self.assertRaises(URLParseError, URL.from_text, url_text) - def test_ip_family_detection(self): - u = URL.from_text('http://giggle.com') - self.assertEqual(u.family, None) - - u = URL.from_text('http://127.0.0.1/a/b/?c=d') - self.assertEqual(u.family, socket.AF_INET) - - u = URL.from_text('http://[::1]/a/b/?c=d') - self.assertEqual(u.family, socket.AF_INET6) - def test_invalid_port(self): self.assertRaises(URLParseError, URL.from_text, 'ftp://portmouth:smash') self.assertRaises(ValueError, URL.from_text, @@ -1067,3 +1086,17 @@ def test_from_text_type(self): assert URL.from_text(u'#ok').fragment == u'ok' # sanity self.assertRaises(TypeError, URL.from_text, b'bytes://x.y.z') self.assertRaises(TypeError, URL.from_text, object()) + + def test_from_text_bad_authority(self): + # bad ipv6 parentheses + self.assertRaises(URLParseError, URL.from_text, 'http://[::1/') + self.assertRaises(URLParseError, URL.from_text, 'http://::1]/') + self.assertRaises(URLParseError, URL.from_text, 'http://[[::1]/') + self.assertRaises(URLParseError, URL.from_text, 'http://[::1]]/') + + # empty port + self.assertRaises(URLParseError, URL.from_text, 'http://127.0.0.1:') + # non-integer port + self.assertRaises(URLParseError, URL.from_text, 'http://127.0.0.1:hi') + # extra port colon (makes for an invalid host) + self.assertRaises(URLParseError, URL.from_text, 'http://127.0.0.1::80')