Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 157 additions & 1 deletion openapi_core/validation/schemas/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
from openapi_core.validation.schemas.exceptions import InvalidSchemaValue
from openapi_core.validation.schemas.exceptions import ValidateError

# OpenAPI ``format`` values whose *type: string* schemas are permitted to
# carry a raw ``bytes`` payload end-to-end -- ``binary`` for opaque file
# bodies (multipart/form-data, application/octet-stream) and ``byte`` for
# base64 strings that callers may still hand in as ``bytes``.
_BINARY_STRING_FORMATS = frozenset({"binary", "byte"})

if TYPE_CHECKING:
from openapi_core.casting.schemas.casters import SchemaCaster

Expand All @@ -41,12 +47,156 @@ def __contains__(self, schema_format: str) -> bool:
return schema_format in self.validator.format_checker.checkers

def validate(self, value: Any) -> None:
errors_iter = self.validator.iter_errors(value)
# OpenAPI allows ``bytes`` to flow through ``string`` schemas
# whose ``format`` is ``binary`` or ``byte`` (file uploads,
# base64-encoded blobs). jsonschema only validates ``string``
# against text, so we present it a decoded view while keeping
# the original ``value`` for downstream unmarshalling and error
# reporting.
normalized = self._normalize_for_validation(value)
errors_iter = self.validator.iter_errors(normalized)
errors = tuple(errors_iter)
if errors:
schema_type = (self.schema / "type").read_str_or_list("any")
raise InvalidSchemaValue(value, schema_type, schema_errors=errors)

@staticmethod
def _decode_binary_value(value: bytes) -> str:
"""Decode raw ``bytes`` into the text view jsonschema expects.

``utf-8`` first because that's what the vast majority of byte
bodies actually are; falling back to ASCII + ``surrogateescape``
guarantees the call never raises for arbitrary binary payloads
(a real file upload may contain any byte sequence).
"""
try:
return value.decode("utf-8")
except UnicodeDecodeError:
return value.decode("ASCII", errors="surrogateescape")

def _accepts_binary_string(self, value: Any) -> bool:
"""True when ``value`` is ``bytes`` and the schema at this
position is a ``string`` whose ``format`` allows raw bytes.
"""
if not isinstance(value, bytes):
return False
schema_format = (self.schema / "format").read_str(None)
if schema_format not in _BINARY_STRING_FORMATS:
return False
schema_types = (self.schema / "type").read_str_or_list(None)
if schema_types is None:
# No declared type: OAS 3.1 lets any value flow; treat the
# binary/byte format as authoritative.
return True
if isinstance(schema_types, str):
return schema_types == "string"
return "string" in schema_types

def _normalize_for_validation(self, value: Any) -> Any:
"""Return a view of ``value`` with ``bytes`` decoded to text
wherever the schema-at-this-position is a binary/byte string.

The original ``value`` is never mutated. Containers are only
copied when a descendant actually changes, so the unchanged
fast path returns ``value`` itself -- callers can use object
identity to detect a no-op.

Recursion is driven by the schema, not by introspecting the
value: a ``dict`` is only descended when the schema declares
``properties``/``additionalProperties``, a ``list`` only when
it declares ``items``, and composition (``oneOf``/``anyOf``/
``allOf``) is descended unconditionally because that's where
a multipart binary branch typically lives.
"""
if self._accepts_binary_string(value):
return self._decode_binary_value(value)

normalized: Any
if isinstance(value, dict):
normalized = self._normalize_mapping_for_validation(value)
elif isinstance(value, list) and "items" in self.schema:
normalized = self._normalize_array_for_validation(value)
else:
normalized = value

