diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5c8b6c80..7a0f1fcc 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -16,7 +16,7 @@ jobs: - uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: '3.12' - uses: actions/cache@v4 with: @@ -42,7 +42,7 @@ jobs: - uses: actions/setup-python@v5 with: - python-version: "3.11" + python-version: "3.12" - uses: actions/cache@v4 with: diff --git a/docs/examples/digest_auth.py b/docs/examples/digest_auth.py new file mode 100644 index 00000000..ae590718 --- /dev/null +++ b/docs/examples/digest_auth.py @@ -0,0 +1,16 @@ +from twisted.internet.task import react +from _utils import print_response + +import treq +from treq.auth import HTTPDigestAuth + + +def main(reactor, *args): + d = treq.get( + 'http://httpbin.org/digest-auth/auth/treq/treq', + auth=HTTPDigestAuth('treq', 'treq') + ) + d.addCallback(print_response) + return d + +react(main, []) diff --git a/docs/howto.rst b/docs/howto.rst index 3f304983..90b3addf 100644 --- a/docs/howto.rst +++ b/docs/howto.rst @@ -92,6 +92,27 @@ The ``auth`` argument should be a tuple of the form ``('username', 'password')`` Full example: :download:`basic_auth.py ` +HTTP Digest authentication is supported by passing an instance of +:py:class:`treq.auth.HTTPDigestAuth` to any of the request functions by using the `auth` keyword argument. +We support only "auth" QoP as defined at `RFC 2617`_ +or simple `RFC 2069`_ without QoP at the moment. Treq takes care of +caching HTTP digest credentials — after authorizing any URL/method pair, +the library will use the initially received HTTP digest credentials on that endpoint +for subsequent requests, and will not perform any redundant requests to obtain the +credentials. + +:py:class:`treq.auth.HTTPDigestAuth` class accepts ``username`` and ``password`` +as constructor arguments. + +.. literalinclude:: examples/digest_auth.py + :linenos: + :lines: 5-14 + +Full example: :download:`digest_auth.py ` + +.. _RFC 2617: http://www.ietf.org/rfc/rfc2617.txt +.. _RFC 2069: http://www.ietf.org/rfc/rfc2069.txt + Redirects --------- diff --git a/docs/index.rst b/docs/index.rst index 059dfccb..fdacb19d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -71,7 +71,7 @@ Here is a list of `requests`_ features and their status in treq. +----------------------------------+----------+----------+ | Basic Authentication | yes | yes | +----------------------------------+----------+----------+ -| Digest Authentication | yes | no | +| Digest Authentication | yes | yes | +----------------------------------+----------+----------+ | Elegant Key/Value Cookies | yes | yes | +----------------------------------+----------+----------+ diff --git a/src/treq/auth.py b/src/treq/auth.py index ffae1ff4..88ac0fca 100644 --- a/src/treq/auth.py +++ b/src/treq/auth.py @@ -1,22 +1,225 @@ +# -*- test-case-name: treq.test.test_auth,treq.test.test_treq_integration -*- # Copyright 2012-2020 The treq Authors. # See LICENSE for details. -from __future__ import absolute_import, division, print_function +from __future__ import absolute_import, division, print_function, annotations +import re +import time +import hashlib import binascii -from typing import Union +from enum import Enum +from typing import Union, Optional, TypedDict +from urllib.parse import urlparse +from twisted.python.randbytes import secureRandom from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent +from twisted.web.iweb import IAgent, IBodyProducer, IResponse +from twisted.internet.defer import Deferred + from zope.interface import implementer +from requests.utils import parse_dict_header + + +_DIGEST_HEADER_PREFIX_REGEXP = re.compile(b"digest ", flags=re.IGNORECASE) + + +class _DIGEST_ALGO(str, Enum): + MD5 = "MD5" + MD5_SESS = "MD5-SESS" + SHA = "SHA" + SHA_256 = "SHA-256" + SHA_512 = "SHA-512" + + +def _generate_client_nonce(server_side_nonce: str) -> str: + return hashlib.sha1( + hashlib.sha1(server_side_nonce.encode("utf-8")).digest() + + secureRandom(16) + + time.ctime().encode("utf-8") + ).hexdigest()[:16] + + +def _md5_utf_digest(x: str) -> str: + return hashlib.md5(x.encode("utf-8")).hexdigest() + + +def _sha1_utf_digest(x: str) -> str: + return hashlib.sha1(x.encode("utf-8")).hexdigest() + + +def _sha256_utf_digest(x: str) -> str: + return hashlib.sha256(x.encode("utf-8")).hexdigest() + + +def _sha512_utf_digest(x: str) -> str: + return hashlib.sha512(x.encode("utf-8")).hexdigest() + + +class _DigestAuthCacheParams(TypedDict): + path: bytes + method: bytes + cached: bool + nonce: str + realm: str + qop: str | None + algorithm: _DIGEST_ALGO + opaque: str | None + + +class _DigestAuthCacheEntry(TypedDict): + c: int + p: _DigestAuthCacheParams + + +class HTTPDigestAuth(object): + """ + HTTP Digest authentication credentials. + + This container will cache digest auth parameters, + in order not to recompute these for each request. + """ + + def __init__(self, username: Union[str, bytes], password: Union[str, bytes]): + self._username: str = ( + username.decode("utf-8") if isinstance(username, bytes) else username + ) + self._password: str = ( + password.decode("utf-8") if isinstance(password, bytes) else password + ) + + # (method,uri) --> digest auth cache + self._digest_auth_cache: dict[tuple[bytes, bytes], _DigestAuthCacheEntry] = {} + + def _build_authentication_header( + self, + url: bytes, + method: bytes, + cached: bool, + nonce: str, + realm: str, + qop: Optional[str] = None, + algorithm: _DIGEST_ALGO = _DIGEST_ALGO.MD5, + opaque: Optional[str] = None, + ) -> str: + """ + Build the authorization header for credentials got from the server. + Algorithm is accurately ported from https://github.com/psf/requests + with small adjustments. + See + https://github.com/psf/requests/blob/v2.5.1/requests/auth.py#L72 + for details. + + :param algorithm: algorithm to be used for authentication, + defaults to MD5, supported values are + "MD5", "MD5-SESS" and "SHA" + :param realm: HTTP Digest authentication realm + :param nonce: "nonce" HTTP Digest authentication param + :param qop: Quality Of Protection HTTP Digest auth param + :param opaque: "opaque" HTTP Digest authentication param + (should be sent back to server unchanged) + :param cached: Identifies that authentication already have been + performed for URI/method, + and new request should use the same params as first + authenticated request + :param url: the URI path where we are authenticating + :param method: HTTP method to be used when requesting + + :return: HTTP Digest authentication string + """ + algo = algorithm.upper() + p_parsed = urlparse(url.decode("utf-8")) + # path is request-uri defined in RFC 2616 which should not be empty + path = p_parsed.path or "/" + if p_parsed.query: + path += f"?{p_parsed.query}" + + A1 = f"{self._username}:{realm}:{self._password}" + A2 = f"{method.decode('utf-8')}:{path}" + + if algo == _DIGEST_ALGO.MD5 or algo == _DIGEST_ALGO.MD5_SESS: + digest_hash_func = _md5_utf_digest + elif algo == _DIGEST_ALGO.SHA: + digest_hash_func = _sha1_utf_digest + elif algo == _DIGEST_ALGO.SHA_256: + digest_hash_func = _sha256_utf_digest + elif algo == _DIGEST_ALGO.SHA_512: + digest_hash_func = _sha512_utf_digest + else: + raise ValueError( + f"Unsupported Digest Auth algorithm identifier passed: {algo}" + ) + + KD = lambda s, d: digest_hash_func(f"{s}:{d}") # noqa:E731 + + HA1 = digest_hash_func(A1) + HA2 = digest_hash_func(A2) + + if cached: + self._digest_auth_cache[(method, url)]["c"] += 1 + nonce_count = self._digest_auth_cache[(method, url)]["c"] + else: + nonce_count = 1 + + ncvalue = "%08x" % nonce_count + + cnonce = _generate_client_nonce(nonce) + if algo == _DIGEST_ALGO.MD5_SESS: + HA1 = digest_hash_func(f"{HA1}:{nonce}:{cnonce}") + + if not qop: + respdig = KD(HA1, f"{HA2}:{nonce}") + elif qop == "auth" or "auth" in qop.split(","): + noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" + respdig = KD(HA1, noncebit) + else: + raise UnknownQopForDigestAuth(qop) + + base = ( + f'username="{self._username}", realm="{realm}", nonce="{nonce}", ' + f'uri="{path}", response="{respdig}"' + ) + if opaque: + base += f', opaque="{opaque}"' + if algorithm: + base += f', algorithm="{algorithm.value}"' + if qop: + base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' + + if not cached: + cache_params: _DigestAuthCacheParams = { + "path": url, + "method": method, + "cached": cached, + "nonce": nonce, + "realm": realm, + "qop": qop, + "algorithm": algorithm, + "opaque": opaque, + } + self._digest_auth_cache[(method, url)] = {"p": cache_params, "c": 1} + + return f"Digest {base}" + + def _cached_metadata_for( + self, method: bytes, uri: bytes + ) -> Optional[_DigestAuthCacheEntry]: + return self._digest_auth_cache.get((method, uri)) class UnknownAuthConfig(Exception): """ The authentication config provided couldn't be interpreted. """ + def __init__(self, config): + super(Exception, self).__init__("{0!r} not of a known type.".format(config)) + + +class UnknownQopForDigestAuth(Exception): + def __init__(self, qop: Optional[str]): super(Exception, self).__init__( - '{0!r} not of a known type.'.format(config)) + "Unsupported Quality Of Protection value passed: {qop}".format(qop=qop) + ) @implementer(IAgent) @@ -30,6 +233,7 @@ class _RequestHeaderSetterAgent: Headers to set on each request before forwarding it to the wrapped agent. """ + def __init__(self, agent, headers): self._agent = agent self._headers = headers @@ -43,7 +247,148 @@ def request(self, method, uri, headers=None, bodyProducer=None): requestHeaders.setRawHeaders(header, values) return self._agent.request( - method, uri, headers=requestHeaders, bodyProducer=bodyProducer) + method, uri, headers=requestHeaders, bodyProducer=bodyProducer + ) + + +@implementer(IAgent) +class _RequestDigestAuthenticationAgent: + def __init__(self, agent: IAgent, auth: HTTPDigestAuth): + self._agent = agent + self._auth = auth + + def _on_401_response( + self, + www_authenticate_response: IResponse, + method: bytes, + uri: bytes, + headers: Optional[Headers], + bodyProducer: Optional[IBodyProducer], + ) -> Deferred[IResponse]: + """ + Handle the server`s 401 response, that is capable with authentication + headers, build the Authorization header + for + + :param www_authenticate_response: t.w.client.Response object + :param method: HTTP method to be used to perform the request + :param uri: URI to be used + :param headers: Additional headers to be sent with the request, + instead of "Authorization" header + :param bodyProducer: IBodyProducer implementer instance that would be + used to fetch the response body + + :return: + """ + assert ( + www_authenticate_response.code == 401 + ), """Got invalid pre-authentication response code, probably URL + does not support Digest auth + """ + www_authenticate_header_string = ( + www_authenticate_response.headers.getRawHeaders( + b"www-authenticate", [b""] + )[0] + ) + digest_header = _DIGEST_HEADER_PREFIX_REGEXP.sub( + b"", www_authenticate_header_string, count=1 + ) + digest_authentication_params = parse_dict_header(digest_header.decode("utf-8")) + + digest_authentication_header = self._auth._build_authentication_header( + uri, + method, + False, + digest_authentication_params["nonce"], + digest_authentication_params["realm"], + qop=digest_authentication_params.get("qop", None), + algorithm=_DIGEST_ALGO( + digest_authentication_params.get("algorithm", "MD5") + ), + opaque=digest_authentication_params.get("opaque", None), + ) + return self._perform_request( + digest_authentication_header, method, uri, headers, bodyProducer + ) + + def _perform_request( + self, + digest_authentication_header: str, + method: bytes, + uri: bytes, + headers: Optional[Headers], + bodyProducer: Optional[IBodyProducer], + ) -> Deferred[IResponse]: + """ + Add Authorization header and perform the request with + actual credentials + + :param digest_authentication_header: HTTP Digest Authorization + header string + :param method: HTTP method to be used to perform the request + :param uri: URI to be used + :param headers: Headers to be sent with the request + :param bodyProducer: IBodyProducer implementer instance that would be + used to fetch the response body + + :return: t.i.defer.Deferred (holding the result of the request) + """ + if not headers: + headers = Headers( + {b"Authorization": [digest_authentication_header.encode("utf-8")]} + ) + else: + headers.addRawHeader( + b"Authorization", digest_authentication_header.encode("utf-8") + ) + return self._agent.request( + method, uri, headers=headers, bodyProducer=bodyProducer + ) + + def request( + self, + method: bytes, + uri: bytes, + headers: Optional[Headers] = None, + bodyProducer: Optional[IBodyProducer] = None, + ) -> Deferred[IResponse]: + """ + Wrap the agent with HTTP Digest authentication. + + :param method: HTTP method to be used to perform the request + :param uri: URI to be used + :param headers: Headers to be sent with the request + :param bodyProducer: IBodyProducer implementer instance that would be + used to fetch the response body + + :return: t.i.defer.Deferred (holding the result of the request) + """ + + digest_auth_metadata = self._auth._cached_metadata_for(method, uri) + + if digest_auth_metadata is None: + # Perform first request for getting the realm; + # the client awaits for 401 response code here + d = self._agent.request(b"GET", uri, headers=headers, bodyProducer=None) + d.addCallback(self._on_401_response, method, uri, headers, bodyProducer) + else: + # We have performed authentication on that URI already + digest_params_from_cache = digest_auth_metadata["p"] + digest_params_from_cache["cached"] = True + digest_authentication_header = self._auth._build_authentication_header( + digest_params_from_cache["path"], + digest_params_from_cache["method"], + digest_params_from_cache["cached"], + digest_params_from_cache["nonce"], + digest_params_from_cache["realm"], + qop=digest_params_from_cache["qop"], + algorithm=digest_params_from_cache["algorithm"], + opaque=digest_params_from_cache["opaque"], + ) + d = self._perform_request( + digest_authentication_header, method, uri, headers, bodyProducer + ) + return d def add_basic_auth( @@ -68,18 +413,18 @@ def add_basic_auth( :returns: :class:`~twisted.web.iweb.IAgent` """ if not isinstance(username, bytes): - username = username.encode('utf-8') + username = username.encode("utf-8") if not isinstance(password, bytes): - password = password.encode('utf-8') + password = password.encode("utf-8") - creds = binascii.b2a_base64(b'%s:%s' % (username, password)).rstrip(b'\n') + creds = binascii.b2a_base64(b"%s:%s" % (username, password)).rstrip(b"\n") return _RequestHeaderSetterAgent( agent, - Headers({b'Authorization': [b'Basic ' + creds]}), + Headers({b"Authorization": [b"Basic " + creds]}), ) -def add_auth(agent, auth_config): +def add_auth(agent: IAgent, auth_config: Union[tuple, HTTPDigestAuth]) -> IAgent: """ Wrap an agent to perform authentication @@ -95,5 +440,7 @@ def add_auth(agent, auth_config): """ if isinstance(auth_config, tuple): return add_basic_auth(agent, auth_config[0], auth_config[1]) + elif isinstance(auth_config, HTTPDigestAuth): + return _RequestDigestAuthenticationAgent(agent, auth_config) raise UnknownAuthConfig(auth_config) diff --git a/src/treq/test/test_auth.py b/src/treq/test/test_auth.py index 4ba169dc..db4f1e8e 100644 --- a/src/treq/test/test_auth.py +++ b/src/treq/test/test_auth.py @@ -5,7 +5,8 @@ from twisted.web.iweb import IAgent from treq._agentspy import agent_spy -from treq.auth import _RequestHeaderSetterAgent, add_auth, UnknownAuthConfig +from treq.auth import _RequestHeaderSetterAgent, add_auth, \ + UnknownAuthConfig, HTTPDigestAuth, _DIGEST_ALGO class RequestHeaderSetterAgentTests(SynchronousTestCase): @@ -130,6 +131,57 @@ def test_add_basic_auth_bytes(self): Headers({b'Authorization': [b'Basic AQ//Ov/wAQ==']}), ) + def test_add_digest_auth(self): + """ + add_auth() wraps the given agent with one that adds a ``Authorization: + Digest ...`` authentication handler. + """ + agent, requests = agent_spy() + username = 'spam' + password = 'eggs' + auth = HTTPDigestAuth(username, password) + authAgent = add_auth(agent, auth) + + authAgent.request(b'method', b'uri') + + self.assertEqual( + authAgent._auth, + auth, + ) + self.assertEqual( + authAgent._auth._username, + username, + ) + self.assertEqual( + authAgent._auth._password, + password, + ) + + def test_add_digest_auth_bytes(self): + """ + Digest auth can be passed as `bytes` which will be encoded as utf-8. + """ + agent, requests = agent_spy() + username = b'spam' + password = b'eggs' + auth = HTTPDigestAuth(username, password) + authAgent = add_auth(agent, auth) + + authAgent.request(b'method', b'uri') + + self.assertEqual( + authAgent._auth, + auth, + ) + self.assertEqual( + authAgent._auth._username, + username.decode('utf-8'), + ) + self.assertEqual( + authAgent._auth._password, + password.decode('utf-8'), + ) + def test_add_unknown_auth(self): """ add_auth() raises UnknownAuthConfig when given anything other than @@ -139,3 +191,116 @@ def test_add_unknown_auth(self): invalidAuth = 1234 self.assertRaises(UnknownAuthConfig, add_auth, agent, invalidAuth) + + +class HttpDigestAuthTests(SynchronousTestCase): + + def setUp(self): + self.maxDiff = None + self._auth = HTTPDigestAuth('spam', 'eggs') + + def test_digest_unknown_algorithm(self): + """ + _DIGEST_ALGO('UNKNOWN') raises ValueError when the algorithm is unknown. + """ + with self.assertRaises(ValueError) as e: + _DIGEST_ALGO('UNKNOWN') + self.assertIn("'UNKNOWN' is not a valid _DIGEST_ALGO", str(e.exception)) + + def test_build_authentication_header_md5_no_cache_no_qop(self): + """ + _build_authentication_header test vectors using the MD5 algo and without + qop parameter generate the expected digest header when cache is + uninitialized. + """ + auth_header = self._auth._build_authentication_header( + b'/spam/eggs', b'GET', False, + 'b7f36bc385a662ed615f27bd9e94eecd', + 'me@dragons', qop=None, + algorithm=_DIGEST_ALGO('MD5') + ) + self.assertEquals( + auth_header, + 'Digest username="spam", realm="me@dragons", ' + + 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + + 'uri="/spam/eggs", ' + + 'response="fc05d17c55156b278132a52dc0dca526", algorithm="MD5"', + ) + + def test_build_authentication_header_md5_sess_no_cache(self): + """ + _build_authentication_header test vectors using the MD5-SESS algo and + with qop parameter generate the expected digest header when cache is + uninitialized. + """ + auth_header = self._auth._build_authentication_header( + b'/spam/eggs?ham=bacon', b'GET', False, + 'b7f36bc385a662ed615f27bd9e94eecd', + 'me@dragons', qop='auth', + algorithm=_DIGEST_ALGO('MD5-SESS') + ) + self.assertRegex( + auth_header, + 'Digest username="spam", realm="me@dragons", ' + + 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + + 'uri="/spam/eggs\\?ham=bacon", ' + + 'response="([0-9a-f]{32})", ' + + 'algorithm="MD5-SESS", qop="auth", ' + + 'nc=00000001, cnonce="([0-9a-f]{16})"', + ) + + def test_build_authentication_header_sha_no_cache_no_qop(self): + """ + _build_authentication_header test vectors using the SHA(SHA-1) algo and + without the qop parameter generate the expected digest header when cache + is uninitialized. + """ + auth_header = self._auth._build_authentication_header( + b'/spam/eggs', b'GET', False, + 'b7f36bc385a662ed615f27bd9e94eecd', + 'me@dragons', qop=None, + algorithm=_DIGEST_ALGO('SHA') + ) + + self.assertEquals( + auth_header, + 'Digest username="spam", realm="me@dragons", ' + + 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + + 'uri="/spam/eggs", ' + + 'response="45420a4786287998bcb99dfde563c3a198109b31", ' + + 'algorithm="SHA"' + ) + + def test_build_authentication_header_sha512_cache(self): + """ + _build_authentication_header test vectors using the SHA-512 algo and + with the qop parameter generate the expected digest header when the + digest cache is used for the second request. + """ + # Emulate 1st request + self._auth._build_authentication_header( + b'/spam/eggs', b'GET', False, + 'b7f36bc385a662ed615f27bd9e94eecd', + 'me@dragons', qop='auth', + algorithm=_DIGEST_ALGO('SHA-512') + ) + # Get header after cached request + auth_header = self._auth._build_authentication_header( + b'/spam/eggs', b'GET', True, + 'b7f36bc385a662ed615f27bd9e94eecd', + 'me@dragons', qop='auth', + algorithm=_DIGEST_ALGO('SHA-512') + ) + + # Make sure metadata was cached + self.assertTrue(self._auth._cached_metadata_for(b'GET', b'/spam/eggs')) + + self.assertRegex( + auth_header, + 'Digest username="spam", realm="me@dragons", ' + + 'nonce="b7f36bc385a662ed615f27bd9e94eecd", ' + + 'uri="/spam/eggs", ' + + 'response="([0-9a-f]{128})", ' + + 'algorithm="SHA-512", qop="auth", ' + + 'nc=00000002, cnonce="([0-9a-f]+?)"', + ) diff --git a/src/treq/test/test_treq_integration.py b/src/treq/test/test_treq_integration.py index 1518fb46..8ab79e7c 100644 --- a/src/treq/test/test_treq_integration.py +++ b/src/treq/test/test_treq_integration.py @@ -1,4 +1,5 @@ from io import BytesIO +from typing import Optional from twisted.python.url import URL @@ -11,12 +12,14 @@ from twisted.web.client import (Agent, BrowserLikePolicyForHTTPS, HTTPConnectionPool, ResponseFailed) +from twisted.web.http_headers import Headers from treq.test.util import DEBUG, skip_on_windows_because_of_199 from .local_httpbin.parent import _HTTPBinProcess import treq +from treq.auth import HTTPDigestAuth, UnknownQopForDigestAuth skip = skip_on_windows_because_of_199() @@ -51,7 +54,7 @@ class TreqIntegrationTests(TestCase): head = with_baseurl(treq.head) post = with_baseurl(treq.post) put = with_baseurl(treq.put) - patch = with_baseurl(treq.patch) + patch_req = with_baseurl(treq.patch) delete = with_baseurl(treq.delete) _httpbin_process = _HTTPBinProcess(https=False) @@ -205,7 +208,7 @@ def test_put(self): @inlineCallbacks def test_patch(self): - response = yield self.patch('/patch', data=b'Hello!') + response = yield self.patch_req('/patch', data=b'Hello!') self.assertEqual(response.code, 200) yield self.assert_data(response, 'Hello!') yield print_response(response) @@ -241,6 +244,147 @@ def test_failed_basic_auth(self): self.assertEqual(response.code, 401) yield print_response(response) + @inlineCallbacks + def test_digest_auth(self): + """ + Digest authentication succeeds. + """ + response = yield self.get('/digest-auth/auth/treq/treq', + auth=HTTPDigestAuth('treq', 'treq')) + self.assertEqual(response.code, 200) + yield print_response(response) + json = yield treq.json_content(response) + self.assertTrue(json['authenticated']) + self.assertEqual(json['user'], 'treq') + + @inlineCallbacks + def test_digest_auth_multi_qop(self): + """ + Digest authentication with alternative qop type. + """ + response = yield self.get('/digest-auth/undefined/treq/treq', + auth=HTTPDigestAuth('treq', 'treq')) + self.assertEqual(response.code, 200) + yield print_response(response) + json = yield treq.json_content(response) + self.assertTrue(json['authenticated']) + self.assertEqual(json['user'], 'treq') + + @inlineCallbacks + def test_digest_auth_multiple_calls(self): + """ + Proper Digest authentication credentials caching works across + multiple requests. + """ + + calls = 0 + headers_for_second_request: Optional[Headers] = None + + # Original Agent request call + agent_request_orig = Agent.request + + def agent_request_patched(*args, **kwargs): + """ + Patched Agent.request function, + that increases call count on every HTTP request + and appends. + """ + nonlocal calls, headers_for_second_request + response_deferred = agent_request_orig(*args, **kwargs) + calls += 1 + if calls == 2: + headers_for_second_request = args[3] + return response_deferred + + self.patch(Agent, 'request', agent_request_patched) + + auth = HTTPDigestAuth('treq-digest-auth-multiple', 'treq') + + response1 = yield self.get( + '/digest-auth/auth/treq-digest-auth-multiple/treq', + auth=auth + ) + self.assertEqual(response1.code, 200) + yield print_response(response1) + json1 = yield treq.json_content(response1) + + # Assume we did two actual HTTP requests - one to obtain credentials + # and second is original request with authentication + self.assertEqual( + calls, + 2 + ) + self.assertIn( + b'Authorization', + dict(headers_for_second_request.getAllRawHeaders()) + ) + + response2 = yield self.get( + '/digest-auth/auth/treq-digest-auth-multiple/treq', + auth=auth, + cookies=response1.cookies() + ) + self.assertEqual(response2.code, 200) + yield print_response(response2) + json2 = yield treq.json_content(response2) + self.assertTrue(json1['authenticated']) + self.assertEqual(json1['user'], 'treq-digest-auth-multiple') + + # Assume that responses are the same + self.assertEqual(json1, json2) + + # Assume we need only one call to obtain second response + self.assertEqual( + calls, + 3 + ) + + @inlineCallbacks + def test_digest_auth_sha256(self): + """ + Digest authentication with sha256 works. + """ + response = yield self.get('/digest-auth/auth/treq/treq/SHA-256', + auth=HTTPDigestAuth('treq', 'treq')) + self.assertEqual(response.code, 200) + yield print_response(response) + json = yield treq.json_content(response) + self.assertTrue(json['authenticated']) + self.assertEqual(json['user'], 'treq') + + @inlineCallbacks + def test_digest_auth_sha512(self): + """ + Digest authentication with sha512 works. + """ + response = yield self.get('/digest-auth/auth/treq/treq/SHA-512', + auth=HTTPDigestAuth('treq', 'treq')) + self.assertEqual(response.code, 200) + yield print_response(response) + json = yield treq.json_content(response) + self.assertTrue(json['authenticated']) + self.assertEqual(json['user'], 'treq') + + @inlineCallbacks + def test_failed_digest_auth(self): + """ + Digest auth with invalid credentials fails. + """ + response = yield self.get('/digest-auth/auth/treq/treq', + auth=HTTPDigestAuth('not-treq', 'not-treq')) + self.assertEqual(response.code, 401) + yield print_response(response) + + @inlineCallbacks + def test_failed_digest_auth_int(self): + """ + Digest authentication when qop type is unsupported fails with + UnknownQopForDigestAuth. + """ + with self.assertRaises(UnknownQopForDigestAuth): + yield self.get('/digest-auth/auth-int/treq/treq', + auth=HTTPDigestAuth('treq', 'treq')) + @inlineCallbacks def test_timeout(self): """ diff --git a/tox.ini b/tox.ini index c43dcf0b..58a7ce6a 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ commands = {envbindir}/trial {posargs:treq} [testenv:mypy] -basepython = python3.8 +basepython = python3.12 deps = mypy==1.0.1 mypy-zope==0.9.1 @@ -67,7 +67,7 @@ commands = [testenv:docs] extras = docs changedir = docs -basepython = python3.8 +basepython = python3.12 commands = sphinx-build -b html . html