diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..1a26650 --- /dev/null +++ b/conftest.py @@ -0,0 +1,12 @@ +import pydantic +import pytest + +import pydantic_scim2 + + +@pytest.fixture(autouse=True) +def add_doctest_namespace(doctest_namespace): + doctest_namespace["pydantic"] = pydantic + imports = {item: getattr(pydantic_scim2, item) for item in pydantic_scim2.__all__} + doctest_namespace.update(imports) + return doctest_namespace diff --git a/doc/conf.py b/doc/conf.py index e5c1e7b..ba3b42f 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -88,3 +88,9 @@ autodoc_pydantic_model_show_config_summary = False autodoc_pydantic_model_show_field_summary = False autodoc_pydantic_model_show_json = False + +# -- Options for doctest ------------------------------------------- + +doctest_global_setup = """ +from pydantic_scim2 import * +""" diff --git a/doc/tutorial.rst b/doc/tutorial.rst index cc195cb..eeca85f 100644 --- a/doc/tutorial.rst +++ b/doc/tutorial.rst @@ -4,11 +4,12 @@ Tutorial Model parsing ============= -Pydantic :func:`~pydantic.BaseModel.model_validate` method can be used to parse and validate SCIM2 payloads. +Pydantic :func:`~pydantic_scim2.BaseModel.model_validate` method can be used to parse and validate SCIM2 payloads. Python models have generally the same name than in the SCIM specifications, they are simply snake cased. -.. doctest:: +.. code-block:: python + :emphasize-lines: 17 >>> from pydantic_scim2 import User >>> import datetime @@ -36,9 +37,10 @@ Python models have generally the same name than in the SCIM specifications, they Model serialization =================== -Pydantic :func:`~pydantic.BaseModel.model_dump` method can be used to produce valid SCIM2 payloads: +Pydantic :func:`~pydantic_scim2.BaseModel.model_dump` method have been tuned to produce valid SCIM2 payloads. -.. doctest:: +.. code-block:: python + :emphasize-lines: 16 >>> from pydantic_scim2 import User, Meta >>> import datetime @@ -55,7 +57,7 @@ Pydantic :func:`~pydantic.BaseModel.model_dump` method can be used to produce va ... ), ... ) - >>> dump = user.model_dump(exclude_none=True, by_alias=True, mode="json") + >>> dump = user.model_dump() >>> assert dump == { ... "schemas": [ ... "urn:ietf:params:scim:schemas:core:2.0:User" @@ -71,6 +73,69 @@ Pydantic :func:`~pydantic.BaseModel.model_dump` method can be used to produce va ... "userName": "bjensen@example.com" ... } +Contexts +======== + +The SCIM specifications detail some :class:`~pydantic_scim2.Mutability` and :class:`~pydantic_scim2.Returned` parameters for model attributes. +Depending on the context, they will indicate that attributes should be present, absent, be ignored. + +For instance, attributes marked as :attr:`~pydantic_scim2.Mutability.read_only` should not be sent by SCIM clients on resource creation requests. +By passing the right :class:`~pydantic_scim2.Context` to the :meth:`~pydantic_scim2.BaseModel.model_dump` method, only the expected fields will be dumped for this context: + +.. code-block:: python + :caption: Client generating a resource creation request payload + + >>> from pydantic_scim2 import User, Context + >>> user = User(user_name="bjensen@example.com") + >>> payload = user.model_dump(scim_ctx=Context.RESOURCE_CREATION_REQUEST) + +In the same fashion, by passing the right :class:`~pydantic_scim2.Context` to the :meth:`~pydantic_scim2.BaseModel.model_validate` method, +fields with unexpected values will raise :class:`~pydantic.ValidationError`: + +.. code-block:: python + :caption: Server validating a resource creation request payload + + >>> from pydantic_scim2 import User, Context + >>> from pydantic import ValidationError + >>> try: + ... obj = User.model_validate(payload, scim_ctx=Context.RESOURCE_CREATION_REQUEST) + ... except pydantic.ValidationError: + ... obj = Error(...) + +Attributes inclusions and exclusions +==================================== + +In some situations it might be needed to exclude, or only include a given set of attributes when serializing a model. +This happens for instance when servers build response payloads for clients requesting only a sub-set the model attributes. +As defined in :rfc:`RFC7644 §3.9 <7644#section-3.9>`, :code:`attributes` and :code:`excluded_attributes` parameters can +be passed to :meth:`~pydantic_scim2.BaseModel.model_dump`. +The expected attribute notation is the one detailed on :rfc:`RFC7644 §3.10 <7644#section-3.10>`, +like :code:`urn:ietf:params:scim:schemas:core:2.0:User:userName`, or :code:`userName` for short. + +.. code-block:: python + :emphasize-lines: 5 + + >>> from pydantic_scim2 import User, Context + >>> user = User(user_name="bjensen@example.com", display_name="bjensen") + >>> payload = user.model_dump( + ... scim_ctx=Context.RESOURCE_QUERY_REQUEST, + ... excluded_attributes=["displayName"] + ... ) + >>> assert payload == { + ... "schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"], + ... "userName": "bjensen@example.com", + ... "displayName": "bjensen", + ... } + +Values read from :attr:`~pydantic_scim2.SearchRequest.attributes` and :attr:`~pydantic_scim2.SearchRequest.excluded_attributes` in :class:`~pydantic_scim2.SearchRequest` objects can directly be used in :meth:`~pydantic_scim2.BaseModel.model_dump`. + +Attribute inclusions and exclusions interact with attributes :class:`~pydantic_scim2.Returned`, in the server response :class:`Contexts `: + +- attributes annotated with :attr:`~pydantic_scim2.Returned.always` will always be dumped; +- attributes annotated with :attr:`~pydantic_scim2.Returned.never` will never be dumped; +- attributes annotated with :attr:`~pydantic_scim2.Returned.default` will be dumped unless being explicitly excluded; +- attributes annotated with :attr:`~pydantic_scim2.Returned.request` will be not dumped unless being explicitly included. + Typed ListResponse ================== @@ -78,7 +143,8 @@ Typed ListResponse You must pass the type you expect in the response, e.g. :class:`~pydantic_scim2.ListResponse.of(User)` or :class:`~pydantic_scim2.ListResponse.of(User, Group)`. If a response resource type cannot be found, a ``pydantic.ValidationError`` will be raised. -.. doctest:: +.. code-block:: python + :emphasize-lines: 49 >>> from typing import Union >>> from pydantic_scim2 import User, Group, ListResponse @@ -143,7 +209,7 @@ Schema extensions Extensions must be passed as resource type parameter, e.g. ``user = User[EnterpriseUser]`` or ``user = User[EnterpriseUser, SuperHero]``. Extensions attributes are accessed with brackets, e.g. ``user[EnterpriseUser].employee_number``. -.. doctest:: +.. code-block:: python >>> import datetime >>> from pydantic_scim2 import User, EnterpriseUser, Meta @@ -161,7 +227,7 @@ Extensions attributes are accessed with brackets, e.g. ``user[EnterpriseUser].em >>> user[EnterpriseUser] = EnterpriseUser(employee_number = "701984") >>> user[EnterpriseUser].division="Theme Park" - >>> dump = user.model_dump(exclude_none=True, by_alias=True, mode="json") + >>> dump = user.model_dump() >>> assert dump == { ... "schemas": [ ... "urn:ietf:params:scim:schemas:core:2.0:User", @@ -188,11 +254,11 @@ Pre-defined Error objects :rfc:`RFC7643 §3.12 <7643#section-3.12>` pre-defined errors are usable. -.. doctest:: +.. code-block:: python >>> from pydantic_scim2 import InvalidPathError - >>> dump = InvalidPathError.model_dump(exclude_none=True, by_alias=True, mode="json") + >>> dump = InvalidPathError.model_dump() >>> assert dump == { ... 'detail': 'The "path" attribute was invalid or malformed (see Figure 7 of RFC7644).', ... 'schemas': ['urn:ietf:params:scim:api:messages:2.0:Error'], @@ -206,20 +272,34 @@ The exhaustive list is availaible in the :class:`reference >> from pydantic_scim2 import Resource + >>> from typing import Annotated, Optional + >>> from pydantic_scim2 import Resource, Returned, Mutability, ComplexAttribute >>> from enum import Enum - >>> class Pet(Resource): - ... class Type(str, Enum): - ... dog = "dog" - ... cat = "cat" + >>> class PetType(ComplexAttribute): + ... type: Optional[str] + ... """The pet type like 'cat' or 'dog'.""" ... - ... name : str + ... color: Optional[str] + ... """The pet color.""" + + >>> class Pet(Resource): + ... name : Annotated[Optional[str], Mutability.immutable, Returned.always] ... """The name of the pet.""" ... - ... type: Type + ... pet_type: Optional[PetType] ... """The pet type.""" + +You can annotate fields to indicate their :class:`~pydantic_scim2.Mutability` and :class:`~pydantic_scim2.Returned`. +If unset the default values will be :attr:`~pydantic_scim2.Mutability.read_write` and :attr:`~pydantic_scim2.Returned.default`. + +.. warning:: + + Be sure to make all the fields of your model :data:`~typing.Optional`. + There will always be a :class:`~pydantic_scim2.Context` in which this will be true. diff --git a/poetry.lock b/poetry.lock index e82a62a..e284887 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1070,4 +1070,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "b79290c9981a4485d80b7690c199d3074bbdbdb76e351f998b66484001e0fcf3" +content-hash = "dd184ba8886f920b52729d0b9d2f8b84f51fbb2fa1e4f5f99c9632018d072bbf" diff --git a/pydantic_scim2/__init__.py b/pydantic_scim2/__init__.py index 34d064b..728a970 100644 --- a/pydantic_scim2/__init__.py +++ b/pydantic_scim2/__init__.py @@ -1,3 +1,9 @@ +from .base import ComplexAttribute +from .base import Context +from .base import Mutability +from .base import Required +from .base import Returned +from .base import Uniqueness from .rfc7643.enterprise_user import EnterpriseUser from .rfc7643.enterprise_user import Manager from .rfc7643.group import Group @@ -8,10 +14,7 @@ from .rfc7643.resource_type import ResourceType from .rfc7643.resource_type import SchemaExtension from .rfc7643.schema import Attribute -from .rfc7643.schema import Mutability -from .rfc7643.schema import Returned from .rfc7643.schema import Schema -from .rfc7643.schema import Uniqueness from .rfc7643.service_provider import AuthenticationScheme from .rfc7643.service_provider import Bulk from .rfc7643.service_provider import ChangePassword @@ -61,6 +64,8 @@ "BulkRequest", "BulkResponse", "ChangePassword", + "ComplexAttribute", + "Context", "ETag", "Email", "EnterpriseUser", @@ -88,6 +93,7 @@ "PatchOperation", "PhoneNumber", "Photo", + "Required", "Resource", "ResourceType", "Returned", diff --git a/pydantic_scim2/attributes.py b/pydantic_scim2/attributes.py new file mode 100644 index 0000000..8426cc8 --- /dev/null +++ b/pydantic_scim2/attributes.py @@ -0,0 +1,89 @@ +from typing import List +from typing import Optional +from typing import Tuple +from typing import Type + + +def validate_model_attribute(model: Type, attribute_base: str) -> None: + """Validate that an attribute name or a sub-attribute path exist for a + given model.""" + + from pydantic_scim2.base import BaseModel + + attribute_name, *sub_attribute_blocks = attribute_base.split(".") + sub_attribute_base = ".".join(sub_attribute_blocks) + + aliases = {field.alias for field in model.model_fields.values()} + + if attribute_name not in aliases: + raise ValueError( + f"Model '{model.__name__}' has no attribute named '{attribute_name}'" + ) + + if sub_attribute_base: + attribute_type = model.get_field_root_type(attribute_name) + + if not issubclass(attribute_type, BaseModel): + raise ValueError( + f"Attribute '{attribute_name}' is not a complex attribute, and cannot have a '{sub_attribute_base}' sub-attribute" + ) + + validate_model_attribute(attribute_type, sub_attribute_base) + + +def extract_schema_and_attribut_base(attribute_urn: str) -> Tuple[str, str]: + """Extract the schema urn part and the attribute name part from attribute + name, as defined in :rfc:`RFC7644 §3.10 <7644#section-3.10>`.""" + + *urn_blocks, attribute_base = attribute_urn.split(":") + schema = ":".join(urn_blocks) + return schema, attribute_base + + +def validate_attribute_urn( + attribute_name: str, + default_resource: Optional[Type] = None, + resource_types: Optional[List[Type]] = None, +) -> str: + """Validate that an attribute urn is valid or not. + + :parm attribute_name: The attribute urn to check. + :default_resource: The default resource if `attribute_name` is not an absolute urn. + :resource_types: The available resources in which to look for the attribute. + :return: The normalized attribute URN. + """ + + from pydantic_scim2.rfc7643.resource import Resource + + if not resource_types: + resource_types = [] + + if default_resource and default_resource not in resource_types: + resource_types.append(default_resource) + + default_schema = ( + default_resource.model_fields["schemas"].default[0] + if default_resource + else None + ) + + schema, attribute_base = extract_schema_and_attribut_base(attribute_name) + if not schema: + schema = default_schema + + if not schema: + raise ValueError("No default schema and relative URN") + + resource = Resource.get_by_schema(resource_types, schema) + if not resource: + raise ValueError(f"No resource matching schema '{schema}'") + + validate_model_attribute(resource, attribute_base) + + return f"{schema}:{attribute_base}" + + +def contains_attribute_or_subattributes(attribute_urns: List[str], attribute_urn): + return attribute_urn in attribute_urns or any( + item.startswith(f"{attribute_urn}.") for item in attribute_urns + ) diff --git a/pydantic_scim2/base.py b/pydantic_scim2/base.py index 538a986..455dac0 100644 --- a/pydantic_scim2/base.py +++ b/pydantic_scim2/base.py @@ -1,17 +1,535 @@ +from enum import Enum +from enum import auto +from typing import Any +from typing import Dict +from typing import List from typing import Optional +from typing import Type from typing import TypeVar +from typing import Union +from typing import get_args +from typing import get_origin from pydantic import BaseModel from pydantic import ConfigDict +from pydantic import SerializationInfo +from pydantic import SerializerFunctionWrapHandler +from pydantic import ValidationInfo +from pydantic import ValidatorFunctionWrapHandler +from pydantic import field_serializer +from pydantic import field_validator +from pydantic import model_serializer +from pydantic import model_validator from pydantic.alias_generators import to_camel +from pydantic_core import PydanticCustomError +from typing_extensions import Self +from pydantic_scim2.attributes import contains_attribute_or_subattributes +from pydantic_scim2.attributes import validate_attribute_urn + + +class Context(Enum): + """Represent the different HTTP contexts detailed in :rfc:`RFC7644 §3.2 + <7644#section-3.2>` + + Contexts are intented to be used during model validation and serialization. + For instance a client preparing a resource creation POST request can use + :code:`resource.model_dump(Context.RESOURCE_CREATION_REQUEST)` and + the server can then validate it with + :code:`resource.model_validate(Context.RESOURCE_CREATION_REQUEST)`. + """ + + DEFAULT = auto() + """The default context. + + All fields are accepted during validation, and all fields are + serialized during a dump. + """ + + RESOURCE_CREATION_REQUEST = auto() + """The resource creation request context. + + Should be used for clients building a payload for a resource creation request, + and servers validating resource creation request payloads. + + - When used for serialization, it will not dump attributes annotated with :attr:`~pydantic_scim2.Mutability.read_only`. + - When used for validation, it will raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Mutability.read_only`. + """ + + RESOURCE_CREATION_RESPONSE = auto() + """The resource creation response context. + + Should be used for servers building a payload for a resource + creation response, and clients validating resource creation response + payloads. + + - When used for validation, it will raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Returned.never` or when attributes annotated with :attr:`~pydantic_scim2.Returned.always` are missing or :data:`None`; + - When used for serialization, it will: + - always dump attributes annotated with :attr:`~pydantic_scim2.Returned.always`; + - never dump attributes annotated with :attr:`~pydantic_scim2.Returned.never`; + - dump attributes annotated with :attr:`~pydantic_scim2.Returned.default` unless they are explicitly excluded; + - not dump attributes annotated with :attr:`~pydantic_scim2.Returned.request` unless they are explicitly included. + """ + + RESOURCE_QUERY_REQUEST = auto() + """The resource query request context. + + Should be used for clients building a payload for a resource query request, + and servers validating resource query request payloads. + + - When used for serialization, it will not dump attributes annotated with :attr:`~pydantic_scim2.Mutability.write_only`. + - When used for validation, it will raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Mutability.write_only`. + """ + + RESOURCE_QUERY_RESPONSE = auto() + """The resource query response context. + + Should be used for servers building a payload for a resource query + response, and clients validating resource query response payloads. + + - When used for validation, it will raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Returned.never` or when attributes annotated with :attr:`~pydantic_scim2.Returned.always` are missing or :data:`None`; + - When used for serialization, it will: + - always dump attributes annotated with :attr:`~pydantic_scim2.Returned.always`; + - never dump attributes annotated with :attr:`~pydantic_scim2.Returned.never`; + - dump attributes annotated with :attr:`~pydantic_scim2.Returned.default` unless they are explicitly excluded; + - not dump attributes annotated with :attr:`~pydantic_scim2.Returned.request` unless they are explicitly included. + """ + + RESOURCE_REPLACEMENT_REQUEST = auto() + """The resource replacement request context. + + Should be used for clients building a payload for a resource replacement request, + and servers validating resource replacement request payloads. + + - When used for serialization, it will not dump attributes annotated with :attr:`~pydantic_scim2.Mutability.read_only` and :attr:`~pydantic_scim2.Mutability.immutable`. + - When used for validation, it will ignore attributes annotated with :attr:`pydantic_scim2.Mutability.read_only` and raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Mutability.immutable`. + """ + + RESOURCE_REPLACEMENT_RESPONSE = auto() + """The resource replacement response context. + + Should be used for servers building a payload for a resource + replacement response, and clients validating resource query + replacement payloads. + + - When used for validation, it will raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Returned.never` or when attributes annotated with :attr:`~pydantic_scim2.Returned.always` are missing or :data:`None`; + - When used for serialization, it will: + - always dump attributes annotated with :attr:`~pydantic_scim2.Returned.always`; + - never dump attributes annotated with :attr:`~pydantic_scim2.Returned.never`; + - dump attributes annotated with :attr:`~pydantic_scim2.Returned.default` unless they are explicitly excluded; + - not dump attributes annotated with :attr:`~pydantic_scim2.Returned.request` unless they are explicitly included. + """ + + SEARCH_REQUEST = auto() + """The search request context. + + Should be used for clients building a payload for a search request, + and servers validating search request payloads. + + - When used for serialization, it will not dump attributes annotated with :attr:`~pydantic_scim2.Mutability.write_only`. + - When used for validation, it will raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Mutability.write_only`. + """ + + SEARCH_RESPONSE = auto() + """The resource query response context. + + Should be used for servers building a payload for a search response, + and clients validating resource search payloads. + + - When used for validation, it will raise a :class:`~pydantic.ValidationError` when finding attributes annotated with :attr:`~pydantic_scim2.Returned.never` or when attributes annotated with :attr:`~pydantic_scim2.Returned.always` are missing or :data:`None`; + - When used for serialization, it will: + - always dump attributes annotated with :attr:`~pydantic_scim2.Returned.always`; + - never dump attributes annotated with :attr:`~pydantic_scim2.Returned.never`; + - dump attributes annotated with :attr:`~pydantic_scim2.Returned.default` unless they are explicitly excluded; + - not dump attributes annotated with :attr:`~pydantic_scim2.Returned.request` unless they are explicitly included. + """ + + @classmethod + def is_request(cls, ctx: "Context") -> bool: + return ctx in ( + cls.RESOURCE_CREATION_REQUEST, + cls.RESOURCE_QUERY_REQUEST, + cls.RESOURCE_REPLACEMENT_REQUEST, + cls.SEARCH_REQUEST, + ) + + @classmethod + def is_response(cls, ctx: "Context") -> bool: + return ctx in ( + cls.RESOURCE_CREATION_RESPONSE, + cls.RESOURCE_QUERY_RESPONSE, + cls.RESOURCE_REPLACEMENT_RESPONSE, + cls.SEARCH_RESPONSE, + ) + + +class Mutability(str, Enum): + """A single keyword indicating the circumstances under which the value of + the attribute can be (re)defined:""" + + read_only = "readOnly" + """The attribute SHALL NOT be modified.""" + + read_write = "readWrite" + """The attribute MAY be updated and read at any time.""" + + immutable = "immutable" + """The attribute MAY be defined at resource creation (e.g., POST) or at + record replacement via a request (e.g., a PUT). + + The attribute SHALL NOT be updated. + """ + + write_only = "writeOnly" + """The attribute MAY be updated at any time. + + Attribute values SHALL NOT be returned (e.g., because the value is a + stored hash). Note: An attribute with a mutability of "writeOnly" + usually also has a returned setting of "never". + """ + + +class Returned(str, Enum): + """A single keyword that indicates when an attribute and associated values + are returned in response to a GET request or in response to a PUT, POST, or + PATCH request.""" + + always = "always" # cannot be excluded + """The attribute is always returned, regardless of the contents of the + "attributes" parameter. + + For example, "id" is always returned to identify a SCIM resource. + """ + + never = "never" # always excluded + """The attribute is never returned, regardless of the contents of the + "attributes" parameter.""" + + default = "default" # included by default but can be excluded + """The attribute is returned by default in all SCIM operation responses + where attribute values are returned, unless it is explicitly excluded.""" + + request = "request" # excluded by default but can be included + """The attribute is returned in response to any PUT, POST, or PATCH + operations if specified in the "attributes" parameter.""" + + +class Uniqueness(str, Enum): + """A single keyword value that specifies how the service provider enforces + uniqueness of attribute values.""" + + none = "none" + """The values are not intended to be unique in any way.""" + + server = "server" + """The value SHOULD be unique within the context of the current SCIM + endpoint (or tenancy) and MAY be globally unique (e.g., a "username", email + address, or other server-generated key or counter). + + No two resources on the same server SHOULD possess the same value. + """ + + global_ = "global" + """The value SHOULD be globally unique (e.g., an email address, a GUID, or + other value). + + No two resources on any server SHOULD possess the same value. + """ + + +class Required(Enum): + true = True + false = False + + +class BaseModel(BaseModel): + """Base Model for everything.""" -class SCIM2Model(BaseModel): model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True) + @classmethod + def get_field_mutability(cls, field_name: str) -> Mutability: + field_metadata = cls.model_fields[field_name].metadata + + default_mutability = Mutability.read_write + + def mutability_filter(item): + return isinstance(item, Mutability) + + field_mutability = next( + filter(mutability_filter, field_metadata), default_mutability + ) + return field_mutability + + @classmethod + def get_field_returnability(cls, field_name: str) -> Returned: + field_metadata = cls.model_fields[field_name].metadata + default_returned = Returned.default + + def returned_filter(item): + return isinstance(item, Returned) + + field_returned = next(filter(returned_filter, field_metadata), default_returned) + return field_returned + + def get_attribute_urn(self, field_name: str) -> Returned: + """Build the full URN of the attribute. + + See :rfc:`RFC7644 §3.12 <7644#section-3.12>`. + + .. todo:: Actually *guess* the URN instead of using the hacky `_attribute_urn` attribute. + """ + alias = self.model_fields[field_name].alias or field_name + return f"{self._attribute_urn}.{alias}" + + @classmethod + def get_field_root_type(cls, attribute_name: str) -> Type: + """Extract the root type from a model field. + + For example, return 'GroupMember' for + 'Optional[List[GroupMember]]' + """ + + attribute_type = cls.model_fields[attribute_name].annotation + + # extract 'x' from 'Optional[x]' + if get_origin(attribute_type) is Union: + attribute_type = get_args(attribute_type)[0] + + # extract 'x' from 'List[x]' + if isinstance(get_origin(attribute_type), Type) and issubclass( + get_origin(attribute_type), List + ): + attribute_type = get_args(attribute_type)[0] + + return attribute_type + + @field_validator("*") + @classmethod + def check_request_mutability(cls, value: Any, info: ValidationInfo) -> Any: + """Check that the field mutability is expected according to the + requests validation context, as defined in :rfc:`RFC7643 §7 + <7653#section-7>`.""" + if ( + not info.context + or not info.context.get("scim") + or not Context.is_request(info.context["scim"]) + ): + return value + + context = info.context.get("scim") + mutability = cls.get_field_mutability(info.field_name) + exc = PydanticCustomError( + "mutability_error", + "Field '{field_name}' has mutability '{field_mutability}' but this in not valid in {context} context", + { + "field_name": info.field_name, + "field_mutability": mutability, + "context": context.name.lower().replace("_", " "), + }, + ) + + if ( + context == Context.RESOURCE_CREATION_REQUEST + and mutability == Mutability.read_only + ): + raise exc + + if ( + context in (Context.RESOURCE_QUERY_REQUEST, Context.SEARCH_REQUEST) + and mutability == Mutability.write_only + ): + raise exc + + if ( + context == Context.RESOURCE_REPLACEMENT_REQUEST + and mutability == Mutability.immutable + ): + raise exc + + if ( + context == Context.RESOURCE_REPLACEMENT_REQUEST + and mutability == Mutability.read_only + ): + return None + + return value + + @model_validator(mode="wrap") + @classmethod + def check_response_returnability( + cls, value: Any, handler: ValidatorFunctionWrapHandler, info: ValidationInfo + ) -> Self: + """Check that the fields returnability is expected according to the + responses validation context, as defined in :rfc:`RFC7643 §7 + <7653#section-7>`.""" + if ( + not info.context + or not info.context.get("scim") + or not Context.is_response(info.context["scim"]) + ): + return handler(value) + + for field_name, field in cls.model_fields.items(): + returnability = cls.get_field_returnability(field_name) + alias = field.alias or field_name + + if returnability == Returned.always and value.get(alias) is None: + raise PydanticCustomError( + "returned_error", + "Field '{field_name}' has returnability 'always' but value is missing or null", + { + "field_name": field_name, + }, + ) + + if returnability == Returned.never and value.get(alias) is not None: + raise PydanticCustomError( + "returned_error", + "Field '{field_name}' has returnability 'never' but value is set", + { + "field_name": field_name, + }, + ) + + return handler(value) + + @field_serializer("*", mode="wrap") + def scim_serializer( + self, + value: Any, + handler: SerializerFunctionWrapHandler, + info: SerializationInfo, + ) -> Any: + """Serialize the fields according to mutability indications passed in + the serialization context.""" + + value = handler(value) + + if info.context.get("scim") and Context.is_request(info.context["scim"]): + value = self.scim_request_serializer(value, info) + + if info.context.get("scim") and Context.is_response(info.context["scim"]): + value = self.scim_response_serializer(value, info) + + return value + + def scim_request_serializer(self, value: Any, info: SerializationInfo) -> Any: + """Serialize the fields according to mutability indications passed in + the serialization context.""" + + mutability = self.get_field_mutability(info.field_name) + context = info.context.get("scim") + + if ( + context == Context.RESOURCE_CREATION_REQUEST + and mutability == Mutability.read_only + ): + return None + + if ( + context + in ( + Context.RESOURCE_QUERY_REQUEST, + Context.SEARCH_REQUEST, + ) + and mutability == Mutability.write_only + ): + return None + + if context == Context.RESOURCE_REPLACEMENT_REQUEST and mutability in ( + Mutability.immutable, + Mutability.read_only, + ): + return None + + return value + + def scim_response_serializer(self, value: Any, info: SerializationInfo) -> Any: + """Serialize the fields according to returability indications passed in + the serialization context.""" + + returnability = self.get_field_returnability(info.field_name) + attribute_urn = self.get_attribute_urn(info.field_name) + included_urns = info.context.get("scim_attributes", []) + excluded_urns = info.context.get("scim_excluded_attributes", []) + + if returnability == Returned.never: + return None + + if returnability == Returned.default and ( + ( + included_urns + and not contains_attribute_or_subattributes( + included_urns, attribute_urn + ) + ) + or attribute_urn in excluded_urns + ): + return None + + if returnability == Returned.request and attribute_urn not in included_urns: + return None + + return value + + @model_serializer(mode="wrap") + def model_serializer_exclude_none( + self, handler, info: SerializationInfo + ) -> Dict[str, Any]: + """Remove `None` values inserted by the + :meth:`~pydantic_scim2.base.BaseModel.scim_field_serializer`.""" + + result = handler(self) + return {key: value for key, value in result.items() if value is not None} + + @classmethod + def model_validate( + cls, *args, scim_ctx: Optional[Context] = Context.DEFAULT, **kwargs + ) -> "BaseModel": + """Validate SCIM payloads and generate model representation by using + Pydantic :code:`BaseModel.model_validate`.""" + + kwargs.setdefault("context", {}).setdefault("scim", scim_ctx) + return super().model_validate(*args, **kwargs) + + def model_dump( + self, + *args, + scim_ctx: Optional[Context] = Context.DEFAULT, + attributes: Optional[List[str]] = None, + excluded_attributes: Optional[List[str]] = None, + **kwargs, + ): + """Create a model representation that can be included in SCIM messages + by using Pydantic :code:`BaseModel.model_dump`. + + :param scim_ctx: If a SCIM context is passed, some default values of + Pydantic :code:`BaseModel.model_dump` are tuned to generate valid SCIM + messages. Pass :data:`None` to get the default Pydantic behavior. + """ + + kwargs.setdefault("context", {}).setdefault("scim", scim_ctx) + kwargs["context"]["scim_attributes"] = [ + validate_attribute_urn(attribute, self.__class__) + for attribute in (attributes or []) + ] + kwargs["context"]["scim_excluded_attributes"] = [ + validate_attribute_urn(attribute, self.__class__) + for attribute in (excluded_attributes or []) + ] + + if scim_ctx: + kwargs.setdefault("exclude_none", True) + kwargs.setdefault("by_alias", True) + kwargs.setdefault("mode", "json") + + return super().model_dump(*args, **kwargs) + -def int_to_str(status: Optional[int]) -> Optional[str]: - return None if status is None else str(status) +class ComplexAttribute(BaseModel): + """A complex attribute as defined in :rfc:`RFC7643 §2.3.8 + <7643#section-2.3.8>`.""" -AnyModel = TypeVar("AnyModel", bound=SCIM2Model) +AnyModel = TypeVar("AnyModel", bound=BaseModel) diff --git a/pydantic_scim2/rfc7643/enterprise_user.py b/pydantic_scim2/rfc7643/enterprise_user.py index d28905f..e6b1003 100644 --- a/pydantic_scim2/rfc7643/enterprise_user.py +++ b/pydantic_scim2/rfc7643/enterprise_user.py @@ -4,10 +4,14 @@ from pydantic import AnyUrl from pydantic import Field -from ..base import SCIM2Model +from ..base import ComplexAttribute -class Manager(SCIM2Model): +class Manager(ComplexAttribute): + _attribute_urn: str = ( + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager" + ) + value: Optional[str] = None """The id of the SCIM resource representingthe User's manager.""" @@ -21,7 +25,7 @@ class Manager(SCIM2Model): """The displayName of the User's manager.""" -class EnterpriseUser(SCIM2Model): +class EnterpriseUser(ComplexAttribute): schemas: List[str] = ["urn:ietf:params:scim:schemas:extension:enterprise:2.0:User"] employee_number: Optional[str] = None diff --git a/pydantic_scim2/rfc7643/group.py b/pydantic_scim2/rfc7643/group.py index 4f089ce..994d03a 100644 --- a/pydantic_scim2/rfc7643/group.py +++ b/pydantic_scim2/rfc7643/group.py @@ -1,22 +1,27 @@ +from typing import Annotated from typing import List from typing import Optional from pydantic import AnyUrl from pydantic import Field -from ..base import SCIM2Model +from ..base import ComplexAttribute +from ..base import Mutability from .resource import Resource -class GroupMember(SCIM2Model): - value: Optional[str] = None +class GroupMember(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:Group.members" - display: Optional[str] = None + value: Annotated[Optional[str], Mutability.immutable] = None + """Identifier of the member of this Group.""" - type: Optional[str] = None + display: Annotated[Optional[str], Mutability.immutable] = None + + type: Annotated[Optional[str], Mutability.immutable] = None """A label indicating the attribute's function, e.g., "work" or "home".""" - ref: Optional[AnyUrl] = Field(None, alias="$ref") + ref: Annotated[Optional[AnyUrl], Mutability.immutable] = Field(None, alias="$ref") """The reference URI of a target resource, if the attribute is a reference.""" diff --git a/pydantic_scim2/rfc7643/resource.py b/pydantic_scim2/rfc7643/resource.py index 70c9be7..a244b89 100644 --- a/pydantic_scim2/rfc7643/resource.py +++ b/pydantic_scim2/rfc7643/resource.py @@ -1,6 +1,7 @@ from datetime import datetime from typing import Annotated from typing import Any +from typing import Dict from typing import Generic from typing import List from typing import Optional @@ -18,10 +19,13 @@ from typing_extensions import Self from ..base import AnyModel -from ..base import SCIM2Model +from ..base import ComplexAttribute +from ..base import Mutability +from ..base import Returned +from ..base import Uniqueness -class Meta(SCIM2Model): +class Meta(ComplexAttribute): """All "meta" sub-attributes are assigned by the service provider (have a "mutability" of "readOnly"), and all of these sub-attributes have a "returned" characteristic of "default". @@ -76,7 +80,7 @@ class Meta(SCIM2Model): """ -class Resource(SCIM2Model, Generic[AnyModel]): +class Resource(ComplexAttribute, Generic[AnyModel]): model_config = ConfigDict(extra="allow") schemas: List[str] @@ -88,7 +92,9 @@ class Resource(SCIM2Model, Generic[AnyModel]): # Common attributes as defined by # https://www.rfc-editor.org/rfc/rfc7643#section-3.1 - id: Optional[str] = None + id: Annotated[ + Optional[str], Mutability.read_only, Returned.always, Uniqueness.global_ + ] = None """A unique identifier for a SCIM resource as defined by the service provider. @@ -96,44 +102,74 @@ class Resource(SCIM2Model, Generic[AnyModel]): resource creation or replacement requests. """ - external_id: Optional[str] = None + external_id: Annotated[Optional[str], Mutability.read_write, Returned.default] = ( + None + ) """A String that is an identifier for the resource as defined by the provisioning client.""" - meta: Optional[Meta] = None + meta: Annotated[Optional[Meta], Mutability.read_only, Returned.default] = None """A complex attribute containing resource metadata.""" def __getitem__(self, item: Any): - if not isinstance(item, type) or not issubclass(item, SCIM2Model): + if not isinstance(item, type) or not issubclass(item, ComplexAttribute): raise KeyError(f"{item} is not a valid extension type") schema = item.model_fields["schemas"].default[0] return getattr(self, schema) def __setitem__(self, item: Any, value: "Resource"): - if not isinstance(item, type) or not issubclass(item, SCIM2Model): + if not isinstance(item, type) or not issubclass(item, ComplexAttribute): raise KeyError(f"{item} is not a valid extension type") schema = item.model_fields["schemas"].default[0] setattr(self, schema, value) + @classmethod + def get_extension_models(cls) -> Dict[str, Type]: + """Return extension a dict associating extension models with their + schemas.""" + + extension_models = cls.__pydantic_generic_metadata__.get("args", []) + by_schema = { + ext.model_fields["schemas"].default[0]: ext for ext in extension_models + } + return by_schema + + @staticmethod + def get_by_schema( + resource_types: List[Type], schema: str, with_extensions=True + ) -> Optional[Type]: + """Given a resource type list and a schema, find the matching resource + type.""" + + by_schema = { + resource_type.model_fields["schemas"].default[0]: resource_type + for resource_type in (resource_types or []) + } + if with_extensions: + for resource_type in list(by_schema.values()): + by_schema.update(**resource_type.get_extension_models()) + + return by_schema.get(schema) + @model_validator(mode="after") def load_model_extensions(self) -> Self: """Instanciate schema objects if found in the payload.""" - extension_models = self.__pydantic_generic_metadata__.get("args") - if not extension_models: - return self - main_schema = self.model_fields["schemas"].default[0] - by_schema = { - ext.model_fields["schemas"].default[0]: ext for ext in extension_models - } + extension_models = self.get_extension_models() for schema in self.schemas: if schema == main_schema: continue - model = by_schema[schema] + try: + model = extension_models[schema] + except KeyError as exc: + raise ValueError( + f"No extension model found for schema '{schema}'" + ) from exc + if payload := getattr(self, schema, None): setattr(self, schema, model.model_validate(payload)) @@ -152,6 +188,17 @@ def set_extension_schemas(self, schemas: List[str]): ] return schemas + def get_attribute_urn(self, field_name: str) -> Returned: + """Build the full URN of the attribute. + + See :rfc:`RFC7644 §3.12 <7644#section-3.12>`. + + .. todo:: Actually *guess* the URN instead of using the hacky `_schema` attribute. + """ + main_schema = self.model_fields["schemas"].default[0] + alias = self.model_fields[field_name].alias or field_name + return f"{main_schema}:{alias}" + AnyResource = TypeVar("AnyResource", bound="Resource") diff --git a/pydantic_scim2/rfc7643/resource_type.py b/pydantic_scim2/rfc7643/resource_type.py index 0334a10..94227fc 100644 --- a/pydantic_scim2/rfc7643/resource_type.py +++ b/pydantic_scim2/rfc7643/resource_type.py @@ -1,18 +1,27 @@ +from typing import Annotated from typing import List from typing import Optional from pydantic import AnyUrl from pydantic import Field -from ..base import SCIM2Model +from ..base import ComplexAttribute +from ..base import Mutability +from ..base import Required from .resource import Resource -class SchemaExtension(SCIM2Model): - schema_: AnyUrl = Field(..., alias="schema") +class SchemaExtension(ComplexAttribute): + _attribute_urn: str = ( + "urn:ietf:params:scim:schemas:core:2.0:ResourceType.schemaExtensions" + ) + + schema_: Annotated[AnyUrl, Mutability.read_only, Required.true] = Field( + ..., alias="schema" + ) """The URI of a schema extension.""" - required: bool + required: Annotated[bool, Mutability.read_only, Required.true] """A Boolean value that specifies whether or not the schema extension is required for the resource type. @@ -26,31 +35,35 @@ class SchemaExtension(SCIM2Model): class ResourceType(Resource): schemas: List[str] = ["urn:ietf:params:scim:schemas:core:2.0:ResourceType"] - id: Optional[str] = None + id: Annotated[Optional[str], Mutability.read_only] = None """The resource type's server unique id. May be the same as the 'name' attribute. """ - name: str + name: Annotated[str, Mutability.read_only, Required.true] """The resource type name. When applicable, service providers MUST specify the name, e.g., 'User'. """ - description: Optional[str] = None + description: Annotated[Optional[str], Mutability.read_only] = None """The resource type's human-readable description. When applicable, service providers MUST specify the description. """ - endpoint: str + endpoint: Annotated[str, Mutability.read_only, Required.true] """The resource type's HTTP-addressable endpoint relative to the Base URL, e.g., '/Users'.""" - schema_: AnyUrl = Field(..., alias="schema") + schema_: Annotated[AnyUrl, Mutability.read_only, Required.true] = Field( + ..., alias="schema" + ) """The resource type's primary/base schema URI.""" - schema_extensions: Optional[List[SchemaExtension]] = None + schema_extensions: Annotated[ + Optional[List[SchemaExtension]], Mutability.read_only, Required.true + ] = None """A list of URIs of the resource type's schema extensions.""" diff --git a/pydantic_scim2/rfc7643/schema.py b/pydantic_scim2/rfc7643/schema.py index f4cd95d..2217183 100644 --- a/pydantic_scim2/rfc7643/schema.py +++ b/pydantic_scim2/rfc7643/schema.py @@ -1,32 +1,18 @@ from enum import Enum +from typing import Annotated from typing import List from typing import Optional -from ..base import SCIM2Model +from ..base import ComplexAttribute +from ..base import Mutability +from ..base import Returned +from ..base import Uniqueness from .resource import Meta -class Mutability(str, Enum): - read_only = "readOnly" - read_write = "readWrite" - immutable = "immutable" - write_only = "writeOnly" +class Attribute(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:Schema.attributes" - -class Returned(str, Enum): - always = "always" - never = "never" - default = "default" - request = "request" - - -class Uniqueness(str, Enum): - none = "none" - server = "server" - global_ = "global" - - -class Attribute(SCIM2Model): class Type(str, Enum): string = "string" boolean = "boolean" @@ -37,67 +23,67 @@ class Type(str, Enum): binary = "binary" complex = "complex" - name: str + name: Annotated[str, Mutability.read_only] """The attribute's name.""" - type: Type + type: Annotated[Type, Mutability.read_only] """The attribute's data type.""" - sub_attributes: Optional[List["Attribute"]] = None + sub_attributes: Annotated[Optional[List["Attribute"]], Mutability.read_only] = None """When an attribute is of type "complex", "subAttributes" defines a set of sub-attributes.""" - multi_valued: bool + multi_valued: Annotated[bool, Mutability.read_only] """A Boolean value indicating the attribute's plurality.""" - description: str + description: Annotated[str, Mutability.read_only] """The attribute's human-readable description.""" - required: bool + required: Annotated[bool, Mutability.read_only] """A Boolean value that specifies whether or not the attribute is required.""" - canonical_values: Optional[List[str]] = None + canonical_values: Annotated[Optional[List[str]], Mutability.read_only] = None """A collection of suggested canonical values that MAY be used (e.g., "work" and "home").""" - case_exact: bool = True + case_exact: Annotated[bool, Mutability.read_only] = True """A Boolean value that specifies whether or not a string attribute is case sensitive.""" - mutability: Mutability = Mutability.read_write + mutability: Annotated[Mutability, Mutability.read_only] = Mutability.read_write """A single keyword indicating the circumstances under which the value of the attribute can be (re)defined.""" - returned: Returned = Returned.default + returned: Annotated[Returned, Mutability.read_only] = Returned.default """A single keyword that indicates when an attribute and associated values are returned in response to a GET request or in response to a PUT, POST, or PATCH request.""" - uniqueness: Uniqueness = Uniqueness.none + uniqueness: Annotated[Uniqueness, Mutability.read_only] = Uniqueness.none """A single keyword value that specifies how the service provider enforces uniqueness of attribute values.""" - reference_types: Optional[List[str]] = None + reference_types: Annotated[Optional[List[str]], Mutability.read_only] = None """A multi-valued array of JSON strings that indicate the SCIM resource types that may be referenced.""" -class Schema(SCIM2Model): +class Schema(ComplexAttribute): schemas: List[str] = ["urn:ietf:params:scim:schemas:core:2.0:Schema"] - id: str + meta: Annotated[Optional[Meta], Mutability.read_only] = None + """A complex attribute containing resource metadata.""" + + id: Annotated[str, Mutability.read_only] """The unique URI of the schema.""" - name: Optional[str] = None + name: Annotated[Optional[str], Mutability.read_only] = None """The schema's human-readable name.""" - description: Optional[str] = None + description: Annotated[Optional[str], Mutability.read_only] = None """The schema's human-readable description.""" - attributes: List[Attribute] + attributes: Annotated[List[Attribute], Mutability.read_only] """A complex type that defines service provider attributes and their qualities via the following set of sub-attributes.""" - - meta: Optional[Meta] = None - """A complex attribute containing resource metadata.""" diff --git a/pydantic_scim2/rfc7643/service_provider.py b/pydantic_scim2/rfc7643/service_provider.py index 232b1ed..beeabda 100644 --- a/pydantic_scim2/rfc7643/service_provider.py +++ b/pydantic_scim2/rfc7643/service_provider.py @@ -1,53 +1,56 @@ from enum import Enum +from typing import Annotated from typing import List from typing import Optional from pydantic import AnyUrl -from ..base import SCIM2Model +from ..base import ComplexAttribute +from ..base import Mutability +from ..base import Required from .resource import Resource -class Patch(SCIM2Model): - supported: bool +class Patch(ComplexAttribute): + supported: Annotated[bool, Mutability.read_only, Required.true] """A Boolean value specifying whether or not the operation is supported.""" -class Bulk(SCIM2Model): - supported: bool +class Bulk(ComplexAttribute): + supported: Annotated[bool, Mutability.read_only, Required.true] """A Boolean value specifying whether or not the operation is supported.""" - max_operations: int + max_operations: Annotated[int, Mutability.read_only, Required.true] """An integer value specifying the maximum number of operations.""" - max_payload_size: int + max_payload_size: Annotated[int, Mutability.read_only, Required.true] """An integer value specifying the maximum payload size in bytes.""" -class Filter(SCIM2Model): - supported: bool +class Filter(ComplexAttribute): + supported: Annotated[bool, Mutability.read_only, Required.true] """A Boolean value specifying whether or not the operation is supported.""" - max_results: Optional[int] = None + max_results: Annotated[Optional[int], Mutability.read_only, Required.true] = None """A Boolean value specifying whether or not the operation is supported.""" -class ChangePassword(SCIM2Model): - supported: bool +class ChangePassword(ComplexAttribute): + supported: Annotated[bool, Mutability.read_only, Required.true] """A Boolean value specifying whether or not the operation is supported.""" -class Sort(SCIM2Model): - supported: bool +class Sort(ComplexAttribute): + supported: Annotated[bool, Mutability.read_only, Required.true] """A Boolean value specifying whether or not the operation is supported.""" -class ETag(SCIM2Model): - supported: bool +class ETag(ComplexAttribute): + supported: Annotated[bool, Mutability.read_only] """A Boolean value specifying whether or not the operation is supported.""" -class AuthenticationScheme(SCIM2Model): +class AuthenticationScheme(ComplexAttribute): class Type(str, Enum): oauth = "oauth" oauth2 = "oauth2" @@ -55,58 +58,57 @@ class Type(str, Enum): httpbasic = "httpbasic" httpdigest = "httpdigest" - type: Type + type: Annotated[Type, Mutability.read_only] """The authentication scheme.""" - name: str + name: Annotated[str, Mutability.read_only, Required.true] """The common authentication scheme name, e.g., HTTP Basic.""" - description: str + description: Annotated[str, Mutability.read_only, Required.true] """A description of the authentication scheme.""" - spec_uri: Optional[AnyUrl] + spec_uri: Annotated[Optional[AnyUrl], Mutability.read_only] """An HTTP-addressable URL pointing to the authentication scheme's specification.""" - documentation_uri: Optional[AnyUrl] = None + documentation_uri: Annotated[Optional[AnyUrl], Mutability.read_only] = None """An HTTP-addressable URL pointing to the authentication scheme's usage documentation.""" - primary: Optional[bool] = None + primary: Annotated[Optional[bool], Mutability.read_only] = None """A Boolean value indicating the 'primary' or preferred attribute value for this attribute, e.g., the preferred mailing address or primary email - address. - - The primary attribute value 'true' MUST appear no more than once. - """ + address.""" class ServiceProviderConfiguration(Resource): schemas: List[str] = ["urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig"] - documentation_uri: Optional[AnyUrl] = None + documentation_uri: Annotated[Optional[AnyUrl], Mutability.read_only] = None """An HTTP-addressable URL pointing to the service provider's human- consumable help documentation.""" - patch: Patch + patch: Annotated[Patch, Mutability.read_only, Required.true] """A complex type that specifies PATCH configuration options.""" - bulk: Bulk + bulk: Annotated[Bulk, Mutability.read_only, Required.true] """A complex type that specifies bulk configuration options.""" - filter: Filter + filter: Annotated[Filter, Mutability.read_only, Required.true] """A complex type that specifies FILTER options.""" - change_password: ChangePassword + change_password: Annotated[ChangePassword, Mutability.read_only, Required.true] """A complex type that specifies configuration options related to changing a password.""" - sort: Sort + sort: Annotated[Sort, Mutability.read_only, Required.true] """A complex type that specifies sort result options.""" - etag: ETag + etag: Annotated[ETag, Mutability.read_only] """A complex type that specifies ETag configuration options.""" - authentication_schemes: List[AuthenticationScheme] + authentication_schemes: Annotated[ + List[AuthenticationScheme], Mutability.read_only, Required.true + ] """A complex type that specifies supported authentication scheme properties.""" diff --git a/pydantic_scim2/rfc7643/user.py b/pydantic_scim2/rfc7643/user.py index ded2877..843bd53 100644 --- a/pydantic_scim2/rfc7643/user.py +++ b/pydantic_scim2/rfc7643/user.py @@ -1,16 +1,22 @@ from enum import Enum +from typing import Annotated from typing import List from typing import Optional from pydantic import AnyUrl from pydantic import EmailStr -from ..base import SCIM2Model +from ..base import ComplexAttribute +from ..base import Mutability +from ..base import Required +from ..base import Uniqueness from .group import GroupMember from .resource import Resource -class Name(SCIM2Model): +class Name(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.name" + formatted: Optional[str] = None """The full name, including all middle names, titles, and suffixes as appropriate, formatted for display (e.g., 'Ms. Barbara J Jensen, III').""" @@ -36,7 +42,9 @@ class Name(SCIM2Model): languages (e.g., 'III' given the full name 'Ms. Barbara J Jensen, III').""" -class Email(SCIM2Model): +class Email(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.emails" + class Type(str, Enum): work = "work" home = "home" @@ -57,7 +65,9 @@ class Type(str, Enum): address.""" -class PhoneNumber(SCIM2Model): +class PhoneNumber(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.phoneNumbers" + class Type(str, Enum): work = "work" home = "home" @@ -82,7 +92,9 @@ class Type(str, Enum): number.""" -class Im(SCIM2Model): +class Im(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.ims" + class Type(str, Enum): aim = "aim" gtalk = "gtalk" @@ -108,7 +120,9 @@ class Type(str, Enum): for this attribute, e.g., the preferred messenger or primary messenger.""" -class Photo(SCIM2Model): +class Photo(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.photos" + class Type(str, Enum): photo = "photo" thumbnail = "thumbnail" @@ -128,7 +142,9 @@ class Type(str, Enum): for this attribute, e.g., the preferred photo or thumbnail.""" -class Address(SCIM2Model): +class Address(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.addresses" + class Type(str, Enum): work = "work" home = "home" @@ -165,7 +181,9 @@ class Type(str, Enum): for this attribute, e.g., the preferred photo or thumbnail.""" -class Entitlement(SCIM2Model): +class Entitlement(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.entitlements" + value: Optional[str] = None """The value of an entitlement.""" @@ -180,7 +198,9 @@ class Entitlement(SCIM2Model): for this attribute.""" -class Role(SCIM2Model): +class Role(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.roles" + value: Optional[str] = None """The value of a role.""" @@ -195,7 +215,9 @@ class Role(SCIM2Model): for this attribute.""" -class X509Certificate(SCIM2Model): +class X509Certificate(ComplexAttribute): + _attribute_urn: str = "urn:ietf:params:scim:schemas:core:2.0:User.x509Certificates" + value: Optional[str] = None """The value of an X.509 certificate.""" @@ -211,7 +233,9 @@ class X509Certificate(SCIM2Model): class User(Resource): - user_name: Optional[str] = None + schemas: List[str] = ["urn:ietf:params:scim:schemas:core:2.0:User"] + + user_name: Annotated[Optional[str], Uniqueness.server, Required.true] = None """Unique identifier for the User, typically used by the user to directly authenticate to the service provider.""" @@ -257,7 +281,7 @@ class User(Resource): active: Optional[bool] = None """A Boolean value indicating the User's administrative status.""" - password: Optional[str] = None + password: Annotated[Optional[str], Mutability.write_only] = None """The User's cleartext password.""" emails: Optional[List[Email]] = None @@ -289,5 +313,3 @@ class User(Resource): x509_certificates: Optional[List[X509Certificate]] = None """A list of certificates issued to the User.""" - - schemas: List[str] = ["urn:ietf:params:scim:schemas:core:2.0:User"] diff --git a/pydantic_scim2/rfc7644/bulk.py b/pydantic_scim2/rfc7644/bulk.py index c63d2c5..3194514 100644 --- a/pydantic_scim2/rfc7644/bulk.py +++ b/pydantic_scim2/rfc7644/bulk.py @@ -7,11 +7,11 @@ from pydantic import Field from pydantic import PlainSerializer -from ..base import SCIM2Model -from ..base import int_to_str +from ..base import ComplexAttribute +from ..utils import int_to_str -class BulkOperation(SCIM2Model): +class BulkOperation(ComplexAttribute): class Method(str, Enum): post = "POST" put = "PUT" @@ -45,7 +45,7 @@ class Method(str, Enum): """The HTTP response status code for the requested operation.""" -class BulkRequest(SCIM2Model): +class BulkRequest(ComplexAttribute): schemas: List[str] = ["urn:ietf:params:scim:api:messages:2.0:BulkRequest"] fail_on_errors: Optional[int] = None @@ -57,7 +57,7 @@ class BulkRequest(SCIM2Model): """Defines operations within a bulk job.""" -class BulkResponse(SCIM2Model): +class BulkResponse(ComplexAttribute): schemas: List[str] = ["urn:ietf:params:scim:api:messages:2.0:BulkResponse"] operations: List[BulkOperation] = Field(..., alias="Operations") diff --git a/pydantic_scim2/rfc7644/error.py b/pydantic_scim2/rfc7644/error.py index 7030658..8610586 100644 --- a/pydantic_scim2/rfc7644/error.py +++ b/pydantic_scim2/rfc7644/error.py @@ -4,11 +4,11 @@ from pydantic import PlainSerializer -from ..base import SCIM2Model -from ..base import int_to_str +from ..base import ComplexAttribute +from ..utils import int_to_str -class Error(SCIM2Model): +class Error(ComplexAttribute): """Representation of SCIM API errors. Here is the exhaustive list of pre-defined errors: diff --git a/pydantic_scim2/rfc7644/list_response.py b/pydantic_scim2/rfc7644/list_response.py index 15e7d05..f5eee4c 100644 --- a/pydantic_scim2/rfc7644/list_response.py +++ b/pydantic_scim2/rfc7644/list_response.py @@ -5,12 +5,12 @@ from pydantic import Field -from ..base import SCIM2Model +from ..base import ComplexAttribute from ..rfc7643.resource import AnyResource from ..rfc7643.resource import tagged_resource_union -class ListResponse(SCIM2Model, Generic[AnyResource]): +class ListResponse(ComplexAttribute, Generic[AnyResource]): @classmethod def of(cls, *resource_types: AnyResource): """Build a ListResponse instance that can handle resource_types.""" diff --git a/pydantic_scim2/rfc7644/patch_op.py b/pydantic_scim2/rfc7644/patch_op.py index 3d71ad9..44b3a47 100644 --- a/pydantic_scim2/rfc7644/patch_op.py +++ b/pydantic_scim2/rfc7644/patch_op.py @@ -5,7 +5,7 @@ from pydantic import Field -from ..base import SCIM2Model +from ..base import ComplexAttribute class Op(str, Enum): @@ -14,7 +14,7 @@ class Op(str, Enum): add = "add" -class PatchOperation(SCIM2Model): +class PatchOperation(ComplexAttribute): op: Op """Each PATCH operation object MUST have exactly one "op" member, whose value indicates the operation to perform and MAY be one of "add", "remove", @@ -27,7 +27,7 @@ class PatchOperation(SCIM2Model): value: Optional[Any] = None -class PatchOp(SCIM2Model): +class PatchOp(ComplexAttribute): schemas: List[str] = ["urn:ietf:params:scim:api:messages:2.0:PatchOp"] operations: List[PatchOperation] = Field(..., alias="Operations") diff --git a/pydantic_scim2/rfc7644/search_request.py b/pydantic_scim2/rfc7644/search_request.py index b17ba1d..b5df535 100644 --- a/pydantic_scim2/rfc7644/search_request.py +++ b/pydantic_scim2/rfc7644/search_request.py @@ -2,7 +2,7 @@ from typing import List from typing import Optional -from ..base import SCIM2Model +from ..base import ComplexAttribute class SortOrder(str, Enum): @@ -10,7 +10,7 @@ class SortOrder(str, Enum): descending = "descending" -class SearchRequest(SCIM2Model): +class SearchRequest(ComplexAttribute): """SearchRequest object defined at https://datatracker.ietf.org/doc/html/rfc7644#section-3.4.3""" schemas: List[str] = ["urn:ietf:params:scim:api:messages:2.0:SearchRequest"] diff --git a/pydantic_scim2/utils.py b/pydantic_scim2/utils.py new file mode 100644 index 0000000..b239607 --- /dev/null +++ b/pydantic_scim2/utils.py @@ -0,0 +1,5 @@ +from typing import Optional + + +def int_to_str(status: Optional[int]) -> Optional[str]: + return None if status is None else str(status) diff --git a/pyproject.toml b/pyproject.toml index ed1e7cf..b3dc678 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.9" -pydantic = {version = "^2.5.0", extras = ["email"]} +pydantic = {version = "^2.7.0", extras = ["email"]} [tool.poetry.group.dev.dependencies] pytest = "^8.2.1" diff --git a/samples/rfc7643-8.3-user-enterprise_user.json b/samples/rfc7643-8.3-enterprise_user.json similarity index 100% rename from samples/rfc7643-8.3-user-enterprise_user.json rename to samples/rfc7643-8.3-enterprise_user.json diff --git a/tests/test_enterprise_user.py b/tests/test_enterprise_user.py index ea68d57..9c3bd25 100644 --- a/tests/test_enterprise_user.py +++ b/tests/test_enterprise_user.py @@ -12,7 +12,7 @@ def test_enterprise_user(load_sample): - payload = load_sample("rfc7643-8.3-user-enterprise_user.json") + payload = load_sample("rfc7643-8.3-enterprise_user.json") obj = User[EnterpriseUser].model_validate(payload) assert obj.schemas == [ @@ -135,9 +135,4 @@ def test_enterprise_user(load_sample): ) assert obj[EnterpriseUser].manager.display_name == "John Smith" - assert ( - obj.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload - ) + assert obj.model_dump(exclude_unset=True) == payload diff --git a/tests/test_group.py b/tests/test_group.py index dc34008..8ce460d 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -35,9 +35,4 @@ def test_group(load_sample): == "https://example.com/v2/Groups/e9e30dba-f08f-4109-8486-d5c6a331660a" ) - assert ( - obj.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload - ) + assert obj.model_dump() == payload diff --git a/tests/test_list_response.py b/tests/test_list_response.py index a30a15a..06855c2 100644 --- a/tests/test_list_response.py +++ b/tests/test_list_response.py @@ -28,7 +28,7 @@ def test_user(load_sample): def test_enterprise_user(load_sample): - resource_payload = load_sample("rfc7643-8.3-user-enterprise_user.json") + resource_payload = load_sample("rfc7643-8.3-enterprise_user.json") payload = { "totalResults": 1, "itemsPerPage": 10, @@ -103,12 +103,7 @@ def test_mixed_types(load_sample): user, group = response.resources assert isinstance(user, User) assert isinstance(group, Group) - assert ( - response.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload - ) + assert response.model_dump() == payload def test_mixed_types_type_missing(load_sample): @@ -132,8 +127,8 @@ class Foobar(Resource): with pytest.raises(ValidationError): ListResponse.of(User, Foobar).model_validate(payload) - # TODO: This should raise a ValidationError - ListResponse.of(User).model_validate(payload) + with pytest.raises(ValidationError): + ListResponse.of(User).model_validate(payload) def test_missing_resource_schema(load_sample): diff --git a/tests/test_model_attributes.py b/tests/test_model_attributes.py new file mode 100644 index 0000000..05c10f5 --- /dev/null +++ b/tests/test_model_attributes.py @@ -0,0 +1,157 @@ +from typing import Annotated +from typing import List +from typing import Optional + +import pytest + +from pydantic_scim2.attributes import validate_attribute_urn +from pydantic_scim2.base import BaseModel +from pydantic_scim2.base import Returned +from pydantic_scim2.rfc7643.resource import Resource +from pydantic_scim2.rfc7643.user import User + + +def test_get_attribute_urn(): + class Sub(BaseModel): + _attribute_urn = "urn:example:2.0:Sup:sub" + dummy: str + + class Sup(Resource): + schemas: List[str] = ["urn:example:2.0:Sup"] + dummy: str + sub: Sub + subs: List[Sub] + + sup = Sup(dummy="x", sub=Sub(dummy="x"), subs=[Sub(dummy="x")]) + + assert sup.get_attribute_urn("dummy") == "urn:example:2.0:Sup:dummy" + assert sup.get_attribute_urn("sub") == "urn:example:2.0:Sup:sub" + assert sup.sub.get_attribute_urn("dummy") == "urn:example:2.0:Sup:sub.dummy" + + # TODO: fix this by dynamically guess attribute urns + # assert sup.subs[0].get_attribute_urn("dummy") == "urn:example:2.0:Bar:subs.dummy" + + +def test_guess_root_type(): + class Sub(BaseModel): + _attribute_urn = "urn:example:2.0:Sup:sub" + dummy: str + + class Sup(Resource): + schemas: List[str] = ["urn:example:2.0:Sup"] + dummy: str + sub: Sub + subs: List[Sub] + + assert Sup.get_field_root_type("dummy") == str + assert Sup.get_field_root_type("sub") == Sub + assert Sup.get_field_root_type("subs") == Sub + + +class ReturnedModel(BaseModel): + always: Annotated[Optional[str], Returned.always] = None + never: Annotated[Optional[str], Returned.never] = None + default: Annotated[Optional[str], Returned.default] = None + request: Annotated[Optional[str], Returned.request] = None + + +class Baz(BaseModel): + baz_snake_case: str + + +class Foo(Resource): + schemas: List[str] = ["urn:example:2.0:Foo"] + sub: Annotated[ReturnedModel, Returned.default] + bar: str + snake_case: str + baz: Optional[Baz] = None + + +class Bar(Resource): + schemas: List[str] = ["urn:example:2.0:Bar"] + sub: Annotated[ReturnedModel, Returned.default] + bar: str + snake_case: str + baz: Optional[Baz] = None + + +class Extension(Resource): + schemas: List[str] = ["urn:example:2.0:Extension"] + baz: str + + +def test_validate_attribute_urn(): + """Test the method that validates and normalizes attribute URNs.""" + + assert validate_attribute_urn("bar", Foo) == "urn:example:2.0:Foo:bar" + assert ( + validate_attribute_urn("urn:example:2.0:Foo:bar", Foo) + == "urn:example:2.0:Foo:bar" + ) + assert ( + validate_attribute_urn("urn:example:2.0:Foo:bar", User, resource_types=[Foo]) + == "urn:example:2.0:Foo:bar" + ) + + assert validate_attribute_urn("sub", Foo) == "urn:example:2.0:Foo:sub" + assert ( + validate_attribute_urn("urn:example:2.0:Foo:sub", Foo) + == "urn:example:2.0:Foo:sub" + ) + assert ( + validate_attribute_urn("urn:example:2.0:Foo:sub", User, resource_types=[Foo]) + == "urn:example:2.0:Foo:sub" + ) + + assert validate_attribute_urn("sub.always", Foo) == "urn:example:2.0:Foo:sub.always" + assert ( + validate_attribute_urn("urn:example:2.0:Foo:sub.always", Foo) + == "urn:example:2.0:Foo:sub.always" + ) + assert ( + validate_attribute_urn( + "urn:example:2.0:Foo:sub.always", User, resource_types=[Foo] + ) + == "urn:example:2.0:Foo:sub.always" + ) + + assert validate_attribute_urn("snakeCase", Foo) == "urn:example:2.0:Foo:snakeCase" + assert ( + validate_attribute_urn("urn:example:2.0:Foo:snakeCase", Foo) + == "urn:example:2.0:Foo:snakeCase" + ) + + assert ( + validate_attribute_urn("urn:example:2.0:Extension:baz", Foo[Extension]) + == "urn:example:2.0:Extension:baz" + ) + assert ( + validate_attribute_urn( + "urn:example:2.0:Extension:baz", resource_types=[Foo[Extension]] + ) + == "urn:example:2.0:Extension:baz" + ) + + with pytest.raises(ValueError, match="No default schema and relative URN"): + validate_attribute_urn("bar", resource_types=[Foo]) + + with pytest.raises( + ValueError, match="No resource matching schema 'urn:InvalidResource'" + ): + validate_attribute_urn("urn:InvalidResource:bar", Foo) + + with pytest.raises( + ValueError, match="No resource matching schema 'urn:example:2.0:Foo'" + ): + validate_attribute_urn("urn:example:2.0:Foo:bar") + + with pytest.raises( + ValueError, match="Model 'Foo' has no attribute named 'invalid'" + ): + validate_attribute_urn("urn:example:2.0:Foo:invalid", Foo) + + with pytest.raises( + ValueError, + match="Attribute 'bar' is not a complex attribute, and cannot have a 'invalid' sub-attribute", + ): + validate_attribute_urn("bar.invalid", Foo) diff --git a/tests/test_model_serialization.py b/tests/test_model_serialization.py new file mode 100644 index 0000000..9c22124 --- /dev/null +++ b/tests/test_model_serialization.py @@ -0,0 +1,348 @@ +from typing import Annotated +from typing import List +from typing import Optional + +import pytest + +from pydantic_scim2.base import BaseModel +from pydantic_scim2.base import Context +from pydantic_scim2.base import Mutability +from pydantic_scim2.base import Returned +from pydantic_scim2.rfc7643.resource import Resource + + +class SubRetModel(BaseModel): + _attribute_urn: str = "org:example:SupRetResource:sub" + + always_returned: Annotated[Optional[str], Returned.always] = None + never_returned: Annotated[Optional[str], Returned.never] = None + default_returned: Annotated[Optional[str], Returned.default] = None + request_returned: Annotated[Optional[str], Returned.request] = None + + +class SupRetResource(Resource): + schemas: List[str] = ["org:example:SupRetResource"] + + always_returned: Annotated[Optional[str], Returned.always] = None + never_returned: Annotated[Optional[str], Returned.never] = None + default_returned: Annotated[Optional[str], Returned.default] = None + request_returned: Annotated[Optional[str], Returned.request] = None + + sub: Optional[SubRetModel] = None + + +class MutResource(Resource): + schemas: List[str] = ["org:example:MutResource"] + + read_only: Annotated[Optional[str], Mutability.read_only] = None + read_write: Annotated[Optional[str], Mutability.read_write] = None + immutable: Annotated[Optional[str], Mutability.immutable] = None + write_only: Annotated[Optional[str], Mutability.write_only] = None + + +@pytest.fixture +def ret_resource(): + return SupRetResource( + id="id", + always_returned="x", + never_returned="x", + default_returned="x", + request_returned="x", + sub=SubRetModel( + always_returned="x", + never_returned="x", + default_returned="x", + request_returned="x", + ), + ) + + +@pytest.fixture +def mut_resource(): + return MutResource( + id="id", + read_only="x", + read_write="x", + immutable="x", + write_only="x", + ) + + +def test_dump_default(mut_resource): + """By default, everything is dumped.""" + + assert mut_resource.model_dump() == { + "schemas": ["org:example:MutResource"], + "id": "id", + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + "writeOnly": "x", + } + + assert mut_resource.model_dump(scim_ctx=Context.DEFAULT) == { + "schemas": ["org:example:MutResource"], + "id": "id", + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + "writeOnly": "x", + } + + assert mut_resource.model_dump(scim_ctx=None) == { + "schemas": ["org:example:MutResource"], + "id": "id", + "read_only": "x", + "read_write": "x", + "immutable": "x", + "write_only": "x", + } + + +def test_dump_creation_request(mut_resource): + """Test query building for resource creation request: + + Attributes marked as: + - Mutability.read_write are dumped + - Mutability.immutable are dumped + - Mutability.write_only are dumped + - Mutability.read_only are not dumped + """ + assert mut_resource.model_dump(scim_ctx=Context.RESOURCE_CREATION_REQUEST) == { + "schemas": ["org:example:MutResource"], + "readWrite": "x", + "immutable": "x", + "writeOnly": "x", + } + + +def test_dump_query_request(mut_resource): + """Test query building for resource query request: + + Attributes marked as: + - Mutability.read_write are dumped + - Mutability.immutable are dumped + - Mutability.write_only are not dumped + - Mutability.read_only are dumped + """ + + assert mut_resource.model_dump(scim_ctx=Context.RESOURCE_QUERY_REQUEST) == { + "schemas": ["org:example:MutResource"], + "id": "id", + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + } + + +def test_dump_replacement_request(mut_resource): + """Test query building for resource model replacement requests: + + Attributes marked as: + - Mutability.read_write are dumped + - Mutability.immutable are not dumped + - Mutability.write_only are dumped + - Mutability.read_only are not dumped""" + + assert mut_resource.model_dump(scim_ctx=Context.RESOURCE_REPLACEMENT_REQUEST) == { + "schemas": ["org:example:MutResource"], + "readWrite": "x", + "writeOnly": "x", + } + + +def test_dump_search_request(mut_resource): + """Test query building for resource query request: + + Attributes marked as: + - Mutability.read_write are dumped + - Mutability.immutable are dumped + - Mutability.write_only are not dumped + - Mutability.read_only are dumped + """ + + assert mut_resource.model_dump(scim_ctx=Context.RESOURCE_QUERY_REQUEST) == { + "schemas": ["org:example:MutResource"], + "id": "id", + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + } + + +def test_dump_default_response(ret_resource): + """When no scim context is passed, every attributes are dumped.""" + + assert ret_resource.model_dump() == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "neverReturned": "x", + "defaultReturned": "x", + "requestReturned": "x", + "sub": { + "alwaysReturned": "x", + "neverReturned": "x", + "defaultReturned": "x", + "requestReturned": "x", + }, + } + + +@pytest.mark.parametrize( + "context", + [ + Context.RESOURCE_CREATION_RESPONSE, + Context.RESOURCE_QUERY_RESPONSE, + Context.RESOURCE_REPLACEMENT_RESPONSE, + Context.SEARCH_RESPONSE, + ], +) +def test_dump_response(context, ret_resource): + """Test context for responses. + + Attributes marked as: + - Returned.always are always dumped + - Returned.never are never dumped + - Returned.default are dumped unless excluded + - Returned.request are dumped only if included + + Including attributes with 'attributes=' replace the whole default set. + """ + + assert ret_resource.model_dump(scim_ctx=context) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "defaultReturned": "x", + "sub": { + "alwaysReturned": "x", + "defaultReturned": "x", + }, + } + + assert ret_resource.model_dump(scim_ctx=context, attributes={"alwaysReturned"}) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + } + + assert ret_resource.model_dump(scim_ctx=context, attributes={"neverReturned"}) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + } + + assert ret_resource.model_dump( + scim_ctx=context, attributes={"defaultReturned"} + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "defaultReturned": "x", + } + + assert ret_resource.model_dump(scim_ctx=context, attributes={"sub"}) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "sub": { + "alwaysReturned": "x", + }, + } + + assert ret_resource.model_dump( + scim_ctx=context, attributes={"sub.defaultReturned"} + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "sub": { + "alwaysReturned": "x", + "defaultReturned": "x", + }, + } + + assert ret_resource.model_dump( + scim_ctx=context, attributes={"requestReturned"} + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "requestReturned": "x", + } + + assert ret_resource.model_dump( + scim_ctx=context, + attributes={"defaultReturned", "requestReturned"}, + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "defaultReturned": "x", + "requestReturned": "x", + } + + assert ret_resource.model_dump( + scim_ctx=context, excluded_attributes={"alwaysReturned"} + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "defaultReturned": "x", + "sub": { + "alwaysReturned": "x", + "defaultReturned": "x", + }, + } + + assert ret_resource.model_dump( + scim_ctx=context, excluded_attributes={"neverReturned"} + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "defaultReturned": "x", + "sub": { + "alwaysReturned": "x", + "defaultReturned": "x", + }, + } + + assert ret_resource.model_dump( + scim_ctx=context, excluded_attributes={"defaultReturned"} + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "sub": { + "alwaysReturned": "x", + "defaultReturned": "x", + }, + } + + assert ret_resource.model_dump( + scim_ctx=context, excluded_attributes={"requestReturned"} + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "defaultReturned": "x", + "sub": { + "alwaysReturned": "x", + "defaultReturned": "x", + }, + } + + assert ret_resource.model_dump( + scim_ctx=context, + excluded_attributes={"defaultReturned", "requestReturned"}, + ) == { + "schemas": ["org:example:SupRetResource"], + "id": "id", + "alwaysReturned": "x", + "sub": { + "alwaysReturned": "x", + "defaultReturned": "x", + }, + } diff --git a/tests/test_model_validation.py b/tests/test_model_validation.py new file mode 100644 index 0000000..739af67 --- /dev/null +++ b/tests/test_model_validation.py @@ -0,0 +1,334 @@ +from typing import Annotated +from typing import List +from typing import Optional + +import pytest +from pydantic import ValidationError + +from pydantic_scim2.base import Context +from pydantic_scim2.base import Mutability +from pydantic_scim2.base import Returned +from pydantic_scim2.rfc7643.resource import Resource + + +class RetResource(Resource): + schemas: List[str] = ["org:example:RetResource"] + + always_returned: Annotated[Optional[str], Returned.always] = None + never_returned: Annotated[Optional[str], Returned.never] = None + default_returned: Annotated[Optional[str], Returned.default] = None + request_returned: Annotated[Optional[str], Returned.request] = None + + +class MutResource(Resource): + schemas: List[str] = ["org:example:MutResource"] + + read_only: Annotated[Optional[str], Mutability.read_only] = None + read_write: Annotated[Optional[str], Mutability.read_write] = None + immutable: Annotated[Optional[str], Mutability.immutable] = None + write_only: Annotated[Optional[str], Mutability.write_only] = None + + +def test_validate_default(): + """Test query validation for resource creation request.""" + assert MutResource.model_validate( + { + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + "writeOnly": "x", + }, + ) == MutResource( + schemas=["org:example:MutResource"], + readWrite="x", + immutable="x", + writeOnly="x", + readOnly="x", + ) + + assert MutResource.model_validate( + { + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + "writeOnly": "x", + }, + scim_ctx=None, + ) == MutResource( + schemas=["org:example:MutResource"], + readWrite="x", + immutable="x", + writeOnly="x", + readOnly="x", + ) + + assert MutResource.model_validate( + { + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + "writeOnly": "x", + }, + scim_ctx=Context.DEFAULT, + ) == MutResource( + schemas=["org:example:MutResource"], + readWrite="x", + immutable="x", + writeOnly="x", + readOnly="x", + ) + + +def test_validate_creation_request(): + """Test query validation for resource creation request: + + Attributes marked as: + - Mutability.read_only raise a ValidationError + """ + assert MutResource.model_validate( + { + "readWrite": "x", + "immutable": "x", + "writeOnly": "x", + }, + scim_ctx=Context.RESOURCE_CREATION_REQUEST, + ) == MutResource( + schemas=["org:example:MutResource"], + readWrite="x", + immutable="x", + writeOnly="x", + ) + + with pytest.raises( + ValidationError, + match="Field 'read_only' has mutability 'readOnly' but this in not valid in resource creation request context", + ): + MutResource.model_validate( + { + "readOnly": "x", + }, + scim_ctx=Context.RESOURCE_CREATION_REQUEST, + ) + + +def test_validate_query_request(): + """Test query validation for resource query request: + + Attributes marked as: + - Mutability.write_only raise a ValidationError + """ + + assert MutResource.model_validate( + { + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + }, + scim_ctx=Context.RESOURCE_QUERY_REQUEST, + ) == MutResource( + schemas=["org:example:MutResource"], + readOnly="x", + readWrite="x", + immutable="x", + ) + + with pytest.raises( + ValidationError, + match="Field 'write_only' has mutability 'writeOnly' but this in not valid in resource query request context", + ): + MutResource.model_validate( + { + "writeOnly": "x", + }, + scim_ctx=Context.RESOURCE_QUERY_REQUEST, + ) + + +def test_validate_replacement_request(): + """Test query validation for resource model replacement requests: + + Attributes marked as: + - Mutability.immutable raise a ValidationError + - Mutability.read_only are ignored""" + + assert MutResource.model_validate( + { + "readOnly": "x", + "readWrite": "x", + "writeOnly": "x", + }, + scim_ctx=Context.RESOURCE_REPLACEMENT_REQUEST, + ) == MutResource( + schemas=["org:example:MutResource"], + readWrite="x", + writeOnly="x", + ) + + with pytest.raises( + ValidationError, + match="Field 'immutable' has mutability 'immutable' but this in not valid in resource replacement request context", + ): + MutResource.model_validate( + { + "immutable": "x", + }, + scim_ctx=Context.RESOURCE_REPLACEMENT_REQUEST, + ) + + +def test_validate_search_request(): + """Test query validation for resource query request: + + Attributes marked as: + - Mutability.write_only raise a ValidationError + """ + + assert MutResource.model_validate( + { + "readOnly": "x", + "readWrite": "x", + "immutable": "x", + }, + scim_ctx=Context.SEARCH_REQUEST, + ) == MutResource( + schemas=["org:example:MutResource"], + readOnly="x", + readWrite="x", + immutable="x", + ) + + with pytest.raises( + ValidationError, + match="Field 'write_only' has mutability 'writeOnly' but this in not valid in search request context", + ): + MutResource.model_validate( + { + "writeOnly": "x", + }, + scim_ctx=Context.SEARCH_REQUEST, + ) + + +def test_validate_default_response(): + """When no scim context is passed, every attributes are dumped.""" + + assert RetResource.model_validate( + { + "schemas": ["org:example:RetResource"], + "id": "id", + "alwaysReturned": "x", + "neverReturned": "x", + "defaultReturned": "x", + "requestReturned": "x", + }, + ) == RetResource( + schemas=["org:example:RetResource"], + id="id", + alwaysReturned="x", + neverReturned="x", + defaultReturned="x", + requestReturned="x", + ) + + assert RetResource.model_validate( + { + "schemas": ["org:example:RetResource"], + "id": "id", + "alwaysReturned": "x", + "neverReturned": "x", + "defaultReturned": "x", + "requestReturned": "x", + }, + scim_ctx=None, + ) == RetResource( + schemas=["org:example:RetResource"], + id="id", + alwaysReturned="x", + neverReturned="x", + defaultReturned="x", + requestReturned="x", + ) + + assert RetResource.model_validate( + { + "schemas": ["org:example:RetResource"], + "id": "id", + "alwaysReturned": "x", + "neverReturned": "x", + "defaultReturned": "x", + "requestReturned": "x", + }, + scim_ctx=Context.DEFAULT, + ) == RetResource( + schemas=["org:example:RetResource"], + id="id", + alwaysReturned="x", + neverReturned="x", + defaultReturned="x", + requestReturned="x", + ) + + +@pytest.mark.parametrize( + "context", + [ + Context.RESOURCE_CREATION_RESPONSE, + Context.RESOURCE_QUERY_RESPONSE, + Context.RESOURCE_REPLACEMENT_RESPONSE, + Context.SEARCH_RESPONSE, + ], +) +def test_validate_response(context): + """Test context for responses. + + Attributes marked as: + - Returned.always raise a ValidationException if None + - Returned.never raise a ValidationException if not None + """ + + assert RetResource.model_validate( + { + "schemas": ["org:example:RetResource"], + "id": "id", + "alwaysReturned": "x", + "defaultReturned": "x", + "requestReturned": "x", + }, + scim_ctx=context, + ) == RetResource( + schemas=["org:example:RetResource"], + id="id", + alwaysReturned="x", + defaultReturned="x", + requestReturned="x", + ) + + # always is missing + with pytest.raises( + ValidationError, + match="Field 'always_returned' has returnability 'always' but value is missing or null", + ): + RetResource.model_validate({"id": "id"}, scim_ctx=context) + + # always is None + with pytest.raises( + ValidationError, + match="Field 'always_returned' has returnability 'always' but value is missing or null", + ): + RetResource.model_validate( + {"id": "id", "alwaysReturned": None}, scim_ctx=context + ) + + # never is not None + with pytest.raises( + ValidationError, + match="Field 'never_returned' has returnability 'never' but value is set", + ): + RetResource.model_validate( + { + "id": "id", + "alwaysReturned": "x", + "neverReturned": "x", + }, + scim_ctx=context, + ) diff --git a/tests/test_models.py b/tests/test_models.py index 612851a..098ae7e 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -2,10 +2,12 @@ from pydantic_scim2 import BulkRequest from pydantic_scim2 import BulkResponse +from pydantic_scim2 import EnterpriseUser from pydantic_scim2 import Error from pydantic_scim2 import Group from pydantic_scim2 import ListResponse from pydantic_scim2 import PatchOp +from pydantic_scim2 import Resource from pydantic_scim2 import ResourceType from pydantic_scim2 import Schema from pydantic_scim2 import SearchRequest @@ -17,11 +19,14 @@ def test_parse_and_serialize_examples(load_sample): samples = list(os.walk("samples"))[0][2] models = { "user": User, + "enterprise_user": User[EnterpriseUser], "group": Group, "schema": Schema, "resource_type": ResourceType, "service_provider_configuration": ServiceProviderConfiguration, - "list_response": ListResponse.of(User, Group, Schema, ResourceType), + "list_response": ListResponse.of( + User[EnterpriseUser], Group, Schema, ResourceType + ), "patch_op": PatchOp, "bulk_request": BulkRequest, "bulk_response": BulkResponse, @@ -44,9 +49,35 @@ def test_parse_and_serialize_examples(load_sample): payload = load_sample(sample) obj = model.model_validate(payload) - assert ( - obj.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload + assert obj.model_dump(exclude_unset=True) == payload + + +def test_get_resource_by_schema(): + resource_types = [Group, User[EnterpriseUser]] + assert ( + Resource.get_by_schema( + resource_types, "urn:ietf:params:scim:schemas:core:2.0:Group" + ) + == Group + ) + assert ( + Resource.get_by_schema( + resource_types, "urn:ietf:params:scim:schemas:core:2.0:User" + ) + == User[EnterpriseUser] + ) + assert ( + Resource.get_by_schema( + resource_types, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + with_extensions=False, + ) + is None + ) + assert ( + Resource.get_by_schema( + resource_types, + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", ) + == EnterpriseUser + ) diff --git a/tests/test_resource_extension.py b/tests/test_resource_extension.py index 5da43bf..a0811b1 100644 --- a/tests/test_resource_extension.py +++ b/tests/test_resource_extension.py @@ -69,10 +69,7 @@ def test_extension_getitem(): }, "userName": "bjensen@example.com", } - assert ( - user.model_dump(exclude_none=True, by_alias=True, mode="json") - == expected_payload - ) + assert user.model_dump() == expected_payload def test_extension_setitem(): @@ -136,10 +133,7 @@ def test_extension_setitem(): }, "userName": "bjensen@example.com", } - assert ( - user.model_dump(exclude_none=True, by_alias=True, mode="json") - == expected_payload - ) + assert user.model_dump() == expected_payload def test_extension_no_payload(): diff --git a/tests/test_resource_type.py b/tests/test_resource_type.py index 15598b0..cb86fd6 100644 --- a/tests/test_resource_type.py +++ b/tests/test_resource_type.py @@ -20,12 +20,7 @@ def test_user_resource_type(load_sample): assert obj.meta.location == "https://example.com/v2/ResourceTypes/User" assert obj.meta.resource_type == "ResourceType" - assert ( - obj.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload - ) + assert obj.model_dump(exclude_unset=True) == payload def test_group_resource_type(load_sample): @@ -40,9 +35,4 @@ def test_group_resource_type(load_sample): assert obj.meta.location == "https://example.com/v2/ResourceTypes/Group" assert obj.meta.resource_type == "ResourceType" - assert ( - obj.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload - ) + assert obj.model_dump(exclude_unset=True) == payload diff --git a/tests/test_schema.py b/tests/test_schema.py index b3f18eb..fae430d 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -71,9 +71,4 @@ def test_group_schema(load_sample): obj.meta.location == "/v2/Schemas/urn:ietf:params:scim:schemas:core:2.0:Group" ) - assert ( - obj.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload - ) + assert obj.model_dump(exclude_unset=True) == payload diff --git a/tests/test_service_provider_configuration.py b/tests/test_service_provider_configuration.py index 659ee07..a8e9b80 100644 --- a/tests/test_service_provider_configuration.py +++ b/tests/test_service_provider_configuration.py @@ -61,9 +61,4 @@ def test_service_provider_configuration(load_sample): ) assert obj.meta.version == 'W\\/"3694e05e9dff594"' - assert ( - obj.model_dump( - exclude_none=True, exclude_unset=True, by_alias=True, mode="json" - ) - == payload - ) + assert obj.model_dump() == payload