Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add discriminated union support (v2) #5051

Merged
merged 18 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 140 additions & 1 deletion pydantic/_internal/_generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import re
import typing
import warnings
from enum import Enum
from typing import TYPE_CHECKING, Any

from annotated_types import BaseMetadata, GroupedMetadata
from pydantic_core import SchemaError, SchemaValidator, core_schema
from typing_extensions import Annotated, Literal, get_args, get_origin, is_typeddict

from ..errors import PydanticSchemaGenerationError
from ..errors import PydanticSchemaGenerationError, PydanticUserError
from ..fields import FieldInfo
from ..json_schema import JsonSchemaMetadata, JsonSchemaValue
from . import _fields, _typing_extra
Expand Down Expand Up @@ -245,6 +246,8 @@ def generate_field_schema(
"""
assert field_info.annotation is not None, 'field_info.annotation should not be None when generating a schema'
schema = self.generate_schema(field_info.annotation)
if field_info.discriminator is not None:
schema = apply_discriminator(schema, field_info.discriminator)
schema = apply_annotations(schema, field_info.metadata)

if not field_info.is_required():
Expand Down Expand Up @@ -682,6 +685,8 @@ def apply_single_annotation(schema: core_schema.CoreSchema, metadata: Any) -> co
return apply_annotations(schema, metadata)
elif isinstance(metadata, FieldInfo):
schema = apply_annotations(schema, metadata.metadata)
if metadata.discriminator is not None:
schema = apply_discriminator(schema, metadata.discriminator)
# TODO setting a default here needs to be tested
return wrap_default(metadata, schema)

Expand Down Expand Up @@ -766,3 +771,137 @@ def get_model_self_schema(cls: type[BaseModel]) -> core_schema.ModelSchema:
core_schema.definition_reference_schema(model_ref),
metadata=build_metadata_dict(js_metadata=model_js_metadata),
)


def apply_discriminator(schema: core_schema.CoreSchema, discriminator: str) -> core_schema.CoreSchema:
# Eventually: should add support for other discriminator types, and explicitly specified choices

# If the field was wrapped with a nullable schema, we preserve that here and re-wrap the final result at the end.
nullable_schema: core_schema.NullableSchema | None = None
if schema['type'] == 'nullable':
nullable_schema = schema
schema = schema['schema']

if schema['type'] != 'union':
raise TypeError('`discriminator` can only be used with `Union` type with more than one variant')

union_choices = schema['choices']
if len(union_choices) < 2:
raise TypeError('`discriminator` can only be used with `Union` type with more than one variant')
dmontagu marked this conversation as resolved.
Show resolved Hide resolved

# `aliases` is meant to behave like a set, but uses a dict to ensure order is preserved
aliases = {discriminator: None}
tagged_union_choices = _build_tagged_union_choices(union_choices, discriminator, aliases)

if len(aliases) > 1:
schema_discriminator: str | list[list[str | int]] = [[alias] for alias in aliases]
dmontagu marked this conversation as resolved.
Show resolved Hide resolved
else:
schema_discriminator = discriminator

discriminated_schema = core_schema.tagged_union_schema(
choices=tagged_union_choices,
discriminator=schema_discriminator,
custom_error_type=schema.get('custom_error_type'),
custom_error_message=schema.get('custom_error_message'),
custom_error_context=schema.get('custom_error_context'),
strict=False,
ref=schema.get('ref'),
metadata=schema.get('metadata'),
serialization=schema.get('serialization'),
)

if nullable_schema is None:
return discriminated_schema
else:
return core_schema.nullable_schema(
discriminated_schema,
strict=nullable_schema.get('strict'),
ref=nullable_schema.get('ref'),
metadata=nullable_schema.get('metadata'),
serialization=nullable_schema.get('serialization'),
)


def _build_tagged_union_choices(
choices: typing.Iterable[core_schema.CoreSchema], discriminator: str, aliases: dict[str, None]
) -> dict[str | int, str | int | core_schema.CoreSchema]:
choices = list(choices)[::-1]
dmontagu marked this conversation as resolved.
Show resolved Hide resolved

tagged_union_choices: dict[str | int, str | int | core_schema.CoreSchema] = {}
while choices:
choice = choices.pop()
if choice['type'] == 'union':
choices.extend(choice['choices'])
continue
adriangb marked this conversation as resolved.
Show resolved Hide resolved

discriminator_values = _get_discriminator_values_for_choice(choice, discriminator, aliases)
if discriminator_values:

def _handle_discriminator_value(value: Any, choice_override: int | str | None = None) -> None:
dmontagu marked this conversation as resolved.
Show resolved Hide resolved
# This function accepts choice_override so that we can produce a schema that doesn't copy choices
if not isinstance(value, (int, str, Enum)):
raise ValueError(f'Invalid discriminator value {value!r}; must be a string, int, or Enum')
if isinstance(value, Enum):
value = value.value
if value in tagged_union_choices:
# Need to walk the choices dict until we get to a "real" choice
existing_choice = tagged_union_choices[value]
while isinstance(existing_choice, (str, int)):
existing_choice = tagged_union_choices[existing_choice]
if existing_choice != choice:
raise ValueError(
f'Value {value!r} for discriminator {discriminator!r} mapped to multiple choices'
)
else:
tagged_union_choices[value] = choice if choice_override is None else choice_override

primary_value = discriminator_values[0]
_handle_discriminator_value(primary_value)
dmontagu marked this conversation as resolved.
Show resolved Hide resolved
for other_value in discriminator_values[1:]:
_handle_discriminator_value(other_value, primary_value)
dmontagu marked this conversation as resolved.
Show resolved Hide resolved
return tagged_union_choices


def _get_discriminator_values_for_choice(
choice: core_schema.CoreSchema, discriminator: str, aliases: dict[str, None]
) -> list[Any]:
if choice['type'] == 'tagged-union':
values: list[Any] = []
for inner_choice in choice['choices'].values():
if isinstance(inner_choice, (str, int)):
continue
values.extend(_get_discriminator_values_for_choice(inner_choice, discriminator, aliases))
return values
dmontagu marked this conversation as resolved.
Show resolved Hide resolved

elif choice['type'] == 'model':
model_name = choice['cls'].__name__
# Unpack ModelSchema into the inner TypedDictSchema
inner_schema = choice['schema']
if inner_schema['type'] == 'definitions':
inner_schema = inner_schema['schema'] # unpack a definitions schema
if inner_schema['type'] == 'typed-dict':
typed_dict_schema = inner_schema
if discriminator not in typed_dict_schema['fields']:
raise PydanticUserError(f'Model {model_name!r} needs a discriminator field for key {discriminator!r}')
discriminator_field = typed_dict_schema['fields'][discriminator]

alias = discriminator_field.get('validation_alias', discriminator)
aliases[alias] = None

discriminator_schema = discriminator_field['schema']
if discriminator_schema['type'] == 'default':
# Ignore a wrapping default schema if present
discriminator_schema = discriminator_schema['schema']
if discriminator_schema['type'] != 'literal':
raise PydanticUserError(f'Field {discriminator!r} of model {model_name!r} needs to be a `Literal`')
return discriminator_schema['expected']
else:
raise TypeError(
f"Expected a CoreSchema with type='typed-dict' for model {model_name!r}, "
f"got type={inner_schema['type']!r}"
)

else:
raise TypeError(
f"{choice['type']!r} is not a valid discriminated union variant; " "should be a `BaseModel` or `dataclass`"
)
28 changes: 26 additions & 2 deletions pydantic/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
from dataclasses import is_dataclass
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Counter, Dict, NewType, Sequence, cast
from typing import TYPE_CHECKING, Any, Callable, Counter, Dict, Iterable, List, NewType, Sequence, Tuple, Union, cast

from pydantic_core import CoreSchema, CoreSchemaType, core_schema
from pydantic_core.core_schema import TypedDictField
Expand All @@ -18,6 +18,8 @@
from .main import BaseModel

JsonSchemaValue = Dict[str, Any]
Json = Union[Dict[str, Any], List[Any], str, int, float, bool, None]
HashableJson = Union[Tuple[Tuple[str, Any], ...], Tuple[Any, ...], str, int, float, bool, None]
dmontagu marked this conversation as resolved.
Show resolved Hide resolved


# ##### JSON Schema Metadata Manipulation #####
Expand Down Expand Up @@ -497,7 +499,16 @@ def tagged_union_schema(self, schema: core_schema.TaggedUnionSchema) -> JsonSche
generated[str(k)] = self.generate_inner(v).copy()
except PydanticInvalidForJsonSchema:
pass
json_schema: JsonSchemaValue = {'oneOf': list(generated.values())}

# Populate the schema with any "indirect" references
for k, v in schema['choices'].items():
if isinstance(v, (str, int)):
while isinstance(schema['choices'][v], (str, int)):
v = schema['choices'][v]
if str(v) in generated: # PydanticInvalidForJsonSchema may have been raised above
generated[str(k)] = generated[str(v)]

json_schema: JsonSchemaValue = {'oneOf': _deduplicate_schemas(generated.values())}

# This reflects the v1 behavior, but we may want to only include the discriminator based on dialect / etc.
if 'discriminator' in schema and isinstance(schema['discriminator'], str):
Expand Down Expand Up @@ -1062,3 +1073,16 @@ def model_schema(
) -> dict[str, Any]:
model = _utils.get_model(model)
return model.model_json_schema(by_alias=by_alias, ref_template=ref_template, schema_generator=schema_generator)


def _deduplicate_schemas(schemas: Iterable[Json]) -> list[Json]:
return list({_make_json_hashable(schema): schema for schema in schemas}.values())


def _make_json_hashable(value: Json) -> HashableJson:
if isinstance(value, dict):
return tuple(sorted((k, _make_json_hashable(v)) for k, v in value.items()))
elif isinstance(value, list):
return tuple(_make_json_hashable(v) for v in value)
else:
return value
Loading