diff --git a/fhir/resources/core/fhirabstractmodel.py b/fhir/resources/core/fhirabstractmodel.py index e4f838f4..b831d913 100644 --- a/fhir/resources/core/fhirabstractmodel.py +++ b/fhir/resources/core/fhirabstractmodel.py @@ -9,15 +9,14 @@ from enum import Enum from functools import lru_cache -from pydantic.v1 import BaseModel, Extra, Field +from pydantic import ValidationError, BaseModel, Field, ConfigDict +from pydantic.fields import FieldInfo from pydantic.v1.class_validators import ROOT_VALIDATOR_CONFIG_KEY, root_validator -from pydantic.v1.error_wrappers import ErrorWrapper, ValidationError -from pydantic.v1.errors import ConfigError, PydanticValueError -from pydantic.v1.fields import ModelField -from pydantic.v1.parse import Protocol -from pydantic.v1.utils import ROOT_KEY, sequence_like +from pydantic.v1.utils import ROOT_KEY +from pydantic_core import InitErrorDetails -from .utils import is_primitive_type, load_file, load_str_bytes, xml_dumps, yaml_dumps +from .utils import is_primitive_type, load_file, load_str_bytes, xml_dumps, yaml_dumps, Format +from .utils.common import sequence_like try: import orjson @@ -47,9 +46,7 @@ def json_dumps(v, *, default, option=0, return_bytes=False): if typing.TYPE_CHECKING: from pydantic.v1.typing import TupleGenerator - from pydantic.v1.types import StrBytes from pydantic.v1.typing import AnyCallable - from pydantic.v1.main import Model __author__ = "Md Nazrul Islam" @@ -57,7 +54,7 @@ def json_dumps(v, *, default, option=0, return_bytes=False): FHIR_COMMENTS_FIELD_NAME = "fhir_comments" -class WrongResourceType(PydanticValueError): +class WrongResourceType: code = "wrong.resource_type" msg_template = "Wrong ResourceType: {error}" @@ -65,48 +62,59 @@ class WrongResourceType(PydanticValueError): class FHIRAbstractModel(BaseModel, abc.ABC): """Abstract base model class for all FHIR elements.""" + json_loads = json_loads + json_dumps = json_dumps resource_type: str = ... # type: ignore + model_config = ConfigDict( + extra="forbid", + populate_by_name=True, + validate_assignment=True, + ) fhir_comments: typing.Union[str, typing.List[str]] = Field( None, alias="fhir_comments", element_property=False ) - def __init__(__pydantic_self__, **data: typing.Any) -> None: + def __init__(self: "FHIRAbstractModel", **data: typing.Any) -> None: """ """ resource_type = data.pop("resource_type", None) - errors = [] + errors: typing.List[InitErrorDetails] = [] if ( "resourceType" in data - and "resourceType" not in __pydantic_self__.__fields__ + and "resourceType" not in self.model_fields ): resource_type = data.pop("resourceType", None) if ( resource_type is not None - and resource_type != __pydantic_self__.__fields__["resource_type"].default + and resource_type != self.model_fields["resource_type"].default ): - expected_resource_type = __pydantic_self__.__fields__[ + expected_resource_type = self.model_fields[ "resource_type" ].default - error = ( - f"``{__pydantic_self__.__class__.__module__}." - f"{__pydantic_self__.__class__.__name__}`` " + error_message = ( + f"``{self.__class__.__module__}." + f"{self.__class__.__name__}`` " f"expects resource type ``{expected_resource_type}``, " f"but got ``{resource_type}``. " "Make sure resource type name is correct and right " "ModelClass has been chosen." ) - errors.append( - ErrorWrapper(WrongResourceType(error=error), loc="resource_type") + init_error_details = InitErrorDetails( + type=WrongResourceType.code, + loc=("resource_type",), + ctx={"message": error_message}, + input=resource_type ) + errors.append(init_error_details) if errors: - raise ValidationError(errors, __pydantic_self__.__class__) + raise ValidationError(errors, self.__class__) - BaseModel.__init__(__pydantic_self__, **data) + BaseModel.__init__(self, **data) @classmethod def add_root_validator( - cls: typing.Type["Model"], + cls: typing.Type["BaseModel"], validator: typing.Union["AnyCallable", classmethod], *, pre: bool = False, @@ -125,11 +133,11 @@ def add_root_validator( # first level validation if any([func_name in cls_.__dict__ for cls_ in cls.mro()]): - raise ConfigError( + raise ValidationError( f"{cls} already has same name '{func_name}' method or attribute!" ) - if func_name in cls.__fields__: - raise ConfigError(f"{cls} already has same name '{func_name}' field!") + if func_name in cls.model_fields: + raise ValidationError(f"{cls} already has same name '{func_name}' field!") # evaluate through root_validator validator = root_validator( @@ -141,13 +149,13 @@ def add_root_validator( arg_list = list(sig.parameters.keys()) if len(arg_list) != 2: - raise ConfigError( + raise ValidationError( f"Invalid signature for root validator {func_name}: {sig}" ", should be: (cls, values)." ) if arg_list[0] != "cls": - raise ConfigError( + raise ValidationError( f"Invalid signature for root validator {func_name}: {sig}, " f'"{arg_list[0]}" not permitted as first argument, ' "should be: (cls, values)." @@ -173,10 +181,10 @@ def add_root_validator( @classmethod def element_properties( - cls: typing.Type["Model"], - ) -> typing.Generator[ModelField, None, None]: + cls: typing.Type["BaseModel"], + ) -> typing.Generator[FieldInfo, None, None]: """ """ - for model_field in cls.__fields__.values(): + for model_field in cls.model_fields: if model_field.field_info.extra.get("element_property", False): yield model_field @@ -189,7 +197,7 @@ def elements_sequence(cls): @classmethod @lru_cache(maxsize=1024, typed=True) - def has_resource_base(cls: typing.Type["Model"]) -> bool: + def has_resource_base(cls: typing.Type["BaseModel"]) -> bool: """ """ # xxx: calculate metrics, other than cache it! for cl in inspect.getmro(cls)[:-4]: @@ -199,9 +207,9 @@ def has_resource_base(cls: typing.Type["Model"]) -> bool: @classmethod @lru_cache(maxsize=None, typed=True) - def get_resource_type(cls: typing.Type["Model"]) -> str: + def get_resource_type(cls: typing.Type["BaseModel"]) -> str: """ """ - return cls.__fields__["resource_type"].default + return cls.model_fields["resource_type"].default @classmethod @lru_cache(maxsize=None, typed=True) @@ -211,7 +219,9 @@ def get_alias_mapping( """Mappings between field's name and alias""" aliases = cls.elements_sequence() return { - f.alias: fname for fname, f in cls.__fields__.items() if f.alias in aliases + field_info.alias: field_name + for field_name, field_info in cls.model_fields.items() + if field_info.alias in aliases } @classmethod @@ -221,15 +231,15 @@ def get_json_encoder(cls) -> typing.Callable[[typing.Any], typing.Any]: @classmethod def parse_file( - cls: typing.Type["Model"], + cls: typing.Type["BaseModel"], path: typing.Union[str, pathlib.Path], *, content_type: typing.Optional[str] = None, encoding: str = "utf8", - proto: typing.Optional[Protocol] = None, + proto: typing.Optional[Format] = None, allow_pickle: bool = False, **extra, - ) -> "Model": + ) -> BaseModel: extra.update({"cls": cls}) obj = load_file( path, @@ -237,22 +247,22 @@ def parse_file( content_type=content_type, # type: ignore[arg-type] encoding=encoding, allow_pickle=allow_pickle, - json_loads=cls.__config__.json_loads, + json_loads=cls.json_loads, **extra, ) - return cls.parse_obj(obj) + return cls.model_validate(obj) @classmethod def parse_raw( - cls: typing.Type["Model"], - b: "StrBytes", + cls: typing.Type["BaseModel"], + b: typing.Union[str, bytes], *, content_type: typing.Optional[str] = None, encoding: str = "utf8", - proto: typing.Optional[Protocol] = None, + proto: typing.Optional[Format] = None, allow_pickle: bool = False, **extra, - ) -> "Model": + ) -> BaseModel: extra.update({"cls": cls}) try: obj = load_str_bytes( @@ -261,12 +271,18 @@ def parse_raw( content_type=content_type, # type: ignore[arg-type] encoding=encoding, allow_pickle=allow_pickle, - json_loads=cls.__config__.json_loads, + json_loads=cls.json_loads, **extra, ) except (ValueError, TypeError, UnicodeDecodeError) as e: # noqa: B014 - raise ValidationError([ErrorWrapper(e, loc=ROOT_KEY)], cls) - return cls.parse_obj(obj) + init_error_details = InitErrorDetails( + type="failed_parse_raw", + loc=("__root__",), + input=b, + ctx={"message": e} + ) + raise ValidationError([init_error_details], cls) + return cls.model_validate(obj) def yaml( # type: ignore self, @@ -343,7 +359,7 @@ def json( # type: ignore exclude_none = True if ( - getattr(self.__config__.json_dumps, "__qualname__", "") + getattr(self.json_dumps, "__qualname__", "") == "orjson_json_dumps" ): option = dumps_kwargs.pop("option", 0) @@ -385,7 +401,7 @@ def json( # type: ignore if typing.TYPE_CHECKING: result: typing.Union[str, bytes] - result = self.__config__.json_dumps(data, default=encoder, **dumps_kwargs) + result = self.json_dumps(data, default=encoder, **dumps_kwargs) if return_bytes is True: if isinstance(result, str): @@ -436,7 +452,7 @@ def _fhir_iter( for prop_name in self.elements_sequence(): field_key = alias_maps[prop_name] - field = self.__fields__[field_key] + field = self.model_fields[field_key] is_primitive = is_primitive_type(field) v = self.__dict__.get(field_key, None) dict_key = by_alias and field.alias or field_key @@ -518,11 +534,3 @@ def _fhir_get_value( ): return None return value - - class Config: - json_loads = json_loads - json_dumps = json_dumps - allow_population_by_field_name = True - extra = Extra.forbid - validate_assignment = True - error_msg_templates = {"value_error.extra": "extra fields not permitted"} diff --git a/fhir/resources/core/utils/__init__.py b/fhir/resources/core/utils/__init__.py index 982c48ee..db420309 100644 --- a/fhir/resources/core/utils/__init__.py +++ b/fhir/resources/core/utils/__init__.py @@ -3,12 +3,8 @@ import pathlib from typing import TYPE_CHECKING, Any, Callable, Union, cast, no_type_check, Optional -from pydantic.v1.parse import Protocol -from pydantic.v1.parse import load_file as default_load_file -from pydantic.v1.parse import load_str_bytes as default_load_str_bytes -from pydantic.v1.types import StrBytes - from .common import is_primitive_type # noqa: F401 +from .deprecated import Format, v1_load_str_bytes, v1_load_file try: from .yaml import yaml_dumps, yaml_loads @@ -69,11 +65,11 @@ def xml_loads(cls, b, xmlparser=None): def load_str_bytes( - b: StrBytes, + b: Union[str, bytes], *, content_type: Optional[str] = None, encoding: str = "utf8", - proto: Optional[Protocol] = None, + proto: Optional[Format] = None, allow_pickle: bool = False, json_loads: Callable[[str], Any] = json.loads, **extra, @@ -95,7 +91,7 @@ def load_str_bytes( b = cast(bytes, b) obj = xml_loads(extra["cls"], b, **params) return obj - obj = default_load_str_bytes( + obj = v1_load_str_bytes( b, proto=proto, # type: ignore[arg-type] content_type=content_type, # type: ignore[arg-type] @@ -111,7 +107,7 @@ def load_file( *, content_type: Optional[str] = None, encoding: str = "utf8", - proto: Optional[Protocol] = None, + proto: Optional[Format] = None, allow_pickle: bool = False, json_loads: Callable[[str], Any] = json.loads, **extra, @@ -136,7 +132,7 @@ def load_file( params["xmlparser"] = extra["xmlparser"] obj = xml_loads(extra["cls"], path.read_bytes(), **params) else: - obj = default_load_file( + obj = v1_load_file( path, proto=proto, # type: ignore[arg-type] content_type=content_type, # type: ignore[arg-type] diff --git a/fhir/resources/core/utils/common.py b/fhir/resources/core/utils/common.py index cdbc40a0..1d7cca51 100644 --- a/fhir/resources/core/utils/common.py +++ b/fhir/resources/core/utils/common.py @@ -1,5 +1,8 @@ # _*_ coding: utf-8 _*_ +from collections import deque from functools import lru_cache +from types import GeneratorType +from typing import Any from pydantic.v1.fields import ModelField from pydantic.v1.typing import get_args, get_origin @@ -66,3 +69,7 @@ def normalize_fhir_type_class(type_): return normalize_fhir_type_class(tp_) else: return type_ + + +def sequence_like(v: Any) -> bool: + return isinstance(v, (list, tuple, set, frozenset, GeneratorType, deque)) diff --git a/fhir/resources/core/utils/deprecated.py b/fhir/resources/core/utils/deprecated.py new file mode 100644 index 00000000..b80d3b57 --- /dev/null +++ b/fhir/resources/core/utils/deprecated.py @@ -0,0 +1,64 @@ +import json +import pickle +from enum import Enum +from pathlib import Path +from typing import Union, Callable, Any + + +class Format(str, Enum): + json = 'json' + pickle = 'pickle' + + +def v1_load_str_bytes( + b: Union[str, bytes], + *, + content_type: str = None, + encoding: str = 'utf8', + proto: Format = None, + allow_pickle: bool = False, + json_loads: Callable[[str], Any] = json.loads, +) -> Any: + if proto is None and content_type: + if content_type.endswith(('json', 'javascript')): + pass + elif allow_pickle and content_type.endswith('pickle'): + proto = Format.pickle + else: + raise TypeError(f'Unknown content-type: {content_type}') + + proto = proto or Format.json + + if proto == Format.json: + if isinstance(b, bytes): + b = b.decode(encoding) + return json_loads(b) + elif proto == Format.pickle: + if not allow_pickle: + raise RuntimeError('Trying to decode with pickle with allow_pickle=False') + bb = b if isinstance(b, bytes) else b.encode() + return pickle.loads(bb) + else: + raise TypeError(f'Unknown protocol: {proto}') + + +def v1_load_file( + path: Union[str, Path], + *, + content_type: str = None, + encoding: str = 'utf8', + proto: Format = None, + allow_pickle: bool = False, + json_loads: Callable[[str], Any] = json.loads, +) -> Any: + path = Path(path) + b = path.read_bytes() + if content_type is None: + if path.suffix in ('.js', '.json'): + proto = Format.json + elif path.suffix == '.pkl': + proto = Format.pickle + + return v1_load_str_bytes( + b, proto=proto, content_type=content_type, encoding=encoding, allow_pickle=allow_pickle, json_loads=json_loads + )