Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 19 additions & 52 deletions netbox_diode_plugin/api/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,21 @@


import logging
from dataclasses import dataclass, field

from django.apps import apps
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from rest_framework.exceptions import ValidationError as ValidationError

from .differ import Change, ChangeSet, ChangeType
from .common import NON_FIELD_ERRORS, Change, ChangeSet, ChangeSetException, ChangeSetResult, ChangeType
from .plugin_utils import get_object_type_model, legal_fields
from .supported_models import get_serializer_for_model

logger = logging.getLogger(__name__)


@dataclass
class ApplyChangeSetResult:
"""A result of applying a change set."""

id: str
success: bool
errors: dict | None = field(default=None)

def to_dict(self) -> dict:
"""Convert the result to a dictionary."""
return {
"id": self.id,
"success": self.success,
"errors": self.errors,
}


class ApplyChangeSetException(Exception):
"""ApplyChangeSetException is raised when an error occurs while applying a change set."""

def __init__(self, message, errors=None):
"""Initialize the exception."""
super().__init__(message)
self.message = message
self.errors = errors or {}

def __str__(self):
"""Return the string representation of the exception."""
if self.errors:
return f"{self.message}: {self.errors}"
return self.message


def apply_changeset(change_set: ChangeSet) -> ApplyChangeSetResult:
def apply_changeset(change_set: ChangeSet) -> ChangeSetResult:
"""Apply a change set."""
_validate_change_set(change_set)

Expand All @@ -71,14 +37,12 @@ def apply_changeset(change_set: ChangeSet) -> ApplyChangeSetResult:
except ValidationError as e:
raise _err_from_validation_error(e, f"changes[{i}]")
except ObjectDoesNotExist:
raise _err(f"{object_type} with id {change.object_id} does not exist", f"changes[{i}].object_id")
raise _err(f"{object_type} with id {change.object_id} does not exist", f"changes[{i}]", "object_id")
# ConstraintViolationError ?
# ...

