-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
_generate_schema.py
2289 lines (1950 loc) · 98.7 KB
/
_generate_schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Convert python types to pydantic-core schema."""
from __future__ import annotations as _annotations
import collections.abc
import dataclasses
import inspect
import re
import sys
import typing
import warnings
from contextlib import ExitStack, contextmanager
from copy import copy, deepcopy
from enum import Enum
from functools import partial
from inspect import Parameter, _ParameterKind, signature
from itertools import chain
from operator import attrgetter
from types import FunctionType, LambdaType, MethodType
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Final,
ForwardRef,
Iterable,
Iterator,
Mapping,
Type,
TypeVar,
Union,
cast,
overload,
)
from warnings import warn
from pydantic_core import CoreSchema, PydanticUndefined, core_schema, to_jsonable_python
from typing_extensions import Annotated, Literal, TypeAliasType, TypedDict, get_args, get_origin, is_typeddict
from ..aliases import AliasGenerator
from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler
from ..config import ConfigDict, JsonDict, JsonEncoder
from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError
from ..json_schema import JsonSchemaValue
from ..version import version_short
from ..warnings import PydanticDeprecatedSince20
from . import _core_utils, _decorators, _discriminated_union, _known_annotated_metadata, _typing_extra
from ._config import ConfigWrapper, ConfigWrapperStack
from ._core_metadata import CoreMetadataHandler, build_metadata_dict
from ._core_utils import (
CoreSchemaOrField,
collect_invalid_schemas,
define_expected_missing_refs,
get_ref,
get_type_ref,
is_function_with_inner_schema,
is_list_like_schema_with_items_schema,
simplify_schema_references,
validate_core_schema,
)
from ._decorators import (
Decorator,
DecoratorInfos,
FieldSerializerDecoratorInfo,
FieldValidatorDecoratorInfo,
ModelSerializerDecoratorInfo,
ModelValidatorDecoratorInfo,
RootValidatorDecoratorInfo,
ValidatorDecoratorInfo,
get_attribute_from_bases,
inspect_field_serializer,
inspect_model_serializer,
inspect_validator,
)
from ._docs_extraction import extract_docstrings_from_cls
from ._fields import collect_dataclass_fields, get_type_hints_infer_globalns
from ._forward_ref import PydanticRecursiveRef
from ._generics import get_standard_typevars_map, has_instance_in_type, recursively_defined_type_refs, replace_types
from ._schema_generation_shared import CallbackGetCoreSchemaHandler
from ._typing_extra import is_finalvar, is_self_type
from ._utils import lenient_issubclass
if TYPE_CHECKING:
from ..fields import ComputedFieldInfo, FieldInfo
from ..main import BaseModel
from ..types import Discriminator
from ..validators import FieldValidatorModes
from ._dataclasses import StandardDataclass
from ._schema_generation_shared import GetJsonSchemaFunction
_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12)
_AnnotatedType = type(Annotated[int, 123])
FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo]
FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo)
AnyFieldDecorator = Union[
Decorator[ValidatorDecoratorInfo],
Decorator[FieldValidatorDecoratorInfo],
Decorator[FieldSerializerDecoratorInfo],
]
ModifyCoreSchemaWrapHandler = GetCoreSchemaHandler
GetCoreSchemaFunction = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema]
TUPLE_TYPES: list[type] = [tuple, typing.Tuple]
LIST_TYPES: list[type] = [list, typing.List, collections.abc.MutableSequence]
SET_TYPES: list[type] = [set, typing.Set, collections.abc.MutableSet]
FROZEN_SET_TYPES: list[type] = [frozenset, typing.FrozenSet, collections.abc.Set]
DICT_TYPES: list[type] = [dict, typing.Dict, collections.abc.MutableMapping, collections.abc.Mapping]
def check_validator_fields_against_field_name(
info: FieldDecoratorInfo,
field: str,
) -> bool:
"""Check if field name is in validator fields.
Args:
info: The field info.
field: The field name to check.
Returns:
`True` if field name is in validator fields, `False` otherwise.
"""
if '*' in info.fields:
return True
for v_field_name in info.fields:
if v_field_name == field:
return True
return False
def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None:
"""Check if the defined fields in decorators exist in `fields` param.
It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
Args:
decorators: An iterable of decorators.
fields: An iterable of fields name.
Raises:
PydanticUserError: If one of the field names does not exist in `fields` param.
"""
fields = set(fields)
for dec in decorators:
if '*' in dec.info.fields:
continue
if dec.info.check_fields is False:
continue
for field in dec.info.fields:
if field not in fields:
raise PydanticUserError(
f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
" (use check_fields=False if you're inheriting from the model and intended this)",
code='decorator-missing-field',
)
def filter_field_decorator_info_by_field(
validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
) -> list[Decorator[FieldDecoratorInfoType]]:
return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)]
def apply_each_item_validators(
schema: core_schema.CoreSchema,
each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
field_name: str | None,
) -> core_schema.CoreSchema:
# This V1 compatibility shim should eventually be removed
# push down any `each_item=True` validators
# note that this won't work for any Annotated types that get wrapped by a function validator
# but that's okay because that didn't exist in V1
if schema['type'] == 'nullable':
schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators, field_name)
return schema
elif schema['type'] == 'tuple':
if (variadic_item_index := schema.get('variadic_item_index')) is not None:
schema['items_schema'][variadic_item_index] = apply_validators(
schema['items_schema'][variadic_item_index], each_item_validators, field_name
)
elif is_list_like_schema_with_items_schema(schema):
inner_schema = schema.get('items_schema', None)
if inner_schema is None:
inner_schema = core_schema.any_schema()
schema['items_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
elif schema['type'] == 'dict':
# push down any `each_item=True` validators onto dict _values_
# this is super arbitrary but it's the V1 behavior
inner_schema = schema.get('values_schema', None)
if inner_schema is None:
inner_schema = core_schema.any_schema()
schema['values_schema'] = apply_validators(inner_schema, each_item_validators, field_name)
elif each_item_validators:
raise TypeError(
f"`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema['type']}"
)
return schema
def modify_model_json_schema(
schema_or_field: CoreSchemaOrField, handler: GetJsonSchemaHandler, *, cls: Any
) -> JsonSchemaValue:
"""Add title and description for model-like classes' JSON schema.
Args:
schema_or_field: The schema data to generate a JSON schema from.
handler: The `GetCoreSchemaHandler` instance.
cls: The model-like class.
Returns:
JsonSchemaValue: The updated JSON schema.
"""
from ..main import BaseModel
json_schema = handler(schema_or_field)
original_schema = handler.resolve_ref_schema(json_schema)
# Preserve the fact that definitions schemas should never have sibling keys:
if '$ref' in original_schema:
ref = original_schema['$ref']
original_schema.clear()
original_schema['allOf'] = [{'$ref': ref}]
if 'title' not in original_schema:
original_schema['title'] = cls.__name__
# BaseModel; don't use cls.__doc__ as it will contain the verbose class signature by default
docstring = None if cls is BaseModel else cls.__doc__
if docstring and 'description' not in original_schema:
original_schema['description'] = inspect.cleandoc(docstring)
return json_schema
JsonEncoders = Dict[Type[Any], JsonEncoder]
def _add_custom_serialization_from_json_encoders(
json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
) -> CoreSchema:
"""Iterate over the json_encoders and add the first matching encoder to the schema.
Args:
json_encoders: A dictionary of types and their encoder functions.
tp: The type to check for a matching encoder.
schema: The schema to add the encoder to.
"""
if not json_encoders:
return schema
if 'serialization' in schema:
return schema
# Check the class type and its superclasses for a matching encoder
# Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
# if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]):
encoder = json_encoders.get(base)
if encoder is None:
continue
warnings.warn(
f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
PydanticDeprecatedSince20,
)
# TODO: in theory we should check that the schema accepts a serialization key
schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json')
return schema
return schema
TypesNamespace = Union[Dict[str, Any], None]
class TypesNamespaceStack:
"""A stack of types namespaces."""
def __init__(self, types_namespace: TypesNamespace):
self._types_namespace_stack: list[TypesNamespace] = [types_namespace]
@property
def tail(self) -> TypesNamespace:
return self._types_namespace_stack[-1]
@contextmanager
def push(self, for_type: type[Any]):
types_namespace = {**_typing_extra.get_cls_types_namespace(for_type), **(self.tail or {})}
self._types_namespace_stack.append(types_namespace)
try:
yield
finally:
self._types_namespace_stack.pop()
def _get_first_non_null(a: Any, b: Any) -> Any:
"""Return the first argument if it is not None, otherwise return the second argument.
Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''.
This function will return serialization_alias, which is the first argument, even though it is an empty string.
"""
return a if a is not None else b
class GenerateSchema:
"""Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
__slots__ = (
'_config_wrapper_stack',
'_types_namespace_stack',
'_typevars_map',
'field_name_stack',
'model_type_stack',
'defs',
)
def __init__(
self,
config_wrapper: ConfigWrapper,
types_namespace: dict[str, Any] | None,
typevars_map: dict[Any, Any] | None = None,
) -> None:
# we need a stack for recursing into child models
self._config_wrapper_stack = ConfigWrapperStack(config_wrapper)
self._types_namespace_stack = TypesNamespaceStack(types_namespace)
self._typevars_map = typevars_map
self.field_name_stack = _FieldNameStack()
self.model_type_stack = _ModelTypeStack()
self.defs = _Definitions()
@classmethod
def __from_parent(
cls,
config_wrapper_stack: ConfigWrapperStack,
types_namespace_stack: TypesNamespaceStack,
model_type_stack: _ModelTypeStack,
typevars_map: dict[Any, Any] | None,
defs: _Definitions,
) -> GenerateSchema:
obj = cls.__new__(cls)
obj._config_wrapper_stack = config_wrapper_stack
obj._types_namespace_stack = types_namespace_stack
obj.model_type_stack = model_type_stack
obj._typevars_map = typevars_map
obj.field_name_stack = _FieldNameStack()
obj.defs = defs
return obj
@property
def _config_wrapper(self) -> ConfigWrapper:
return self._config_wrapper_stack.tail
@property
def _types_namespace(self) -> dict[str, Any] | None:
return self._types_namespace_stack.tail
@property
def _current_generate_schema(self) -> GenerateSchema:
cls = self._config_wrapper.schema_generator or GenerateSchema
return cls.__from_parent(
self._config_wrapper_stack,
self._types_namespace_stack,
self.model_type_stack,
self._typevars_map,
self.defs,
)
@property
def _arbitrary_types(self) -> bool:
return self._config_wrapper.arbitrary_types_allowed
def str_schema(self) -> CoreSchema:
"""Generate a CoreSchema for `str`"""
return core_schema.str_schema()
# the following methods can be overridden but should be considered
# unstable / private APIs
def _list_schema(self, tp: Any, items_type: Any) -> CoreSchema:
return core_schema.list_schema(self.generate_schema(items_type))
def _dict_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema:
return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type))
def _set_schema(self, tp: Any, items_type: Any) -> CoreSchema:
return core_schema.set_schema(self.generate_schema(items_type))
def _frozenset_schema(self, tp: Any, items_type: Any) -> CoreSchema:
return core_schema.frozenset_schema(self.generate_schema(items_type))
def _arbitrary_type_schema(self, tp: Any) -> CoreSchema:
if not isinstance(tp, type):
warn(
f'{tp!r} is not a Python type (it may be an instance of an object),'
' Pydantic will allow any object with no validation since we cannot even'
' enforce that the input is an instance of the given type.'
' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
UserWarning,
)
return core_schema.any_schema()
return core_schema.is_instance_schema(tp)
def _unknown_type_schema(self, obj: Any) -> CoreSchema:
raise PydanticSchemaGenerationError(
f'Unable to generate pydantic-core schema for {obj!r}. '
'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
'\n\nIf you got this error by calling handler(<some type>) within'
' `__get_pydantic_core_schema__` then you likely need to call'
' `handler.generate_schema(<some type>)` since we do not call'
' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
)
def _apply_discriminator_to_union(
self, schema: CoreSchema, discriminator: str | Discriminator | None
) -> CoreSchema:
if discriminator is None:
return schema
try:
return _discriminated_union.apply_discriminator(
schema,
discriminator,
)
except _discriminated_union.MissingDefinitionForUnionRef:
# defer until defs are resolved
_discriminated_union.set_discriminator_in_metadata(
schema,
discriminator,
)
return schema
class CollectedInvalid(Exception):
pass
def clean_schema(self, schema: CoreSchema) -> CoreSchema:
schema = self.collect_definitions(schema)
schema = simplify_schema_references(schema)
schema = _discriminated_union.apply_discriminators(schema)
if collect_invalid_schemas(schema):
raise self.CollectedInvalid()
schema = validate_core_schema(schema)
return schema
def collect_definitions(self, schema: CoreSchema) -> CoreSchema:
ref = cast('str | None', schema.get('ref', None))
if ref:
self.defs.definitions[ref] = schema
if 'ref' in schema:
schema = core_schema.definition_reference_schema(schema['ref'])
return core_schema.definitions_schema(
schema,
list(self.defs.definitions.values()),
)
def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None:
metadata = CoreMetadataHandler(metadata_schema).metadata
pydantic_js_functions = metadata.setdefault('pydantic_js_functions', [])
# because of how we generate core schemas for nested generic models
# we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
# this check may fail to catch duplicates if the function is a `functools.partial`
# or something like that
# but if it does it'll fail by inserting the duplicate
if js_function not in pydantic_js_functions:
pydantic_js_functions.append(js_function)
def generate_schema(
self,
obj: Any,
from_dunder_get_core_schema: bool = True,
) -> core_schema.CoreSchema:
"""Generate core schema.
Args:
obj: The object to generate core schema for.
from_dunder_get_core_schema: Whether to generate schema from either the
`__get_pydantic_core_schema__` function or `__pydantic_core_schema__` property.
Returns:
The generated core schema.
Raises:
PydanticUndefinedAnnotation:
If it is not possible to evaluate forward reference.
PydanticSchemaGenerationError:
If it is not possible to generate pydantic-core schema.
TypeError:
- If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
- If V1 style validator with `each_item=True` applied on a wrong field.
PydanticUserError:
- If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
- If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
"""
schema: CoreSchema | None = None
if from_dunder_get_core_schema:
from_property = self._generate_schema_from_property(obj, obj)
if from_property is not None:
schema = from_property
if schema is None:
schema = self._generate_schema_inner(obj)
metadata_js_function = _extract_get_pydantic_json_schema(obj, schema)
if metadata_js_function is not None:
metadata_schema = resolve_original_schema(schema, self.defs.definitions)
if metadata_schema:
self._add_js_function(metadata_schema, metadata_js_function)
schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema)
return schema
def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema:
"""Generate schema for a Pydantic model."""
with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema):
if maybe_schema is not None:
return maybe_schema
fields = cls.model_fields
decorators = cls.__pydantic_decorators__
computed_fields = decorators.computed_fields
check_decorator_fields_exist(
chain(
decorators.field_validators.values(),
decorators.field_serializers.values(),
decorators.validators.values(),
),
{*fields.keys(), *computed_fields.keys()},
)
config_wrapper = ConfigWrapper(cls.model_config, check=False)
core_config = config_wrapper.core_config(cls)
metadata = build_metadata_dict(js_functions=[partial(modify_model_json_schema, cls=cls)])
model_validators = decorators.model_validators.values()
extras_schema = None
if core_config.get('extra_fields_behavior') == 'allow':
assert cls.__mro__[0] is cls
assert cls.__mro__[-1] is object
for candidate_cls in cls.__mro__[:-1]:
extras_annotation = candidate_cls.__annotations__.get('__pydantic_extra__', None)
if extras_annotation is not None:
if isinstance(extras_annotation, str):
extras_annotation = _typing_extra.eval_type_backport(
_typing_extra._make_forward_ref(extras_annotation, is_argument=False, is_class=True),
self._types_namespace,
)
tp = get_origin(extras_annotation)
if tp not in (Dict, dict):
raise PydanticSchemaGenerationError(
'The type annotation for `__pydantic_extra__` must be `Dict[str, ...]`'
)
extra_items_type = self._get_args_resolving_forward_refs(
extras_annotation,
required=True,
)[1]
if extra_items_type is not Any:
extras_schema = self.generate_schema(extra_items_type)
break
with self._config_wrapper_stack.push(config_wrapper), self._types_namespace_stack.push(cls):
self = self._current_generate_schema
if cls.__pydantic_root_model__:
root_field = self._common_field_schema('root', fields['root'], decorators)
inner_schema = root_field['schema']
inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
model_schema = core_schema.model_schema(
cls,
inner_schema,
custom_init=getattr(cls, '__pydantic_custom_init__', None),
root_model=True,
post_init=getattr(cls, '__pydantic_post_init__', None),
config=core_config,
ref=model_ref,
metadata=metadata,
)
else:
fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema(
{k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
computed_fields=[
self._computed_field_schema(d, decorators.field_serializers)
for d in computed_fields.values()
],
extras_schema=extras_schema,
model_name=cls.__name__,
)
inner_schema = apply_validators(fields_schema, decorators.root_validators.values(), None)
new_inner_schema = define_expected_missing_refs(inner_schema, recursively_defined_type_refs())
if new_inner_schema is not None:
inner_schema = new_inner_schema
inner_schema = apply_model_validators(inner_schema, model_validators, 'inner')
model_schema = core_schema.model_schema(
cls,
inner_schema,
custom_init=getattr(cls, '__pydantic_custom_init__', None),
root_model=False,
post_init=getattr(cls, '__pydantic_post_init__', None),
config=core_config,
ref=model_ref,
metadata=metadata,
)
schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values())
schema = apply_model_validators(schema, model_validators, 'outer')
self.defs.definitions[model_ref] = schema
return core_schema.definition_reference_schema(model_ref)
def _unpack_refs_defs(self, schema: CoreSchema) -> CoreSchema:
"""Unpack all 'definitions' schemas into `GenerateSchema.defs.definitions`
and return the inner schema.
"""
def get_ref(s: CoreSchema) -> str:
return s['ref'] # type: ignore
if schema['type'] == 'definitions':
self.defs.definitions.update({get_ref(s): s for s in schema['definitions']})
schema = schema['schema']
return schema
def _generate_schema_from_property(self, obj: Any, source: Any) -> core_schema.CoreSchema | None:
"""Try to generate schema from either the `__get_pydantic_core_schema__` function or
`__pydantic_core_schema__` property.
Note: `__get_pydantic_core_schema__` takes priority so it can
decide whether to use a `__pydantic_core_schema__` attribute, or generate a fresh schema.
"""
# avoid calling `__get_pydantic_core_schema__` if we've already visited this object
if is_self_type(obj):
obj = self.model_type_stack.get()
with self.defs.get_schema_or_ref(obj) as (_, maybe_schema):
if maybe_schema is not None:
return maybe_schema
if obj is source:
ref_mode = 'unpack'
else:
ref_mode = 'to-def'
schema: CoreSchema
get_schema = getattr(obj, '__get_pydantic_core_schema__', None)
if get_schema is None:
validators = getattr(obj, '__get_validators__', None)
if validators is None:
return None
warn(
'`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
PydanticDeprecatedSince20,
)
schema = core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()])
else:
if len(inspect.signature(get_schema).parameters) == 1:
# (source) -> CoreSchema
schema = get_schema(source)
else:
schema = get_schema(
source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
)
schema = self._unpack_refs_defs(schema)
if is_function_with_inner_schema(schema):
ref = schema['schema'].pop('ref', None) # pyright: ignore[reportGeneralTypeIssues]
if ref:
schema['ref'] = ref
else:
ref = get_ref(schema)
if ref:
self.defs.definitions[ref] = schema
return core_schema.definition_reference_schema(ref)
return schema
def _resolve_forward_ref(self, obj: Any) -> Any:
# we assume that types_namespace has the target of forward references in its scope,
# but this could fail, for example, if calling Validator on an imported type which contains
# forward references to other types only defined in the module from which it was imported
# `Validator(SomeImportedTypeAliasWithAForwardReference)`
# or the equivalent for BaseModel
# class Model(BaseModel):
# x: SomeImportedTypeAliasWithAForwardReference
try:
obj = _typing_extra.eval_type_backport(obj, globalns=self._types_namespace)
except NameError as e:
raise PydanticUndefinedAnnotation.from_name_error(e) from e
# if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
if isinstance(obj, ForwardRef):
raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}')
if self._typevars_map:
obj = replace_types(obj, self._typevars_map)
return obj
@overload
def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]:
...
@overload
def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None:
...
def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None:
args = get_args(obj)
if args:
args = tuple([self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args])
elif required: # pragma: no cover
raise TypeError(f'Expected {obj} to have generic parameters but it had none')
return args
def _get_first_arg_or_any(self, obj: Any) -> Any:
args = self._get_args_resolving_forward_refs(obj)
if not args:
return Any
return args[0]
def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]:
args = self._get_args_resolving_forward_refs(obj)
if not args:
return (Any, Any)
if len(args) < 2:
origin = get_origin(obj)
raise TypeError(f'Expected two type arguments for {origin}, got 1')
return args[0], args[1]
def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema:
if isinstance(obj, _AnnotatedType):
return self._annotated_schema(obj)
if isinstance(obj, dict):
# we assume this is already a valid schema
return obj # type: ignore[return-value]
if isinstance(obj, str):
obj = ForwardRef(obj)
if isinstance(obj, ForwardRef):
return self.generate_schema(self._resolve_forward_ref(obj))
from ..main import BaseModel
if lenient_issubclass(obj, BaseModel):
with self.model_type_stack.push(obj):
return self._model_schema(obj)
if isinstance(obj, PydanticRecursiveRef):
return core_schema.definition_reference_schema(schema_ref=obj.type_ref)
return self.match_type(obj)
def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
"""Main mapping of types to schemas.
The general structure is a series of if statements starting with the simple cases
(non-generic primitive types) and then handling generics and other more complex cases.
Each case either generates a schema directly, calls into a public user-overridable method
(like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
The idea is that we'll evolve this into adding more and more user facing methods over time
as they get requested and we figure out what the right API for them is.
"""
if obj is str:
return self.str_schema()
elif obj is bytes:
return core_schema.bytes_schema()
elif obj is int:
return core_schema.int_schema()
elif obj is float:
return core_schema.float_schema()
elif obj is bool:
return core_schema.bool_schema()
elif obj is Any or obj is object:
return core_schema.any_schema()
elif obj is None or obj is _typing_extra.NoneType:
return core_schema.none_schema()
elif obj in TUPLE_TYPES:
return self._tuple_schema(obj)
elif obj in LIST_TYPES:
return self._list_schema(obj, self._get_first_arg_or_any(obj))
elif obj in SET_TYPES:
return self._set_schema(obj, self._get_first_arg_or_any(obj))
elif obj in FROZEN_SET_TYPES:
return self._frozenset_schema(obj, self._get_first_arg_or_any(obj))
elif obj in DICT_TYPES:
return self._dict_schema(obj, *self._get_first_two_args_or_any(obj))
elif isinstance(obj, TypeAliasType):
return self._type_alias_type_schema(obj)
elif obj == type:
return self._type_schema()
elif _typing_extra.is_callable_type(obj):
return core_schema.callable_schema()
elif _typing_extra.is_literal_type(obj):
return self._literal_schema(obj)
elif is_typeddict(obj):
return self._typed_dict_schema(obj, None)
elif _typing_extra.is_namedtuple(obj):
return self._namedtuple_schema(obj, None)
elif _typing_extra.is_new_type(obj):
# NewType, can't use isinstance because it fails <3.10
return self.generate_schema(obj.__supertype__)
elif obj == re.Pattern:
return self._pattern_schema(obj)
elif obj is collections.abc.Hashable or obj is typing.Hashable:
return self._hashable_schema()
elif isinstance(obj, typing.TypeVar):
return self._unsubstituted_typevar_schema(obj)
elif is_finalvar(obj):
if obj is Final:
return core_schema.any_schema()
return self.generate_schema(
self._get_first_arg_or_any(obj),
)
elif isinstance(obj, (FunctionType, LambdaType, MethodType, partial)):
return self._callable_schema(obj)
elif inspect.isclass(obj) and issubclass(obj, Enum):
from ._std_types_schema import get_enum_core_schema
return get_enum_core_schema(obj, self._config_wrapper.config_dict)
if _typing_extra.is_dataclass(obj):
return self._dataclass_schema(obj, None)
res = self._get_prepare_pydantic_annotations_for_known_type(obj, ())
if res is not None:
source_type, annotations = res
return self._apply_annotations(source_type, annotations)
origin = get_origin(obj)
if origin is not None:
return self._match_generic_type(obj, origin)
if self._arbitrary_types:
return self._arbitrary_type_schema(obj)
return self._unknown_type_schema(obj)
def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901
if isinstance(origin, TypeAliasType):
return self._type_alias_type_schema(obj)
# Need to handle generic dataclasses before looking for the schema properties because attribute accesses
# on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
# As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
# to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
if _typing_extra.is_dataclass(origin):
return self._dataclass_schema(obj, origin)
if _typing_extra.is_namedtuple(origin):
return self._namedtuple_schema(obj, origin)
from_property = self._generate_schema_from_property(origin, obj)
if from_property is not None:
return from_property
if _typing_extra.origin_is_union(origin):
return self._union_schema(obj)
elif origin in TUPLE_TYPES:
return self._tuple_schema(obj)
elif origin in LIST_TYPES:
return self._list_schema(obj, self._get_first_arg_or_any(obj))
elif origin in SET_TYPES:
return self._set_schema(obj, self._get_first_arg_or_any(obj))
elif origin in FROZEN_SET_TYPES:
return self._frozenset_schema(obj, self._get_first_arg_or_any(obj))
elif origin in DICT_TYPES:
return self._dict_schema(obj, *self._get_first_two_args_or_any(obj))
elif is_typeddict(origin):
return self._typed_dict_schema(obj, origin)
elif origin in (typing.Type, type):
return self._subclass_schema(obj)
elif origin in {typing.Sequence, collections.abc.Sequence}:
return self._sequence_schema(obj)
elif origin in {typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator}:
return self._iterable_schema(obj)
elif origin in (re.Pattern, typing.Pattern):
return self._pattern_schema(obj)
if self._arbitrary_types:
return self._arbitrary_type_schema(origin)
return self._unknown_type_schema(obj)
def _generate_td_field_schema(
self,
name: str,
field_info: FieldInfo,
decorators: DecoratorInfos,
*,
required: bool = True,
) -> core_schema.TypedDictField:
"""Prepare a TypedDictField to represent a model or typeddict field."""
common_field = self._common_field_schema(name, field_info, decorators)
return core_schema.typed_dict_field(
common_field['schema'],
required=False if not field_info.is_required() else required,
serialization_exclude=common_field['serialization_exclude'],
validation_alias=common_field['validation_alias'],
serialization_alias=common_field['serialization_alias'],
metadata=common_field['metadata'],
)
def _generate_md_field_schema(
self,
name: str,
field_info: FieldInfo,
decorators: DecoratorInfos,
) -> core_schema.ModelField:
"""Prepare a ModelField to represent a model field."""
common_field = self._common_field_schema(name, field_info, decorators)
return core_schema.model_field(
common_field['schema'],
serialization_exclude=common_field['serialization_exclude'],
validation_alias=common_field['validation_alias'],
serialization_alias=common_field['serialization_alias'],
frozen=common_field['frozen'],
metadata=common_field['metadata'],
)
def _generate_dc_field_schema(
self,
name: str,
field_info: FieldInfo,
decorators: DecoratorInfos,
) -> core_schema.DataclassField:
"""Prepare a DataclassField to represent the parameter/field, of a dataclass."""
common_field = self._common_field_schema(name, field_info, decorators)
return core_schema.dataclass_field(
name,
common_field['schema'],
init=field_info.init,
init_only=field_info.init_var or None,
kw_only=None if field_info.kw_only else False,
serialization_exclude=common_field['serialization_exclude'],
validation_alias=common_field['validation_alias'],
serialization_alias=common_field['serialization_alias'],
frozen=common_field['frozen'],
metadata=common_field['metadata'],
)
@staticmethod
def _apply_alias_generator_to_field_info(
alias_generator: Callable[[str], str] | AliasGenerator, field_info: FieldInfo, field_name: str
) -> None:
"""Apply an alias_generator to aliases on a FieldInfo instance if appropriate.
Args:
alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
field_info: The FieldInfo instance to which the alias_generator is (maybe) applied.
field_name: The name of the field from which to generate the alias.
"""
# Apply an alias_generator if
# 1. An alias is not specified
# 2. An alias is specified, but the priority is <= 1
if (
field_info.alias_priority is None
or field_info.alias_priority <= 1
or field_info.alias is None
or field_info.validation_alias is None
or field_info.serialization_alias is None
):
alias, validation_alias, serialization_alias = None, None, None
if isinstance(alias_generator, AliasGenerator):
alias, validation_alias, serialization_alias = alias_generator.generate_aliases(field_name)
elif isinstance(alias_generator, Callable):
alias = alias_generator(field_name)
if not isinstance(alias, str):
raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
# if priority is not set, we set to 1
# which supports the case where the alias_generator from a child class is used
# to generate an alias for a field in a parent class
if field_info.alias_priority is None or field_info.alias_priority <= 1:
field_info.alias_priority = 1
# if the priority is 1, then we set the aliases to the generated alias
if field_info.alias_priority == 1:
field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
field_info.validation_alias = _get_first_non_null(validation_alias, alias)
field_info.alias = alias
# if any of the aliases are not set, then we set them to the corresponding generated alias
if field_info.alias is None:
field_info.alias = alias
if field_info.serialization_alias is None:
field_info.serialization_alias = _get_first_non_null(serialization_alias, alias)
if field_info.validation_alias is None:
field_info.validation_alias = _get_first_non_null(validation_alias, alias)
@staticmethod
def _apply_alias_generator_to_computed_field_info(
alias_generator: Callable[[str], str] | AliasGenerator,
computed_field_info: ComputedFieldInfo,
computed_field_name: str,
):
"""Apply an alias_generator to alias on a ComputedFieldInfo instance if appropriate.
Args:
alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
computed_field_info: The ComputedFieldInfo instance to which the alias_generator is (maybe) applied.
computed_field_name: The name of the computed field from which to generate the alias.
"""