# Composition keywords are where the binary branch actually
# lives in real specs (a multipart oneOf with a file branch and
# a non-file branch, for example). We apply each sub-schema's
# normalization in turn -- idempotent because a sub-schema that
# doesn't touch a position returns the same object, and once a
# bytes value has been decoded to ``str`` no other sub-schema
# treats it as binary.
for keyword in ("oneOf", "anyOf", "allOf"):
if keyword not in self.schema:
continue
for subschema in self.schema / keyword:
normalized = self.evolve(subschema)._normalize_for_validation(
normalized
)

return normalized

def _normalize_mapping_for_validation(
self, value: dict[str, Any]
) -> dict[str, Any]:
normalized: dict[str, Any] = value

if "properties" in self.schema:
for prop_name, prop_schema in (self.schema / "properties").items():
if not isinstance(prop_name, str) or prop_name not in value:
continue
prop_validator = self.evolve(prop_schema)
new_value = prop_validator._normalize_for_validation(
value[prop_name]
)
if new_value is value[prop_name]:
continue
if normalized is value:
normalized = dict(value)
normalized[prop_name] = new_value

additional = self.schema.get("additionalProperties", True)
if additional in (True, False):
return normalized

property_names: set[str] = set()
if "properties" in self.schema:
property_names = {
name
for name in (self.schema / "properties").keys()
if isinstance(name, str)
}
additional_validator = self.evolve(
self.schema / "additionalProperties"
)
for prop_name, prop_value in value.items():
if prop_name in property_names:
continue
new_value = additional_validator._normalize_for_validation(
prop_value
)
if new_value is prop_value:
continue
if normalized is value:
normalized = dict(value)
normalized[prop_name] = new_value

return normalized

def _normalize_array_for_validation(self, value: list[Any]) -> list[Any]:
items_validator = self.evolve(self.schema / "items")
normalized: Optional[list[Any]] = None
for idx, item in enumerate(value):
new_item = items_validator._normalize_for_validation(item)
if new_item is item:
continue
if normalized is None:
normalized = list(value)
normalized[idx] = new_item
if normalized is None:
return value
return normalized

# Cache the recursive "does this schema benefit from a ValidationState?"
# check, keyed on the SchemaPath. SchemaPath is hashed by content, so
# two SchemaPaths pointing at the same spec location share a cache
Expand Down Expand Up @@ -267,6 +417,12 @@ def get_primitive_type(self, value: Any) -> Optional[str]:
schema_types = sorted(self.validator.TYPE_CHECKER._type_checkers)
assert isinstance(schema_types, list)
for schema_type in schema_types:
if schema_type == "string" and self._accepts_binary_string(value):
# Bytes value, binary/byte format, ``string`` is in the
# declared type list: treat it as string without asking
# jsonschema's type checker (which doesn't know about
# OpenAPI's binary convention).
return "string"
result = self.type_validator(value, type_override=schema_type)
if not result:
continue
Expand Down
159 changes: 151 additions & 8 deletions tests/integration/unmarshalling/test_request_unmarshaller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
from base64 import b64encode
from email.generator import _make_boundary

import pytest

Expand Down Expand Up @@ -469,16 +468,14 @@ def test_request_body_with_object_default(self):
assert result.errors == []
assert result.body == {"tags": []}

@pytest.mark.xfail(
reason=(
"multipart composed-schema branch selection is not binary-aware"
),
strict=True,
)
def test_request_body_multipart_oneof_binary_field(self):
from openapi_core import OpenAPI

