diff --git a/docs/docs/python-sdk/topics/object_file.mdx b/docs/docs/python-sdk/topics/object_file.mdx index 8de01740..f3e426ae 100644 --- a/docs/docs/python-sdk/topics/object_file.mdx +++ b/docs/docs/python-sdk/topics/object_file.mdx @@ -60,23 +60,23 @@ apiVersion: infrahub.app/v1 kind: Object spec: kind: - strategy: # Optional, defaults to normal + parameters: + expand_range: # Optional, defaults to false data: - [...] ``` > Multiple documents in a single YAML file are also supported, each document will be loaded separately. Documents are separated by `---` -### Data Processing Strategies +### Data Processing Parameters -The `strategy` field controls how the data in the object file is processed before loading into Infrahub: +The `parameters` field controls how the data in the object file is processed before loading into Infrahub: -| Strategy | Description | Default | -|----------|-------------|---------| -| `normal` | No data manipulation is performed. Objects are loaded as-is. | Yes | -| `range_expand` | Range patterns (e.g., `[1-5]`) in string fields are expanded into multiple objects. | No | +| Parameter | Description | Default | +|-----------|-------------|---------| +| `expand_range` | When set to `true`, range patterns (e.g., `[1-5]`) in string fields are expanded into multiple objects. | `false` | -When `strategy` is not specified, it defaults to `normal`. +When `expand_range` is not specified, it defaults to `false`. ### Relationship of cardinality one @@ -210,7 +210,7 @@ Metadata support is planned for future releases. Currently, the Object file does ## Range Expansion in Object Files -The Infrahub Python SDK supports **range expansion** for string fields in object files when the `strategy` is set to `range_expand`. This feature allows you to specify a range pattern (e.g., `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing. +The Infrahub Python SDK supports **range expansion** for string fields in object files when the `parameters > expand_range` is set to `true`. This feature allows you to specify a range pattern (e.g., `[1-5]`) in any string value, and the SDK will automatically expand it into multiple objects during validation and processing. ```yaml --- @@ -218,7 +218,8 @@ apiVersion: infrahub.app/v1 kind: Object spec: kind: BuiltinLocation - strategy: range_expand # Enable range expansion + parameters: + expand_range: true # Enable range expansion data: - name: AMS[1-3] type: Country @@ -237,7 +238,8 @@ spec: ```yaml spec: kind: BuiltinLocation - strategy: range_expand + parameters: + expand_range: true data: - name: AMS[1-3] type: Country @@ -259,7 +261,8 @@ This will expand to: ```yaml spec: kind: BuiltinLocation - strategy: range_expand + parameters: + expand_range: true data: - name: AMS[1-3] description: Datacenter [A-C] @@ -287,7 +290,8 @@ If you use ranges of different lengths in multiple fields: ```yaml spec: kind: BuiltinLocation - strategy: range_expand + parameters: + expand_range: true data: - name: AMS[1-3] description: "Datacenter [10-15]" diff --git a/infrahub_sdk/spec/models.py b/infrahub_sdk/spec/models.py new file mode 100644 index 00000000..9020720b --- /dev/null +++ b/infrahub_sdk/spec/models.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from pydantic import BaseModel + + +class InfrahubObjectParameters(BaseModel): + expand_range: bool = False diff --git a/infrahub_sdk/spec/object.py b/infrahub_sdk/spec/object.py index 16992b1a..3bc738c3 100644 --- a/infrahub_sdk/spec/object.py +++ b/infrahub_sdk/spec/object.py @@ -1,17 +1,15 @@ from __future__ import annotations -import copy -import re -from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Any, ClassVar +from typing import TYPE_CHECKING, Any from pydantic import BaseModel, Field from ..exceptions import ObjectValidationError, ValidationError from ..schema import GenericSchemaAPI, RelationshipKind, RelationshipSchema from ..yaml import InfrahubFile, InfrahubFileKind -from .range_expansion import MATCH_PATTERN, range_expansion +from .models import InfrahubObjectParameters +from .processors.factory import DataProcessorFactory if TYPE_CHECKING: from ..client import InfrahubClient @@ -46,11 +44,6 @@ class RelationshipDataFormat(str, Enum): MANY_REF = "many_ref_list" -class ObjectStrategy(str, Enum): - NORMAL = "normal" - RANGE_EXPAND = "range_expand" - - class RelationshipInfo(BaseModel): name: str rel_schema: RelationshipSchema @@ -173,97 +166,21 @@ async def get_relationship_info( return info -def expand_data_with_ranges(data: list[dict[str, Any]]) -> list[dict[str, Any]]: - """Expand any item in data with range pattern in any value. Supports multiple fields, requires equal expansion length.""" - range_pattern = re.compile(MATCH_PATTERN) - expanded = [] - for item in data: - # Find all fields to expand - expand_fields = {} - for key, value in item.items(): - if isinstance(value, str) and range_pattern.search(value): - try: - expand_fields[key] = range_expansion(value) - except Exception: - # If expansion fails, treat as no expansion - expand_fields[key] = [value] - if not expand_fields: - expanded.append(item) - continue - # Check all expanded lists have the same length - lengths = [len(v) for v in expand_fields.values()] - if len(set(lengths)) > 1: - raise ValidationError(f"Range expansion mismatch: fields expanded to different lengths: {lengths}") - n = lengths[0] - # Zip expanded values and produce new items - for i in range(n): - new_item = copy.deepcopy(item) - for key, values in expand_fields.items(): - new_item[key] = values[i] - expanded.append(new_item) - return expanded - - -class DataProcessor(ABC): - """Abstract base class for data processing strategies""" - - @abstractmethod - def process_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]: - """Process the data according to the strategy""" - - -class SingleDataProcessor(DataProcessor): - """Process data without any expansion""" - - def process_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]: - return data - - -class RangeExpandDataProcessor(DataProcessor): - """Process data with range expansion""" - - def process_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]: - return expand_data_with_ranges(data) - - -class DataProcessorFactory: - """Factory to create appropriate data processor based on strategy""" - - _processors: ClassVar[dict[ObjectStrategy, type[DataProcessor]]] = { - ObjectStrategy.NORMAL: SingleDataProcessor, - ObjectStrategy.RANGE_EXPAND: RangeExpandDataProcessor, - } - - @classmethod - def get_processor(cls, strategy: ObjectStrategy) -> DataProcessor: - processor_class = cls._processors.get(strategy) - if not processor_class: - raise ValueError( - f"Unknown strategy: {strategy} - no processor found. Valid strategies are: {list(cls._processors.keys())}" - ) - return processor_class() - - @classmethod - def register_processor(cls, strategy: ObjectStrategy, processor_class: type[DataProcessor]) -> None: - """Register a new processor for a strategy - useful for future extensions""" - cls._processors[strategy] = processor_class - - class InfrahubObjectFileData(BaseModel): kind: str - strategy: ObjectStrategy = ObjectStrategy.NORMAL + parameters: InfrahubObjectParameters = Field(default_factory=InfrahubObjectParameters) data: list[dict[str, Any]] = Field(default_factory=list) - def _get_processed_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]: + async def _get_processed_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]: """Get data processed according to the strategy""" - processor = DataProcessorFactory.get_processor(self.strategy) - return processor.process_data(data) + + return await DataProcessorFactory.process_data(kind=self.kind, parameters=self.parameters, data=data) async def validate_format(self, client: InfrahubClient, branch: str | None = None) -> list[ObjectValidationError]: errors: list[ObjectValidationError] = [] schema = await client.schema.get(kind=self.kind, branch=branch) - processed_data = self._get_processed_data(data=self.data) + processed_data = await self._get_processed_data(data=self.data) self.data = processed_data for idx, item in enumerate(processed_data): @@ -275,14 +192,14 @@ async def validate_format(self, client: InfrahubClient, branch: str | None = Non data=item, branch=branch, default_schema_kind=self.kind, - strategy=self.strategy, # Pass strategy down + parameters=self.parameters, ) ) return errors async def process(self, client: InfrahubClient, branch: str | None = None) -> None: schema = await client.schema.get(kind=self.kind, branch=branch) - processed_data = self._get_processed_data(data=self.data) + processed_data = await self._get_processed_data(data=self.data) for idx, item in enumerate(processed_data): await self.create_node( @@ -304,8 +221,9 @@ async def validate_object( context: dict | None = None, branch: str | None = None, default_schema_kind: str | None = None, - strategy: ObjectStrategy = ObjectStrategy.NORMAL, + parameters: InfrahubObjectParameters | None = None, ) -> list[ObjectValidationError]: + parameters = parameters or InfrahubObjectParameters() errors: list[ObjectValidationError] = [] context = context.copy() if context else {} @@ -354,7 +272,7 @@ async def validate_object( context=context, branch=branch, default_schema_kind=default_schema_kind, - strategy=strategy, + parameters=parameters, ) ) @@ -370,8 +288,9 @@ async def validate_related_nodes( context: dict | None = None, branch: str | None = None, default_schema_kind: str | None = None, - strategy: ObjectStrategy = ObjectStrategy.NORMAL, + parameters: InfrahubObjectParameters | None = None, ) -> list[ObjectValidationError]: + parameters = parameters or InfrahubObjectParameters() context = context.copy() if context else {} errors: list[ObjectValidationError] = [] @@ -399,6 +318,7 @@ async def validate_related_nodes( context=context, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) ) return errors @@ -412,11 +332,11 @@ async def validate_related_nodes( rel_info.find_matching_relationship(peer_schema=peer_schema) context.update(rel_info.get_context(value="placeholder")) - # Use strategy-aware data processing - processor = DataProcessorFactory.get_processor(strategy) - expanded_data = processor.process_data(data["data"]) + processed_data = await DataProcessorFactory.process_data( + kind=peer_kind, data=data["data"], parameters=parameters + ) - for idx, peer_data in enumerate(expanded_data): + for idx, peer_data in enumerate(processed_data): context["list_index"] = idx errors.extend( await cls.validate_object( @@ -427,7 +347,7 @@ async def validate_related_nodes( context=context, branch=branch, default_schema_kind=default_schema_kind, - strategy=strategy, + parameters=parameters, ) ) return errors @@ -452,6 +372,7 @@ async def validate_related_nodes( context=context, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) ) return errors @@ -478,7 +399,9 @@ async def create_node( context: dict | None = None, branch: str | None = None, default_schema_kind: str | None = None, + parameters: InfrahubObjectParameters | None = None, ) -> InfrahubNode: + parameters = parameters or InfrahubObjectParameters() context = context.copy() if context else {} errors = await cls.validate_object( @@ -489,6 +412,7 @@ async def create_node( context=context, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) if errors: messages = [str(error) for error in errors] @@ -534,6 +458,7 @@ async def create_node( data=value, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) clean_data[key] = nodes[0] @@ -545,6 +470,7 @@ async def create_node( data=value, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) clean_data[key] = nodes @@ -583,6 +509,7 @@ async def create_node( context=context, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) return node @@ -598,7 +525,9 @@ async def create_related_nodes( context: dict | None = None, branch: str | None = None, default_schema_kind: str | None = None, + parameters: InfrahubObjectParameters | None = None, ) -> list[InfrahubNode]: + parameters = parameters or InfrahubObjectParameters() nodes: list[InfrahubNode] = [] context = context.copy() if context else {} @@ -618,6 +547,7 @@ async def create_related_nodes( context=context, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) return [new_node] @@ -631,7 +561,10 @@ async def create_related_nodes( rel_info.find_matching_relationship(peer_schema=peer_schema) context.update(rel_info.get_context(value=parent_node.id)) - expanded_data = expand_data_with_ranges(data=data["data"]) + expanded_data = await DataProcessorFactory.process_data( + kind=peer_kind, data=data["data"], parameters=parameters + ) + for idx, peer_data in enumerate(expanded_data): context["list_index"] = idx if isinstance(peer_data, dict): @@ -643,6 +576,7 @@ async def create_related_nodes( context=context, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) nodes.append(node) return nodes @@ -668,6 +602,7 @@ async def create_related_nodes( context=context, branch=branch, default_schema_kind=default_schema_kind, + parameters=parameters, ) nodes.append(node) diff --git a/infrahub_sdk/spec/processors/__init__.py b/infrahub_sdk/spec/processors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/infrahub_sdk/spec/processors/data_processor.py b/infrahub_sdk/spec/processors/data_processor.py new file mode 100644 index 00000000..0b007fec --- /dev/null +++ b/infrahub_sdk/spec/processors/data_processor.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod +from typing import Any + + +class DataProcessor(ABC): + """Abstract base class for data processing strategies""" + + @abstractmethod + async def process_data(self, data: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Process the data according to the strategy""" diff --git a/infrahub_sdk/spec/processors/factory.py b/infrahub_sdk/spec/processors/factory.py new file mode 100644 index 00000000..80f80efc --- /dev/null +++ b/infrahub_sdk/spec/processors/factory.py @@ -0,0 +1,34 @@ +from collections.abc import Sequence +from typing import Any + +from ..models import InfrahubObjectParameters +from .data_processor import DataProcessor +from .range_expand_processor import RangeExpandDataProcessor + +PROCESSOR_PER_KIND: dict[str, DataProcessor] = {} + + +class DataProcessorFactory: + """Factory to create appropriate data processor based on strategy""" + + @classmethod + def get_processors(cls, kind: str, parameters: InfrahubObjectParameters) -> Sequence[DataProcessor]: + processors: list[DataProcessor] = [] + if parameters.expand_range: + processors.append(RangeExpandDataProcessor()) + if kind in PROCESSOR_PER_KIND: + processors.append(PROCESSOR_PER_KIND[kind]) + + return processors + + @classmethod + async def process_data( + cls, + kind: str, + data: list[dict[str, Any]], + parameters: InfrahubObjectParameters, + ) -> list[dict[str, Any]]: + processors = cls.get_processors(kind=kind, parameters=parameters) + for processor in processors: + data = await processor.process_data(data=data) + return data diff --git a/infrahub_sdk/spec/processors/range_expand_processor.py b/infrahub_sdk/spec/processors/range_expand_processor.py new file mode 100644 index 00000000..53e159e4 --- /dev/null +++ b/infrahub_sdk/spec/processors/range_expand_processor.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import copy +import logging +import re +from typing import Any + +from ...exceptions import ValidationError +from ..range_expansion import MATCH_PATTERN, range_expansion +from .data_processor import DataProcessor + +log = logging.getLogger("infrahub_sdk") + + +class RangeExpandDataProcessor(DataProcessor): + """Process data with range expansion""" + + @classmethod + async def process_data( + cls, + data: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + """Expand any item in data with range pattern in any value. Supports multiple fields, requires equal expansion length.""" + range_pattern = re.compile(MATCH_PATTERN) + expanded = [] + for item in data: + # Find all fields to expand + expand_fields = {} + for key, value in item.items(): + if isinstance(value, str) and range_pattern.search(value): + try: + expand_fields[key] = range_expansion(value) + except (ValueError, TypeError, KeyError): + # If expansion fails, treat as no expansion + log.debug( + f"Range expansion failed for value '{value}' in key '{key}'. Treating as no expansion." + ) + expand_fields[key] = [value] + if not expand_fields: + expanded.append(item) + continue + # Check all expanded lists have the same length + lengths = [len(v) for v in expand_fields.values()] + if len(set(lengths)) > 1: + raise ValidationError( + identifier="range_expansion", + message=f"Range expansion mismatch: fields expanded to different lengths: {lengths}", + ) + n = lengths[0] + # Zip expanded values and produce new items + for i in range(n): + new_item = copy.deepcopy(item) + for key, values in expand_fields.items(): + new_item[key] = values[i] + expanded.append(new_item) + return expanded diff --git a/tests/unit/sdk/conftest.py b/tests/unit/sdk/conftest.py index 0d249012..451a2adb 100644 --- a/tests/unit/sdk/conftest.py +++ b/tests/unit/sdk/conftest.py @@ -1878,6 +1878,12 @@ async def mock_schema_query_01(httpx_mock: HTTPXMock, schema_query_01_data: dict return httpx_mock +@pytest.fixture +async def client_with_schema_01(client: InfrahubClient, schema_query_01_data: dict) -> InfrahubClient: + client.schema.set_cache(schema=schema_query_01_data, branch="main") + return client + + @pytest.fixture async def mock_schema_query_02(httpx_mock: HTTPXMock, schema_query_02_data: dict) -> HTTPXMock: httpx_mock.add_response( diff --git a/tests/unit/sdk/spec/test_object.py b/tests/unit/sdk/spec/test_object.py index faf862b0..570f14c5 100644 --- a/tests/unit/sdk/spec/test_object.py +++ b/tests/unit/sdk/spec/test_object.py @@ -5,11 +5,9 @@ import pytest from infrahub_sdk.exceptions import ValidationError -from infrahub_sdk.spec.object import ObjectFile, ObjectStrategy, RelationshipDataFormat, get_relationship_info +from infrahub_sdk.spec.object import ObjectFile, RelationshipDataFormat, get_relationship_info if TYPE_CHECKING: - from pytest_httpx import HTTPXMock - from infrahub_sdk.client import InfrahubClient @@ -40,7 +38,7 @@ def location_bad_syntax02(root_location: dict) -> dict: data = [{"name": "Mexico", "notvalidattribute": "notvalidattribute", "type": "Country"}] location = root_location.copy() location["spec"]["data"] = data - location["spec"]["strategy"] = ObjectStrategy.RANGE_EXPAND + location["spec"]["parameters"] = {"expand_range": True} return location @@ -54,7 +52,7 @@ def location_expansion(root_location: dict) -> dict: ] location = root_location.copy() location["spec"]["data"] = data - location["spec"]["strategy"] = ObjectStrategy.RANGE_EXPAND + location["spec"]["parameters"] = {"expand_range": True} return location @@ -68,7 +66,7 @@ def no_location_expansion(root_location: dict) -> dict: ] location = root_location.copy() location["spec"]["data"] = data - location["spec"]["strategy"] = ObjectStrategy.NORMAL + location["spec"]["parameters"] = {"expand_range": False} return location @@ -83,7 +81,7 @@ def location_expansion_multiple_ranges(root_location: dict) -> dict: ] location = root_location.copy() location["spec"]["data"] = data - location["spec"]["strategy"] = ObjectStrategy.RANGE_EXPAND + location["spec"]["parameters"] = {"expand_range": True} return location @@ -98,11 +96,12 @@ def location_expansion_multiple_ranges_bad_syntax(root_location: dict) -> dict: ] location = root_location.copy() location["spec"]["data"] = data - location["spec"]["strategy"] = ObjectStrategy.RANGE_EXPAND + location["spec"]["parameters"] = {"expand_range": True} return location -async def test_validate_object(client: InfrahubClient, mock_schema_query_01: HTTPXMock, location_mexico_01) -> None: +async def test_validate_object(client: InfrahubClient, schema_query_01_data: dict, location_mexico_01) -> None: + client.schema.set_cache(schema=schema_query_01_data, branch="main") obj = ObjectFile(location="some/path", content=location_mexico_01) await obj.validate_format(client=client) @@ -110,8 +109,9 @@ async def test_validate_object(client: InfrahubClient, mock_schema_query_01: HTT async def test_validate_object_bad_syntax01( - client: InfrahubClient, mock_schema_query_01: HTTPXMock, location_bad_syntax01 + client: InfrahubClient, schema_query_01_data: dict, location_bad_syntax01 ) -> None: + client.schema.set_cache(schema=schema_query_01_data, branch="main") obj = ObjectFile(location="some/path", content=location_bad_syntax01) with pytest.raises(ValidationError) as exc: await obj.validate_format(client=client) @@ -119,21 +119,17 @@ async def test_validate_object_bad_syntax01( assert "name" in str(exc.value) -async def test_validate_object_bad_syntax02( - client: InfrahubClient, mock_schema_query_01: HTTPXMock, location_bad_syntax02 -) -> None: +async def test_validate_object_bad_syntax02(client_with_schema_01: InfrahubClient, location_bad_syntax02) -> None: obj = ObjectFile(location="some/path", content=location_bad_syntax02) with pytest.raises(ValidationError) as exc: - await obj.validate_format(client=client) + await obj.validate_format(client=client_with_schema_01) assert "notvalidattribute" in str(exc.value) -async def test_validate_object_expansion( - client: InfrahubClient, mock_schema_query_01: HTTPXMock, location_expansion -) -> None: +async def test_validate_object_expansion(client_with_schema_01: InfrahubClient, location_expansion) -> None: obj = ObjectFile(location="some/path", content=location_expansion) - await obj.validate_format(client=client) + await obj.validate_format(client=client_with_schema_01) assert obj.spec.kind == "BuiltinLocation" assert len(obj.spec.data) == 5 @@ -141,22 +137,20 @@ async def test_validate_object_expansion( assert obj.spec.data[4]["name"] == "AMS5" -async def test_validate_no_object_expansion( - client: InfrahubClient, mock_schema_query_01: HTTPXMock, no_location_expansion -) -> None: +async def test_validate_no_object_expansion(client_with_schema_01: InfrahubClient, no_location_expansion) -> None: obj = ObjectFile(location="some/path", content=no_location_expansion) - await obj.validate_format(client=client) + await obj.validate_format(client=client_with_schema_01) assert obj.spec.kind == "BuiltinLocation" - assert obj.spec.strategy == ObjectStrategy.NORMAL + assert not obj.spec.parameters.expand_range assert len(obj.spec.data) == 1 assert obj.spec.data[0]["name"] == "AMS[1-5]" async def test_validate_object_expansion_multiple_ranges( - client: InfrahubClient, mock_schema_query_01: HTTPXMock, location_expansion_multiple_ranges + client_with_schema_01: InfrahubClient, location_expansion_multiple_ranges ) -> None: obj = ObjectFile(location="some/path", content=location_expansion_multiple_ranges) - await obj.validate_format(client=client) + await obj.validate_format(client=client_with_schema_01) assert obj.spec.kind == "BuiltinLocation" assert len(obj.spec.data) == 5 @@ -167,11 +161,11 @@ async def test_validate_object_expansion_multiple_ranges( async def test_validate_object_expansion_multiple_ranges_bad_syntax( - client: InfrahubClient, mock_schema_query_01: HTTPXMock, location_expansion_multiple_ranges_bad_syntax + client_with_schema_01: InfrahubClient, location_expansion_multiple_ranges_bad_syntax ) -> None: obj = ObjectFile(location="some/path", content=location_expansion_multiple_ranges_bad_syntax) with pytest.raises(ValidationError) as exc: - await obj.validate_format(client=client) + await obj.validate_format(client=client_with_schema_01) assert "Range expansion mismatch" in str(exc.value) @@ -217,41 +211,13 @@ async def test_validate_object_expansion_multiple_ranges_bad_syntax( @pytest.mark.parametrize("data,is_valid,format", get_relationship_info_testdata) async def test_get_relationship_info_tags( - client: InfrahubClient, - mock_schema_query_01: HTTPXMock, + client_with_schema_01: InfrahubClient, data: dict | list, is_valid: bool, format: RelationshipDataFormat, ) -> None: - location_schema = await client.schema.get(kind="BuiltinLocation") + location_schema = await client_with_schema_01.schema.get(kind="BuiltinLocation") - rel_info = await get_relationship_info(client, location_schema, "tags", data) + rel_info = await get_relationship_info(client_with_schema_01, location_schema, "tags", data) assert rel_info.is_valid == is_valid assert rel_info.format == format - - -async def test_invalid_object_expansion_processor( - client: InfrahubClient, mock_schema_query_01: HTTPXMock, location_expansion -) -> None: - obj = ObjectFile(location="some/path", content=location_expansion) - - from infrahub_sdk.spec.object import DataProcessorFactory, ObjectStrategy # noqa: PLC0415 - - # Patch _processors to remove the invalid strategy - original_processors = DataProcessorFactory._processors.copy() - try: - DataProcessorFactory._processors[ObjectStrategy.RANGE_EXPAND] = None - with pytest.raises(ValueError) as exc: - await obj.validate_format(client=client) - assert "Unknown strategy" in str(exc.value) - finally: - DataProcessorFactory._processors = original_processors - - -async def test_invalid_object_expansion_strategy(client: InfrahubClient, location_expansion) -> None: - location_expansion["spec"]["strategy"] = "InvalidStrategy" - obj = ObjectFile(location="some/path", content=location_expansion) - - with pytest.raises(ValidationError) as exc: - await obj.validate_format(client=client) - assert "Input should be" in str(exc.value)