Skip to content

Commit

Permalink
feat: Option to require basic authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
Ecno92 committed Oct 13, 2020
1 parent 8d1853e commit 548a5a4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 21 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Install the app::
HEALTHPOINT_BASICAUTH_USERNAME = 'john'
HEALTHPOINT_BASICAUTH_PASSWORD = 'doe'

# If set to True authentication is required for executing the
# health checks.
HEALTHPOINT_AUTH_REQUIRED = True

# urls.py
urlpatterns = [
...
Expand Down
31 changes: 30 additions & 1 deletion healthpoint/tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.core.urlresolvers import reverse
from django.test import TestCase
import base64
from django.test import TestCase, override_settings
from django.utils.http import urlencode


Expand All @@ -8,26 +9,54 @@ class HealthTestCase(TestCase):
def test_health(self):
resp = self.client.get(reverse('healthpoint_health'))
self.assertEqual(resp.status_code, 503)
self.assertEqual(resp.json(), {})

def test_health_failure(self):
resp = self._test_func('healthpoint.tests.health.failure')
self.assertEqual(resp.status_code, 503)
self.assertEqual(resp.json(), {})

def test_health_success(self):
resp = self._test_func('healthpoint.tests.health.success')
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json(), {})

def test_unknown_health(self):
resp = self._test_func('does.not.exist')
self.assertEqual(resp.status_code, 404)
self.assertEqual(resp.json(), {})

def test_unknown_health_and_success(self):
resp = self._test_func('does.not.exist', 'healthpoint.tests.health.success')
self.assertEqual(resp.status_code, 404)
self.assertEqual(resp.json(), {})

def test_unknown_health_and_failure(self):
resp = self._test_func('does.not.exist', 'healthpoint.tests.health.failure')
self.assertEqual(resp.status_code, 503)
self.assertEqual(resp.json(), {})

@override_settings(
HEALTHPOINT_BASICAUTH_USERNAME='john',
HEALTHPOINT_BASICAUTH_PASSWORD='doe'
)
def test_health_with_basic_auth(self):
headers = {
'HTTP_AUTHORIZATION': 'Basic ' +
base64.b64encode(b'john:doe').decode("ascii")
}
resp = self.client.get(reverse('healthpoint_health'), **headers)
self.assertEqual(resp.status_code, 503)
self.assertEqual(
resp.json(),
{'success': {'healthpoint.tests.health.success': 'This check always succeeds'},
'error': {'healthpoint.tests.health.failure': 'This check always fails'}}
)

@override_settings(HEALTHPOINT_AUTH_REQUIRED=True)
def test_health_with_auth_required(self):
resp = self.client.get(reverse('healthpoint_health'))
self.assertEqual(resp.status_code, 403)

def _test_func(self, *func):
params = urlencode({'test': func}, doseq=True)
Expand Down
58 changes: 38 additions & 20 deletions healthpoint/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from healthpoint.registry import get_health_checks


def _show_health_details(request):
# Only staff members are allowed to see details...
def _is_authenticated_request(request):
# Superusers and staff members are always correctly authenticated.
user = getattr(request, 'user', None)
if user is not None and (user.is_staff or user.is_superuser):
return True
# Requests with basic authentication credentials are only
# authenticated when they match the settings.
ba_username = getattr(settings, 'HEALTHPOINT_BASICAUTH_USERNAME', None)
ba_password = getattr(settings, 'HEALTHPOINT_BASICAUTH_PASSWORD', None)
authorization = request.META.get('HTTP_AUTHORIZATION')
Expand All @@ -27,23 +29,39 @@ def health(request):
tests_left = set(tests)
data = {'success': {}, 'error': {}}
status = 200
for health_check in get_health_checks():
func = '.'.join([
health_check.__module__,
health_check.__qualname__])
if tests and func not in tests:
continue
tests_left.discard(func)
success, detail = health_check()
data['success' if success else 'error'][func] = detail
if not success:
status = 503
if tests_left:
if status == 200:
status = 404
for test in tests_left:
data['error'][test] = 'Unknown health check'
# Only staff members are allowed to see details...
if not _show_health_details(request):

is_authenticated_request = _is_authenticated_request(request)
authentication_required = getattr(settings, 'HEALTHPOINT_AUTH_REQUIRED', False)

if authentication_required and not is_authenticated_request:
# The settings require a succesfully authenticated request
# Abort the health checks if this is not the case...
status = 403
data = {}
else:
# Perform the actual health checks if the authentication is successful
# or not required.
for health_check in get_health_checks():
func = '.'.join([
health_check.__module__,
health_check.__qualname__])
if tests and func not in tests:
continue
tests_left.discard(func)
success, detail = health_check()
data['success' if success else 'error'][func] = detail
if not success:
status = 503
if tests_left:
if status == 200:
status = 404
for test in tests_left:
data['error'][test] = 'Unknown health check'

# Only successfully authenticated requests are allowed to see the
# full details of the results.
if not is_authenticated_request:
data = {}

# Return the response
return JsonResponse(data, status=status)

0 comments on commit 548a5a4

Please sign in to comment.