diff --git a/scim2_tester/checkers/patch_add.py b/scim2_tester/checkers/patch_add.py new file mode 100644 index 0000000..b642d0c --- /dev/null +++ b/scim2_tester/checkers/patch_add.py @@ -0,0 +1,134 @@ +"""PATCH add operation checkers for SCIM compliance testing.""" + +from typing import Any + +from scim2_client import SCIMClientError +from scim2_models import Mutability +from scim2_models import PatchOp +from scim2_models import PatchOperation +from scim2_models import Required +from scim2_models import Resource + +from ..filling import generate_random_value +from ..urns import get_annotation_by_urn +from ..urns import get_value_by_urn +from ..urns import iter_all_urns +from ..utils import CheckContext +from ..utils import CheckResult +from ..utils import Status +from ..utils import checker +from ..utils import compare_field + + +@checker("patch:add") +def check_add_attribute( + context: CheckContext, model: type[Resource[Any]] +) -> list[CheckResult]: + """Test PATCH add operation on all attributes (simple, complex, and extensions). + + Creates a minimal resource, then iterates over ALL possible URNs (base model, + extensions, and sub-attributes) to test PATCH add operations systematically. + Uses a unified approach that handles all attribute types consistently. + + **Tested Behavior:** + - Adding new attribute values (simple, complex, and extension attributes) + - Server accepts the PATCH request with correct URN paths for extensions + - Response contains the added attribute with correct values + + **Status:** + - :attr:`~scim2_tester.Status.SUCCESS`: Attribute successfully added + - :attr:`~scim2_tester.Status.ERROR`: Failed to add attribute + - :attr:`~scim2_tester.Status.SKIPPED`: No addable attributes found + + .. pull-quote:: :rfc:`RFC 7644 Section 3.5.2.1 - Add Operation <7644#section-3.5.2.1>` + + "The 'add' operation is used to add a new attribute and/or values to + an existing resource." + """ + results = [] + all_urns = list( + iter_all_urns( + model, + required=[Required.false], + mutability=[Mutability.read_write, Mutability.write_only], + # Not supported until filters are implemented in scim2_models + include_subattributes=False, + ) + ) + + if not all_urns: + return [ + CheckResult( + status=Status.SKIPPED, + reason=f"No addable attributes found for {model.__name__}", + resource_type=model.__name__, + ) + ] + + base_resource = context.resource_manager.create_and_register(model) + + for urn, source_model in all_urns: + patch_value = generate_random_value(context, urn=urn, model=source_model) + + patch_op = PatchOp[type(base_resource)]( + operations=[ + PatchOperation( + op=PatchOperation.Op.add, + path=urn, + value=patch_value, + ) + ] + ) + + try: + updated_resource = context.client.modify( + resource_model=type(base_resource), + id=base_resource.id, + patch_op=patch_op, + ) + except SCIMClientError as exc: + results.append( + CheckResult( + status=Status.ERROR, + reason=f"Failed to add attribute '{urn}': {exc}", + resource_type=model.__name__, + data={ + "urn": urn, + "error": exc, + "patch_value": patch_value, + }, + ) + ) + continue + + actual_value = get_value_by_urn(updated_resource, urn) + + if get_annotation_by_urn( + Mutability, urn, source_model + ) == Mutability.write_only or compare_field(patch_value, actual_value): + results.append( + CheckResult( + status=Status.SUCCESS, + reason=f"Successfully added attribute '{urn}'", + resource_type=model.__name__, + data={ + "urn": urn, + "value": patch_value, + }, + ) + ) + else: + results.append( + CheckResult( + status=Status.ERROR, + reason=f"Attribute '{urn}' was not added or has incorrect value", + resource_type=model.__name__, + data={ + "urn": urn, + "expected": patch_value, + "actual": actual_value, + }, + ) + ) + + return results diff --git a/scim2_tester/checkers/patch_remove.py b/scim2_tester/checkers/patch_remove.py new file mode 100644 index 0000000..1726dd0 --- /dev/null +++ b/scim2_tester/checkers/patch_remove.py @@ -0,0 +1,136 @@ +"""PATCH remove operation checkers for SCIM compliance testing.""" + +from typing import Any + +from scim2_client import SCIMClientError +from scim2_models import Mutability +from scim2_models import PatchOp +from scim2_models import PatchOperation +from scim2_models import Required +from scim2_models import Resource + +from ..urns import get_annotation_by_urn +from ..urns import get_value_by_urn +from ..urns import iter_all_urns +from ..utils import CheckContext +from ..utils import CheckResult +from ..utils import Status +from ..utils import checker + + +@checker("patch:remove") +def check_remove_attribute( + context: CheckContext, model: type[Resource[Any]] +) -> list[CheckResult]: + """Test PATCH remove operation on all attributes (simple, complex, and extensions). + + Creates a resource with initial values, then iterates over ALL possible URNs + (base model, extensions, and sub-attributes) to test PATCH remove operations + systematically. Uses a unified approach that handles all attribute types consistently. + + **Tested Behavior:** + - Removing attribute values (simple, complex, and extension attributes) + - Server accepts the PATCH request with correct URN paths for extensions + - Response contains the resource with removed attributes (null/missing) + + **Status:** + - :attr:`~scim2_tester.Status.SUCCESS`: Attribute successfully removed + - :attr:`~scim2_tester.Status.ERROR`: Failed to remove attribute or attribute still exists + - :attr:`~scim2_tester.Status.SKIPPED`: No removable attributes found + + .. pull-quote:: :rfc:`RFC 7644 Section 3.5.2.2 - Remove Operation <7644#section-3.5.2.2>` + + "The 'remove' operation removes the value at the target location specified + by the required attribute 'path'. The operation performs the following + functions, depending on the target location specified by 'path'." + """ + results = [] + all_urns = list( + iter_all_urns( + model, + required=[Required.false], + mutability=[Mutability.read_write, Mutability.write_only], + # Not supported until filters are implemented in scim2_models + include_subattributes=False, + ) + ) + + if not all_urns: + return [ + CheckResult( + status=Status.SKIPPED, + reason=f"No removable attributes found for {model.__name__}", + resource_type=model.__name__, + ) + ] + + full_resource = context.resource_manager.create_and_register(model, fill_all=True) + + for urn, source_model in all_urns: + initial_value = get_value_by_urn(full_resource, urn) + if initial_value is None: + continue + + remove_op = PatchOp[type(full_resource)]( + operations=[ + PatchOperation( + op=PatchOperation.Op.remove, + path=urn, + ) + ] + ) + + try: + updated_resource = context.client.modify( + resource_model=type(full_resource), + id=full_resource.id, + patch_op=remove_op, + ) + except SCIMClientError as exc: + results.append( + CheckResult( + status=Status.ERROR, + reason=f"Failed to remove attribute '{urn}': {exc}", + resource_type=model.__name__, + data={ + "urn": urn, + "error": exc, + "initial_value": initial_value, + }, + ) + ) + continue + + actual_value = get_value_by_urn(updated_resource, urn) + + if ( + get_annotation_by_urn(Mutability, urn, source_model) + == Mutability.write_only + or actual_value is None + ): + results.append( + CheckResult( + status=Status.SUCCESS, + reason=f"Successfully removed attribute '{urn}'", + resource_type=model.__name__, + data={ + "urn": urn, + "initial_value": initial_value, + }, + ) + ) + else: + results.append( + CheckResult( + status=Status.ERROR, + reason=f"Attribute '{urn}' was not removed", + resource_type=model.__name__, + data={ + "urn": urn, + "initial_value": initial_value, + "actual_value": actual_value, + }, + ) + ) + + return results diff --git a/scim2_tester/checkers/patch_replace.py b/scim2_tester/checkers/patch_replace.py new file mode 100644 index 0000000..82ac1a7 --- /dev/null +++ b/scim2_tester/checkers/patch_replace.py @@ -0,0 +1,133 @@ +"""PATCH replace operation checkers for SCIM compliance testing.""" + +from typing import Any + +from scim2_client import SCIMClientError +from scim2_models import Mutability +from scim2_models import PatchOp +from scim2_models import PatchOperation +from scim2_models import Resource + +from ..filling import generate_random_value +from ..urns import get_annotation_by_urn +from ..urns import get_value_by_urn +from ..urns import iter_all_urns +from ..utils import CheckContext +from ..utils import CheckResult +from ..utils import Status +from ..utils import checker +from ..utils import compare_field + + +@checker("patch:replace") +def check_replace_attribute( + context: CheckContext, model: type[Resource[Any]] +) -> list[CheckResult]: + """Test PATCH replace operation on all attributes (simple, complex, and extensions). + + Creates a resource with initial values, then iterates over ALL possible URNs + (base model, extensions, and sub-attributes) to test PATCH replace operations + systematically. Uses a unified approach that handles all attribute types consistently. + + **Tested Behavior:** + - Replacing existing attribute values (simple, complex, and extension attributes) + - Server accepts the PATCH request with correct URN paths for extensions + - Response contains the replaced attribute with correct new values + + **Status:** + - :attr:`~scim2_tester.Status.SUCCESS`: Attribute successfully replaced + - :attr:`~scim2_tester.Status.ERROR`: Failed to replace attribute + - :attr:`~scim2_tester.Status.SKIPPED`: No replaceable attributes found + + .. pull-quote:: :rfc:`RFC 7644 Section 3.5.2.3 - Replace Operation <7644#section-3.5.2.3>` + + "The 'replace' operation replaces the value at the target location + specified by the 'path'." + """ + results = [] + all_urns = list( + iter_all_urns( + model, + mutability=[Mutability.read_write, Mutability.write_only], + # Not supported until filters are implemented in scim2_models + include_subattributes=False, + ) + ) + + if not all_urns: + return [ + CheckResult( + status=Status.SKIPPED, + reason=f"No replaceable attributes found for {model.__name__}", + resource_type=model.__name__, + ) + ] + + base_resource = context.resource_manager.create_and_register(model) + + for urn, source_model in all_urns: + patch_value = generate_random_value(context, urn=urn, model=source_model) + + patch_op = PatchOp[type(base_resource)]( + operations=[ + PatchOperation( + op=PatchOperation.Op.replace_, + path=urn, + value=patch_value, + ) + ] + ) + + try: + # Perform the PATCH replace operation + updated_resource = context.client.modify( + resource_model=type(base_resource), + id=base_resource.id, + patch_op=patch_op, + ) + except SCIMClientError as exc: + results.append( + CheckResult( + status=Status.ERROR, + reason=f"Failed to replace attribute '{urn}': {exc}", + resource_type=model.__name__, + data={ + "urn": urn, + "error": exc, + "patch_value": patch_value, + }, + ) + ) + continue + + actual_value = get_value_by_urn(updated_resource, urn) + + if get_annotation_by_urn( + Mutability, urn, source_model + ) == Mutability.write_only or compare_field(patch_value, actual_value): + results.append( + CheckResult( + status=Status.SUCCESS, + reason=f"Successfully replaced attribute '{urn}'", + resource_type=model.__name__, + data={ + "urn": urn, + "value": patch_value, + }, + ) + ) + else: + results.append( + CheckResult( + status=Status.ERROR, + reason=f"Attribute '{urn}' was not replaced or has incorrect value", + resource_type=model.__name__, + data={ + "urn": urn, + "expected": patch_value, + "actual": actual_value, + }, + ) + ) + + return results diff --git a/scim2_tester/checkers/resource_put.py b/scim2_tester/checkers/resource_put.py index c4f83e7..de4fdb5 100644 --- a/scim2_tester/checkers/resource_put.py +++ b/scim2_tester/checkers/resource_put.py @@ -34,14 +34,17 @@ def object_replacement( """ test_obj = context.resource_manager.create_and_register(model) - mutable_fields = [ - field_name - for field_name in model.model_fields.keys() - if model.get_field_annotation(field_name, Mutability) - in (Mutability.read_write, Mutability.write_only) - ] + mutable_urns = [] + for field_name in model.model_fields.keys(): + if field_name in ("meta", "id", "schemas"): + continue + if model.get_field_annotation(field_name, Mutability) in ( + Mutability.read_write, + Mutability.write_only, + ): + mutable_urns.append(test_obj.get_attribute_urn(field_name)) - modified_obj = fill_with_random_values(context, test_obj, mutable_fields) + modified_obj = fill_with_random_values(context, test_obj, mutable_urns) if modified_obj is None: raise ValueError( diff --git a/scim2_tester/filling.py b/scim2_tester/filling.py index a01b792..3ce7f6f 100644 --- a/scim2_tester/filling.py +++ b/scim2_tester/filling.py @@ -1,3 +1,4 @@ +import base64 import random import uuid from enum import Enum @@ -15,22 +16,27 @@ from scim2_models import Resource from scim2_models import URIReference from scim2_models.utils import UNION_TYPES +from scim2_models.utils import Base64Bytes +from scim2_models.utils import _find_field_name -from scim2_tester.utils import CheckContext +from scim2_tester.urns import get_attribute_type_by_urn +from scim2_tester.urns import get_multiplicity_by_urn +from scim2_tester.urns import iter_all_urns +from scim2_tester.urns import set_value_by_urn if TYPE_CHECKING: - pass + from scim2_tester.utils import CheckContext -def model_from_ref_type( - context: CheckContext, ref_type: type, different_than: type[Resource[Any]] +def get_model_from_ref_type( + context: "CheckContext", ref_type: type, different_than: type[Resource[Any]] | None ) -> type[Resource[Any]]: """Return "User" from "Union[Literal['User'], Literal['Group']]".""" - def model_from_ref_type_(ref_type: type) -> Any: + def get_model_from_ref_type_(ref_type: type) -> Any: if get_origin(ref_type) in UNION_TYPES: return [ - model_from_ref_type_(sub_ref_type) + get_model_from_ref_type_(sub_ref_type) for sub_ref_type in get_args(ref_type) ] @@ -38,28 +44,43 @@ def model_from_ref_type_(ref_type: type) -> Any: model = context.client.get_resource_model(model_name) return model - models = model_from_ref_type_(ref_type) + models = get_model_from_ref_type_(ref_type) models = models if isinstance(models, list) else [models] acceptable_models = [model for model in models if model != different_than] return acceptable_models[0] def generate_random_value( - context: CheckContext, - obj: Resource[Any], - field_name: str, + context: "CheckContext", + urn: str, + model: type[Resource], ) -> Any: - field_type = obj.get_field_root_type(field_name) + field_name = _find_field_name(model, urn) + field_type = get_attribute_type_by_urn(model, urn) + is_multiple = get_multiplicity_by_urn(model, urn) + + is_email = urn and ( + urn.endswith("emails.value") + or ( + field_name == "value" + and (field_type and "email" in field_type.__name__.lower()) + ) + ) + is_phone = urn and ( + urn.endswith("phoneNumbers.value") + or ( + field_name == "value" + and (field_type and "phone" in field_type.__name__.lower()) + ) + ) value: Any - if obj.get_field_annotation(field_name, Mutability) == Mutability.read_only: - value = None # RFC7643 §4.1.2 provides the following indications, however # there is no way to guess the existence of such requirements # just by looking at the object schema. # The value SHOULD be specified according to [RFC5321]. - elif field_name == "value" and "email" in obj.__class__.__name__.lower(): + if is_email: value = f"{uuid.uuid4()}@{uuid.uuid4()}.com" # RFC7643 §4.1.2 provides the following indications, however @@ -68,7 +89,7 @@ def generate_random_value( # The value SHOULD be specified # according to the format defined in [RFC3966], e.g., # 'tel:+1-201-555-0123'. - elif field_name == "value" and "phone" in obj.__class__.__name__.lower(): + elif is_phone: # pragma: no cover value = "".join(str(random.choice(range(10))) for _ in range(10)) elif field_type is int: @@ -80,9 +101,9 @@ def generate_random_value( elif get_origin(field_type) is Reference and get_args(field_type)[0] != Any: ref_type = get_args(field_type)[0] if ref_type not in (ExternalReference, URIReference): - model = model_from_ref_type(context, ref_type, different_than=obj.__class__) + model = get_model_from_ref_type(context, ref_type, different_than=model) ref_obj = context.resource_manager.create_and_register(model) - value = ref_obj.meta.location + value = ref_obj.meta.location if ref_obj.meta else None else: value = f"https://{str(uuid.uuid4())}.test" @@ -91,45 +112,51 @@ def generate_random_value( value = random.choice(list(field_type)) elif isclass(field_type) and issubclass(field_type, ComplexAttribute): - value = fill_with_random_values(context, field_type()) + value = fill_with_random_values(context, field_type()) # type: ignore[arg-type] elif isclass(field_type) and issubclass(field_type, Extension): - value = fill_with_random_values(context, field_type()) + value = fill_with_random_values(context, field_type()) # type: ignore[arg-type] + + elif field_type is Base64Bytes: + value = base64.b64encode(uuid.uuid4().bytes).decode("ascii") else: - # Put emails so this will be accepted by EmailStr too value = str(uuid.uuid4()) + + if is_multiple: + value = [value] + return value def fill_with_random_values( - context: CheckContext, + context: "CheckContext", obj: Resource[Any], - field_names: list[str] | None = None, + urns: list[str] | None = None, ) -> Resource[Any] | None: """Fill an object with random values generated according the attribute types. :param context: The check context containing the SCIM client and configuration :param obj: The Resource object to fill with random values - :param field_names: Optional list of field names to fill (defaults to all) + :param urns: Optional list of URNs to fill (defaults to all fields) :returns: The filled object or None if the object ends up empty """ - for field_name in ( - field_names if field_names is not None else obj.__class__.model_fields.keys() - ): - if field_name not in obj.__class__.model_fields: - continue - - field = obj.__class__.model_fields[field_name] - if field.default: - continue - - value = generate_random_value(context, obj, field_name) - - is_multiple = obj.get_field_multiplicity(field_name) - if is_multiple: - setattr(obj, field_name, [value]) - else: - setattr(obj, field_name, value) + # If no URNs provided, generate URNs for all fields + if urns is None: + urns = [ + urn + for urn, _ in iter_all_urns( + type(obj), + mutability=[ + Mutability.read_write, + Mutability.write_only, + Mutability.immutable, + ], + ) + ] + + for urn in urns: + value = generate_random_value(context, urn=urn, model=type(obj)) + set_value_by_urn(obj, urn, value) return obj diff --git a/scim2_tester/urns.py b/scim2_tester/urns.py new file mode 100644 index 0000000..83a6696 --- /dev/null +++ b/scim2_tester/urns.py @@ -0,0 +1,264 @@ +from collections.abc import Iterator +from inspect import isclass +from typing import Any + +from scim2_models import BaseModel +from scim2_models import ComplexAttribute +from scim2_models import Extension +from scim2_models import Mutability +from scim2_models import Required +from scim2_models import Resource +from scim2_models.utils import _find_field_name +from scim2_models.utils import _to_camel + + +def iter_urns( + model: type[Resource[Any] | Extension], + required: list[Required] | None = None, + mutability: list[Mutability] | None = None, + include_subattributes: bool = True, +) -> Iterator[str]: + """Iterate over URNs for attributes matching the specified criteria.""" + for field_name in model.model_fields: + if field_name in ("meta", "id", "schemas"): + continue + + # Check required filter + if required is not None: + field_required = model.get_field_annotation(field_name, Required) + if field_required not in required: + continue + + # Check mutability filter + if mutability is not None: # pragma: no cover + field_mutability = model.get_field_annotation(field_name, Mutability) + if field_mutability not in mutability: + continue + + field_type = model.get_field_root_type(field_name) + + if issubclass(field_type, Extension): + urn = field_type.model_fields["schemas"].default[0] + elif issubclass(model, Extension): + urn = model().get_attribute_urn(field_name) + else: + urn = _to_camel(field_name) + + yield urn + + is_complex = isclass(field_type) and issubclass(field_type, ComplexAttribute) + if include_subattributes and is_complex: + for sub_field_name in field_type.model_fields: + sub_urn = f"{urn}.{_to_camel(sub_field_name)}" + yield sub_urn + + +def iter_all_urns( + model: type[Resource[Any]], + required: list[Required] | None = None, + mutability: list[Mutability] | None = None, + include_subattributes: bool = True, +) -> Iterator[tuple[str, type[Resource[Any] | Extension]]]: + """Iterate over ALL URNs from base model and all its extensions.""" + base_urns = iter_urns( + model, + required=required, + mutability=mutability, + include_subattributes=include_subattributes, + ) + + for urn in base_urns: + yield (urn, model) + + if isclass(model) and issubclass(model, Resource): + for extension_model in model.get_extension_models().values(): + ext_urns = iter_urns( + extension_model, + required=required, + mutability=mutability, + include_subattributes=include_subattributes, + ) + + for urn in ext_urns: + yield (urn, extension_model) + + +def get_target_model_by_urn( + model: type[BaseModel], urn: str +) -> tuple[type[BaseModel], str] | None: + if ":" in urn and isclass(model) and issubclass(model, Resource | Extension): + model_schema = model.model_fields["schemas"].default[0] + + if urn.startswith(model_schema): + urn = urn.removeprefix(model_schema) + urn = urn.removeprefix(":") + + elif issubclass(model, Resource): + for ( + extension_schema, + extension_model, + ) in model.get_extension_models().items(): + if urn == extension_schema: + urn = extension_model.__name__ + elif urn.startswith(extension_schema): + model = extension_model + urn = urn.removeprefix(extension_schema) + urn = urn.removeprefix(":") + + if "." in urn: + parts = urn.split(".") + current_model = model + + for part in parts[:-1]: + field_name = _find_field_name(current_model, part) + if field_name is None: + return None + field_type = current_model.get_field_root_type(field_name) + current_model = field_type + + last_part = parts[-1] + field_name = _find_field_name(current_model, last_part) + return current_model, field_name + + field_name = _find_field_name(model, urn) + if field_name is None: + return None + return model, field_name + + +def get_attribute_type_by_urn( + model: type[Resource[Any] | Extension], urn: str +) -> type | None: + """Get the field type for a given URN path.""" + if target := get_target_model_by_urn(model, urn): + model, field_name = target + return model.get_field_root_type(field_name) + + return None + + +def get_annotation_by_urn( + annotation_type: type, urn: str, model: type[Resource[Any] | Extension] +) -> Any: + """Get annotation value for a given URN path.""" + if target := get_target_model_by_urn(model, urn): + model, field_name = target + return model.get_field_annotation(field_name, annotation_type) + + return None + + +def get_multiplicity_by_urn( + model: type[Resource[Any] | Extension], + urn: str, +) -> bool | None: + if target := get_target_model_by_urn(model, urn): + model, field_name = target + return model.get_field_multiplicity(field_name) + + return None + + +def get_value_by_urn(obj: BaseModel, urn: str) -> Any: + """Get value from resource using URN path.""" + if ":" in urn: + model_schema = type(obj).model_fields["schemas"].default[0] + + if isinstance(obj, Resource | Extension) and urn.startswith(model_schema): + urn = urn.removeprefix(model_schema) + urn = urn.removeprefix(":") + + elif isinstance(obj, Resource): + for ( + extension_schema, + extension_model, + ) in obj.get_extension_models().items(): + if urn == extension_schema: + urn = extension_model.__name__ + elif urn.startswith(extension_schema): + urn = urn.removeprefix(extension_schema) + urn = urn.removeprefix(":") + extension_obj = obj[extension_model] + if extension_obj is None: + return None + return get_value_by_urn(extension_obj, urn) + + if "." in urn: + parts = urn.split(".") + current_obj = obj + + for part in parts[:-1]: + field_name = _find_field_name(type(current_obj), part) + if field_name is None: + return None + + sub_obj = getattr(current_obj, field_name) + if sub_obj is None: + return None + current_obj = sub_obj + + last_part = parts[-1] + field_name = _find_field_name(type(current_obj), last_part) + if field_name is None: + return None + + return getattr(current_obj, field_name) + + field_name = _find_field_name(type(obj), urn) + if field_name is None: + return None + + return getattr(obj, field_name) + + +def set_value_by_urn(obj: Resource[Any], urn: str, value: Any) -> None: + if ":" in urn: + model_schema = type(obj).model_fields["schemas"].default[0] + + if isinstance(obj, Resource | Extension) and urn.startswith(model_schema): + urn = urn.removeprefix(model_schema).removeprefix(":") + + elif isinstance(obj, Resource): + for extension_schema, extension_model in obj.get_extension_models().items(): + if urn == extension_schema: + obj[extension_model] = value + return + elif urn.startswith(extension_schema): + sub_urn = urn.removeprefix(extension_schema).removeprefix(":") + if obj[extension_model] is None: + obj[extension_model] = extension_model() + return set_value_by_urn(obj[extension_model], sub_urn, value) + + if "." in urn: + parts = urn.split(".") + current_obj = obj + + for part in parts[:-1]: + field_name = _find_field_name(type(current_obj), part) + if field_name is None: + return + sub_obj = getattr(current_obj, field_name) + if sub_obj is None: + field_type = type(current_obj).get_field_root_type(field_name) + sub_obj = field_type() + setattr(current_obj, field_name, sub_obj) + elif isinstance(sub_obj, list): + # Cannot navigate into multi-valued attributes + return + current_obj = sub_obj + + last_part = parts[-1] + field_name = _find_field_name(type(current_obj), last_part) + setattr(current_obj, field_name, value) + + else: + field_name = _find_field_name(type(obj), urn) + if field_name: + # Handle multivalued fields by wrapping single values in lists + if ( + obj.get_field_multiplicity(field_name) + and not isinstance(value, list) + and value is not None + ): + value = [value] + setattr(obj, field_name, value) diff --git a/scim2_tester/utils.py b/scim2_tester/utils.py index 07f9a64..0facc42 100644 --- a/scim2_tester/utils.py +++ b/scim2_tester/utils.py @@ -8,6 +8,8 @@ from scim2_client import SCIMClientError from scim2_client.engines.httpx import SyncSCIMClient +from scim2_models import BaseModel +from scim2_models import Mutability from scim2_models import Required from scim2_models import Resource @@ -101,12 +103,13 @@ class CheckConfig: class CheckContext: """Execution context with client, config and resource management.""" + resource_manager: "ResourceManager" + def __init__(self, client: SyncSCIMClient, conf: CheckConfig): self.client = client self.conf = conf - from scim2_tester.utils import ResourceManager - - self.resource_manager = ResourceManager(self) + # ResourceManager is defined later in the file, so we instantiate it here + self.resource_manager = ResourceManager(self) # type: ignore[name-defined] class SCIMTesterError(Exception): @@ -153,20 +156,42 @@ def __init__(self, context: CheckContext): self.context = context self.resources: list[Resource[Any]] = [] - def create_and_register(self, model: type[Resource[Any]]) -> Resource[Any]: + def create_and_register( + self, model: type[Resource[Any]], fill_all: bool = False + ) -> Resource[Any]: """Create an object and automatically register it for cleanup. :param model: The Resource model class to create + :param fill_all: If True, fill all writable attributes (excluding read-only). + If False, fill only required attributes (default behavior). :returns: The created Resource instance """ + # Import here to avoid circular imports from scim2_tester.filling import fill_with_random_values - field_names = [ - field_name - for field_name in model.model_fields - if model.get_field_annotation(field_name, Required) == Required.true - ] - obj = fill_with_random_values(self.context, model(), field_names) + obj = model() + + if fill_all: + # Fill all fields except read-only ones (including extensions) + urns = [] + for field_name in model.model_fields: + if field_name in ("meta", "id", "schemas"): + continue + if ( + model.get_field_annotation(field_name, Mutability) + != Mutability.read_only + ): + urns.append(obj.get_attribute_urn(field_name)) + else: + # Default behavior: fill only required fields + urns = [] + for field_name in model.model_fields: + if field_name in ("meta", "id", "schemas"): + continue + if model.get_field_annotation(field_name, Required) == Required.true: + urns.append(obj.get_attribute_urn(field_name)) + + obj = fill_with_random_values(self.context, obj, urns) created = self.context.client.create(obj) # Handle the case where create might return Error or dict @@ -287,3 +312,13 @@ def wrapped(context: CheckContext, *args: Any, **kwargs: Any) -> Any: tags = () return decorator(func) return decorator + + +def compare_field(expected: Any, actual: Any) -> bool: + if expected is None or actual is None: + return False + + if isinstance(expected, BaseModel) and isinstance(actual, BaseModel): + return expected.model_dump() == actual.model_dump() + + return expected == actual diff --git a/tests/test_filling.py b/tests/test_filling.py index 8c2dcca..c8528b3 100644 --- a/tests/test_filling.py +++ b/tests/test_filling.py @@ -1,6 +1,5 @@ """Test automatic field filling functionality.""" -import base64 from typing import Literal from unittest.mock import patch @@ -10,17 +9,14 @@ from scim2_tester.filling import fill_with_random_values from scim2_tester.filling import generate_random_value -from scim2_tester.filling import model_from_ref_type +from scim2_tester.filling import get_model_from_ref_type def test_generate_random_value_bytes_field(testing_context): """Validates random value generation for bytes fields.""" - cert = X509Certificate(value=base64.b64encode(b"placeholder")) - - value = generate_random_value(testing_context, cert, "value") + value = generate_random_value(testing_context, urn="value", model=X509Certificate) assert isinstance(value, str) - assert len(value) == 36 # UUID string length def test_model_resolution_from_reference_type(testing_context): @@ -30,9 +26,8 @@ def test_model_resolution_from_reference_type(testing_context): ref_type = Literal["User"] | Literal["Group"] different_than = Group - result = model_from_ref_type(testing_context, ref_type, different_than) + result = get_model_from_ref_type(testing_context, ref_type, different_than) - # The result should be User[EnterpriseUser] from the testing context assert result == User[EnterpriseUser] assert result != Group @@ -59,7 +54,7 @@ def test_fill_with_nonexistent_field(testing_context): with patch( "scim2_tester.filling.generate_random_value", return_value="mock_value" ) as mock_generate: - result = fill_with_random_values(testing_context, user, ["nonexistent_field"]) - mock_generate.assert_not_called() + result = fill_with_random_values(testing_context, user, ["nonexistent_urn"]) + mock_generate.assert_called_once() assert result is user diff --git a/tests/test_patch_add.py b/tests/test_patch_add.py new file mode 100644 index 0000000..e19b92c --- /dev/null +++ b/tests/test_patch_add.py @@ -0,0 +1,307 @@ +"""Unit tests for PATCH add operation checkers.""" + +import json + +from scim2_models import EnterpriseUser +from scim2_models import User +from werkzeug.wrappers import Response + +from scim2_tester.checkers.patch_add import check_add_attribute +from scim2_tester.utils import Status +from tests.utils import build_nested_response + + +def test_successful_add(httpserver, testing_context): + """Test successful PATCH add returns SUCCESS in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=201, + ) + + def patch_handler(request): + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + value = operation["value"] + + response_data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + } + + # Build nested response structure + response_data = build_nested_response(response_data, path, value) + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + testing_context.conf.raise_exceptions = True + results = check_add_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected + + +def test_server_error(httpserver, testing_context): + """Test server error returns ERROR in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=201, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"], + "detail": "Internal server error", + "status": "500", + }, + status=500, + ) + + results = check_add_attribute(testing_context, User) + # Filter out write_only fields which always return SUCCESS since they can't be verified + non_writeonly_results = [ + r for r in results if "password" not in r.data.get("urn", "").lower() + ] + assert all(r.status == Status.ERROR for r in non_writeonly_results) + assert len(non_writeonly_results) > 0, ( + "Should have non-write_only fields that failed" + ) + + +def test_attribute_not_added(httpserver, testing_context): + """Test attribute not added returns ERROR in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=201, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=200, + ) + + results = check_add_attribute(testing_context, User) + # Filter out write_only fields which always return SUCCESS since they can't be verified + non_writeonly_results = [ + r + for r in results + if "password" + not in r.data.get("urn", "").lower() # password is the main write_only field + ] + assert all(r.status == Status.ERROR for r in non_writeonly_results) + # Verify that we have both types of results + assert len(non_writeonly_results) > 0, ( + "Should have non-write_only fields that failed" + ) + + +def test_no_patchable_attributes(testing_context): + """Test no patchable attributes returns SKIPPED.""" + from unittest.mock import patch + + with ( + patch( + "scim2_tester.checkers.patch_add.iter_all_urns", + return_value=iter([]), + ), + ): + results = check_add_attribute(testing_context, User) + assert len(results) == 1 + unexpected = [r for r in results if r.status != Status.SKIPPED] + assert not unexpected + + +def test_complex_successful_add(httpserver, testing_context): + """Test successful PATCH add for complex attributes returns SUCCESS.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=201, + ) + + def patch_handler(request): + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + value = operation["value"] + + response_data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + } + + # Build nested response structure + response_data = build_nested_response(response_data, path, value) + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_add_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected + + +def test_complex_server_error(httpserver, testing_context): + """Test server error for complex attributes returns ERROR.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=201, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"], + "detail": "Complex attribute error", + "status": "500", + }, + status=500, + ) + + results = check_add_attribute(testing_context, User) + # Filter out write_only fields which always return SUCCESS since they can't be verified + non_writeonly_results = [ + r for r in results if "password" not in r.data.get("urn", "").lower() + ] + assert all(r.status == Status.ERROR for r in non_writeonly_results) + assert len(non_writeonly_results) > 0, ( + "Should have non-write_only fields that failed" + ) + + +def test_complex_attribute_not_added(httpserver, testing_context): + """Test complex attribute not added returns ERROR.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=201, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + }, + status=200, + ) + + results = check_add_attribute(testing_context, User) + # Filter out write_only fields which always return SUCCESS since they can't be verified + non_writeonly_results = [ + r for r in results if "password" not in r.data.get("urn", "").lower() + ] + assert all(r.status == Status.ERROR for r in non_writeonly_results) + assert len(non_writeonly_results) > 0, ( + "Should have non-write_only fields that failed" + ) + + +def test_no_complex_patchable_attributes(testing_context): + """Test no complex patchable attributes returns SKIPPED.""" + from unittest.mock import patch + + with ( + patch( + "scim2_tester.checkers.patch_add.iter_all_urns", + return_value=iter([]), + ), + ): + results = check_add_attribute(testing_context, User) + assert len(results) == 1 + unexpected = [r for r in results if r.status != Status.SKIPPED] + assert not unexpected + + +def test_user_with_enterprise_extension(httpserver, testing_context): + """Test PATCH add with User[EnterpriseUser] extension support.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ], + "id": "123", + "userName": "test@example.com", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": { + "employeeNumber": "EMP001" + }, + }, + status=201, + ) + + def patch_handler(request): + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + value = operation["value"] + + response_data = { + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ], + "id": "123", + "userName": "test@example.com", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": { + "employeeNumber": "EMP001" + }, + } + + # Build nested response structure + response_data = build_nested_response(response_data, path, value) + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_add_attribute(testing_context, User[EnterpriseUser]) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected diff --git a/tests/test_patch_remove.py b/tests/test_patch_remove.py new file mode 100644 index 0000000..645807a --- /dev/null +++ b/tests/test_patch_remove.py @@ -0,0 +1,299 @@ +"""Unit tests for PATCH remove operation checkers.""" + +import json + +from scim2_models import EnterpriseUser +from scim2_models import User +from werkzeug.wrappers import Response + +from scim2_tester.checkers.patch_remove import check_remove_attribute +from scim2_tester.utils import Status + + +def test_successful_remove(httpserver, testing_context): + """Test successful PATCH remove returns SUCCESS in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Test User", + "nickName": "testy", + }, + status=201, + ) + + def patch_handler(request): + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + + field_mapping = { + "display_name": "displayName", + "nick_name": "nickName", + "profile_url": "profileUrl", + "user_type": "userType", + "preferred_language": "preferredLanguage", + } + + response_data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Test User", + "nickName": "testy", + } + + json_field = field_mapping.get(path, path) + del response_data[json_field] + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_remove_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected + + +def test_remove_server_error(httpserver, testing_context): + """Test PATCH remove server error returns ERROR in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Test User", + }, + status=201, + ) + + def patch_handler(request): + return Response( + json.dumps( + { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"], + "status": "500", + "detail": "Internal Server Error", + } + ), + status=500, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_remove_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.ERROR] + assert not unexpected + + +def test_attribute_not_removed(httpserver, testing_context): + """Test attribute not removed returns ERROR in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Test User", + "nickName": "testy", + }, + status=201, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Test User", + "nickName": "testy", + }, + status=200, + ) + + results = check_remove_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.ERROR] + assert not unexpected + + +def test_no_removable_attributes(testing_context): + """Test no removable attributes returns SKIPPED.""" + from unittest.mock import patch + + with patch( + "scim2_tester.checkers.patch_remove.iter_all_urns", + return_value=iter([]), + ): + results = check_remove_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SKIPPED] + assert not unexpected + + +def test_complex_successful_remove(httpserver, testing_context): + """Test successful PATCH remove for complex attributes returns SUCCESS.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "name": { + "givenName": "Test", + "familyName": "User", + }, + }, + status=201, + ) + + def patch_handler(request): + response_data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + } + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_remove_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected + + +def test_user_with_enterprise_extension_remove(httpserver, testing_context): + """Test PATCH remove with User[EnterpriseUser] extension support.""" + + def create_handler(request): + request_data = json.loads(request.get_data(as_text=True)) + resource_id = "123e4567-e89b-12d3-a456-426614174000" + + response_data = { + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ], + "id": resource_id, + "meta": { + "resourceType": "User", + "location": f"http://localhost/Users/{resource_id}", + "created": "2024-01-01T00:00:00Z", + "lastModified": "2024-01-01T00:00:00Z", + "version": 'W/"1"', + }, + } + + for key in [ + "userName", + "displayName", + "nickName", + "profileUrl", + "userType", + "preferredLanguage", + ]: + if key in request_data: + response_data[key] = request_data[key] + + extension_key = "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + if extension_key in request_data: + response_data[extension_key] = request_data[extension_key] + + return Response( + json.dumps(response_data), + status=201, + headers={ + "Content-Type": "application/scim+json", + "Location": f"http://localhost/Users/{resource_id}", + }, + ) + + httpserver.expect_request(uri="/Users", method="POST").respond_with_handler( + create_handler + ) + + def patch_handler(request): + resource_id = "123e4567-e89b-12d3-a456-426614174000" + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + + response_data = { + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ], + "id": resource_id, + "meta": { + "resourceType": "User", + "location": f"http://localhost/Users/{resource_id}", + "created": "2024-01-01T00:00:00Z", + "lastModified": "2024-01-01T00:00:00Z", + "version": 'W/"2"', + }, + "userName": "test@example.com", + "displayName": "Test User", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": { + "employeeNumber": "EMP001", + "department": "Engineering", + }, + } + + if path.startswith( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:" + ): + field_name = path.split(":")[-1] + if ( + field_name + in response_data[ + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + ] + ): + del response_data[ + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + ][field_name] + else: + field_mapping = { + "display_name": "displayName", + "nick_name": "nickName", + "profile_url": "profileUrl", + "user_type": "userType", + "preferred_language": "preferredLanguage", + } + json_field = field_mapping.get(path, path) + if json_field in response_data: + del response_data[json_field] + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request( + uri="/Users/123e4567-e89b-12d3-a456-426614174000", method="PATCH" + ).respond_with_handler(patch_handler) + + httpserver.expect_request( + uri="/Users/123e4567-e89b-12d3-a456-426614174000", method="DELETE" + ).respond_with_data("", status=204) + + results = check_remove_attribute(testing_context, User[EnterpriseUser]) + + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected, ( + f"All results should be SUCCESS, got: {[r.reason for r in results if r.status != Status.SUCCESS]}" + ) diff --git a/tests/test_patch_replace.py b/tests/test_patch_replace.py new file mode 100644 index 0000000..9a06eb5 --- /dev/null +++ b/tests/test_patch_replace.py @@ -0,0 +1,230 @@ +"""Unit tests for PATCH replace operation checkers.""" + +import json + +from scim2_models import EnterpriseUser +from scim2_models import User +from werkzeug.wrappers import Response + +from scim2_tester.checkers.patch_replace import check_replace_attribute +from scim2_tester.utils import Status +from tests.utils import build_nested_response + + +def test_successful_replace(httpserver, testing_context): + """Test successful PATCH replace returns SUCCESS in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Initial Name", + }, + status=201, + ) + + def patch_handler(request): + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + value = operation["value"] + + response_data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Initial Name", + } + + # Use build_nested_response to handle all path types + response_data = build_nested_response(response_data, path, value) + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_replace_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected + + +def test_replace_server_error(httpserver, testing_context): + """Test PATCH replace server error returns ERROR in CheckResult.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Initial Name", + }, + status=201, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"], + "status": "500", + "detail": "Internal Server Error", + }, + status=500, + ) + + results = check_replace_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.ERROR] + assert not unexpected + + +def test_replace_attribute_not_updated(httpserver, testing_context): + """Test PATCH replace where attribute is not actually updated returns ERROR.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Initial Name", + }, + status=201, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "displayName": "Initial Name", + }, + status=200, + ) + + results = check_replace_attribute(testing_context, User) + # Filter out write_only fields which always return SUCCESS since they can't be verified + non_writeonly_results = [ + r for r in results if "password" not in r.data.get("urn", "").lower() + ] + assert all(r.status == Status.ERROR for r in non_writeonly_results) + assert len(non_writeonly_results) > 0, ( + "Should have non-write_only fields that failed" + ) + + +def test_no_replaceable_attributes(testing_context): + """Test no replaceable attributes returns SKIPPED.""" + from unittest.mock import patch + + with ( + patch( + "scim2_tester.checkers.patch_replace.iter_all_urns", + return_value=iter([]), + ), + ): + results = check_replace_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SKIPPED] + assert not unexpected + + +def test_complex_successful_replace(httpserver, testing_context): + """Test successful PATCH replace for complex attributes returns SUCCESS.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "name": { + "givenName": "Initial", + "familyName": "User", + }, + }, + status=201, + ) + + def patch_handler(request): + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + value = operation["value"] + + response_data = { + "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + "id": "123", + "userName": "test@example.com", + "name": { + "givenName": "Initial", + "familyName": "User", + }, + } + + # Use build_nested_response to handle all path types + response_data = build_nested_response(response_data, path, value) + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_replace_attribute(testing_context, User) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected + + +def test_user_with_enterprise_extension_replace(httpserver, testing_context): + """Test PATCH replace with User[EnterpriseUser] extension support.""" + httpserver.expect_request(uri="/Users", method="POST").respond_with_json( + { + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ], + "id": "123", + "userName": "test@example.com", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": { + "employeeNumber": "EMP001" + }, + }, + status=201, + ) + + def patch_handler(request): + patch_data = json.loads(request.get_data(as_text=True)) + operation = patch_data["Operations"][0] + path = operation["path"] + value = operation["value"] + + response_data = { + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ], + "id": "123", + "userName": "test@example.com", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": { + "employeeNumber": "EMP001" + }, + } + + # Use build_nested_response to handle all path types including extensions + response_data = build_nested_response(response_data, path, value) + + return Response( + json.dumps(response_data), + status=200, + headers={"Content-Type": "application/scim+json"}, + ) + + httpserver.expect_request(uri="/Users/123", method="PATCH").respond_with_handler( + patch_handler + ) + + results = check_replace_attribute(testing_context, User[EnterpriseUser]) + unexpected = [r for r in results if r.status != Status.SUCCESS] + assert not unexpected diff --git a/tests/test_urn.py b/tests/test_urn.py new file mode 100644 index 0000000..4fe3ba7 --- /dev/null +++ b/tests/test_urn.py @@ -0,0 +1,2131 @@ +from typing import Any +from typing import Literal + +from pydantic import EmailStr +from scim2_models import Address +from scim2_models import Email +from scim2_models import EnterpriseUser +from scim2_models import Entitlement +from scim2_models import ExternalReference +from scim2_models import GroupMembership +from scim2_models import Im +from scim2_models import Manager +from scim2_models import Mutability +from scim2_models import Name +from scim2_models import PhoneNumber +from scim2_models import Photo +from scim2_models import Reference +from scim2_models import Required +from scim2_models import Role +from scim2_models import User +from scim2_models import X509Certificate +from scim2_models.utils import Base64Bytes + +from scim2_tester.urns import get_annotation_by_urn +from scim2_tester.urns import get_attribute_type_by_urn +from scim2_tester.urns import get_multiplicity_by_urn +from scim2_tester.urns import get_target_model_by_urn +from scim2_tester.urns import get_value_by_urn +from scim2_tester.urns import iter_all_urns +from scim2_tester.urns import iter_urns +from scim2_tester.urns import set_value_by_urn + + +def test_iter_urns_resource(): + urns = list(iter_urns(User)) + assert "externalId" in urns + assert "userName" in urns + assert "name" in urns + assert "name.formatted" in urns + assert "name.familyName" in urns + assert "name.givenName" in urns + assert "name.middleName" in urns + assert "name.honorificPrefix" in urns + assert "name.honorificSuffix" in urns + assert "displayName" in urns + assert "nickName" in urns + assert "profileUrl" in urns + assert "title" in urns + assert "userType" in urns + assert "preferredLanguage" in urns + assert "locale" in urns + assert "timezone" in urns + assert "active" in urns + assert "password" in urns + assert "emails" in urns + assert "emails.type" in urns + assert "emails.primary" in urns + assert "emails.display" in urns + assert "emails.value" in urns + assert "emails.ref" in urns + assert "phoneNumbers" in urns + assert "phoneNumbers.type" in urns + assert "phoneNumbers.primary" in urns + assert "phoneNumbers.display" in urns + assert "phoneNumbers.value" in urns + assert "phoneNumbers.ref" in urns + assert "ims" in urns + assert "ims.type" in urns + assert "ims.primary" in urns + assert "ims.display" in urns + assert "ims.value" in urns + assert "ims.ref" in urns + assert "photos" in urns + assert "photos.type" in urns + assert "photos.primary" in urns + assert "photos.display" in urns + assert "photos.value" in urns + assert "photos.ref" in urns + assert "addresses" in urns + assert "addresses.type" in urns + assert "addresses.primary" in urns + assert "addresses.display" in urns + # TODO: this should not exist + assert "addresses.value" in urns + assert "addresses.ref" in urns + assert "addresses.formatted" in urns + assert "addresses.streetAddress" in urns + assert "addresses.locality" in urns + assert "addresses.region" in urns + assert "addresses.postalCode" in urns + assert "addresses.country" in urns + assert "groups" in urns + assert "groups.type" in urns + assert "groups.primary" in urns + assert "groups.display" in urns + assert "groups.value" in urns + assert "groups.ref" in urns + assert "entitlements" in urns + assert "entitlements.type" in urns + assert "entitlements.primary" in urns + assert "entitlements.display" in urns + # TODO: this should be str ? + assert "entitlements.value" in urns + assert "entitlements.ref" in urns + assert "roles" in urns + assert "roles.type" in urns + assert "roles.primary" in urns + assert "roles.display" in urns + # TODO: this should be str? + assert "roles.value" in urns + assert "roles.ref" in urns + assert "x509Certificates" in urns + assert "x509Certificates.type" in urns + assert "x509Certificates.primary" in urns + assert "x509Certificates.display" in urns + assert "x509Certificates.value" in urns + assert "x509Certificates.ref" in urns + + +def test_iter_urns_extension(): + urns = list(iter_urns(EnterpriseUser)) + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:department" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:division" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:employeeNumber" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.displayName" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.ref" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.value" + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:organization" + ) in urns + + +def test_iter_all_urns(): + urns = list(iter_all_urns(User[EnterpriseUser])) + assert ("externalId", User[EnterpriseUser]) in urns + assert ("userName", User[EnterpriseUser]) in urns + assert ("name", User[EnterpriseUser]) in urns + assert ("name.formatted", User[EnterpriseUser]) in urns + assert ("name.familyName", User[EnterpriseUser]) in urns + assert ("name.givenName", User[EnterpriseUser]) in urns + assert ("name.middleName", User[EnterpriseUser]) in urns + assert ("name.honorificPrefix", User[EnterpriseUser]) in urns + assert ("name.honorificSuffix", User[EnterpriseUser]) in urns + assert ("displayName", User[EnterpriseUser]) in urns + assert ("nickName", User[EnterpriseUser]) in urns + assert ("profileUrl", User[EnterpriseUser]) in urns + assert ("title", User[EnterpriseUser]) in urns + assert ("userType", User[EnterpriseUser]) in urns + assert ("preferredLanguage", User[EnterpriseUser]) in urns + assert ("locale", User[EnterpriseUser]) in urns + assert ("timezone", User[EnterpriseUser]) in urns + assert ("active", User[EnterpriseUser]) in urns + assert ("password", User[EnterpriseUser]) in urns + assert ("emails", User[EnterpriseUser]) in urns + assert ("emails.type", User[EnterpriseUser]) in urns + assert ("emails.primary", User[EnterpriseUser]) in urns + assert ("emails.display", User[EnterpriseUser]) in urns + assert ("emails.value", User[EnterpriseUser]) in urns + assert ("emails.ref", User[EnterpriseUser]) in urns + assert ("phoneNumbers", User[EnterpriseUser]) in urns + assert ("phoneNumbers.type", User[EnterpriseUser]) in urns + assert ("phoneNumbers.primary", User[EnterpriseUser]) in urns + assert ("phoneNumbers.display", User[EnterpriseUser]) in urns + assert ("phoneNumbers.value", User[EnterpriseUser]) in urns + assert ("phoneNumbers.ref", User[EnterpriseUser]) in urns + assert ("ims", User[EnterpriseUser]) in urns + assert ("ims.type", User[EnterpriseUser]) in urns + assert ("ims.primary", User[EnterpriseUser]) in urns + assert ("ims.display", User[EnterpriseUser]) in urns + assert ("ims.value", User[EnterpriseUser]) in urns + assert ("ims.ref", User[EnterpriseUser]) in urns + assert ("photos", User[EnterpriseUser]) in urns + assert ("photos.type", User[EnterpriseUser]) in urns + assert ("photos.primary", User[EnterpriseUser]) in urns + assert ("photos.display", User[EnterpriseUser]) in urns + assert ("photos.value", User[EnterpriseUser]) in urns + assert ("photos.ref", User[EnterpriseUser]) in urns + assert ("addresses", User[EnterpriseUser]) in urns + assert ("addresses.type", User[EnterpriseUser]) in urns + assert ("addresses.primary", User[EnterpriseUser]) in urns + assert ("addresses.display", User[EnterpriseUser]) in urns + # TODO: this should not exist + assert ("addresses.value", User[EnterpriseUser]) in urns + assert ("addresses.ref", User[EnterpriseUser]) in urns + assert ("addresses.formatted", User[EnterpriseUser]) in urns + assert ("addresses.streetAddress", User[EnterpriseUser]) in urns + assert ("addresses.locality", User[EnterpriseUser]) in urns + assert ("addresses.region", User[EnterpriseUser]) in urns + assert ("addresses.postalCode", User[EnterpriseUser]) in urns + assert ("addresses.country", User[EnterpriseUser]) in urns + assert ("groups", User[EnterpriseUser]) in urns + assert ("groups.type", User[EnterpriseUser]) in urns + assert ("groups.primary", User[EnterpriseUser]) in urns + assert ("groups.display", User[EnterpriseUser]) in urns + assert ("groups.value", User[EnterpriseUser]) in urns + assert ("groups.ref", User[EnterpriseUser]) in urns + assert ("entitlements", User[EnterpriseUser]) in urns + assert ("entitlements.type", User[EnterpriseUser]) in urns + assert ("entitlements.primary", User[EnterpriseUser]) in urns + assert ("entitlements.display", User[EnterpriseUser]) in urns + # TODO: this should be str ? + assert ("entitlements.value", User[EnterpriseUser]) in urns + assert ("entitlements.ref", User[EnterpriseUser]) in urns + assert ("roles", User[EnterpriseUser]) in urns + assert ("roles.type", User[EnterpriseUser]) in urns + assert ("roles.primary", User[EnterpriseUser]) in urns + assert ("roles.display", User[EnterpriseUser]) in urns + # TODO: this should be str? + assert ("roles.value", User[EnterpriseUser]) in urns + assert ("roles.ref", User[EnterpriseUser]) in urns + assert ("x509Certificates", User[EnterpriseUser]) in urns + assert ("x509Certificates.type", User[EnterpriseUser]) in urns + assert ("x509Certificates.primary", User[EnterpriseUser]) in urns + assert ("x509Certificates.display", User[EnterpriseUser]) in urns + assert ("x509Certificates.value", User[EnterpriseUser]) in urns + assert ("x509Certificates.ref", User[EnterpriseUser]) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:department", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:division", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:employeeNumber", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.displayName", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.ref", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.value", + EnterpriseUser, + ) in urns + assert ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:organization", + EnterpriseUser, + ) in urns + + +def test_get_target_model_by_urn(): + urns = iter_all_urns(User[EnterpriseUser]) + annotations = {urn: get_target_model_by_urn(model, urn) for urn, model in urns} + assert annotations == { + "active": ( + User[EnterpriseUser], + "active", + ), + "addresses": ( + User[EnterpriseUser], + "addresses", + ), + "addresses.country": ( + Address, + "country", + ), + "addresses.display": ( + Address, + "display", + ), + "addresses.formatted": ( + Address, + "formatted", + ), + "addresses.locality": ( + Address, + "locality", + ), + "addresses.postalCode": ( + Address, + "postal_code", + ), + "addresses.primary": ( + Address, + "primary", + ), + "addresses.ref": ( + Address, + "ref", + ), + "addresses.region": ( + Address, + "region", + ), + "addresses.streetAddress": ( + Address, + "street_address", + ), + "addresses.type": ( + Address, + "type", + ), + "addresses.value": ( + Address, + "value", + ), + "displayName": ( + User[EnterpriseUser], + "display_name", + ), + "emails": ( + User[EnterpriseUser], + "emails", + ), + "emails.display": ( + Email, + "display", + ), + "emails.primary": ( + Email, + "primary", + ), + "emails.ref": ( + Email, + "ref", + ), + "emails.type": ( + Email, + "type", + ), + "emails.value": ( + Email, + "value", + ), + "entitlements": ( + User[EnterpriseUser], + "entitlements", + ), + "entitlements.display": ( + Entitlement, + "display", + ), + "entitlements.primary": ( + Entitlement, + "primary", + ), + "entitlements.ref": ( + Entitlement, + "ref", + ), + "entitlements.type": ( + Entitlement, + "type", + ), + "entitlements.value": ( + Entitlement, + "value", + ), + "externalId": ( + User[EnterpriseUser], + "external_id", + ), + "groups": ( + User[EnterpriseUser], + "groups", + ), + "groups.display": ( + GroupMembership, + "display", + ), + "groups.primary": ( + GroupMembership, + "primary", + ), + "groups.ref": ( + GroupMembership, + "ref", + ), + "groups.type": ( + GroupMembership, + "type", + ), + "groups.value": ( + GroupMembership, + "value", + ), + "ims": ( + User[EnterpriseUser], + "ims", + ), + "ims.display": ( + Im, + "display", + ), + "ims.primary": ( + Im, + "primary", + ), + "ims.ref": ( + Im, + "ref", + ), + "ims.type": ( + Im, + "type", + ), + "ims.value": ( + Im, + "value", + ), + "locale": ( + User[EnterpriseUser], + "locale", + ), + "name": ( + User[EnterpriseUser], + "name", + ), + "name.familyName": ( + Name, + "family_name", + ), + "name.formatted": ( + Name, + "formatted", + ), + "name.givenName": ( + Name, + "given_name", + ), + "name.honorificPrefix": ( + Name, + "honorific_prefix", + ), + "name.honorificSuffix": ( + Name, + "honorific_suffix", + ), + "name.middleName": ( + Name, + "middle_name", + ), + "nickName": ( + User[EnterpriseUser], + "nick_name", + ), + "password": ( + User[EnterpriseUser], + "password", + ), + "phoneNumbers": ( + User[EnterpriseUser], + "phone_numbers", + ), + "phoneNumbers.display": ( + PhoneNumber, + "display", + ), + "phoneNumbers.primary": ( + PhoneNumber, + "primary", + ), + "phoneNumbers.ref": ( + PhoneNumber, + "ref", + ), + "phoneNumbers.type": ( + PhoneNumber, + "type", + ), + "phoneNumbers.value": ( + PhoneNumber, + "value", + ), + "photos": ( + User[EnterpriseUser], + "photos", + ), + "photos.display": ( + Photo, + "display", + ), + "photos.primary": ( + Photo, + "primary", + ), + "photos.ref": ( + Photo, + "ref", + ), + "photos.type": ( + Photo, + "type", + ), + "photos.value": ( + Photo, + "value", + ), + "preferredLanguage": ( + User[EnterpriseUser], + "preferred_language", + ), + "profileUrl": ( + User[EnterpriseUser], + "profile_url", + ), + "roles": ( + User[EnterpriseUser], + "roles", + ), + "roles.display": ( + Role, + "display", + ), + "roles.primary": ( + Role, + "primary", + ), + "roles.ref": ( + Role, + "ref", + ), + "roles.type": ( + Role, + "type", + ), + "roles.value": ( + Role, + "value", + ), + "timezone": ( + User[EnterpriseUser], + "timezone", + ), + "title": ( + User[EnterpriseUser], + "title", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": ( + User[EnterpriseUser], + "EnterpriseUser", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter": ( + EnterpriseUser, + "cost_center", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:department": ( + EnterpriseUser, + "department", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:division": ( + EnterpriseUser, + "division", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:employeeNumber": ( + EnterpriseUser, + "employee_number", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager": ( + EnterpriseUser, + "manager", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.displayName": ( + Manager, + "display_name", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.ref": ( + Manager, + "ref", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.value": ( + Manager, + "value", + ), + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:organization": ( + EnterpriseUser, + "organization", + ), + "userName": ( + User[EnterpriseUser], + "user_name", + ), + "userType": ( + User[EnterpriseUser], + "user_type", + ), + "x509Certificates": ( + User[EnterpriseUser], + "x509_certificates", + ), + "x509Certificates.display": ( + X509Certificate, + "display", + ), + "x509Certificates.primary": ( + X509Certificate, + "primary", + ), + "x509Certificates.ref": ( + X509Certificate, + "ref", + ), + "x509Certificates.type": ( + X509Certificate, + "type", + ), + "x509Certificates.value": ( + X509Certificate, + "value", + ), + } + + +def test_get_attribute_type_by_urn(): + urns = iter_all_urns(User[EnterpriseUser]) + types = {urn: get_attribute_type_by_urn(model, urn) for urn, model in urns} + assert types == { + "active": bool, + "addresses": Address, + "addresses.country": str, + "addresses.display": str, + "addresses.formatted": str, + "addresses.locality": str, + "addresses.postalCode": str, + "addresses.primary": bool, + "addresses.ref": Reference[Any], + "addresses.region": str, + "addresses.streetAddress": str, + "addresses.type": Address.Type, + "addresses.value": Any, + "displayName": str, + "emails": Email, + "emails.display": str, + "emails.primary": bool, + "emails.ref": Reference[Any], + "emails.type": Email.Type, + "emails.value": EmailStr, + "entitlements": Entitlement, + "entitlements.display": str, + "entitlements.primary": bool, + "entitlements.ref": Reference[Any], + "entitlements.type": str, + "entitlements.value": Any, + "externalId": str, + "groups": GroupMembership, + "groups.display": str, + "groups.primary": bool, + "groups.ref": Reference[Literal["User"] | Literal["Group"]], + "groups.type": str, + "groups.value": str, + "ims": Im, + "ims.display": str, + "ims.primary": bool, + "ims.ref": Reference[Any], + "ims.type": Im.Type, + "ims.value": str, + "locale": str, + "name": Name, + "name.familyName": str, + "name.formatted": str, + "name.givenName": str, + "name.honorificPrefix": str, + "name.honorificSuffix": str, + "name.middleName": str, + "nickName": str, + "password": str, + "phoneNumbers": PhoneNumber, + "phoneNumbers.display": str, + "phoneNumbers.primary": bool, + "phoneNumbers.ref": Reference[Any], + "phoneNumbers.type": PhoneNumber.Type, + "phoneNumbers.value": str, + "photos": Photo, + "photos.display": str, + "photos.primary": bool, + "photos.ref": Reference[Any], + "photos.type": Photo.Type, + "photos.value": Reference[ExternalReference], + "preferredLanguage": str, + "profileUrl": Reference[ExternalReference], + "roles": Role, + "roles.display": str, + "roles.primary": bool, + "roles.ref": Reference[Any], + "roles.type": str, + "roles.value": Any, + "timezone": str, + "title": str, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": EnterpriseUser, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter": str, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:department": str, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:division": str, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:employeeNumber": str, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager": Manager, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.displayName": str, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.ref": Reference[ + Literal["User"] + ], + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.value": str, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:organization": str, + "userName": str, + "userType": str, + "x509Certificates": X509Certificate, + "x509Certificates.display": str, + "x509Certificates.primary": bool, + "x509Certificates.ref": Reference[Any], + "x509Certificates.type": str, + "x509Certificates.value": Base64Bytes, + } + + +def test_get_annotation_by_urn(): + urns = iter_all_urns(User[EnterpriseUser]) + annotations = { + urn: get_annotation_by_urn(Mutability, urn, model) for urn, model in urns + } + assert annotations == { + "active": Mutability.read_write, + "addresses": Mutability.read_write, + "addresses.country": Mutability.read_write, + "addresses.display": Mutability.immutable, + "addresses.formatted": Mutability.read_write, + "addresses.locality": Mutability.read_write, + "addresses.postalCode": Mutability.read_write, + "addresses.primary": Mutability.read_write, + "addresses.ref": Mutability.read_write, + "addresses.region": Mutability.read_write, + "addresses.streetAddress": Mutability.read_write, + "addresses.type": Mutability.read_write, + "addresses.value": Mutability.read_write, + "displayName": Mutability.read_write, + "emails": Mutability.read_write, + "emails.display": Mutability.read_write, + "emails.primary": Mutability.read_write, + "emails.ref": Mutability.read_write, + "emails.type": Mutability.read_write, + "emails.value": Mutability.read_write, + "entitlements": Mutability.read_write, + "entitlements.display": Mutability.immutable, + "entitlements.primary": Mutability.read_write, + "entitlements.ref": Mutability.read_write, + "entitlements.type": Mutability.read_write, + "entitlements.value": Mutability.read_write, + "externalId": Mutability.read_write, + "groups": Mutability.read_only, + "groups.display": Mutability.read_only, + "groups.primary": Mutability.read_write, + "groups.ref": Mutability.read_only, + "groups.type": Mutability.read_only, + "groups.value": Mutability.read_only, + "ims": Mutability.read_write, + "ims.display": Mutability.read_write, + "ims.primary": Mutability.read_write, + "ims.ref": Mutability.read_write, + "ims.type": Mutability.read_write, + "ims.value": Mutability.read_write, + "locale": Mutability.read_write, + "name": Mutability.read_write, + "name.familyName": Mutability.read_write, + "name.formatted": Mutability.read_write, + "name.givenName": Mutability.read_write, + "name.honorificPrefix": Mutability.read_write, + "name.honorificSuffix": Mutability.read_write, + "name.middleName": Mutability.read_write, + "nickName": Mutability.read_write, + "password": Mutability.write_only, + "phoneNumbers": Mutability.read_write, + "phoneNumbers.display": Mutability.read_write, + "phoneNumbers.primary": Mutability.read_write, + "phoneNumbers.ref": Mutability.read_write, + "phoneNumbers.type": Mutability.read_write, + "phoneNumbers.value": Mutability.read_write, + "photos": Mutability.read_write, + "photos.display": Mutability.read_write, + "photos.primary": Mutability.read_write, + "photos.ref": Mutability.read_write, + "photos.type": Mutability.read_write, + "photos.value": Mutability.read_write, + "preferredLanguage": Mutability.read_write, + "profileUrl": Mutability.read_write, + "roles": Mutability.read_write, + "roles.display": Mutability.immutable, + "roles.primary": Mutability.read_write, + "roles.ref": Mutability.read_write, + "roles.type": Mutability.read_write, + "roles.value": Mutability.read_write, + "timezone": Mutability.read_write, + "title": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:department": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:division": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:employeeNumber": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.displayName": Mutability.read_only, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.ref": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.value": Mutability.read_write, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:organization": Mutability.read_write, + "userName": Mutability.read_write, + "userType": Mutability.read_write, + "x509Certificates": Mutability.read_write, + "x509Certificates.display": Mutability.immutable, + "x509Certificates.primary": Mutability.read_write, + "x509Certificates.ref": Mutability.read_write, + "x509Certificates.type": Mutability.read_write, + "x509Certificates.value": Mutability.read_write, + } + + +def test_get_value_by_urn(): + user = User[EnterpriseUser]( + user_name="john.doe", + external_id="ext-12345", + display_name="John Doe", + nick_name="Johnny", + profile_url="https://example.com/profiles/john.doe", + title="Senior Software Engineer", + user_type="Employee", + preferred_language="en-US", + locale="en-US", + timezone="America/New_York", + active=True, + password="SecurePassword123!", + name=Name( + formatted="Mr. John William Doe Jr.", + family_name="Doe", + given_name="John", + middle_name="William", + honorific_prefix="Mr.", + honorific_suffix="Jr.", + ), + emails=[ + Email( + value="john.doe@example.com", + type=Email.Type.work, + primary=True, + display="Work Email", + ), + Email( + value="johndoe@personal.com", + type=Email.Type.home, + primary=False, + display="Personal Email", + ), + ], + phone_numbers=[ + PhoneNumber( + value="+1-555-123-4567", + type=PhoneNumber.Type.work, + primary=True, + display="Work Phone", + ), + PhoneNumber( + value="+1-555-987-6543", + type=PhoneNumber.Type.mobile, + primary=False, + display="Mobile Phone", + ), + ], + ims=[ + Im( + value="john.doe.skype", + type=Im.Type.skype, + primary=True, + display="Skype ID", + ) + ], + photos=[ + Photo( + value="https://example.com/photos/john.jpg", + type=Photo.Type.photo, + primary=True, + display="Profile Photo", + ), + Photo( + value="https://example.com/thumbnails/john.jpg", + type=Photo.Type.thumbnail, + primary=False, + display="Thumbnail", + ), + ], + addresses=[ + Address( + formatted="123 Main St, Suite 456, San Francisco, CA 94102, USA", + street_address="123 Main St, Suite 456", + locality="San Francisco", + region="CA", + postal_code="94102", + country="USA", + type=Address.Type.work, + primary=True, + display="Work Address", + ), + Address( + formatted="789 Oak Ave, New York, NY 10001, USA", + street_address="789 Oak Ave", + locality="New York", + region="NY", + postal_code="10001", + country="USA", + type=Address.Type.home, + primary=False, + display="Home Address", + ), + ], + groups=[ + GroupMembership( + value="group-123", + ref="https://example.com/Groups/group-123", + display="Engineering Team", + type="Team", + ), + GroupMembership( + value="group-456", + ref="https://example.com/Groups/group-456", + display="Managers", + type="Role", + ), + ], + entitlements=[ + Entitlement( + value="read-write", + type="access-level", + primary=True, + display="Read-Write Access", + ), + Entitlement( + value="admin", type="role", primary=False, display="Admin Role" + ), + ], + roles=[ + Role( + value="developer", + type="technical", + primary=True, + display="Senior Developer", + ), + Role( + value="team-lead", type="management", primary=False, display="Team Lead" + ), + ], + x509_certificates=[ + X509Certificate( + value=Base64Bytes( + b"MIIDQzCCAqygAwIBAgICEAAwDQYJKoZIhvcNAQEFBQAwTjEL..." + ), + type="signing", + primary=True, + display="Signing Certificate", + ), + X509Certificate( + value=Base64Bytes( + b"MIIDRzCCAqygAwIBAgICEAAwDQYJKoZIhvcNAQEFBQAwTjEL..." + ), + type="encryption", + primary=False, + display="Encryption Certificate", + ), + ], + ) + user[EnterpriseUser] = EnterpriseUser( + cost_center="CC-1001", + department="Engineering", + division="Product Development", + employee_number="EMP-98765", + organization="Acme Corporation", + manager=Manager( + value="manager-789", + ref="https://example.com/Users/manager-789", + display_name="Jane Smith", + ), + ) + + # Test simple attributes + assert get_value_by_urn(user, "userName") == "john.doe" + assert get_value_by_urn(user, "externalId") == "ext-12345" + assert get_value_by_urn(user, "displayName") == "John Doe" + assert get_value_by_urn(user, "nickName") == "Johnny" + assert ( + get_value_by_urn(user, "profileUrl") == "https://example.com/profiles/john.doe" + ) + assert get_value_by_urn(user, "title") == "Senior Software Engineer" + assert get_value_by_urn(user, "userType") == "Employee" + assert get_value_by_urn(user, "preferredLanguage") == "en-US" + assert get_value_by_urn(user, "locale") == "en-US" + assert get_value_by_urn(user, "timezone") == "America/New_York" + assert get_value_by_urn(user, "active") is True + assert get_value_by_urn(user, "password") == "SecurePassword123!" + + # Test complex attribute (name) + assert get_value_by_urn(user, "name").formatted == "Mr. John William Doe Jr." + assert get_value_by_urn(user, "name.formatted") == "Mr. John William Doe Jr." + assert get_value_by_urn(user, "name.familyName") == "Doe" + assert get_value_by_urn(user, "name.givenName") == "John" + assert get_value_by_urn(user, "name.middleName") == "William" + assert get_value_by_urn(user, "name.honorificPrefix") == "Mr." + assert get_value_by_urn(user, "name.honorificSuffix") == "Jr." + + # Test multi-valued attributes (emails) - not possible until filters are supported + assert get_value_by_urn(user, "emails") == [ + Email( + type=Email.Type.work, + primary=True, + display="Work Email", + value="john.doe@example.com", + ), + Email( + type=Email.Type.home, + primary=False, + display="Personal Email", + value="johndoe@personal.com", + ), + ] + # assert get_value_by_urn(user, "emails.value") == [ + # "john.doe@example.com", + # "johndoe@personal.com", + # ] + # assert get_value_by_urn(user, "emails.type") == [Email.Type.work, Email.Type.home] + # assert get_value_by_urn(user, "emails.primary") == [True, False] + # assert get_value_by_urn(user, "emails.display") == ["Work Email", "Personal Email"] + + # Test phone numbers - not possible until filters are supported + assert get_value_by_urn(user, "phoneNumbers") == [ + PhoneNumber( + value="+1-555-123-4567", + type=PhoneNumber.Type.work, + primary=True, + display="Work Phone", + ), + PhoneNumber( + value="+1-555-987-6543", + type=PhoneNumber.Type.mobile, + primary=False, + display="Mobile Phone", + ), + ] + # assert get_value_by_urn(user, "phoneNumbers.value") == [ + # "+1-555-123-4567", + # "+1-555-987-6543", + # ] + # assert get_value_by_urn(user, "phoneNumbers.type") == [ + # PhoneNumber.Type.work, + # PhoneNumber.Type.mobile, + # ] + # assert get_value_by_urn(user, "phoneNumbers.primary") == [True, False] + # assert get_value_by_urn(user, "phoneNumbers.display") == [ + # "Work Phone", + # "Mobile Phone", + # ] + + # Test IMs - not possible until filters are supported + assert get_value_by_urn(user, "ims") == [ + Im( + value="john.doe.skype", + type=Im.Type.skype, + primary=True, + display="Skype ID", + ) + ] + # assert get_value_by_urn(user, "ims.value") == ["john.doe.skype"] + # assert get_value_by_urn(user, "ims.type") == [Im.Type.skype] + # assert get_value_by_urn(user, "ims.primary") == [True] + # assert get_value_by_urn(user, "ims.display") == ["Skype ID"] + + # Test photos - not possible until filters are supported + assert get_value_by_urn(user, "photos") == [ + Photo( + value="https://example.com/photos/john.jpg", + type=Photo.Type.photo, + primary=True, + display="Profile Photo", + ), + Photo( + value="https://example.com/thumbnails/john.jpg", + type=Photo.Type.thumbnail, + primary=False, + display="Thumbnail", + ), + ] + # assert get_value_by_urn(user, "photos.value") == [ + # "https://example.com/photos/john.jpg", + # "https://example.com/thumbnails/john.jpg", + # ] + # assert get_value_by_urn(user, "photos.type") == [ + # Photo.Type.photo, + # Photo.Type.thumbnail, + # ] + # assert get_value_by_urn(user, "photos.primary") == [True, False] + # assert get_value_by_urn(user, "photos.display") == ["Profile Photo", "Thumbnail"] + + # Test addresses - not possible until filters are supported + assert get_value_by_urn(user, "addresses") == [ + Address( + formatted="123 Main St, Suite 456, San Francisco, CA 94102, USA", + street_address="123 Main St, Suite 456", + locality="San Francisco", + region="CA", + postal_code="94102", + country="USA", + type=Address.Type.work, + primary=True, + display="Work Address", + ), + Address( + formatted="789 Oak Ave, New York, NY 10001, USA", + street_address="789 Oak Ave", + locality="New York", + region="NY", + postal_code="10001", + country="USA", + type=Address.Type.home, + primary=False, + display="Home Address", + ), + ] + # assert get_value_by_urn(user, "addresses.formatted") == [ + # "123 Main St, Suite 456, San Francisco, CA 94102, USA", + # "789 Oak Ave, New York, NY 10001, USA", + # ] + # assert get_value_by_urn(user, "addresses.streetAddress") == [ + # "123 Main St, Suite 456", + # "789 Oak Ave", + # ] + # assert get_value_by_urn(user, "addresses.locality") == ["San Francisco", "New York"] + # assert get_value_by_urn(user, "addresses.region") == ["CA", "NY"] + # assert get_value_by_urn(user, "addresses.postalCode") == ["94102", "10001"] + # assert get_value_by_urn(user, "addresses.country") == ["USA", "USA"] + # assert get_value_by_urn(user, "addresses.type") == [ + # Address.Type.work, + # Address.Type.home, + # ] + # assert get_value_by_urn(user, "addresses.primary") == [True, False] + # assert get_value_by_urn(user, "addresses.display") == [ + # "Work Address", + # "Home Address", + # ] + + # Test groups - not possible until filters are supported + assert get_value_by_urn(user, "groups") == [ + GroupMembership( + value="group-123", + ref="https://example.com/Groups/group-123", + display="Engineering Team", + type="Team", + ), + GroupMembership( + value="group-456", + ref="https://example.com/Groups/group-456", + display="Managers", + type="Role", + ), + ] + # assert get_value_by_urn(user, "groups.value") == ["group-123", "group-456"] + # assert get_value_by_urn(user, "groups.ref") == [ + # "https://example.com/Groups/group-123", + # "https://example.com/Groups/group-456", + # ] + # assert get_value_by_urn(user, "groups.display") == ["Engineering Team", "Managers"] + # assert get_value_by_urn(user, "groups.type") == ["Team", "Role"] + + # Test entitlements - not possible until filters are supported + assert get_value_by_urn(user, "entitlements") == [ + Entitlement( + value="read-write", + type="access-level", + primary=True, + display="Read-Write Access", + ), + Entitlement(value="admin", type="role", primary=False, display="Admin Role"), + ] + # assert get_value_by_urn(user, "entitlements.value") == ["read-write", "admin"] + # assert get_value_by_urn(user, "entitlements.type") == ["access-level", "role"] + # assert get_value_by_urn(user, "entitlements.primary") == [True, False] + # assert get_value_by_urn(user, "entitlements.display") == [ + # "Read-Write Access", + # "Admin Role", + # ] + + # Test roles - not possible until filters are supported + assert get_value_by_urn(user, "roles") == [ + Role( + value="developer", + type="technical", + primary=True, + display="Senior Developer", + ), + Role(value="team-lead", type="management", primary=False, display="Team Lead"), + ] + # assert get_value_by_urn(user, "roles.value") == ["developer", "team-lead"] + # assert get_value_by_urn(user, "roles.type") == ["technical", "management"] + # assert get_value_by_urn(user, "roles.primary") == [True, False] + # assert get_value_by_urn(user, "roles.display") == ["Senior Developer", "Team Lead"] + + # Test x509Certificates - not possible until filters are supported + assert get_value_by_urn(user, "x509Certificates") == [ + X509Certificate( + value=Base64Bytes(b"MIIDQzCCAqygAwIBAgICEAAwDQYJKoZIhvcNAQEFBQAwTjEL..."), + type="signing", + primary=True, + display="Signing Certificate", + ), + X509Certificate( + value=Base64Bytes(b"MIIDRzCCAqygAwIBAgICEAAwDQYJKoZIhvcNAQEFBQAwTjEL..."), + type="encryption", + primary=False, + display="Encryption Certificate", + ), + ] + # assert get_value_by_urn(user, "x509Certificates.type") == ["signing", "encryption"] + # assert get_value_by_urn(user, "x509Certificates.primary") == [True, False] + # assert get_value_by_urn(user, "x509Certificates.display") == [ + # "Signing Certificate", + # "Encryption Certificate", + # ] + + # Test enterprise extension with full URN + assert ( + get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + ).cost_center + == "CC-1001" + ) + assert ( + get_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + ) + == "CC-1001" + ) + assert ( + get_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:department", + ) + == "Engineering" + ) + assert ( + get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:division" + ) + == "Product Development" + ) + assert ( + get_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:employeeNumber", + ) + == "EMP-98765" + ) + assert ( + get_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:organization", + ) + == "Acme Corporation" + ) + + # Test manager sub-attributes + assert ( + get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager" + ).value + == "manager-789" + ) + assert ( + get_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.value", + ) + == "manager-789" + ) + assert ( + get_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.ref", + ) + == "https://example.com/Users/manager-789" + ) + assert ( + get_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.displayName", + ) + == "Jane Smith" + ) + + +def test_set_value_by_urn(): + # Create a minimal user object + user = User[EnterpriseUser]( + user_name="initial.user", + ) + + # Test setting simple attributes + set_value_by_urn(user, "userName", "updated.user") + assert user.user_name == "updated.user" + + set_value_by_urn(user, "externalId", "ext-99999") + assert user.external_id == "ext-99999" + + set_value_by_urn(user, "displayName", "Updated User") + assert user.display_name == "Updated User" + + set_value_by_urn(user, "nickName", "UpdatedNick") + assert user.nick_name == "UpdatedNick" + + set_value_by_urn(user, "profileUrl", "https://example.com/updated") + assert user.profile_url == "https://example.com/updated" + + set_value_by_urn(user, "title", "Updated Title") + assert user.title == "Updated Title" + + set_value_by_urn(user, "userType", "UpdatedType") + assert user.user_type == "UpdatedType" + + set_value_by_urn(user, "preferredLanguage", "fr-FR") + assert user.preferred_language == "fr-FR" + + set_value_by_urn(user, "locale", "fr-FR") + assert user.locale == "fr-FR" + + set_value_by_urn(user, "timezone", "Europe/Paris") + assert user.timezone == "Europe/Paris" + + set_value_by_urn(user, "active", False) + assert user.active is False + + set_value_by_urn(user, "password", "NewPassword456!") + assert user.password == "NewPassword456!" + + # Test setting complex attribute (name) + set_value_by_urn( + user, + "name", + Name(formatted="Ms. Jane Smith", family_name="Smith", given_name="Jane"), + ) + assert user.name.formatted == "Ms. Jane Smith" + assert user.name.family_name == "Smith" + assert user.name.given_name == "Jane" + + # Test setting sub-attributes of complex fields + set_value_by_urn(user, "name.familyName", "Johnson") + assert user.name.family_name == "Johnson" + + set_value_by_urn(user, "name.givenName", "Emily") + assert user.name.given_name == "Emily" + + set_value_by_urn(user, "name.middleName", "Marie") + assert user.name.middle_name == "Marie" + + set_value_by_urn(user, "name.honorificPrefix", "Dr.") + assert user.name.honorific_prefix == "Dr." + + set_value_by_urn(user, "name.honorificSuffix", "PhD") + assert user.name.honorific_suffix == "PhD" + + set_value_by_urn(user, "name.formatted", "Dr. Emily Marie Johnson, PhD") + assert user.name.formatted == "Dr. Emily Marie Johnson, PhD" + + # Test setting multi-valued attributes (emails) + # Setting a single email should wrap it in a list + set_value_by_urn( + user, + "emails", + Email(value="new.email@example.com", type=Email.Type.work, primary=True), + ) + assert len(user.emails) == 1 + assert user.emails[0].value == "new.email@example.com" + + # Test setting multiple emails + set_value_by_urn( + user, + "emails", + [ + Email(value="work@example.com", type=Email.Type.work, primary=True), + Email(value="home@example.com", type=Email.Type.home, primary=False), + ], + ) + assert len(user.emails) == 2 + assert user.emails[0].value == "work@example.com" + assert user.emails[1].value == "home@example.com" + + # Test setting phone numbers + set_value_by_urn( + user, + "phoneNumbers", + PhoneNumber( + value="+33-1-23-45-67-89", type=PhoneNumber.Type.work, primary=True + ), + ) + assert len(user.phone_numbers) == 1 + assert user.phone_numbers[0].value == "+33-1-23-45-67-89" + + # Test setting multiple phone numbers + set_value_by_urn( + user, + "phoneNumbers", + [ + PhoneNumber(value="+33-1-11-11-11-11", type=PhoneNumber.Type.work), + PhoneNumber(value="+33-6-22-22-22-22", type=PhoneNumber.Type.mobile), + ], + ) + assert len(user.phone_numbers) == 2 + assert user.phone_numbers[0].value == "+33-1-11-11-11-11" + assert user.phone_numbers[1].value == "+33-6-22-22-22-22" + + # Test setting IMs + set_value_by_urn( + user, "ims", Im(value="user.teams", type=Im.Type.skype, primary=True) + ) + assert len(user.ims) == 1 + assert user.ims[0].value == "user.teams" + + # Test setting photos + set_value_by_urn( + user, + "photos", + Photo( + value="https://example.com/new-photo.jpg", + type=Photo.Type.photo, + primary=True, + ), + ) + assert len(user.photos) == 1 + assert user.photos[0].value == "https://example.com/new-photo.jpg" + + # Test setting addresses + set_value_by_urn( + user, + "addresses", + Address( + formatted="456 New Street, Paris, France", + street_address="456 New Street", + locality="Paris", + country="France", + type=Address.Type.work, + primary=True, + ), + ) + assert len(user.addresses) == 1 + assert user.addresses[0].formatted == "456 New Street, Paris, France" + assert user.addresses[0].locality == "Paris" + + # Test setting multiple addresses + set_value_by_urn( + user, + "addresses", + [ + Address( + street_address="123 Work St", + locality="Lyon", + country="France", + type=Address.Type.work, + ), + Address( + street_address="456 Home Ave", + locality="Nice", + country="France", + type=Address.Type.home, + ), + ], + ) + assert len(user.addresses) == 2 + assert user.addresses[0].locality == "Lyon" + assert user.addresses[1].locality == "Nice" + + # Test setting entitlements + set_value_by_urn( + user, + "entitlements", + Entitlement(value="full-access", type="permission", primary=True), + ) + assert len(user.entitlements) == 1 + assert user.entitlements[0].value == "full-access" + + # Test setting roles + set_value_by_urn( + user, "roles", Role(value="manager", type="position", primary=True) + ) + assert len(user.roles) == 1 + assert user.roles[0].value == "manager" + + # Test setting x509 certificates + cert_data = "MIIDQzCCAqygAwIBAgICADA=" # Valid base64 string + set_value_by_urn( + user, + "x509Certificates", + X509Certificate(value=cert_data, type="auth", primary=True), + ) + assert len(user.x509_certificates) == 1 + assert user.x509_certificates[0].type == "auth" + + # Test setting enterprise extension + user[EnterpriseUser] = EnterpriseUser() + + # Test setting enterprise attributes with full URN + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + "CC-2002", + ) + assert user[EnterpriseUser].cost_center == "CC-2002" + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:department", + "Marketing", + ) + assert user[EnterpriseUser].department == "Marketing" + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:division", + "Sales Division", + ) + assert user[EnterpriseUser].division == "Sales Division" + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:employeeNumber", + "EMP-11111", + ) + assert user[EnterpriseUser].employee_number == "EMP-11111" + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:organization", + "NewCorp Inc.", + ) + assert user[EnterpriseUser].organization == "NewCorp Inc." + + # Test setting manager + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager", + Manager( + value="manager-999", + ref="https://example.com/Users/manager-999", + display_name="Bob Manager", + ), + ) + assert user[EnterpriseUser].manager.value == "manager-999" + assert user[EnterpriseUser].manager.ref == "https://example.com/Users/manager-999" + assert user[EnterpriseUser].manager.display_name == "Bob Manager" + + # Test setting manager sub-attributes + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.value", + "manager-888", + ) + assert user[EnterpriseUser].manager.value == "manager-888" + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.ref", + "https://example.com/Users/manager-888", + ) + assert user[EnterpriseUser].manager.ref == "https://example.com/Users/manager-888" + + # Test setting manager.displayName (even though it's read-only, it should work) + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager.displayName", + "Alice Director", + ) + assert user[EnterpriseUser].manager.display_name == "Alice Director" + + # Test setting entire extension at once + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + EnterpriseUser( + cost_center="CC-3003", + department="IT", + division="Tech", + employee_number="EMP-33333", + organization="TechCorp", + ), + ) + assert user[EnterpriseUser].cost_center == "CC-3003" + assert user[EnterpriseUser].department == "IT" + assert user[EnterpriseUser].division == "Tech" + assert user[EnterpriseUser].employee_number == "EMP-33333" + assert user[EnterpriseUser].organization == "TechCorp" + + # Test edge cases + # Test setting None value + set_value_by_urn(user, "title", None) + assert user.title is None + + # Test setting empty string + set_value_by_urn(user, "nickName", "") + assert user.nick_name == "" + + # Test setting empty list for multi-valued + set_value_by_urn(user, "emails", []) + assert user.emails == [] + + # Test that read-only fields can now be set (groups is read-only but should work) + set_value_by_urn( + user, "groups", [GroupMembership(value="group-999", display="Test Group")] + ) + # Should now be changed since we no longer filter by mutability + assert len(user.groups) == 1 + assert user.groups[0].value == "group-999" + assert user.groups[0].display == "Test Group" + + +def test_iter_urns_with_required_filter(): + """Test iter_urns with Required filter.""" + required_urns = list(iter_urns(User, required=[Required.true])) + all_urns = list(iter_urns(User)) + + assert len(required_urns) < len(all_urns) + assert "userName" in required_urns + + +def test_iter_urns_with_mutability_filter(): + """Test iter_urns with Mutability filter.""" + read_only_urns = list(iter_urns(User, mutability=[Mutability.read_only])) + all_urns = list(iter_urns(User)) + + assert len(read_only_urns) < len(all_urns) + + +def test_iter_all_urns_with_simple_resource(): + """Test iter_all_urns with a resource that has no extensions.""" + from scim2_models import Group + + urns = list(iter_all_urns(Group)) + assert len(urns) > 0 + assert all(isinstance(urn, tuple) and len(urn) == 2 for urn in urns) + + +def test_get_multiplicity_by_urn(): + """Test get_multiplicity_by_urn function.""" + # Test single-valued attribute + assert get_multiplicity_by_urn(User[EnterpriseUser], "userName") is False + + # Test multi-valued attribute + assert get_multiplicity_by_urn(User[EnterpriseUser], "emails") is True + + # Test sub-attribute of multi-valued + assert get_multiplicity_by_urn(User[EnterpriseUser], "emails.value") is False + + # Test extension attribute + assert ( + get_multiplicity_by_urn( + User[EnterpriseUser], + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + ) + is False + ) + + # Test non-existent attribute returns None + result = get_multiplicity_by_urn(User[EnterpriseUser], "nonExistent") + assert result is None + + +def test_get_attribute_type_by_urn_error_cases(): + """Test error cases for get_attribute_type_by_urn.""" + # Test non-existent URN + assert get_attribute_type_by_urn(User, "nonExistent") is None + + # Test malformed URN + assert get_attribute_type_by_urn(User, "invalid.nonexistent.path") is None + + +def test_get_annotation_by_urn_error_cases(): + """Test error cases for get_annotation_by_urn.""" + # Test non-existent URN + assert get_annotation_by_urn(Mutability, "nonExistent", User) is None + + # Test malformed URN + assert get_annotation_by_urn(Mutability, "invalid.nonexistent.path", User) is None + + +def test_get_target_model_by_urn_with_extensions(): + """Test get_target_model_by_urn with extension URNs.""" + # Test direct extension schema URN + result = get_target_model_by_urn( + User[EnterpriseUser], + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ) + assert result == (User[EnterpriseUser], "EnterpriseUser") + + # Test extension attribute URN + result = get_target_model_by_urn( + User[EnterpriseUser], + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + ) + assert result == (EnterpriseUser, "cost_center") + + +def test_get_value_by_urn_error_cases(): + """Test error cases for get_value_by_urn.""" + user = User(user_name="test") + + # Test non-existent field + assert get_value_by_urn(user, "nonExistent") is None + + # Test accessing field on None object (name is None) + assert get_value_by_urn(user, "name.familyName") is None + + # Test malformed path + assert get_value_by_urn(user, "nonexistent.field") is None + + # Test accessing non-existent sub-field + user.name = Name(given_name="John") + assert get_value_by_urn(user, "name.nonExistent") is None + + +def test_get_value_by_urn_with_extension(): + """Test get_value_by_urn with extension URNs.""" + user = User[EnterpriseUser](user_name="test") + user[EnterpriseUser] = EnterpriseUser(cost_center="CC-123") + + # Test direct extension access + extension = get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + ) + assert extension.cost_center == "CC-123" + + # Test extension attribute access + cost_center = get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter" + ) + assert cost_center == "CC-123" + + +def test_set_value_by_urn_extension_creation(): + """Test set_value_by_urn creates extension objects when needed.""" + user = User[EnterpriseUser](user_name="test") + # Extension not set initially + assert user[EnterpriseUser] is None + + # Setting extension attribute should create extension + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + "CC-456", + ) + + assert user[EnterpriseUser] is not None + assert user[EnterpriseUser].cost_center == "CC-456" + + +def test_set_value_by_urn_complex_object_creation(): + """Test set_value_by_urn creates complex objects when needed.""" + user = User(user_name="test") + # Name is None initially + assert user.name is None + + # Setting sub-attribute should create Name object + set_value_by_urn(user, "name.givenName", "John") + + assert user.name is not None + assert user.name.given_name == "John" + + +def test_set_value_by_urn_error_cases(): + """Test error cases for set_value_by_urn.""" + user = User(user_name="test") + + set_value_by_urn(user, "nonExistent", "value") + + user.emails = [Email(value="test@example.com")] + set_value_by_urn(user, "emails.value", "new@example.com") + assert user.emails[0].value == "test@example.com" + + +def test_set_value_by_urn_with_extension_schema(): + """Test set_value_by_urn with direct extension schema URN.""" + user = User[EnterpriseUser](user_name="test") + + # Set entire extension object + enterprise_data = EnterpriseUser(cost_center="CC-789", department="IT") + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + enterprise_data, + ) + + assert user[EnterpriseUser] == enterprise_data + assert user[EnterpriseUser].cost_center == "CC-789" + assert user[EnterpriseUser].department == "IT" + + +def test_iter_all_urns_non_resource_type(): + """Test iter_all_urns with non-Resource type (should only yield base URNs).""" + # Use Extension directly, not as part of a Resource + urns = list(iter_all_urns(EnterpriseUser)) + # Should only contain URNs from the extension itself, no extension processing + assert len(urns) > 0 + assert all(source_model == EnterpriseUser for _, source_model in urns) + + +def test_get_target_model_by_urn_with_resource_schema(): + """Test get_target_model_by_urn with resource's own schema prefix.""" + # User's own schema URN should work + user_schema = "urn:ietf:params:scim:schemas:core:2.0:User" + result = get_target_model_by_urn(User, f"{user_schema}:userName") + assert result == (User, "user_name") + + +def test_get_target_model_by_urn_complex_path_error(): + """Test get_target_model_by_urn with invalid complex path.""" + # Invalid nested path should return None + result = get_target_model_by_urn(User, "nonexistent.invalid.path") + assert result is None + + +def test_get_value_by_urn_with_resource_schema(): + """Test get_value_by_urn with resource's own schema URN.""" + user = User(user_name="test") + user_schema = "urn:ietf:params:scim:schemas:core:2.0:User" + + # Should work with resource's own schema prefix + value = get_value_by_urn(user, f"{user_schema}:userName") + assert value == "test" + + +def test_get_value_by_urn_extension_not_set(): + """Test get_value_by_urn when extension is not set.""" + user = User[EnterpriseUser](user_name="test") + # Extension not set, should return None for extension URN + cost_center = get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter" + ) + assert cost_center is None + + +def test_set_value_by_urn_with_resource_schema(): + """Test set_value_by_urn with resource's own schema URN.""" + user = User(user_name="test") + user_schema = "urn:ietf:params:scim:schemas:core:2.0:User" + + # Should work with resource's own schema prefix + set_value_by_urn(user, f"{user_schema}:userName", "updated") + assert user.user_name == "updated" + + +def test_set_value_by_urn_cannot_create_complex_object(): + """Test set_value_by_urn when it cannot create a complex object.""" + user = User(user_name="test") + + # Create a scenario where field_type is not a BaseModel subclass + # This is hard to trigger with real SCIM models, but tests the branch + user.emails = [Email(value="test@example.com")] + + # Try to navigate into list - should return early due to isinstance(sub_obj, list) + set_value_by_urn(user, "emails.value", "should_not_change") + # Should not have changed + assert user.emails[0].value == "test@example.com" + + +def test_set_value_by_urn_field_name_none_in_else(): + """Test set_value_by_urn when field_name is None in the else branch.""" + user = User(user_name="test") + + set_value_by_urn(user, "totallyNonExistent", "value") + assert not hasattr(user, "totallyNonExistent") + + +def test_get_target_model_by_urn_unknown_extension(): + """Test get_target_model_by_urn with unknown extension schema.""" + result = get_target_model_by_urn( + User[EnterpriseUser], "urn:unknown:schema:extension:unknown:2.0:User:field" + ) + assert result is None + + +def test_get_value_by_urn_direct_extension_schema(): + """Test get_value_by_urn with direct extension schema URN.""" + user = User[EnterpriseUser](user_name="test") + user[EnterpriseUser] = EnterpriseUser(cost_center="CC-123") + + extension = get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + ) + assert extension is not None + assert extension.cost_center == "CC-123" + + +def test_set_value_ignores_empty_multivalued_field(): + """Test setting nested field on empty multivalued field is ignored.""" + user = User(user_name="test") + user.emails = [] + + set_value_by_urn(user, "emails.value", "test@example.com") + + assert user.emails == [] + + +def test_get_target_model_returns_none_for_nonexistent_intermediate(): + """Test resolving path with nonexistent intermediate field returns None.""" + result = get_target_model_by_urn(User, "nonexistent.field.subfield") + assert result is None + + +def test_get_value_returns_none_for_unset_field(): + """Test getting value from unset optional field returns None.""" + user = User(user_name="test") + user.name = Name() + + value = get_value_by_urn(user, "name.formatted") + + assert value is None + + +def test_set_value_creates_missing_extension(): + """Test setting extension field creates extension object when missing.""" + user = User[EnterpriseUser](user_name="test") + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + "CC-TEST", + ) + + assert user[EnterpriseUser] is not None + assert user[EnterpriseUser].cost_center == "CC-TEST" + + +def test_set_value_creates_complex_object_for_nested_field(): + """Test creating complex object when setting nested field.""" + user = User(user_name="test") + assert user.name is None + + set_value_by_urn(user, "name.givenName", "John") + + assert user.name is not None + assert user.name.given_name == "John" + + +def test_set_value_ignores_nested_field_in_populated_multivalued(): + """Test setting nested field in populated multivalued attribute is ignored.""" + user = User(user_name="test") + user.emails = [Email(value="test@example.com")] + original_email = user.emails[0].value + + set_value_by_urn(user, "emails.value", "new@example.com") + + assert user.emails[0].value == original_email + + +def test_get_target_model_resolves_extension_schema(): + """Test resolving target model for extension URN.""" + result = get_target_model_by_urn( + User[EnterpriseUser], + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:costCenter", + ) + assert result == (EnterpriseUser, "cost_center") + + +def test_get_value_returns_entire_extension_object(): + """Test retrieving entire extension object by schema URN.""" + user = User[EnterpriseUser](user_name="test") + user[EnterpriseUser] = EnterpriseUser(cost_center="CC-123") + + extension = get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + ) + assert extension.cost_center == "CC-123" + + +def test_set_value_replaces_entire_extension_object(): + """Test setting entire extension object by schema URN.""" + user = User[EnterpriseUser](user_name="test") + new_extension = EnterpriseUser(cost_center="CC-456", department="IT") + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + new_extension, + ) + assert user[EnterpriseUser].cost_center == "CC-456" + assert user[EnterpriseUser].department == "IT" + + +def test_set_value_creates_intermediate_complex_object(): + """Test creating intermediate object when setting nested field.""" + user = User(user_name="test") + assert user.name is None + + set_value_by_urn(user, "name.familyName", "Doe") + + assert user.name is not None + assert user.name.family_name == "Doe" + + +def test_set_value_ignores_multivalued_field_navigation(): + """Test setting field on multivalued attribute is ignored.""" + user = User(user_name="test") + user.addresses = [Address(country="US")] + original_country = user.addresses[0].country + + set_value_by_urn(user, "addresses.country", "FR") + + assert user.addresses[0].country == original_country + + +def test_get_target_model_returns_none_for_invalid_nested_path(): + """Test resolving deeply nested invalid path returns None.""" + result = get_target_model_by_urn(User, "name.invalid.deeply.nested") + assert result is None + + +def test_get_target_model_with_unknown_schema_falls_through(): + """Test handling URN with unknown schema that doesn't match extensions.""" + result = get_target_model_by_urn(User[EnterpriseUser], "urn:unknown:schema:field") + assert result is None + + +def test_get_value_by_urn_with_direct_extension_schema_match(): + """Test getting value by direct extension schema match.""" + user = User[EnterpriseUser](user_name="test") + user[EnterpriseUser] = EnterpriseUser(cost_center="CC-123") + + extension = get_value_by_urn( + user, "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + ) + assert extension.cost_center == "CC-123" + + +def test_get_value_by_urn_with_unknown_schema_falls_through(): + """Test getting value with unknown schema that doesn't match extensions.""" + user = User[EnterpriseUser](user_name="test") + + result = get_value_by_urn(user, "urn:unknown:schema:field") + assert result is None + + +def test_set_value_by_urn_with_direct_extension_schema_match(): + """Test setting value by direct extension schema match.""" + user = User[EnterpriseUser](user_name="test") + new_extension = EnterpriseUser(cost_center="CC-456") + + set_value_by_urn( + user, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + new_extension, + ) + assert user[EnterpriseUser] == new_extension + + +def test_set_value_by_urn_with_unknown_schema_falls_through(): + """Test setting value with unknown schema that doesn't match extensions.""" + user = User[EnterpriseUser](user_name="test") + + set_value_by_urn(user, "urn:unknown:schema:field", "value") + + +def test_urn_with_colon_but_no_extension_match(): + """Test URN with colon that doesn't match any extension schema.""" + user = User[EnterpriseUser](user_name="test") + + result = get_target_model_by_urn(user.__class__, "urn:other:schema:userName") + assert result is None + + value = get_value_by_urn(user, "urn:other:schema:userName") + assert value is None + + original_username = user.user_name + set_value_by_urn(user, "urn:other:schema:userName", "testvalue") + assert user.user_name == original_username + + +def test_urn_fallthrough_with_dot(): + """Test URN that falls through extension processing to dot handling.""" + user = User[EnterpriseUser](user_name="test") + + result = get_target_model_by_urn(user.__class__, "urn:other:schema:name.givenName") + assert result is None + + value = get_value_by_urn(user, "urn:other:schema:name.givenName") + assert value is None + + set_value_by_urn(user, "urn:other:schema:name.givenName", "test") + + +def test_extension_loop_fallthrough(): + """Test URN that loops through extensions but finds no match.""" + user = User[EnterpriseUser](user_name="test") + + result = get_target_model_by_urn(user.__class__, "urn:nomatch:userName") + assert result is None + + value = get_value_by_urn(user, "urn:nomatch:userName") + assert value is None + + set_value_by_urn(user, "urn:nomatch:userName", "Jane") + assert user.user_name == "test" + + +def test_get_target_model_by_urn_extension_bad_schema(): + assert get_target_model_by_urn(EnterpriseUser, "bad:urn") is None + + +def test_get_value_by_urn_extension_bad_schema(): + assert get_value_by_urn(EnterpriseUser(), "bad:urn") is None + + +def test_set_value_by_urn_extension_bad_schema(): + obj = EnterpriseUser() + set_value_by_urn(obj, "bad:urn", "foobar") + assert obj == EnterpriseUser() diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..cb95c2c --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,30 @@ +"""Test utilities for SCIM compliance testing.""" + +from typing import Any + +from scim2_models.utils import _to_camel + + +def build_nested_response(base_response: dict, path: str, value: Any) -> dict: + """Build a SCIM response with value at the correct nested path.""" + write_only_attributes = {"password"} + if path in write_only_attributes: + return base_response + + response = base_response.copy() + + if path.startswith("urn:"): + known_extension_urns = { + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + } + + if path in known_extension_urns: + response[path] = value + else: + namespace, field_name = path.rsplit(":", 1) + response[namespace][field_name] = value + + else: + response[_to_camel(path)] = value + + return response