diff --git a/redis/ocsp.py b/redis/ocsp.py index 666c7dcd08..4753434fba 100644 --- a/redis/ocsp.py +++ b/redis/ocsp.py @@ -56,9 +56,14 @@ def _check_certificate(issuer_cert, ocsp_bytes, validate=True): raise AuthorizationError("you are not authorized to view this ocsp certificate") if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL: if ocsp_response.certificate_status != ocsp.OCSPCertStatus.GOOD: - return False + raise ConnectionError( + f'Received an {str(ocsp_response.certificate_status).split(".")[1]} ' + "ocsp certificate status" + ) else: - return False + raise ConnectionError( + "failed to retrieve a sucessful response from the ocsp responder" + ) if ocsp_response.this_update >= datetime.datetime.now(): raise ConnectionError("ocsp certificate was issued in the future") diff --git a/tests/test_ssl.py b/tests/test_ssl.py index 0ae7440daf..ab5d47f293 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -107,7 +107,7 @@ def test_ssl_ocsp_called_withcrypto(self, request): def test_valid_ocsp_cert_http(self): from redis.ocsp import OCSPVerifier - hostnames = ["github.com", "aws.amazon.com", "ynet.co.il", "microsoft.com"] + hostnames = ["github.com", "aws.amazon.com", "ynet.co.il"] for hostname in hostnames: context = ssl.create_default_context() with socket.create_connection((hostname, 443)) as sock: @@ -124,7 +124,9 @@ def test_revoked_ocsp_certificate(self): with socket.create_connection((hostname, 443)) as sock: with context.wrap_socket(sock, server_hostname=hostname) as wrapped: ocsp = OCSPVerifier(wrapped, hostname, 443) - assert ocsp.is_valid() is False + with pytest.raises(ConnectionError) as e: + assert ocsp.is_valid() + assert "REVOKED" in str(e) @skip_if_nocryptography() def test_unauthorized_ocsp(self): @@ -147,7 +149,9 @@ def test_ocsp_not_present_in_response(self): with socket.create_connection((hostname, 443)) as sock: with context.wrap_socket(sock, server_hostname=hostname) as wrapped: ocsp = OCSPVerifier(wrapped, hostname, 443) - assert ocsp.is_valid() is False + with pytest.raises(ConnectionError) as e: + assert ocsp.is_valid() + assert "from the" in str(e) @skip_if_nocryptography() def test_unauthorized_then_direct(self):