Skip to content
This repository has been archived by the owner on Jul 9, 2020. It is now read-only.

Commit

Permalink
Simplified REGISTRATION_ENABLED flag + added disabled test
Browse files Browse the repository at this point in the history
  • Loading branch information
nemesifier committed May 13, 2016
1 parent 6f22de8 commit 3a48279
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 74 deletions.
11 changes: 3 additions & 8 deletions django_netjsonconfig/controller/urls.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from django.conf.urls import url

from . import views
from ..settings import REGISTRATION_ENABLED

urlpatterns = [
url(r'^controller/checksum/(?P<pk>[^/]+)/$',
Expand All @@ -13,11 +12,7 @@
url(r'^controller/report-status/(?P<pk>[^/]+)/$',
views.report_status,
name='report_status'),
url(r'^controller/register/$',
views.register,
name='register'),
]

if REGISTRATION_ENABLED:
urlpatterns += [
url(r'^controller/register/$',
views.register,
name='register'),
]
99 changes: 50 additions & 49 deletions django_netjsonconfig/controller/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,52 +52,53 @@ def report_status(request, pk):
content_type='text/plain')


if settings.REGISTRATION_ENABLED:
@csrf_exempt
@require_http_methods(['POST'])
def register(request):
"""
registers new config
"""
# ensure request is well formed and authorized
allowed_backends = [path for path, name in settings.BACKENDS]
required_params = [('secret', settings.SHARED_SECRET),
('name', None),
('backend', allowed_backends)]
# valid required params or forbid
for key, value in required_params:
bad_response = forbid_unallowed(request, 'POST', key, value)
if bad_response:
return bad_response
key = None
last_ip = request.META.get('REMOTE_ADDR')
if settings.CONSISTENT_REGISTRATION:
key = request.POST.get('key')
# try retrieving existing Config first
# (key is filled only if CONSISTENT_REGISTRATION is enabled)
try:
config = Config.objects.get(key=key)
# create new Config otherwise
except Config.DoesNotExist:
new = True
options = dict(name=request.POST.get('name'),
backend=request.POST.get('backend'),
last_ip=last_ip)
# do not specify key if ``None``, would cause exception
if key:
options['key'] = key
config = Config.objects.create(**options)
# update last_ip on existing configs
else:
new = False
config.last_ip = last_ip
config.save()
# return id and key in response
s = 'registration-result: success\n' \
'uuid: {id}\n' \
'key: {key}\n' \
'hostname: {name}\n'
s += 'is-new: %s\n' % (int(new))
return ControllerResponse(s.format(**config.__dict__),
content_type='text/plain',
status=201)
@csrf_exempt
@require_http_methods(['POST'])
def register(request):
"""
registers new config
"""
if not settings.REGISTRATION_ENABLED:
return ControllerResponse(status=404)
# ensure request is well formed and authorized
allowed_backends = [path for path, name in settings.BACKENDS]
required_params = [('secret', settings.SHARED_SECRET),
('name', None),
('backend', allowed_backends)]
# valid required params or forbid
for key, value in required_params:
bad_response = forbid_unallowed(request, 'POST', key, value)
if bad_response:
return bad_response
key = None
last_ip = request.META.get('REMOTE_ADDR')
if settings.CONSISTENT_REGISTRATION:
key = request.POST.get('key')
# try retrieving existing Config first
# (key is filled only if CONSISTENT_REGISTRATION is enabled)
try:
config = Config.objects.get(key=key)
# create new Config otherwise
except Config.DoesNotExist:
new = True
options = dict(name=request.POST.get('name'),
backend=request.POST.get('backend'),
last_ip=last_ip)
# do not specify key if ``None``, would cause exception
if key:
options['key'] = key
config = Config.objects.create(**options)
# update last_ip on existing configs
else:
new = False
config.last_ip = last_ip
config.save()
# return id and key in response
s = 'registration-result: success\n' \
'uuid: {id}\n' \
'key: {key}\n' \
'hostname: {name}\n'
s += 'is-new: %s\n' % (int(new))
return ControllerResponse(s.format(**config.__dict__),
content_type='text/plain',
status=201)
48 changes: 31 additions & 17 deletions django_netjsonconfig/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@
TEST_MACADDR = '00:11:22:33:44:55'
mac_plus_secret = '%s+%s' % (TEST_MACADDR, settings.NETJSONCONFIG_SHARED_SECRET)
TEST_CONSISTENT_KEY = md5(mac_plus_secret.encode()).hexdigest()
REGISTER_URL = reverse('controller:register')