boundary = _make_boundary()
# email.generator._make_boundary() returns strings like
# ``===============1234==`` whose ``=`` chars trip the mimetype
# parameter parser. That's a separate bug; here we just want a
# legal boundary that round-trips the binary oneOf branch.
boundary = "openapicoreboundary1234567890"
spec = OpenAPI.from_dict(
{
"openapi": "3.1.0",
Expand Down Expand Up @@ -545,6 +542,152 @@ def test_request_body_multipart_oneof_binary_field(self):
assert result.errors == []
assert result.body == {"file": b"\xff\xfe"}

def test_request_body_multipart_anyof_binary_field(self):
# anyOf with a text-only branch and a binary branch: a posted file
# should match the binary branch (and only the binary branch).
from openapi_core import OpenAPI

boundary = "openapicoreboundary1234567890"
spec = OpenAPI.from_dict(
{
"openapi": "3.1.0",
"info": {"version": "0", "title": "test"},
"paths": {
"/test": {
"post": {
"requestBody": {
"required": True,
"content": {
"multipart/form-data": {
"schema": {
"anyOf": [
{
"type": "object",
"properties": {
"note": {
"type": "string"
}
},
"required": ["note"],
},
{
"type": "object",
"properties": {
"blob": {
"type": "string",
"format": "binary",
}
},
"required": ["blob"],
},
]
}
}
},
},
"responses": {"200": {"description": ""}},
}
}
},
}
)
data = (
(
f"--{boundary}\n"
"Content-Type: application/octet-stream\n"
"MIME-Version: 1.0\n"
'Content-Disposition: form-data; name="blob"\n\n'
).encode("ascii")
+ b"\x00\x01\x02binary\xff"
+ (f"\n--{boundary}--\n").encode("ascii")
)
request = MockRequest(
"http://localhost",
"post",
"/test",
content_type=f"multipart/form-data; boundary={boundary}",
data=data,
)

result = spec.unmarshal_request(request)

assert result.errors == []
assert result.body == {"blob": b"\x00\x01\x02binary\xff"}

def test_request_body_multipart_allof_binary_field(self):
# allOf: every branch must validate. Binary normalization has to
# be visible to all of them.
from openapi_core import OpenAPI

boundary = "openapicoreboundary1234567890"
spec = OpenAPI.from_dict(
{
"openapi": "3.1.0",
"info": {"version": "0", "title": "test"},
"paths": {
"/test": {
"post": {
"requestBody": {
"required": True,
"content": {
"multipart/form-data": {
"schema": {
"allOf": [
{
"type": "object",
"properties": {
"label": {
"type": "string"
}
},
"required": ["label"],
},
{
"type": "object",
"properties": {
"file": {
"type": "string",
"format": "binary",
}
},
"required": ["file"],
},
]
}
}
},
},
"responses": {"200": {"description": ""}},
}
}
},
}
)
data = (
(
f"--{boundary}\n"
'Content-Disposition: form-data; name="label"\n\n'
"report"
f"\n--{boundary}\n"
"Content-Type: application/octet-stream\n"
'Content-Disposition: form-data; name="file"\n\n'
).encode("ascii")
+ b"\xff\xfe"
+ (f"\n--{boundary}--\n").encode("ascii")
)
request = MockRequest(
"http://localhost",
"post",
"/test",
content_type=f"multipart/form-data; boundary={boundary}",
data=data,
)

result = spec.unmarshal_request(request)

assert result.errors == []
assert result.body == {"label": "report", "file": b"\xff\xfe"}

def test_post_pets_validates_request_schema_once(
self, request_unmarshaller
):
Expand Down
12 changes: 0 additions & 12 deletions tests/unit/deserializing/test_media_types_deserializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,12 +657,6 @@ def test_urlencoded_form_with_array_default(self, deserializer_factory):

assert result == {"tags": []}

@pytest.mark.xfail(
reason=(
"multipart composed-schema branch selection is not binary-aware"
),
strict=True,
)
def test_multipart_oneof_binary_field(self, spec, deserializer_factory):
mimetype = "multipart/form-data"
schema_dict = {
Expand Down Expand Up @@ -757,12 +751,6 @@ def test_multipart_oneof_string_field(self, spec, deserializer_factory):
"fieldA": "value",
}

@pytest.mark.xfail(
reason=(
"multipart composed-schema branch selection is not binary-aware"
),
strict=True,
)
def test_multipart_anyof_binary_field(self, spec, deserializer_factory):
mimetype = "multipart/form-data"
schema_dict = {
Expand Down
Loading
Loading