diff --git a/netbox_diode_plugin/__init__.py b/netbox_diode_plugin/__init__.py index 3b3b48f..549605d 100644 --- a/netbox_diode_plugin/__init__.py +++ b/netbox_diode_plugin/__init__.py @@ -31,6 +31,10 @@ class NetBoxDiodePluginConfig(PluginConfig): "secrets_path": "/run/secrets/", "netbox_to_diode_client_secret_name": "netbox_to_diode", "diode_max_auth_retries": 3, + + # List of audiences to require for the diode-to-netbox token. + # If empty, no audience is required. + "required_token_audience": [], } diff --git a/netbox_diode_plugin/api/authentication.py b/netbox_diode_plugin/api/authentication.py index a3dc325..1379289 100644 --- a/netbox_diode_plugin/api/authentication.py +++ b/netbox_diode_plugin/api/authentication.py @@ -14,6 +14,7 @@ from netbox_diode_plugin.plugin_config import ( get_diode_auth_introspect_url, get_diode_user, + get_required_token_audience, ) logger = logging.getLogger("netbox.diode_data") @@ -65,6 +66,16 @@ def _introspect_token(self, token: str): return None if data.get("active"): + # if the plugin is configured to require specific token audience(s), + # reject the token if any are missing. + required_audience = get_required_token_audience() + if required_audience: + token_audience = set(data.get("aud", [])) + missing_audience = set(required_audience) - token_audience + if missing_audience: + logger.error(f"Token audience(s) {missing_audience} not found in {token_audience}") + return None + diode_user = SimpleNamespace( user=get_diode_user(), token_scopes=data.get("scope", "").split(), diff --git a/netbox_diode_plugin/plugin_config.py b/netbox_diode_plugin/plugin_config.py index 3c15c0e..eb1cf56 100644 --- a/netbox_diode_plugin/plugin_config.py +++ b/netbox_diode_plugin/plugin_config.py @@ -89,3 +89,7 @@ def get_diode_user(): diode_user = User.objects.create(username=diode_username, is_active=True) return diode_user + +def get_required_token_audience(): + """Returns the require token audience.""" + return get_plugin_config("netbox_diode_plugin", "required_token_audience") diff --git a/netbox_diode_plugin/tests/test_authentication.py b/netbox_diode_plugin/tests/test_authentication.py index 528d107..1bd754f 100644 --- a/netbox_diode_plugin/tests/test_authentication.py +++ b/netbox_diode_plugin/tests/test_authentication.py @@ -49,12 +49,19 @@ def setUp(self): ) self.introspect_url_patcher.start() + self.required_audience_patcher = mock.patch( + 'netbox_diode_plugin.api.authentication.get_required_token_audience', + return_value=[] + ) + self.required_audience_mock = self.required_audience_patcher.start() + def tearDown(self): """Clean up after tests.""" self.cache_patcher.stop() self.cache_set_patcher.stop() self.requests_patcher.stop() self.introspect_url_patcher.stop() + self.required_audience_patcher.stop() def test_authenticate_no_auth_header(self): """Test authentication with no Authorization header.""" @@ -103,6 +110,42 @@ def test_authenticate_token_with_required_scope(self): self.assertEqual(user, self.diode_user.user) self.cache_set_mock.assert_called_once() + def test_authenticate_token_with_required_audience(self): + """Test authentication with token having required audience.""" + self.cache_get_mock.return_value = None + self.requests_mock.return_value.json.return_value = { + 'active': True, + 'scope': 'netbox:read netbox:write', + 'exp': 1000, + 'iat': 500 + } + + request = self.factory.get('/', HTTP_AUTHORIZATION=f'Bearer {self.token_with_scope}') + + self.cache_get_mock.return_value = None + self.required_audience_mock.return_value = ['netbox'] + try: + # should fail if the token does not have the required audience + with self.assertRaises(AuthenticationFailed): + self.auth.authenticate(request) + self.required_audience_mock.assert_called_once() + self.cache_set_mock.assert_not_called() + + # should succeed if the token has the required audience + self.requests_mock.return_value.json.return_value = { + 'active': True, + 'aud': ['netbox', 'api', 'other'], + 'scope': 'netbox:read netbox:write', + 'exp': 1000, + 'iat': 500 + } + + user, _ = self.auth.authenticate(request) + self.assertEqual(user, self.diode_user.user) + self.cache_set_mock.assert_called_once() + finally: + self.required_audience_patcher.return_value = [] + def test_authenticate_token_introspection_failure(self): """Test authentication when token introspection fails.""" self.cache_get_mock.return_value = None