class TestController(TestCase):
"""
tests for django_netjsonconfig.controller
"""
register_url = reverse('controller:register')

def _create_config(self):
d = Config(name='test',
backend='netjsonconfig.OpenWrt',
Expand Down Expand Up @@ -93,7 +92,7 @@ def test_download_config_405(self):
self.assertEqual(response.status_code, 405)

def test_register(self):
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'name': TEST_MACADDR,
'backend': 'netjsonconfig.OpenWrt'
Expand All @@ -109,19 +108,19 @@ def test_register(self):

def test_register_400(self):
# missing secret
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'name': TEST_MACADDR,
'backend': 'netjsonconfig.OpenWrt'
})
self.assertContains(response, 'secret', status_code=400)
# missing name
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'backend': 'netjsonconfig.OpenWrt'
})
self.assertContains(response, 'name', status_code=400)
# missing backend
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'name': TEST_MACADDR,
})
Expand All @@ -130,14 +129,14 @@ def test_register_400(self):

def test_register_403(self):
# wrong secret
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': 'WRONG',
'name': TEST_MACADDR,
'backend': 'netjsonconfig.OpenWrt'
})
self.assertContains(response, 'wrong secret', status_code=403)
# wrong backend
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'name': TEST_MACADDR,
'backend': 'wrong'
Expand All @@ -146,11 +145,11 @@ def test_register_403(self):
self._check_header(response)

def test_register_405(self):
response = self.client.get(self.register_url)
response = self.client.get(REGISTER_URL)
self.assertEqual(response.status_code, 405)

def test_consistent_registration_new(self):
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'name': TEST_MACADDR,
'key': TEST_CONSISTENT_KEY,
Expand All @@ -172,7 +171,7 @@ def test_consistent_registration_existing(self):
c = self._create_config()
c.key = TEST_CONSISTENT_KEY
c.save()
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'name': TEST_MACADDR,
'key': TEST_CONSISTENT_KEY,
Expand Down Expand Up @@ -245,11 +244,6 @@ def test_report_status_405(self):


class TestConsistentRegistrationDisabled(TestCase):
"""
tests for django_netjsonconfig.controller
"""
register_url = reverse('controller:register')

@classmethod
def setUpClass(cls):
super(TestConsistentRegistrationDisabled, cls).setUpClass()
Expand All @@ -261,7 +255,7 @@ def tearDownClass(cls):
app_settings.CONSISTENT_REGISTRATION = True

def test_consistent_registration_disabled(self):
response = self.client.post(self.register_url, {
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'name': TEST_MACADDR,
'key': TEST_CONSISTENT_KEY,
Expand All @@ -276,3 +270,23 @@ def test_consistent_registration_disabled(self):
self.assertNotEqual(key, TEST_CONSISTENT_KEY)
self.assertEqual(Config.objects.filter(key=TEST_CONSISTENT_KEY).count(), 0)
self.assertEqual(Config.objects.filter(key=key).count(), 1)


class TestRegistrationDisabled(TestCase):
@classmethod
def setUpClass(cls):
super(TestRegistrationDisabled, cls).setUpClass()
app_settings.REGISTRATION_ENABLED = False

@classmethod
def tearDownClass(cls):
super(TestRegistrationDisabled, cls).tearDownClass()
app_settings.REGISTRATION_ENABLED = True

def test_register_404(self):
response = self.client.post(REGISTER_URL, {
'secret': settings.NETJSONCONFIG_SHARED_SECRET,
'name': TEST_MACADDR,
'backend': 'netjsonconfig.OpenWrt'
})
self.assertEqual(response.status_code, 404)

0 comments on commit 3a48279

Please sign in to comment.