return ApplyChangeSetResult(
return ChangeSetResult(
id=change_set.id,
success=True,
errors=None,
)

def _apply_change(data: dict, model_class: models.Model, change: Change, created: dict):
Expand Down Expand Up @@ -129,27 +93,30 @@ def _pre_apply(model_class: models.Model, change: Change, created: dict):

def _validate_change_set(change_set: ChangeSet):
if not change_set.id:
raise _err("Change set ID is required", "id")
raise _err("Change set ID is required", "changeset","id")
if not change_set.changes:
raise _err("Changes are required", "changes")
raise _err("Changes are required", "changeset", "changes")

for i, change in enumerate(change_set.changes):
if change.object_id is None and change.ref_id is None:
raise _err("Object ID or Ref ID must be provided", f"changes[{i}]")
raise _err("Object ID or Ref ID must be provided", f"changes[{i}]", NON_FIELD_ERRORS)
if change.change_type not in ChangeType:
raise _err(f"Unsupported change type '{change.change_type}'", f"changes[{i}].change_type")
raise _err(f"Unsupported change type '{change.change_type}'", f"changes[{i}]", "change_type")

def _err(message, field):
return ApplyChangeSetException(message, errors={field: [message]})
def _err(message, object_name, field):
return ChangeSetException(message, errors={object_name: {field: [message]}})

def _err_from_validation_error(e, prefix):
def _err_from_validation_error(e, object_name):
errors = {}
if e.detail:
if isinstance(e.detail, dict):
for k, v in e.detail.items():
errors[f"{prefix}.{k}"] = v
errors[object_name] = e.detail
elif isinstance(e.detail, (list, tuple)):
errors[prefix] = e.detail
errors[object_name] = {
NON_FIELD_ERRORS: e.detail
}
else:
errors[prefix] = [e.detail]
return ApplyChangeSetException("validation error", errors=errors)
errors[object_name] = {
NON_FIELD_ERRORS: [e.detail]
}
return ChangeSetException("validation error", errors=errors)
157 changes: 155 additions & 2 deletions netbox_diode_plugin/api/common.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
#!/usr/bin/env python
# Copyright 2024 NetBox Labs Inc
# Copyright 2025 NetBox Labs Inc
"""Diode NetBox Plugin - API - Common types and utilities."""

from dataclasses import dataclass
from collections import defaultdict
import logging
import uuid
from dataclasses import dataclass, field
from enum import Enum

from django.apps import apps
from django.contrib.contenttypes.fields import GenericRelation, GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from rest_framework import status

Check failure on line 15 in netbox_diode_plugin/api/common.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (I001)

netbox_diode_plugin/api/common.py:5:1: I001 Import block is un-sorted or un-formatted

logger = logging.getLogger("netbox.diode_data")

NON_FIELD_ERRORS = "__all__"

@dataclass
class UnresolvedReference:
Expand All @@ -29,3 +42,143 @@
def __lt__(self, other):
"""Less than operator."""
return self.object_type < other.object_type or (self.object_type == other.object_type and self.uuid < other.uuid)


class ChangeType(Enum):
"""Change type enum."""

CREATE = "create"
UPDATE = "update"
NOOP = "noop"


@dataclass
class Change:
"""A change to a model instance."""

change_type: ChangeType
object_type: str
object_id: int | None = field(default=None)
object_primary_value: str | None = field(default=None)
ref_id: str | None = field(default=None)
id: str = field(default_factory=lambda: str(uuid.uuid4()))
before: dict | None = field(default=None)
data: dict | None = field(default=None)
new_refs: list[str] = field(default_factory=list)

def to_dict(self) -> dict:
"""Convert the change to a dictionary."""
return {
"id": self.id,
"change_type": self.change_type.value,
"object_type": self.object_type,
"object_id": self.object_id,
"ref_id": self.ref_id,
"object_primary_value": self.object_primary_value,
"before": self.before,
"data": self.data,
"new_refs": self.new_refs,
}


@dataclass
class ChangeSet:
"""A set of changes to a model instance."""

id: str = field(default_factory=lambda: str(uuid.uuid4()))
changes: list[Change] = field(default_factory=list)
branch: dict[str, str] | None = field(default=None) # {"id": str, "name": str}

def to_dict(self) -> dict:
"""Convert the change set to a dictionary."""
return {
"id": self.id,
"changes": [change.to_dict() for change in self.changes],
"branch": self.branch,
}

def validate(self) -> dict[str, list[str]]:

Check failure on line 100 in netbox_diode_plugin/api/common.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (C901)

netbox_diode_plugin/api/common.py:100:9: C901 `validate` is too complex (11 > 10)
"""Validate basics of the change set data."""
errors = defaultdict(dict)

for change in self.changes:
model = apps.get_model(change.object_type)

change_data = change.data.copy()
if change.before:
change_data.update(change.before)

# check that there is some value for every required
# reference field, but don't validate the actual reference.
excluded_relation_fields = []
rel_errors = defaultdict(list)
for f in model._meta.get_fields():
if isinstance(f, (GenericRelation, GenericForeignKey)):
excluded_relation_fields.append(f.name)
continue
if not f.is_relation:
continue
field_name = f.name
excluded_relation_fields.append(field_name)

if hasattr(f, "related_model") and f.related_model == ContentType:
change_data.pop(field_name, None)
base_field = field_name[:-5]
excluded_relation_fields.append(base_field + "_id")
value = change_data.pop(base_field + "_id", None)
else:
value = change_data.pop(field_name, None)

if not f.null and not f.blank and not f.many_to_many:
# this field is a required relation...
if value is None:
rel_errors[f.name].append(f"Field {f.name} is required")
if rel_errors:
errors[change.object_type] = rel_errors

try:
instance = model(**change_data)
instance.clean_fields(exclude=excluded_relation_fields)
except ValidationError as e:
errors[change.object_type].update(e.error_dict)

return errors or None


@dataclass
class ChangeSetResult:
"""A result of applying a change set."""

id: str | None = field(default_factory=lambda: str(uuid.uuid4()))
change_set: ChangeSet | None = field(default=None)
errors: dict | None = field(default=None)

def to_dict(self) -> dict:
"""Convert the result to a dictionary."""
if self.change_set:
return self.change_set.to_dict()

return {
"id": self.id,
"errors": self.errors,
}

def get_status_code(self) -> int:
"""Get the status code for the result."""
return status.HTTP_200_OK if not self.errors else status.HTTP_400_BAD_REQUEST


class ChangeSetException(Exception):
"""ChangeSetException is raised when an error occurs while generating or applying a change set."""

def __init__(self, message, errors=None):
"""Initialize the exception."""
super().__init__(message)
self.message = message
self.errors = errors or {}

def __str__(self):
"""Return the string representation of the exception."""
if self.errors:
return f"{self.message}: {self.errors}"
return self.message
71 changes: 11 additions & 60 deletions netbox_diode_plugin/api/differ.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#!/usr/bin/env python
# Copyright 2024 NetBox Labs Inc
# Copyright 2025 NetBox Labs Inc
"""Diode NetBox Plugin - API - Differ."""

import copy
import json
import logging
import uuid
from dataclasses import dataclass, field
from enum import Enum

from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from utilities.data import shallow_compare_dict

from .common import Change, ChangeSet, ChangeSetException, ChangeSetResult, ChangeType
from .plugin_utils import get_primary_value, legal_fields
from .supported_models import extract_supported_models
from .transformer import cleanup_unresolved_references, transform_proto_json
Expand All @@ -21,58 +18,6 @@

SUPPORTED_MODELS = extract_supported_models()

class ChangeType(Enum):
"""Change type enum."""

CREATE = "create"
UPDATE = "update"
NOOP = "noop"


@dataclass
class Change:
"""A change to a model instance."""

change_type: ChangeType
object_type: str
object_id: int | None = field(default=None)
object_primary_value: str | None = field(default=None)
ref_id: str | None = field(default=None)
id: str = field(default_factory=lambda: str(uuid.uuid4()))
before: dict | None = field(default=None)
data: dict | None = field(default=None)
new_refs: list[str] = field(default_factory=list)

def to_dict(self) -> dict:
"""Convert the change to a dictionary."""
return {
"id": self.id,
"change_type": self.change_type.value,
"object_type": self.object_type,
"object_id": self.object_id,
"ref_id": self.ref_id,
"object_primary_value": self.object_primary_value,
"before": self.before,
"data": self.data,
"new_refs": self.new_refs,
}


@dataclass
class ChangeSet:
"""A set of changes to a model instance."""

id: str = field(default_factory=lambda: str(uuid.uuid4()))
changes: list[Change] = field(default_factory=list)
branch: dict[str, str] | None = field(default=None) # {"id": str, "name": str}

def to_dict(self) -> dict:
"""Convert the change set to a dictionary."""
return {
"id": self.id,
"changes": [change.to_dict() for change in self.changes],
"branch": self.branch,
}

def prechange_data_from_instance(instance) -> dict: # noqa: C901
"""Convert model instance data to a dictionary format for comparison."""
Expand Down Expand Up @@ -193,7 +138,7 @@ def sort_dict_recursively(d):
return d


def generate_changeset(entity: dict, object_type: str) -> ChangeSet:
def generate_changeset(entity: dict, object_type: str) -> ChangeSetResult:
"""Generate a changeset for an entity."""
change_set = ChangeSet()

Expand Down Expand Up @@ -227,5 +172,11 @@ def generate_changeset(entity: dict, object_type: str) -> ChangeSet:
new_refs,
)
change_set.changes.append(change)
logger.error(f"change_set: {json.dumps(change_set.to_dict(), default=str, indent=4)}")
return change_set

if errors := change_set.validate():
raise ChangeSetException("Invalid change set", errors)

return ChangeSetResult(
id=change_set.id,
change_set=change_set,
)
Loading
Loading