Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/565.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Disable rich console print markup causing regex reformatting
29 changes: 28 additions & 1 deletion docs/docs/python-sdk/topics/object_file.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,24 @@ apiVersion: infrahub.app/v1
kind: Object
spec:
kind: <NamespaceName>
strategy: <normal|range_expand> # Optional, defaults to normal
data:
- [...]
```

> Multiple documents in a single YAML file are also supported, each document will be loaded separately. Documents are separated by `---`

### Data Processing Strategies

The `strategy` field controls how the data in the object file is processed before loading into Infrahub:

| Strategy | Description | Default |
|----------|-------------|---------|
| `normal` | No data manipulation is performed. Objects are loaded as-is. | Yes |
| `range_expand` | Range patterns (e.g., `[1-5]`) in string fields are expanded into multiple objects. | No |

When `strategy` is not specified, it defaults to `normal`.

### Relationship of cardinality one

A relationship of cardinality one can either reference an existing node via its HFID or create a new node if it doesn't exist.
Expand Down Expand Up @@ -198,7 +210,19 @@ Metadata support is planned for future releases. Currently, the Object file does

## Range Expansion in Object Files

The Infrahub Python SDK supports **range expansion** for string fields in object files. This feature allows you to specify a range pattern (e.g., `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing.
The Infrahub Python SDK supports **range expansion** for string fields in object files when the `strategy` is set to `range_expand`. This feature allows you to specify a range pattern (e.g., `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing.

```yaml
---
apiVersion: infrahub.app/v1
kind: Object
spec:
kind: BuiltinLocation
strategy: range_expand # Enable range expansion
data:
- name: AMS[1-3]
type: Country
```

### How Range Expansion Works

Expand All @@ -213,6 +237,7 @@ The Infrahub Python SDK supports **range expansion** for string fields in object
```yaml
spec:
kind: BuiltinLocation
strategy: range_expand
data:
- name: AMS[1-3]
type: Country
Expand All @@ -234,6 +259,7 @@ This will expand to:
```yaml
spec:
kind: BuiltinLocation
strategy: range_expand
data:
- name: AMS[1-3]
description: Datacenter [A-C]
Expand Down Expand Up @@ -261,6 +287,7 @@ If you use ranges of different lengths in multiple fields:
```yaml
spec:
kind: BuiltinLocation
strategy: range_expand
data:
- name: AMS[1-3]
description: "Datacenter [10-15]"
Expand Down
8 changes: 6 additions & 2 deletions infrahub_sdk/ctl/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def display_schema_load_errors(response: dict[str, Any], schemas_data: list[Sche
loc_type = loc_path[-1]
input_str = error.get("input", None)
error_message = f"{loc_type} ({input_str}) | {error['msg']} ({error['type']})"
console.print(f" Node: {node.get('namespace', None)}{node.get('name', None)} | {error_message}")
console.print(
f" Node: {node.get('namespace', None)}{node.get('name', None)} | {error_message}", markup=False
)

elif len(loc_path) > 6:
loc_type = loc_path[5]
Expand All @@ -91,7 +93,9 @@ def display_schema_load_errors(response: dict[str, Any], schemas_data: list[Sche

input_str = error.get("input", None)
error_message = f"{loc_type[:-1].title()}: {input_label} ({input_str}) | {error['msg']} ({error['type']})"
console.print(f" Node: {node.get('namespace', None)}{node.get('name', None)} | {error_message}")
console.print(
f" Node: {node.get('namespace', None)}{node.get('name', None)} | {error_message}", markup=False
)


def handle_non_detail_errors(response: dict[str, Any]) -> None:
Expand Down
94 changes: 84 additions & 10 deletions infrahub_sdk/spec/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import copy
import re
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -45,6 +46,11 @@ class RelationshipDataFormat(str, Enum):
MANY_REF = "many_ref_list"


class ObjectStrategy(str, Enum):
NORMAL = "normal"
RANGE_EXPAND = "range_expand"


class RelationshipInfo(BaseModel):
name: str
rel_schema: RelationshipSchema
Expand Down Expand Up @@ -168,7 +174,7 @@ async def get_relationship_info(


def expand_data_with_ranges(data: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Expand any item in self.data with range pattern in any value. Supports multiple fields, requires equal expansion length."""
"""Expand any item in data with range pattern in any value. Supports multiple fields, requires equal expansion length."""
range_pattern = re.compile(MATCH_PATTERN)
expanded = []
for item in data:
Expand Down Expand Up @@ -198,16 +204,69 @@ def expand_data_with_ranges(data: list[dict[str, Any]]) -> list[dict[str, Any]]:
return expanded


class DataProcessor(ABC):
"""Abstract base class for data processing strategies"""

@abstractmethod
def process_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Process the data according to the strategy"""


class SingleDataProcessor(DataProcessor):
"""Process data without any expansion"""

def process_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]:
return data


class RangeExpandDataProcessor(DataProcessor):
"""Process data with range expansion"""

def process_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]:
return expand_data_with_ranges(data)


class DataProcessorFactory:
"""Factory to create appropriate data processor based on strategy"""

_processors: ClassVar[dict[ObjectStrategy, type[DataProcessor]]] = {
ObjectStrategy.NORMAL: SingleDataProcessor,
ObjectStrategy.RANGE_EXPAND: RangeExpandDataProcessor,
}

@classmethod
def get_processor(cls, strategy: ObjectStrategy) -> DataProcessor:
processor_class = cls._processors.get(strategy)
if not processor_class:
raise ValueError(
f"Unknown strategy: {strategy} - no processor found. Valid strategies are: {list(cls._processors.keys())}"
)
return processor_class()

@classmethod
def register_processor(cls, strategy: ObjectStrategy, processor_class: type[DataProcessor]) -> None:
"""Register a new processor for a strategy - useful for future extensions"""
cls._processors[strategy] = processor_class


class InfrahubObjectFileData(BaseModel):
kind: str
strategy: ObjectStrategy = ObjectStrategy.NORMAL
data: list[dict[str, Any]] = Field(default_factory=list)

def _get_processed_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Get data processed according to the strategy"""
processor = DataProcessorFactory.get_processor(self.strategy)
return processor.process_data(data)

async def validate_format(self, client: InfrahubClient, branch: str | None = None) -> list[ObjectValidationError]:
errors: list[ObjectValidationError] = []
schema = await client.schema.get(kind=self.kind, branch=branch)
expanded_data = expand_data_with_ranges(self.data)
self.data = expanded_data
for idx, item in enumerate(expanded_data):

processed_data = self._get_processed_data(data=self.data)
self.data = processed_data

for idx, item in enumerate(processed_data):
errors.extend(
await self.validate_object(
client=client,
Expand All @@ -216,14 +275,16 @@ async def validate_format(self, client: InfrahubClient, branch: str | None = Non
data=item,
branch=branch,
default_schema_kind=self.kind,
strategy=self.strategy, # Pass strategy down
)
)
return errors

async def process(self, client: InfrahubClient, branch: str | None = None) -> None:
schema = await client.schema.get(kind=self.kind, branch=branch)
expanded_data = expand_data_with_ranges(self.data)
for idx, item in enumerate(expanded_data):
processed_data = self._get_processed_data(data=self.data)

for idx, item in enumerate(processed_data):
await self.create_node(
client=client,
schema=schema,
Expand All @@ -243,6 +304,7 @@ async def validate_object(
context: dict | None = None,
branch: str | None = None,
default_schema_kind: str | None = None,
strategy: ObjectStrategy = ObjectStrategy.NORMAL,
) -> list[ObjectValidationError]:
errors: list[ObjectValidationError] = []
context = context.copy() if context else {}
Expand Down Expand Up @@ -292,6 +354,7 @@ async def validate_object(
context=context,
branch=branch,
default_schema_kind=default_schema_kind,
strategy=strategy,
)
)

Expand All @@ -307,6 +370,7 @@ async def validate_related_nodes(
context: dict | None = None,
branch: str | None = None,
default_schema_kind: str | None = None,
strategy: ObjectStrategy = ObjectStrategy.NORMAL,
) -> list[ObjectValidationError]:
context = context.copy() if context else {}
errors: list[ObjectValidationError] = []
Expand Down Expand Up @@ -348,7 +412,10 @@ async def validate_related_nodes(
rel_info.find_matching_relationship(peer_schema=peer_schema)
context.update(rel_info.get_context(value="placeholder"))

expanded_data = expand_data_with_ranges(data=data["data"])
# Use strategy-aware data processing
processor = DataProcessorFactory.get_processor(strategy)
expanded_data = processor.process_data(data["data"])

for idx, peer_data in enumerate(expanded_data):
context["list_index"] = idx
errors.extend(
Expand All @@ -360,6 +427,7 @@ async def validate_related_nodes(
context=context,
branch=branch,
default_schema_kind=default_schema_kind,
strategy=strategy,
)
)
return errors
Expand Down Expand Up @@ -633,14 +701,20 @@ class ObjectFile(InfrahubFile):
@property
def spec(self) -> InfrahubObjectFileData:
if not self._spec:
self._spec = InfrahubObjectFileData(**self.data.spec)
try:
self._spec = InfrahubObjectFileData(**self.data.spec)
except Exception as exc:
raise ValidationError(identifier=str(self.location), message=str(exc))
return self._spec

def validate_content(self) -> None:
super().validate_content()
if self.kind != InfrahubFileKind.OBJECT:
raise ValueError("File is not an Infrahub Object file")
self._spec = InfrahubObjectFileData(**self.data.spec)
try:
self._spec = InfrahubObjectFileData(**self.data.spec)
except Exception as exc:
raise ValidationError(identifier=str(self.location), message=str(exc))

async def validate_format(self, client: InfrahubClient, branch: str | None = None) -> None:
self.validate_content()
Expand Down
2 changes: 1 addition & 1 deletion infrahub_sdk/spec/range_expansion.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import itertools
import re

MATCH_PATTERN = r"(\[[\w,-]+\])"
MATCH_PATTERN = r"(\[[\w,-]*[-,][\w,-]*\])"


def _escape_brackets(s: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/ctl/test_schema_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_schema_load_notvalid_namespace(httpx_mock: HTTPXMock) -> None:
clean_output = remove_ansi_color(result.stdout.replace("\n", ""))
expected_result = (
"Unable to load the schema: Node: OuTDevice | "
"namespace (OuT) | String should match pattern '^[A-Z]+$' (string_pattern_mismatch) "
"namespace (OuT) | String should match pattern '^[A-Z][a-z0-9]+$' (string_pattern_mismatch) "
" Node: OuTDevice | Attribute: name (NotValid) | Value error, Only valid Attribute Kind "
"are : ['ID', 'Dropdown'] (value_error)"
)
Expand Down
Loading