Skip to content

Commit

Permalink
perf: Optimize get_operation_by_id
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Dygalo <dmitry@dygalo.dev>
  • Loading branch information
Stranger6667 committed May 15, 2024
1 parent 4913d6b commit dff97d4
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 22 deletions.
4 changes: 4 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Changelog
- Missing parameters shared under the same path in stateful testing if the path is behind a reference.
- ``KeyError`` instead of ``OperationNotFound`` when the operation ID is not found in Open API 3.1 without path entries.

**Performance**

- Optimize `get_operation_by_id` method performance and reduce memory usage.

.. _v3.28.1:

:version:`3.28.1 <v3.28.0...v3.28.1>` - 2024-05-11
Expand Down
78 changes: 78 additions & 0 deletions src/schemathesis/specs/openapi/_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from ...models import APIOperation


@dataclass
class OperationCacheEntry:
path: str
method: str
# The resolution scope of the operation
scope: str
# Parameters shared among all operations in the path
shared_parameters: list[dict[str, Any]]
# Unresolved operation definition
operation: dict[str, Any]
__slots__ = ("path", "method", "scope", "shared_parameters", "operation")


OperationId = str


@dataclass
class OperationCache:
"""Cache for Open API operations.
This cache contains multiple levels to avoid unnecessary parsing of the schema.
The first level cache contains operation IDs and their metadata. The second level cache contains
initialized operation instances.
The first level is populated eagerly because it is cheap. It is mostly a dict traversal and
a bit of reference resolving. The entries there does not own the data, they are just references to the schema.
The second level is populated lazily because it is more expensive. It requires parsing the schema, its parameters
and some more elaborate reference resolution.
"""

_ids_to_definitions: dict[OperationId, OperationCacheEntry] = field(default_factory=dict)
_ids_to_operations: dict[OperationId, APIOperation] = field(default_factory=dict)

@property
def known_operation_ids(self) -> list[str]:
return list(self._ids_to_definitions)

@property
def has_ids_to_definitions(self) -> bool:
return bool(self._ids_to_definitions)

def insert_definition_by_id(
self,
operation_id: str,
path: str,
method: str,
scope: str,
shared_parameters: list[dict[str, Any]],
operation: dict[str, Any],
) -> None:
"""Insert a new operation definition into cache."""
self._ids_to_definitions[operation_id] = OperationCacheEntry(
path=path, method=method, scope=scope, shared_parameters=shared_parameters, operation=operation
)

def get_definition_by_id(self, operation_id: str) -> OperationCacheEntry:
"""Get an operation definition by its ID."""
# TODO: Avoid KeyError in the future
return self._ids_to_definitions[operation_id]

def insert_operation_by_id(self, operation_id: str, operation: APIOperation) -> None:
"""Insert a new operation into cache."""
self._ids_to_operations[operation_id] = operation

def get_operation_by_id(self, operation_id: str) -> APIOperation | None:
"""Get an operation by its ID."""
return self._ids_to_operations.get(operation_id)
66 changes: 44 additions & 22 deletions src/schemathesis/specs/openapi/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
OpenAPI30Parameter,
OpenAPIParameter,
)
from ._cache import OperationCache
from .references import RECURSION_DEPTH_LIMIT, ConvertingResolver, InliningResolver, resolve_pointer, UNRESOLVABLE
from .security import BaseSecurityProcessor, OpenAPISecurityProcessor, SwaggerSecurityProcessor
from .stateful import create_state_machine
Expand All @@ -91,7 +92,7 @@ class BaseOpenAPISchema(BaseSchema):
links_field: ClassVar[str] = ""
header_required_field: ClassVar[str] = ""
security: ClassVar[BaseSecurityProcessor] = None # type: ignore
_operations_by_id: dict[str, APIOperation] = field(init=False)
_operation_cache: OperationCache = field(default_factory=OperationCache)
_inline_reference_cache: dict[str, Any] = field(default_factory=dict)
# Inline references cache can be populated from multiple threads, therefore we need some synchronisation to avoid
# excessive resolving
Expand Down Expand Up @@ -374,31 +375,52 @@ def get_response_schema(self, definition: dict[str, Any], scope: str) -> tuple[l

def get_operation_by_id(self, operation_id: str) -> APIOperation:
"""Get an `APIOperation` instance by its `operationId`."""
if not hasattr(self, "_operations_by_id"):
self._operations_by_id = dict(self._group_operations_by_id())
cache = self._operation_cache
operation = cache.get_operation_by_id(operation_id)
if operation is not None:
return operation
# Operation has not been accessed yet, need to populate the cache
if not cache.has_ids_to_definitions:
self._populate_operation_id_cache(cache)
try:
return self._operations_by_id[operation_id]
entry = cache.get_definition_by_id(operation_id)
except KeyError as exc:
matches = get_close_matches(operation_id, list(self._operations_by_id))
matches = get_close_matches(operation_id, cache.known_operation_ids)
self._on_missing_operation(operation_id, exc, matches)

def _group_operations_by_id(self) -> Generator[tuple[str, APIOperation], None, None]:
for path, methods in self.raw_schema.get("paths", {}).items():
scope, methods = self._resolve_methods(methods)
common_parameters = self.resolver.resolve_all(methods.get("parameters", []), RECURSION_DEPTH_LIMIT - 8)
for method, definition in methods.items():
if method not in HTTP_METHODS or "operationId" not in definition:
shared_parameters = self.resolver.resolve_all(entry.shared_parameters, RECURSION_DEPTH_LIMIT - 8)
self.resolver.push_scope(entry.scope)
try:
resolved = self.resolver.resolve_all(entry.operation, RECURSION_DEPTH_LIMIT - 8)
finally:
self.resolver.pop_scope()
raw_parameters = itertools.chain(resolved.get("parameters", ()), shared_parameters)
parameters = self.collect_parameters(raw_parameters, resolved)
definition = OperationDefinition(entry.operation, resolved, entry.scope)
initialized = self.make_operation(entry.path, entry.method, parameters, definition)
cache.insert_operation_by_id(operation_id, initialized)
return initialized

def _populate_operation_id_cache(self, cache: OperationCache) -> None:
"""Collect all operation IDs from the schema."""
for path, path_item in self.raw_schema.get("paths", {}).items():
# If the path is behind a reference we have to keep the scope
# The scope is used to resolve nested components later on
if "$ref" in path_item:
scope, path_item = self.resolver.resolve(path_item["$ref"])
else:
scope = self.resolver.resolution_scope
for key, entry in path_item.items():
if key not in HTTP_METHODS:
continue
self.resolver.push_scope(scope)
try:
resolved_definition = self.resolver.resolve_all(definition, RECURSION_DEPTH_LIMIT - 8)
finally:
self.resolver.pop_scope()
parameters = self.collect_parameters(
itertools.chain(resolved_definition.get("parameters", ()), common_parameters), resolved_definition
)
raw_definition = OperationDefinition(methods[method], resolved_definition, scope)
yield resolved_definition["operationId"], self.make_operation(path, method, parameters, raw_definition)
if "operationId" in entry:
cache.insert_definition_by_id(
entry["operationId"],
path=path,
method=key,
scope=scope,
shared_parameters=path_item.get("parameters", []),
operation=entry,
)

def get_operation_by_reference(self, reference: str) -> APIOperation:
"""Get local or external `APIOperation` instance by reference.
Expand Down

0 comments on commit dff97d4

Please sign in to comment.