Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions netbox_diode_plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class NetBoxDiodePluginConfig(PluginConfig):
"secrets_path": "/run/secrets/",
"netbox_to_diode_client_secret_name": "netbox_to_diode",
"diode_max_auth_retries": 3,

# List of audiences to require for the diode-to-netbox token.
# If empty, no audience is required.
"required_token_audience": [],
}


Expand Down
11 changes: 11 additions & 0 deletions netbox_diode_plugin/api/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from netbox_diode_plugin.plugin_config import (
get_diode_auth_introspect_url,
get_diode_user,
get_required_token_audience,
)

logger = logging.getLogger("netbox.diode_data")
Expand Down Expand Up @@ -65,6 +66,16 @@ def _introspect_token(self, token: str):
return None

if data.get("active"):
# if the plugin is configured to require specific token audience(s),
# reject the token if any are missing.
required_audience = get_required_token_audience()
if required_audience:
token_audience = set(data.get("aud", []))
missing_audience = set(required_audience) - token_audience
if missing_audience:
logger.error(f"Token audience(s) {missing_audience} not found in {token_audience}")
return None

diode_user = SimpleNamespace(
user=get_diode_user(),
token_scopes=data.get("scope", "").split(),
Expand Down
4 changes: 4 additions & 0 deletions netbox_diode_plugin/plugin_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ def get_diode_user():
diode_user = User.objects.create(username=diode_username, is_active=True)

return diode_user

def get_required_token_audience():
"""Returns the require token audience."""
return get_plugin_config("netbox_diode_plugin", "required_token_audience")
43 changes: 43 additions & 0 deletions netbox_diode_plugin/tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ def setUp(self):
)
self.introspect_url_patcher.start()

self.required_audience_patcher = mock.patch(
'netbox_diode_plugin.api.authentication.get_required_token_audience',
return_value=[]
)
self.required_audience_mock = self.required_audience_patcher.start()

def tearDown(self):
"""Clean up after tests."""
self.cache_patcher.stop()
self.cache_set_patcher.stop()
self.requests_patcher.stop()
self.introspect_url_patcher.stop()
self.required_audience_patcher.stop()

def test_authenticate_no_auth_header(self):
"""Test authentication with no Authorization header."""
Expand Down Expand Up @@ -103,6 +110,42 @@ def test_authenticate_token_with_required_scope(self):
self.assertEqual(user, self.diode_user.user)
self.cache_set_mock.assert_called_once()

def test_authenticate_token_with_required_audience(self):
"""Test authentication with token having required audience."""
self.cache_get_mock.return_value = None
self.requests_mock.return_value.json.return_value = {
'active': True,
'scope': 'netbox:read netbox:write',
'exp': 1000,
'iat': 500
}

request = self.factory.get('/', HTTP_AUTHORIZATION=f'Bearer {self.token_with_scope}')

self.cache_get_mock.return_value = None
self.required_audience_mock.return_value = ['netbox']
try:
# should fail if the token does not have the required audience
with self.assertRaises(AuthenticationFailed):
self.auth.authenticate(request)
self.required_audience_mock.assert_called_once()
self.cache_set_mock.assert_not_called()

# should succeed if the token has the required audience
self.requests_mock.return_value.json.return_value = {
'active': True,
'aud': ['netbox', 'api', 'other'],
'scope': 'netbox:read netbox:write',
'exp': 1000,
'iat': 500
}

user, _ = self.auth.authenticate(request)
self.assertEqual(user, self.diode_user.user)
self.cache_set_mock.assert_called_once()
finally:
self.required_audience_patcher.return_value = []

def test_authenticate_token_introspection_failure(self):
"""Test authentication when token introspection fails."""
self.cache_get_mock.return_value = None
Expand Down