diff --git a/.gitignore b/.gitignore index b5821bd..53cce25 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,5 @@ dist/ # Docker docker/coverage !docker/netbox/env +docker/oauth2/secrets/* +!docker/oauth2/secrets/.gitkeep diff --git a/README.md b/README.md index 96f93b5..0908329 100644 --- a/README.md +++ b/README.md @@ -52,10 +52,18 @@ PLUGINS_CONFIG = { # Username associated with changes applied via plugin "diode_username": "diode", + + # netbox-to-diode client_secret created during diode bootstrap. + "netbox_to_diode_client_secret": "..." }, } ``` +If you are running diode locally via the quickstart, the `netbox-to-diode` client_secret may be found in `/path/to/diode/oauth2/client/client-credentials.json`. eg: +``` +echo $(jq -r '.[] | select(.client_id == "netbox-to-diode") | .client_secret' /path/to/diode/oauth2/client/client-credentials.json) +``` + Note: Once you customise usernames with PLUGINS_CONFIG during first installation, you should not change or remove them later on. Doing so will cause the plugin to stop working properly. diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 88b6702..94677dd 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -20,7 +20,8 @@ services: volumes: - ./netbox/docker-entrypoint.sh:/opt/netbox/docker-entrypoint.sh:z,ro - ./netbox/nginx-unit.json:/opt/netbox/nginx-unit.json:z,ro - - ../netbox_diode_plugin:/opt/netbox/netbox/netbox_diode_plugin:ro + - ../netbox_diode_plugin:/opt/netbox/netbox/netbox_diode_plugin:z,rw + - ./oauth2/secrets:/run/secrets:z,ro - ./netbox/launch-netbox.sh:/opt/netbox/launch-netbox.sh:z,ro - ./netbox/plugins_dev.py:/etc/netbox/config/plugins.py:z,ro - ./coverage:/opt/netbox/netbox/coverage:z,rw diff --git a/docker/netbox/env/netbox.env b/docker/netbox/env/netbox.env index 2215733..4920b18 100644 --- a/docker/netbox/env/netbox.env +++ b/docker/netbox/env/netbox.env @@ -36,6 +36,6 @@ SUPERUSER_EMAIL= SUPERUSER_NAME=admin SUPERUSER_PASSWORD=admin WEBHOOKS_ENABLED=true -RELOAD_NETBOX_ON_DIODE_PLUGIN_CHANGE=false +RELOAD_NETBOX_ON_DIODE_PLUGIN_CHANGE=true BASE_PATH=netbox/ DEBUG=False \ No newline at end of file diff --git a/docker/oauth2/secrets/.gitkeep b/docker/oauth2/secrets/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/netbox_diode_plugin/__init__.py b/netbox_diode_plugin/__init__.py index 16f718d..3b3b48f 100644 --- a/netbox_diode_plugin/__init__.py +++ b/netbox_diode_plugin/__init__.py @@ -22,6 +22,15 @@ class NetBoxDiodePluginConfig(PluginConfig): # Default username associated with changes applied via plugin "diode_username": "diode", + + # client_id and client_secret for communication with Diode server. + # By default, the secret is read from a file /run/secrets/netbox_to_diode + # but may be specified directly as a string in netbox_to_diode_client_secret + "netbox_to_diode_client_id": "netbox-to-diode", + "netbox_to_diode_client_secret": None, + "secrets_path": "/run/secrets/", + "netbox_to_diode_client_secret_name": "netbox_to_diode", + "diode_max_auth_retries": 3, } diff --git a/netbox_diode_plugin/client.py b/netbox_diode_plugin/client.py new file mode 100644 index 0000000..5d9e785 --- /dev/null +++ b/netbox_diode_plugin/client.py @@ -0,0 +1,36 @@ +# !/usr/bin/env python +# Copyright 2025 NetBox Labs, Inc. +"""Diode NetBox Plugin - Client.""" + +import logging + +from netbox_diode_plugin.diode.clients import get_api_client + +logger = logging.getLogger("netbox.diode_data") + + +def create_client(request, client_name: str, scope: str): + """Create client.""" + logger.info(f"Creating client {client_name} with scope {scope}") + return get_api_client().create_client(client_name, scope) + + +def delete_client(request, client_id: str): + """Delete client.""" + sanitized_client_id = client_id.replace("\n", "").replace("\r", "") + logger.info(f"Deleting client {sanitized_client_id}") + return get_api_client().delete_client(client_id) + + +def list_clients(request): + """List clients.""" + logger.info("Listing clients") + response = get_api_client().list_clients() + return response["data"] + + +def get_client(request, client_id: str): + """Get client.""" + sanitized_client_id = client_id.replace("\n", "").replace("\r", "") + logger.info(f"Getting client {sanitized_client_id}") + return get_api_client().get_client(client_id) diff --git a/netbox_diode_plugin/diode/clients.py b/netbox_diode_plugin/diode/clients.py new file mode 100644 index 0000000..32a3670 --- /dev/null +++ b/netbox_diode_plugin/diode/clients.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python +# Copyright 2025 NetBox Labs Inc +"""Diode NetBox Plugin - Diode - Auth.""" + +import datetime +import json +import logging +import re +import threading +from dataclasses import dataclass +from urllib.parse import urlencode + +import requests + +from netbox_diode_plugin.plugin_config import ( + get_diode_auth_base_url, + get_diode_credentials, + get_diode_max_auth_retries, +) + +SCOPE_DIODE_READ = "diode:read" +SCOPE_DIODE_WRITE = "diode:write" + +logger = logging.getLogger("netbox.diode_data") + +valid_client_id_re = re.compile(r"^[a-zA-Z0-9_-]{1,64}$") + +_client = None +_client_lock = threading.Lock() +def get_api_client(): + """Get the client API client.""" + global _client + global _client_lock + + with _client_lock: + if _client is None: + client_id, client_secret = get_diode_credentials() + if not client_id: + raise ClientAPIError( + "Please update the plugin configuration to access this feature.\nMissing netbox to diode client id.", 500) + if not client_secret: + raise ClientAPIError( + "Please update the plugin configuration to access this feature.\nMissing netbox to diode client secret.", 500) + max_auth_retries = get_diode_max_auth_retries() + _client = ClientAPI( + base_url=get_diode_auth_base_url(), + client_id=client_id, + client_secret=client_secret, + max_auth_retries=max_auth_retries, + ) + return _client + + +class ClientAPIError(Exception): + """Client API Error.""" + + def __init__(self, message: str, status_code: int = 500): + """Initialize the ClientAPIError.""" + self.message = message + self.status_code = status_code + super().__init__(self.message) + + def is_auth_error(self) -> bool: + """Check if the error is an authentication error.""" + return self.status_code == 401 or self.status_code == 403 + +class ClientAPI: + """Manages Diode Clients.""" + + def __init__(self, base_url: str, client_id: str, client_secret: str, max_auth_retries: int = 2): + """Initialize the ClientAPI.""" + self.base_url = base_url + self.client_id = client_id + self.client_secret = client_secret + + self._max_auth_retries = max_auth_retries + self._client_auth_token = None + self._client_auth_token_lock = threading.Lock() + + def create_client(self, name: str, scope: str) -> dict: + """Create a client.""" + for attempt in range(self._max_auth_retries): + token = None + try: + token = self._get_token() + url = self.base_url + "/clients" + headers = {"Authorization": f"Bearer {token}"} + data = { + "client_name": name, + "scope": scope, + } + response = requests.post(url, json=data, headers=headers) + if response.status_code != 201: + raise ClientAPIError("Failed to create client", response.status_code) + return response.json() + except ClientAPIError as e: + if e.is_auth_error() and attempt < self._max_auth_retries - 1: + logger.info(f"Retrying create_client due to unauthenticated error, attempt {attempt + 1}") + self._mark_client_auth_token_invalid(token) + continue + raise + raise ClientAPIError("Failed to create client: unexpected state", 500) + + def get_client(self, client_id: str) -> dict: + """Get a client.""" + if not valid_client_id_re.match(client_id): + raise ValueError(f"Invalid client ID: {client_id}") + + for attempt in range(self._max_auth_retries): + token = None + try: + token = self._get_token() + url = self.base_url + f"/clients/{client_id}" + headers = {"Authorization": f"Bearer {token}"} + response = requests.get(url, headers=headers) + if response.status_code == 401 or response.status_code == 403: + raise ClientAPIError(f"Failed to get client {client_id}", response.status_code) + if response.status_code != 200: + raise ClientAPIError(f"Failed to get client {client_id}", response.status_code) + return response.json() + except ClientAPIError as e: + if e.is_auth_error() and attempt < self._max_auth_retries - 1: + logger.info(f"Retrying delete_client due to unauthenticated error, attempt {attempt + 1}") + self._mark_client_auth_token_invalid(token) + continue + raise + raise ClientAPIError(f"Failed to get client {client_id}: unexpected state") + + def delete_client(self, client_id: str) -> None: + """Delete a client.""" + if not valid_client_id_re.match(client_id): + raise ValueError(f"Invalid client ID: {client_id}") + + for attempt in range(self._max_auth_retries): + token = None + try: + token = self._get_token() + url = self.base_url + f"/clients/{client_id}" + headers = {"Authorization": f"Bearer {token}"} + response = requests.delete(url, headers=headers) + if response.status_code != 204: + raise ClientAPIError(f"Failed to delete client {client_id}", response.status_code) + return + except ClientAPIError as e: + if e.is_auth_error() and attempt < self._max_auth_retries - 1: + logger.info(f"Retrying delete_client due to unauthenticated error, attempt {attempt + 1}") + self._mark_client_auth_token_invalid(token) + continue + raise + raise ClientAPIError(f"Failed to delete client {client_id}: unexpected state") + + + def list_clients(self, page_token: str | None = None, page_size: int | None = None) -> list[dict]: + """List all clients.""" + for attempt in range(self._max_auth_retries): + token = None + try: + token = self._get_token() + url = self.base_url + "/clients" + headers = {"Authorization": f"Bearer {token}"} + params = {} + if page_token: + params["page_token"] = page_token + if page_size: + params["page_size"] = page_size + response = requests.get(url, headers=headers, params=params) + if response.status_code != 200: + raise ClientAPIError("Failed to get clients", response.status_code) + return response.json() + except ClientAPIError as e: + if e.is_auth_error() and attempt < self._max_auth_retries - 1: + logger.info(f"Retrying list_clients due to unauthenticated error, attempt {attempt + 1}") + self._mark_client_auth_token_invalid(token) + continue + raise + raise ClientAPIError("Failed to list clients: unexpected state") + + + def _get_token(self) -> str: + """Get a token for the Diode Auth Service.""" + with self._client_auth_token_lock: + if self._client_auth_token: + return self._client_auth_token + self._client_auth_token = self._authenticate() + return self._client_auth_token + + def _mark_client_auth_token_invalid(self, token: str): + """Mark a client auth token as invalid.""" + with self._client_auth_token_lock: + self._client_auth_token = None + + def _authenticate(self) -> str: + """Get a new access token for the Diode Auth Service.""" + headers = {"Content-Type": "application/x-www-form-urlencoded"} + data = urlencode( + { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + "scope": f"{SCOPE_DIODE_READ} {SCOPE_DIODE_WRITE}", + } + ) + url = self.base_url + "/token" + try: + response = requests.post(url, data=data, headers=headers) + except Exception as e: + raise ClientAPIError(f"Failed to obtain access token: {e}", 401) from e + if response.status_code != 200: + raise ClientAPIError(f"Failed to obtain access token: {response.reason}", 401) + + try: + token_info = response.json() + except Exception as e: + raise ClientAPIError(f"Failed to parse access token response: {e}", 401) from e + + access_token = token_info.get("access_token") + if not access_token: + raise ClientAPIError(f"Failed to obtain access token for client {self._client_id}", 401) + + return access_token + diff --git a/netbox_diode_plugin/forms.py b/netbox_diode_plugin/forms.py index 5bec310..05de3bd 100644 --- a/netbox_diode_plugin/forms.py +++ b/netbox_diode_plugin/forms.py @@ -1,6 +1,8 @@ # !/usr/bin/env python # Copyright 2025 NetBox Labs, Inc. """Diode NetBox Plugin - Forms.""" +from django import forms +from django.utils.translation import gettext_lazy as _ from netbox.forms import NetBoxModelForm from netbox.plugins import get_plugin_config from utilities.forms.rendering import FieldSet @@ -9,6 +11,7 @@ __all__ = ( "SettingsForm", + "ClientCredentialForm", ) @@ -40,3 +43,14 @@ def __init__(self, *args, **kwargs): self.fields["diode_target"].help_text = ( "This field is not allowed to be modified." ) + + +class ClientCredentialForm(forms.Form): + """Form for adding client credentials.""" + + client_name = forms.CharField( + label=_("Client Name"), + required=True, + help_text=_("Enter a name for the client credential that will be created for authentication to the Diode ingestion service."), + widget=forms.TextInput(attrs={"class": "form-control"}), + ) diff --git a/netbox_diode_plugin/models.py b/netbox_diode_plugin/models.py index 9079f9e..ed9e6c7 100644 --- a/netbox_diode_plugin/models.py +++ b/netbox_diode_plugin/models.py @@ -40,4 +40,18 @@ def get_absolute_url(self): return reverse("plugins:netbox_diode_plugin:settings") +class ClientCredentials(models.Model): + """Dummy model to allow for permissions, saved filters, etc..""" + + class Meta: + """Meta class.""" + + managed = False + + default_permissions = () + + permissions = ( + ("view_clientcredentials", "Can view Client Credentials"), + ("add_clientcredentials", "Can perform actions on Client Credentials"), + ) diff --git a/netbox_diode_plugin/navigation.py b/netbox_diode_plugin/navigation.py index 4fb18ef..cdc64ce 100644 --- a/netbox_diode_plugin/navigation.py +++ b/netbox_diode_plugin/navigation.py @@ -2,24 +2,26 @@ # Copyright 2025 NetBox Labs, Inc. """Diode NetBox Plugin - Navigation.""" +from django.utils.translation import gettext as _ from netbox.plugins import PluginMenu, PluginMenuItem -settings = { - "link": "plugins:netbox_diode_plugin:settings", - "link_text": "Settings", - "staff_only": True, -} - +_diode_menu_items = ( + PluginMenuItem( + link="plugins:netbox_diode_plugin:settings", + link_text=_("Settings"), + staff_only= True, + ), + PluginMenuItem( + link="plugins:netbox_diode_plugin:client_credential_list", + link_text=_("Client Credentials"), + staff_only= True, + ), +) menu = PluginMenu( label="Diode", groups=( - ( - "Diode", - ( - PluginMenuItem(**settings), - ), - ), + (_("Diode"), _diode_menu_items), ), icon_class="mdi mdi-upload", ) diff --git a/netbox_diode_plugin/plugin_config.py b/netbox_diode_plugin/plugin_config.py index 60dbcad..3c15c0e 100644 --- a/netbox_diode_plugin/plugin_config.py +++ b/netbox_diode_plugin/plugin_config.py @@ -2,6 +2,8 @@ # Copyright 2025 NetBox Labs, Inc. """Diode NetBox Plugin - Plugin Settings.""" +import logging +import os from urllib.parse import urlparse from django.contrib.auth import get_user_model @@ -14,6 +16,7 @@ User = get_user_model() +logger = logging.getLogger("netbox.diode_data") def _parse_diode_target(target: str) -> tuple[str, str, bool]: """Parse the target into authority, path and tls_verify.""" @@ -31,6 +34,11 @@ def _parse_diode_target(target: str) -> tuple[str, str, bool]: def get_diode_auth_introspect_url(): """Returns the Diode Auth introspect URL.""" + diode_auth_base_url = get_diode_auth_base_url() + return f"{diode_auth_base_url}/introspect" + +def get_diode_auth_base_url(): + """Returns the Diode Auth service base URL.""" diode_target = get_plugin_config("netbox_diode_plugin", "diode_target") diode_target_override = get_plugin_config( "netbox_diode_plugin", "diode_target_override" @@ -42,8 +50,34 @@ def get_diode_auth_introspect_url(): scheme = "https" if tls_verify else "http" path = path.rstrip("/") - return f"{scheme}://{authority}{path}/auth/introspect" + return f"{scheme}://{authority}{path}/auth" + +def get_diode_credentials(): + """Returns the Diode credentials.""" + client_id = get_plugin_config("netbox_diode_plugin", "netbox_to_diode_client_id") + secrets_path = get_plugin_config("netbox_diode_plugin", "secrets_path") + secret_name = get_plugin_config("netbox_diode_plugin", "netbox_to_diode_client_secret_name") + client_secret = get_plugin_config("netbox_diode_plugin", "netbox_to_diode_client_secret") + + if not client_secret: + secret_file = os.path.join(secrets_path, secret_name) + client_secret = _read_secret(secret_file, client_secret) + return client_id, client_secret + +def get_diode_max_auth_retries(): + """Returns the Diode max auth retries.""" + return get_plugin_config("netbox_diode_plugin", "diode_max_auth_retries") + +# Read secret from file +def _read_secret(secret_file: str, default: str | None = None) -> str | None: + try: + f = open(secret_file, encoding='utf-8') + except OSError: + return default + else: + with f: + return f.readline().strip() def get_diode_user(): """Returns the Diode user.""" diff --git a/netbox_diode_plugin/tables.py b/netbox_diode_plugin/tables.py new file mode 100644 index 0000000..8e69d6a --- /dev/null +++ b/netbox_diode_plugin/tables.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# Copyright 2024 NetBox Labs Inc +"""Diode NetBox Plugin - Tables.""" +import logging + +import django_tables2 as tables +from django.urls import reverse +from django.utils.dateparse import parse_datetime +from django.utils.safestring import mark_safe +from django.utils.translation import gettext_lazy as _ +from netbox.tables import BaseTable, columns + + +class ClientCredentialsTable(BaseTable): + """Client credentials table.""" + + label = tables.Column( + verbose_name=_("Name"), + accessor="client_name", + orderable=False, + ) + client_id = tables.Column( + verbose_name=_("Client ID"), + accessor="client_id", + orderable=False, + ) + created_at = columns.DateTimeColumn( + verbose_name=_("Created"), + accessor="created_at", + orderable=False, + ) + client_secret = tables.Column( + verbose_name=_("Client Secret"), + empty_values=(), + orderable=False, + ) + actions = tables.Column( + verbose_name=_(""), + orderable=False, + empty_values=(), + attrs={ + "td": { + "class": "text-end", + } + }, + ) + + exempt_columns = ("actions") + embedded = False + + class Meta: + """Meta class.""" + + attrs = { + "class": "table table-hover object-list", + "td": {"class": "align-middle"}, + } + fields = None + default_columns = ( + "label", + "client_id", + "created_at", + "client_secret", + "actions", + ) + + empty_text = _("No Client Credentials to display") + footer = False + + def render_client_secret(self, value): + """Render client secret.""" + return "*****" + + def render_created_at(self, value): + """Render created at.""" + if value: + return parse_datetime(value) + return "-" + + def render_actions(self, record): + """Render actions.""" + delete_url = reverse( + "plugins:netbox_diode_plugin:client_credential_delete", + kwargs={"client_credential_id": record["client_id"]}, + ) + + buttons = f""" + + + + """ # noqa: E501 + + return mark_safe(buttons) diff --git a/netbox_diode_plugin/templates/diode/client_credential_add.html b/netbox_diode_plugin/templates/diode/client_credential_add.html new file mode 100644 index 0000000..30459f0 --- /dev/null +++ b/netbox_diode_plugin/templates/diode/client_credential_add.html @@ -0,0 +1,18 @@ +{% extends 'generic/object_edit.html' %} +{% load form_helpers %} +{% load i18n %} + +{% block title %}{% trans "Add Client Credential" %}{% endblock %} + +{% block form %} + {% render_form form %} +{% endblock %} + +{% block buttons %} +{% trans "Cancel" %} +
+ +
+{% endblock buttons %} diff --git a/netbox_diode_plugin/templates/diode/client_credential_delete.html b/netbox_diode_plugin/templates/diode/client_credential_delete.html new file mode 100644 index 0000000..2ae85d2 --- /dev/null +++ b/netbox_diode_plugin/templates/diode/client_credential_delete.html @@ -0,0 +1,30 @@ +{% extends 'generic/_base.html' %} +{% load helpers %} +{% load form_helpers %} +{% load i18n %} + +{% comment %} +Blocks: + - title: Page title + - content: Primary page content + +Context: + - object: Python instance of the object being deleted + - form: The delete confirmation form + - form_url: URL for form submission (optional; defaults to current path) + - return_url: The URL to which the user is redirected after submitting the form +{% endcomment %} + +{% block title %} + {% trans "Delete" %} {{ object.client_name }}? +{% endblock %} + +{% block content %} + +{% endblock %} diff --git a/netbox_diode_plugin/templates/diode/client_credential_list.html b/netbox_diode_plugin/templates/diode/client_credential_list.html new file mode 100644 index 0000000..8010982 --- /dev/null +++ b/netbox_diode_plugin/templates/diode/client_credential_list.html @@ -0,0 +1,79 @@ +{% extends 'generic/object_list.html' %} +{% load static %} +{% load buttons %} +{% load helpers %} +{% load humanize %} +{% load i18n %} + + +{% block page-header %} +
+
+ + {# Title #} +
+

{% block pagetitle %}{% trans 'Client Credentials' %}{% endblock %}

+
+ + {# Controls #} +
+ {% block controls %} + + {% endblock controls %} +
+ +
+
+{% endblock %} + +{% block title %}{% trans "Client Credentials" %}{% endblock %} + +{% block content %} + {# Object list tab #} +
+
+ {% csrf_token %} + {# "Select all" form #} + {% if table.paginator.num_pages > 1 %} +
+
+
+
+ + +
+
+
+
+
+
+ {% endif %} + +
+ {% csrf_token %} + + + {# Objects table #} +
+
+ {% include 'htmx/table.html' %} +
+
+ {# /Objects table #} + +
+
+ +
+ {# /Object list tab #} + +{% endblock content %} + diff --git a/netbox_diode_plugin/templates/diode/client_credential_secret.html b/netbox_diode_plugin/templates/diode/client_credential_secret.html new file mode 100644 index 0000000..74f43f3 --- /dev/null +++ b/netbox_diode_plugin/templates/diode/client_credential_secret.html @@ -0,0 +1,76 @@ +{% extends 'generic/_base.html' %} +{% load i18n %} +{% load helpers %} + +{% block title %}{% trans "Add Client Credential" %}{% endblock %} + +{% block tabs %} + +{% endblock tabs %} + +{% block content %} +
+ {% csrf_token %} +
+
+
+
+ +
+
+

{{ object.client_name }}

+
+
+ +
+
+ +
+
+
+ + +
+
+
+ +
+
+ +
+
+
+ + +
+ + {% trans "You can only view your secret once. Be sure to save it before leaving." %} + +
+
+ + +
+
+
+{% endblock %} \ No newline at end of file diff --git a/netbox_diode_plugin/templates/diode/htmx/delete_form.html b/netbox_diode_plugin/templates/diode/htmx/delete_form.html new file mode 100644 index 0000000..60f710a --- /dev/null +++ b/netbox_diode_plugin/templates/diode/htmx/delete_form.html @@ -0,0 +1,24 @@ +{% load form_helpers %} +{% load i18n %} + +
+ {% csrf_token %} + + + + +
diff --git a/netbox_diode_plugin/tests/test_diode_clients.py b/netbox_diode_plugin/tests/test_diode_clients.py new file mode 100644 index 0000000..993d752 --- /dev/null +++ b/netbox_diode_plugin/tests/test_diode_clients.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python +# Copyright 2025 NetBox Labs, Inc. +"""Diode NetBox Plugin - Diode Clients API Tests.""" + +from unittest import mock + +from django.test import TestCase + +from netbox_diode_plugin.diode.clients import ClientAPI, ClientAPIError + + +class DiodeClientsTestCase(TestCase): + """Test cases for Diode Clients API.""" + + def test_create_client(self): + """Test creating a client.""" + with mock.patch('requests.post') as mock_post: + client = ClientAPI( + base_url="http://test-diode-url", + client_id="test-client-id", + client_secret="test-client-secret" + ) + client._client_auth_token = "test-client-auth-token" + + mock_post.return_value.status_code = 201 + mock_post.return_value.json.return_value = { + "client_id": "test-client-id", + "client_secret": "test-client-secret", + "client_name": "test-client", + "scope": "test-scope" + } + + created = client.create_client( + name="test-client", + scope="test-scope" + ) + + self.assertEqual(created, { + "client_id": "test-client-id", + "client_secret": "test-client-secret", + "client_name": "test-client", + "scope": "test-scope" + }) + + mock_post.assert_called_once_with( + "http://test-diode-url/clients", + headers={ + "Authorization": "Bearer test-client-auth-token" + }, + json={ + "client_name": "test-client", + "scope": "test-scope" + } + ) + + def test_list_clients(self): + """Test listing clients.""" + with mock.patch('requests.get') as mock_get: + client = ClientAPI( + base_url="http://test-diode-url", + client_id="test-client-id", + client_secret="test-client-secret" + ) + client._client_auth_token = "test-client-auth-token" + + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = { + "data": [ + { + "client_id": "test-client-id", + "client_name": "test-client", + "scope": "test-scope" + } + ], + "next_page_token": "test-next-page-token", + "prev_page_token": "test-prev-page-token" + } + + result = client.list_clients(page_size=100) + + self.assertEqual(result["data"], [ + { + "client_id": "test-client-id", + "client_name": "test-client", + "scope": "test-scope" + } + ]) + + self.assertEqual(result["next_page_token"], "test-next-page-token") + self.assertEqual(result["prev_page_token"], "test-prev-page-token") + + mock_get.assert_called_once_with( + "http://test-diode-url/clients", + headers={ + "Authorization": "Bearer test-client-auth-token" + }, + params={ + "page_size": 100, + } + ) + + def test_get_client(self): + """Test getting a client.""" + with mock.patch('requests.get') as mock_get: + client = ClientAPI( + base_url="http://test-diode-url", + client_id="test-client-id", + client_secret="test-client-secret" + ) + client._client_auth_token = "test-client-auth-token" + + mock_get.return_value.status_code = 200 + mock_get.return_value.json.return_value = { + "client_id": "test-client-id", + "client_name": "test-client", + "scope": "test-scope" + } + + result = client.get_client("test-client-id") + + self.assertEqual(result, { + "client_id": "test-client-id", + "client_name": "test-client", + "scope": "test-scope" + }) + + mock_get.assert_called_once_with( + "http://test-diode-url/clients/test-client-id", + headers={ + "Authorization": "Bearer test-client-auth-token" + } + ) + + def test_get_client_raises_error_on_bad_id(self): + """Test getting a client raises an error on bad ID.""" + client = ClientAPI( + base_url="http://test-diode-url", + client_id="test-client-id", + client_secret="test-client-secret" + ) + with self.assertRaises(ValueError): + client.get_client("../bad/../client/id") + + def test_delete_client(self): + """Test deleting a client.""" + with mock.patch('requests.delete') as mock_delete: + client = ClientAPI( + base_url="http://test-diode-url", + client_id="test-client-id", + client_secret="test-client-secret" + ) + client._client_auth_token = "test-client-auth-token" + + mock_delete.return_value.status_code = 204 + mock_delete.return_value.raise_for_status = mock.Mock() + + client.delete_client("test-client-id") + + mock_delete.assert_called_once_with( + "http://test-diode-url/clients/test-client-id", + headers={ + "Authorization": "Bearer test-client-auth-token" + } + ) + + def test_delete_client_raises_error_on_bad_id(self): + """Test deleting a client raises an error on bad ID.""" + client = ClientAPI( + base_url="http://test-diode-url", + client_id="test-client-id", + client_secret="test-client-secret" + ) + with self.assertRaises(ValueError): + client.delete_client("../bad/../client/id") + + def test_authentication_retries(self): + """Test authentication retries.""" + with mock.patch('requests.post') as mock_post: + client = ClientAPI( + base_url="http://test-diode-url", + client_id="test-client-id", + client_secret="test-client-secret" + ) + client._client_auth_token = "test-client-auth-token" + + mock_post.side_effect = [ + ClientAPIError("Failed to create client", 401), + mock.Mock(status_code=200, json=lambda: {"access_token": "new-access-token"}), + mock.Mock(status_code=201, json=lambda: { + "client_id": "test-client-id", + "client_secret": "test-client-secret", + "client_name": "test-client", + "scope": "diode:read diode:write" + }), + ] + + result = client.create_client("test-client", "diode:read diode:write") + self.assertEqual(result, { + "client_id": "test-client-id", + "client_secret": "test-client-secret", + "client_name": "test-client", + "scope": "diode:read diode:write" + }) + + self.assertEqual(mock_post.call_count, 3) + + mock_post.assert_has_calls([ + mock.call("http://test-diode-url/clients", + headers={ + "Authorization": "Bearer test-client-auth-token" + }, + json={ + "client_name": "test-client", + "scope": "diode:read diode:write", + } + ), + mock.call("http://test-diode-url/token", + data='grant_type=client_credentials&client_id=test-client-id&client_secret=test-client-secret&scope=diode%3Aread+diode%3Awrite', + headers={'Content-Type': 'application/x-www-form-urlencoded'} + ), + mock.call("http://test-diode-url/clients", + headers={ + "Authorization": "Bearer new-access-token" + }, + json={ + "client_name": "test-client", + "scope": "diode:read diode:write", + } + ), + ]) + + diff --git a/netbox_diode_plugin/urls.py b/netbox_diode_plugin/urls.py index abe6a0b..6273f29 100644 --- a/netbox_diode_plugin/urls.py +++ b/netbox_diode_plugin/urls.py @@ -9,4 +9,8 @@ urlpatterns = ( path("settings/", views.SettingsView.as_view(), name="settings"), path("settings/edit/", views.SettingsEditView.as_view(), name="settings_edit"), + path("credentials/", views.ClientCredentialListView.as_view(), name="client_credential_list"), + path("credentials/add/", views.ClientCredentialAddView.as_view(), name="client_credential_add"), + path("credentials/secret/", views.ClientCredentialSecretView.as_view(), name="client_credential_secret"), + path("credentials/delete//", views.ClientCredentialDeleteView.as_view(), name="client_credential_delete"), ) diff --git a/netbox_diode_plugin/views.py b/netbox_diode_plugin/views.py index b0e4a7a..1143c5d 100644 --- a/netbox_diode_plugin/views.py +++ b/netbox_diode_plugin/views.py @@ -1,23 +1,34 @@ #!/usr/bin/env python # Copyright 2025 NetBox Labs, Inc. """Diode NetBox Plugin - Views.""" +import logging + from django.conf import settings as netbox_settings from django.contrib import messages from django.contrib.auth import get_user_model from django.http import HttpResponseRedirect from django.shortcuts import redirect, render +from django.urls import reverse from django.utils.http import url_has_allowed_host_and_scheme +from django.utils.translation import gettext as _ from django.views.generic import View from netbox.plugins import get_plugin_config from netbox.views import generic +from utilities.forms import ConfirmationForm +from utilities.htmx import htmx_partial +from utilities.permissions import get_permission_for_model from utilities.views import register_model_view -from netbox_diode_plugin.forms import SettingsForm -from netbox_diode_plugin.models import Setting +from netbox_diode_plugin.client import create_client, delete_client, get_client, list_clients +from netbox_diode_plugin.forms import ClientCredentialForm, SettingsForm +from netbox_diode_plugin.models import ClientCredentials, Setting +from netbox_diode_plugin.tables import ClientCredentialsTable User = get_user_model() +logger = logging.getLogger(__name__) + def redirect_to_login(request): """Redirect to login view.""" redirect_url = netbox_settings.LOGIN_URL @@ -109,3 +120,229 @@ def post(self, request, *args, **kwargs): kwargs["pk"] = settings.pk return super().post(request, *args, **kwargs) + + +class GetReturnURLMixin: + """Get return URL mixin.""" + + def get_return_url(self, request): + """Get return URL.""" + # First, see if `return_url` was specified as a query parameter or form data. Use this URL only if it's + # considered safe. + return_url = request.GET.get("return_url") or request.POST.get("return_url") + if return_url and url_has_allowed_host_and_scheme( + return_url, allowed_hosts=None + ): + return return_url + + return None + + +class BaseDiodeView(View): + """Base diode view.""" + + def check_authentication(self, request): + """Check authentication.""" + if not request.user.is_authenticated or not request.user.is_staff: + return redirect_to_login(request) + return None + + def get_required_permission(self): + """Get required permission.""" + return get_permission_for_model(self.model, "view") + +class ClientCredentialListView(BaseDiodeView): + """Client credential list view.""" + + table = ClientCredentialsTable + template_name = "diode/client_credential_list.html" + model = ClientCredentials + + def get_table_data(self, request): + """Get table data.""" + try: + data = list_clients(request) + total = len(data) + except Exception as e: + logger.debug(f"Error loading client credentials error: {str(e)}") + messages.error(self.request, str(e)) + data = [] + total = 0 + + return total, data + + def get(self, request): + """GET request handler.""" + if ret := self.check_authentication(request): + return ret + + total, data = self.get_table_data(request) + table = self.table(data=data) # Pass the data to the table + + # If this is an HTMX request, return only the rendered table HTML + if htmx_partial(request): + if request.GET.get("embedded", False): + table.embedded = True + # Hide selection checkboxes + if "pk" in table.base_columns: + table.columns.hide("pk") + return render( + request, + "htmx/table.html", + { + "model": ClientCredentials, + "table": table, + "total_count": len(data), + }, + ) + + context = { + "model": ClientCredentials, + "table": table, + "total_count": len(data), + } + + return render(request, self.template_name, context) + + +class ClientCredentialDeleteView(GetReturnURLMixin, BaseDiodeView): + """Client credential delete view.""" + + template_name = "diode/client_credential_delete.html" + default_return_url = "plugins:netbox_diode_plugin:client_credential_list" + + def get(self, request, client_credential_id): + """GET request handler.""" + if ret := self.check_authentication(request): + return ret + + data = get_client(request, client_credential_id) + + return render( + request, + self.template_name, + { + "object": data, + "object_type": "Client Credential", + "return_url": self.get_return_url(request) or reverse(self.default_return_url), + }, + ) + + def post(self, request, client_credential_id): + """POST request handler.""" + sanitized_client_credential_id = client_credential_id.replace('\n', '').replace('\r', '') + logger.info(f"Deleting client {sanitized_client_credential_id}") + if ret := self.check_authentication(request): + return ret + + form = ConfirmationForm(request.POST) + if form.is_valid(): + try: + delete_client(request, client_credential_id) + messages.success(request, _("Client deleted successfully")) + except Exception as e: + logger.error( + f"Error deleting client: {sanitized_client_credential_id} error: {str(e)}" + ) + messages.error(request, str(e)) + + return redirect( + reverse( + "plugins:netbox_diode_plugin:client_credential_list", + ) + ) + + +class ClientCredentialAddView(GetReturnURLMixin, BaseDiodeView): + """View for adding client credentials.""" + + template_name = "diode/client_credential_add.html" + form_class = ClientCredentialForm + default_return_url = "plugins:netbox_diode_plugin:client_credential_list" + + def get(self, request): + """GET request handler.""" + if ret := self.check_authentication(request): + return ret + + form = self.form_class() + return render( + request, + self.template_name, + { + "form": form, + "return_url": self.get_return_url(request) or reverse(self.default_return_url), + }, + ) + + def post(self, request): + """POST request handler.""" + if ret := self.check_authentication(request): + return ret + + form = self.form_class(request.POST) + if form.is_valid(): + try: + response = create_client(request, form.cleaned_data["client_name"], "diode:ingest") + # Store the client credentials in session + request.session['client_secret'] = response.get('client_secret') + request.session['client_name'] = form.cleaned_data["client_name"] + request.session['client_id'] = response.get('client_id') + return redirect( + reverse( + "plugins:netbox_diode_plugin:client_credential_secret", + ) + ) + except Exception as e: + logger.error(f"Error creating client: {str(e)}") + messages.error(request, str(e)) + + return render( + request, + self.template_name, + { + "form": form, + "return_url": self.get_return_url(request) or reverse(self.default_return_url), + }, + ) + + +class ClientCredentialSecretView(BaseDiodeView): + """View for displaying client secret.""" + + template_name = "diode/client_credential_secret.html" + + def get(self, request): + """Get request handler.""" + if ret := self.check_authentication(request): + return ret + + # Get the client secret from session + client_secret = request.session.get('client_secret') + client_name = request.session.get('client_name') + client_id = request.session.get('client_id') + + if not client_secret: + messages.error(request, _("No client secret found. Please create a new client.")) + return redirect( + reverse( + "plugins:netbox_diode_plugin:client_credential_list", + ) + ) + + # Clear the session data after retrieving it + request.session.pop('client_secret', None) + request.session.pop('client_name', None) + request.session.pop('client_id', None) + + return render( + request, + self.template_name, + { + "object": { + "client_name": client_name, + "client_id": client_id, + "client_secret": client_secret, + } + }, + )