diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0ca5d0d..5b8ff35 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -25,7 +25,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.11" - name: Run pre-commit uses: pre-commit/action@v3.0.0 @@ -66,7 +66,7 @@ jobs: - name: Run tox uses: lsst-sqre/run-tox@v1 with: - python-version: "3.10" + python-version: "3.11" tox-envs: "docs,docs-linkcheck" # Only attempt documentation uploads for tagged releases and pull @@ -98,5 +98,5 @@ jobs: uses: lsst-sqre/build-and-publish-to-pypi@v1 with: pypi-token: ${{ secrets.PYPI_SQRE_ADMIN }} - python-version: "3.10" + python-version: "3.11" upload: ${{ github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') }} diff --git a/.github/workflows/periodic-ci.yaml b/.github/workflows/periodic-ci.yaml index d68468b..e768c6d 100644 --- a/.github/workflows/periodic-ci.yaml +++ b/.github/workflows/periodic-ci.yaml @@ -39,7 +39,7 @@ jobs: - name: Build docs in tox uses: lsst-sqre/run-tox@v1 with: - python-version: "3.10" + python-version: "3.11" tox-envs: "docs,docs-linkcheck" use-cache: false @@ -55,5 +55,5 @@ jobs: uses: lsst-sqre/build-and-publish-to-pypi@v1 with: pypi-token: "" - python-version: "3.10" + python-version: "3.11" upload: false diff --git a/CHANGELOG.md b/CHANGELOG.md index 689e9c6..ddd32ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Change log +## Unreleased + +### New features + +- New `PydanticSchemaManager` enables you to use [Pydantic](https://pydantic-docs.helpmanual.io/) models as Avro schemas. Like the `RecordNameSchemaManager`, this new manager handles schema registration for you. To serialize a message, you simply supply a Pydantic object. The manager will also deserialize messages into Pydantic objects if the message's schema corresponds to a managed Pydantic model. Overall this feature provides end-to-end type checking of messages from development to production. + ## 0.3.0 (2023-02-23) New features: diff --git a/Makefile b/Makefile index 484b05e..3411e65 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ help: .PHONY: init init: - pip install -e ".[aiohttp,httpx,dev]" - pip install tox pre-commit + pip install -e ".[aiohttp,httpx,pydantic,dev]" + pip install -U tox pre-commit pre-commit install rm -rf .tox diff --git a/docs/dev/development.rst b/docs/dev/development.rst index 9765fd1..6276aad 100644 --- a/docs/dev/development.rst +++ b/docs/dev/development.rst @@ -43,9 +43,6 @@ Pre-commit hooks The pre-commit hooks, which are automatically installed by running the :command:`make init` command on :ref:`set up `, ensure that files are valid and properly formatted. Some pre-commit hooks automatically reformat code: -``seed-isort-config`` - Adds configuration for isort to the :file:`pyproject.toml` file. - ``isort`` Automatically sorts imports in Python modules. @@ -73,13 +70,13 @@ You can also run tox_, which tests the library the same way that the CI workflow .. code-block:: sh - tox + tox run To see a listing of test environments, run: .. code-block:: sh - tox -av + tox list .. _dev-build-docs: @@ -92,7 +89,7 @@ Documentation is built with Sphinx_: .. code-block:: sh - tox -e docs + tox run -e docs The build documentation is located in the :file:`docs/_build/html` directory. @@ -101,22 +98,20 @@ The build documentation is located in the :file:`docs/_build/html` directory. Updating the change log ======================= -Each pull request should update the change log (:file:`CHANGELOG.rst`). +Each pull request should update the change log (:file:`CHANGELOG.md`). Add a description of new features and fixes as list items under a section at the top of the change log called "Unreleased:" -.. code-block:: rst +.. code-block:: md - Unreleased - ---------- + ## Unreleased - Description of the feature or fix. -If the next version is known (because Kafkit's master branch is being prepared for a new major or minor version), the section may contain that version information: +If the next version is known (because Kafkit's main branch is being prepared for a new major or minor version), the section may contain that version information: -.. code-block:: rst +.. code-block:: md - X.Y.0 (unreleased) - ------------------ + ## X.Y.0 (unreleased) - Description of the feature or fix. @@ -124,8 +119,7 @@ If the exact version and release date is known (:doc:`because a release is being .. code-block:: rst - X.Y.0 (YYYY-MM-DD) - ------------------ + ## X.Y.0 (YYYY-MM-DD) - Description of the feature or fix. diff --git a/docs/dev/release.rst b/docs/dev/release.rst index 326acfb..abf4b73 100644 --- a/docs/dev/release.rst +++ b/docs/dev/release.rst @@ -10,15 +10,15 @@ When a semantic version tag is pushed to GitHub, `Kafkit is released to PyPI`_ w Similarly, documentation is built and pushed for each version (see https://kafkit.lsst.io/v). .. _`Kafkit is released to PyPI`: https://pypi.org/project/kafkit/ -.. _`ci.yaml`: https://github.com/lsst-sqre/kafkit/blob/master/.github/workflows/ci.yaml +.. _`ci.yaml`: https://github.com/lsst-sqre/kafkit/blob/main/.github/workflows/ci.yaml .. _regular-release: Regular releases ================ -Regular releases happen from the ``master`` branch after changes have been merged. -From the ``master`` branch you can release a new major version (``X.0.0``), a new minor version of the current major version (``X.Y.0``), or a new patch of the current major-minor version (``X.Y.Z``). +Regular releases happen from the ``main`` branch after changes have been merged. +From the ``main`` branch you can release a new major version (``X.0.0``), a new minor version of the current major version (``X.Y.0``), or a new patch of the current major-minor version (``X.Y.Z``). See :ref:`backport-release` to patch an earlier major-minor version. Release tags are semantic version identifiers following the :pep:`440` specification. @@ -27,7 +27,7 @@ Release tags are semantic version identifiers following the :pep:`440` specifica ------------------------------- Each PR should include updates to the change log. -If the change log or documentation needs additional updates, now is the time to make those changes through the regular branch-and-PR development method against the ``master`` branch. +If the change log or documentation needs additional updates, now is the time to make those changes through the regular branch-and-PR development method against the ``main`` branch. In particular, replace the "Unreleased" section headline with the semantic version and date. See :ref:`dev-change-log` in the *Developer guide* for details. @@ -35,7 +35,7 @@ See :ref:`dev-change-log` in the *Developer guide* for details. 2. Tag the release ------------------ -At the HEAD of the ``master`` branch, create and push a tag with the semantic version: +At the HEAD of the ``main`` branch, create and push a tag with the semantic version: .. code-block:: sh @@ -54,7 +54,7 @@ The `ci.yaml`_ GitHub Actions workflow uploads the new release to PyPI and docum Backport releases ================= -The regular release procedure works from the main line of development on the ``master`` Git branch. +The regular release procedure works from the main line of development on the ``main`` Git branch. To create a release that patches an earlier major or minor version, you need to release from a **release branch.** Creating a release branch @@ -72,12 +72,12 @@ If the release branch doesn't already exist, check out the latest patch for that Developing on a release branch ------------------------------ -Once a release branch exists, it becomes the "master" branch for patches of that major-minor version. +Once a release branch exists, it becomes the "main" branch for patches of that major-minor version. Pull requests should be based on, and merged into, the release branch. -If the development on the release branch is a backport of commits on the ``master`` branch, use ``git cherry-pick`` to copy those commits into a new pull request against the release branch. +If the development on the release branch is a backport of commits on the ``main`` branch, use ``git cherry-pick`` to copy those commits into a new pull request against the release branch. Releasing from a release branch ------------------------------- -Releases from a release branch are equivalent to :ref:`regular releases `, except that the release branch takes the role of the ``master`` branch. +Releases from a release branch are equivalent to :ref:`regular releases `, except that the release branch takes the role of the ``main`` branch. diff --git a/docs/documenteer.toml b/docs/documenteer.toml index 2ff4792..afdf9b1 100644 --- a/docs/documenteer.toml +++ b/docs/documenteer.toml @@ -16,6 +16,14 @@ nitpick_ignore = [ "py:class", "httpx.AsyncClient", ], + [ + "py:class", + "AvroBaseModel", + ], + [ + "py:class", + "dataclasses_avroschema.avrodantic.AvroBaseModel", + ], ] [sphinx.intersphinx.projects] diff --git a/pyproject.toml b/pyproject.toml index c382f6e..366ccbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,18 +17,20 @@ classifiers = [ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "Intended Audience :: Developers", "Natural Language :: English", "Operating System :: POSIX", "Typing :: Typed", ] requires-python = ">=3.8" -dependencies = ["uritemplate", "fastavro"] +dependencies = ["fastavro", "uritemplate"] dynamic = ["version"] [project.optional-dependencies] aiohttp = ["aiohttp"] httpx = ["httpx"] +pydantic = ["dataclasses-avroschema[pydantic]"] dev = [ # Testing "coverage[toml]", diff --git a/setup.py b/setup.py deleted file mode 100644 index 7e4ff29..0000000 --- a/setup.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Legacy setuptools build front-end (pre PEP 517).""" - -from setuptools import setup - -setup(use_scm_version=True) diff --git a/src/kafkit/registry/__init__.py b/src/kafkit/registry/__init__.py index a624c26..ada6072 100644 --- a/src/kafkit/registry/__init__.py +++ b/src/kafkit/registry/__init__.py @@ -8,11 +8,18 @@ RegistryError, RegistryHttpError, RegistryRedirectionError, + UnmanagedSchemaError, +) +from kafkit.registry.serializer import ( + Deserializer, + MessageInfo, + PolySerializer, + Serializer, ) -from kafkit.registry.serializer import Deserializer, PolySerializer, Serializer __all__ = [ "Deserializer", + "MessageInfo", "Serializer", "PolySerializer", "RegistryBadRequestError", @@ -20,4 +27,5 @@ "RegistryError", "RegistryHttpError", "RegistryRedirectionError", + "UnmanagedSchemaError", ] diff --git a/src/kafkit/registry/errors.py b/src/kafkit/registry/errors.py index 66de762..a538477 100644 --- a/src/kafkit/registry/errors.py +++ b/src/kafkit/registry/errors.py @@ -6,6 +6,7 @@ "RegistryRedirectionError", "RegistryBadRequestError", "RegistryBrokenError", + "UnmanagedSchemaError", ] from typing import Any, Optional @@ -61,3 +62,9 @@ class RegistryBadRequestError(RegistryHttpError): class RegistryBrokenError(RegistryHttpError): """An excpetion if the server is down (5XX errors).""" + + +class UnmanagedSchemaError(Exception): + """An exception for when a schema is not managed by the Registry, and + therefore cannot be deserialized into a native Python object. + """ diff --git a/src/kafkit/registry/manager/__init__.py b/src/kafkit/registry/manager/__init__.py new file mode 100644 index 0000000..3bfb876 --- /dev/null +++ b/src/kafkit/registry/manager/__init__.py @@ -0,0 +1,8 @@ +"""Schema managers register schemas with the registry and enable conventient +serialization and deserialization of messages. +""" + +from ._pydantic import PydanticSchemaManager +from ._recordname import RecordNameSchemaManager + +__all__ = ["RecordNameSchemaManager", "PydanticSchemaManager"] diff --git a/src/kafkit/registry/manager/_pydantic.py b/src/kafkit/registry/manager/_pydantic.py new file mode 100644 index 0000000..0afd188 --- /dev/null +++ b/src/kafkit/registry/manager/_pydantic.py @@ -0,0 +1,195 @@ +"""Schema management that uses Pydantic models as the Python representation +of schemas. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Iterable, Optional, Type + +from dataclasses_avroschema.avrodantic import AvroBaseModel + +from kafkit.registry.errors import UnmanagedSchemaError + +from ..serializer import Deserializer, PolySerializer +from ..utils import get_avro_fqn + +if TYPE_CHECKING: + from kafkit.registry.sansio import RegistryApi + +__all__ = ["PydanticSchemaManager"] + + +@dataclass +class CachedSchema: + """A cached schema and model.""" + + schema: dict[str, Any] + """The Avro schema derived from the model.""" + + model: Type[AvroBaseModel] + """The Pydantic model.""" + + +class PydanticSchemaManager: + """A manager for schemas that are represented as Pydantic models in Python, + and translated into Avro for the Confluent Schema Registry. + + Parameters + ---------- + registry + The Registry API client instance. For an application build with + ``aiohttp``, use the `kafkit.registry.aiohttp.RegistryApi` type. + suffix + A suffix that is added to the schema name (and thus subject name), for + example ``_dev1``. + + The suffix creates alternate subjects in the Schema Registry so + schemas registered during testing and staging don't affect the + compatibility continuity of a production subject. + + For production, it's best to not set a suffix. + """ + + def __init__(self, *, registry: RegistryApi, suffix: str = "") -> None: + self._registry = registry + self._suffix = suffix + + self._logger = logging.getLogger(__name__) + + self._serializer = PolySerializer(registry=self._registry) + self._deserializer = Deserializer(registry=self._registry) + + # A mapping of fully-qualified schema names to models. + self._models: dict[str, CachedSchema] = {} + + async def register_models( + self, + models: Iterable[Type[AvroBaseModel]], + compatibility: Optional[str] = None, + ) -> None: + """Register the models with the registry. + + Parameters + ---------- + models + The models to register. + """ + for model in models: + await self.register_model(model, compatibility=compatibility) + + async def register_model( + self, model: Type[AvroBaseModel], compatibility: Optional[str] = None + ) -> None: + """Register the model with the registry. + + Parameters + ---------- + model + The model to register. + """ + cached_schema = self._cache_model(model) + schema_fqn = get_avro_fqn(cached_schema.schema) + + await self._registry.register_schema( + schema=cached_schema.schema, + subject=schema_fqn, + compatibility=compatibility, + ) + + async def serialize(self, data: AvroBaseModel) -> bytes: + """Serialize the data. + + Parameters + ---------- + data + The data to serialize. + + Returns + ------- + bytes + The serialized data in the Confluent Wire Format. + """ + schema_fqn = self._get_model_fqn(data) + + if schema_fqn in self._models: + avro_schema = self._models[schema_fqn].schema + else: + cached_model = self._cache_model(data) + avro_schema = cached_model.schema + + return await self._serializer.serialize( + data.to_dict(), schema=avro_schema, subject=schema_fqn + ) + + async def deserialize(self, data: bytes) -> AvroBaseModel: + """Deserialize the data. + + Parameters + ---------- + data + The data in the Confluent Wire Format to deserialize into a + Pydantic object. + + Returns + ------- + AvroBaseModel + The deserialized data. + + Raises + ------ + UnmanagedSchemaError + Raised if the Pydantic model corresponding to the message's + schema is not registered through the manager. + """ + message_info = await self._deserializer.deserialize(data) + schema_fqn = get_avro_fqn(message_info.schema) + if self._suffix: + schema_fqn += self._suffix + if schema_fqn not in self._models: + raise UnmanagedSchemaError( + f"Schema named {schema_fqn} is not registered with the manager" + ) + + cached_model = self._models[schema_fqn] + return cached_model.model.parse_obj(message_info.message) + + def _cache_model( + self, model: AvroBaseModel | Type[AvroBaseModel] + ) -> CachedSchema: + schema_fqn = self._get_model_fqn(model) + avro_schema = model.avro_schema_to_python() + + if isinstance(model, AvroBaseModel): + model_type = model.__class__ + else: + model_type = model + + self._models[schema_fqn] = CachedSchema(avro_schema, model_type) + + return self._models[schema_fqn] + + def _get_model_fqn( + self, model: AvroBaseModel | Type[AvroBaseModel] + ) -> str: + # Mypy can't detect the Meta class on the model, so we have to ignore + # those lines. + + try: + name = model.Meta.schema_name # type: ignore + except AttributeError: + name = model.__class__.__name__ + + try: + namespace = model.Meta.namespace # type: ignore + except AttributeError: + namespace = None + + if namespace: + name = f"{namespace}.{name}" + + if self._suffix: + name += self._suffix + + return name diff --git a/src/kafkit/registry/manager.py b/src/kafkit/registry/manager/_recordname.py similarity index 56% rename from src/kafkit/registry/manager.py rename to src/kafkit/registry/manager/_recordname.py index e358a9f..3013bb1 100644 --- a/src/kafkit/registry/manager.py +++ b/src/kafkit/registry/manager/_recordname.py @@ -1,16 +1,18 @@ -"""Combined local and registry-based schema management.""" +"""Manage schemas as avro files based on the record name subject name +strategy. +""" from __future__ import annotations import json import logging from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, Mapping, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional -from kafkit.registry.errors import RegistryBadRequestError -from kafkit.registry.sansio import CompatibilityType from kafkit.registry.serializer import PolySerializer +from ..utils import get_avro_fqn + if TYPE_CHECKING: from kafkit.registry.sansio import RegistryApi @@ -74,11 +76,12 @@ class RecordNameSchemaManager: def __init__( self, *, root: Path, registry: RegistryApi, suffix: str = "" ) -> None: - self._logger = logging.getLogger(__name__) - self._root = root self._registry = registry + self._root = root self._suffix = suffix + self._logger = logging.getLogger(__name__) + self._serializer = PolySerializer(registry=self._registry) self.schemas: Dict[str, Any] = {} @@ -95,40 +98,9 @@ def _load_schemas(self) -> None: if self._suffix: schema["name"] = f'{schema["name"]}{self._suffix}' - fqn = self._get_fqn(schema) + fqn = get_avro_fqn(schema) self.schemas[fqn] = schema - @staticmethod - def _get_fqn(schema: Mapping[str, Any]) -> str: - """Get the fully-qualified name of an Avro schema. - - Parameters - ---------- - schema : `dict` - The Avro schema. - - Returns - ------- - str - The fully-qualified name. - - Notes - ----- - The decision sequence is: - - - If the ``name`` field includes a period (``.``), the ``name`` field - is treated as a fully-qualified name. - - Otherwise, if the schema includes a ``namespace`` field, the - fully-qualified name is ``{{namespace}}.{{name}}``. - - Otherwise, the ``name`` is treated as the fully-qualified name. - """ - if "." not in schema["name"] and "namespace" in schema: - fqn = ".".join((schema["namespace"], schema["name"])) - else: - fqn = schema["name"] - assert isinstance(fqn, str) - return fqn - async def register_schemas( self, *, compatibility: Optional[str] = None ) -> None: @@ -160,105 +132,11 @@ async def register_schemas( documentation `__. """ - if isinstance(compatibility, str): - try: - CompatibilityType[compatibility] - except KeyError: - raise ValueError( - f"Compatibility setting {compatibility!r} is not in the " - f"allowed set: {[v.value for v in CompatibilityType]}" - ) for subject_name, schema in self.schemas.items(): - await self._register_schema( - subject_name=subject_name, + await self._registry.register_schema( schema=schema, - desired_compatibility=compatibility, - ) - - async def _register_schema( - self, - *, - subject_name: str, - schema: Dict[str, Any], - desired_compatibility: Optional[str], - ) -> int: - """Register a schema with the Schema Registry - - Parameters - ---------- - subject_name : `str` - The name of a subject in the Confluent Schema Registry, which - may already exist or not. - desired_compatibility : `str` - A subject compatibility setting. See docs for `register_schemas` - for possible values. - - Returns - ------- - int - Unique ID of the schema in the Schema in the Schema Registry. - - Notes - ----- - This method can be safely run multiple times with the same schema; in - each instance the same schema ID will be returned. - """ - schema_id = await self._registry.register_schema( - schema, subject=subject_name - ) - - if desired_compatibility is not None: - await self._set_subject_compatibility( - subject_name=subject_name, compatibility=desired_compatibility - ) - return schema_id - - async def _set_subject_compatibility( - self, *, subject_name: str, compatibility: str - ) -> None: - """Set the compatibility for a Schema Registry subject. - - Parameters - ---------- - subject_name : `str` - The name of a subject that exists in the Confluent Schema Registry. - compatibility : `str` - A subject compatibility setting. See docs for `register_schemas` - for possible values. - """ - try: - subject_config = await self._registry.get( - "/config{/subject}", url_vars={"subject": subject_name} - ) - except RegistryBadRequestError: - self._logger.info( - "No existing configuration for this subject: %s", subject_name - ) - # Create a mock config that forces a reset - subject_config = {"compatibilityLevel": None} - - self._logger.debug( - "Current config subject=%s config=%s", subject_name, subject_config - ) - - if subject_config["compatibilityLevel"] != compatibility: - await self._registry.put( - "/config{/subject}", - url_vars={"subject": subject_name}, - data={"compatibility": compatibility}, - ) - self._logger.info( - "Reset subject compatibility level. " - "subject=%s compatibility=%s", - subject_name, - compatibility, - ) - else: - self._logger.debug( - "Existing subject compatibility level is good. " - "subject=%s compatibility=%s", - subject_name, - compatibility, + subject=subject_name, + compatibility=compatibility, ) async def serialize(self, *, data: Any, name: str) -> bytes: diff --git a/src/kafkit/registry/sansio.py b/src/kafkit/registry/sansio.py index 1957a38..979465c 100644 --- a/src/kafkit/registry/sansio.py +++ b/src/kafkit/registry/sansio.py @@ -17,8 +17,8 @@ import fastavro -from kafkit.httputils import format_url, parse_content_type -from kafkit.registry.errors import ( +from ..httputils import format_url, parse_content_type +from .errors import ( RegistryBadRequestError, RegistryBrokenError, RegistryHttpError, @@ -128,6 +128,7 @@ def __init__(self, *, url: str) -> None: self.url = url self._schema_cache = SchemaCache() self._subject_cache = SubjectCache(self._schema_cache) + self._logger = logging.getLogger(__name__) @property def schema_cache(self) -> SchemaCache: @@ -402,7 +403,10 @@ def _prep_schema(schema: Mapping[str, Any]) -> str: return json.dumps(schema, sort_keys=True) async def register_schema( - self, schema: Mapping[str, Any], subject: Optional[str] = None + self, + schema: Mapping[str, Any], + subject: Optional[str] = None, + compatibility: Optional[str] = None, ) -> int: """Register a schema or get the ID of an existing schema. @@ -416,6 +420,11 @@ async def register_schema( subject : `str`, optional The subject to register the schema under. If not provided, the fully-qualified name of the schema is adopted as the subject name. + compatibility : `str`, optional + The compatibility level to use for the subject. If not provided, + the existing compatibility level is used (or the server's default + compatibility level if subject does not have a specific + compatibility level). Returns ------- @@ -457,8 +466,58 @@ async def register_schema( # add to cache self.schema_cache.insert(schema, result["id"]) + if compatibility is not None: + await self.set_subject_compatibility(subject, compatibility) + return result["id"] + async def set_subject_compatibility( + self, subject: str, compatibility: str + ) -> None: + # Validate compatibility setting + try: + CompatibilityType[compatibility] + except KeyError: + raise ValueError( + f"Compatibility setting {compatibility!r} is not in the " + f"allowed set: {[v.value for v in CompatibilityType]}" + ) + + try: + subject_config = await self.get( + "/config{/subject}", url_vars={"subject": subject} + ) + except RegistryBadRequestError: + self._logger.info( + "No existing configuration for this subject: %s", subject + ) + # Create a mock config that forces a reset + subject_config = {"compatibilityLevel": None} + + self._logger.debug( + "Current config subject=%s config=%s", subject, subject_config + ) + + if subject_config["compatibilityLevel"] != compatibility: + await self.put( + "/config{/subject}", + url_vars={"subject": subject}, + data={"compatibility": compatibility}, + ) + self._logger.info( + "Reset subject compatibility level. " + "subject=%s compatibility=%s", + subject, + compatibility, + ) + else: + self._logger.debug( + "Existing subject compatibility level is good. " + "subject=%s compatibility=%s", + subject, + compatibility, + ) + async def get_schema_by_id(self, schema_id: int) -> Dict[str, Any]: """Get a schema from the registry given its ID. diff --git a/src/kafkit/registry/serializer.py b/src/kafkit/registry/serializer.py index b63cb62..0547e03 100644 --- a/src/kafkit/registry/serializer.py +++ b/src/kafkit/registry/serializer.py @@ -5,6 +5,7 @@ from __future__ import annotations import struct +from dataclasses import dataclass from io import BytesIO from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple @@ -17,6 +18,7 @@ "Serializer", "PolySerializer", "Deserializer", + "MessageInfo", ] @@ -204,6 +206,59 @@ def _make_message( return message_fh.read() +@dataclass +class MessageInfo: + """A message, along with schema information. + + Parameters + ---------- + id + The ID of the schema (an `int`) in the Schema Registry. This uniquely + identifies the message's schema. + schema + The schema, as a Python object. + message + The message itself, as a decoded Python object. + + Attributes + ---------- + id + The ID of the schema (an `int`) in the Schema Registry. This uniquely + identifies the message's schema. + schema + The schema, as a Python object. + message + The message itself, as a decoded Python object. + """ + + id: int + """The ID of the schema (an `int`) in the Schema Registry. This uniquely + identifies the message's schema. + """ + + schema: dict[str, Any] + """The schema, as a Python object.""" + + message: Any + """The message itself, as a decoded Python object.""" + + def __getitem__(self, key: str) -> Any: + """Get info by key (for backwards compatibility). + + This method is for backwards-compatibility when + `Deserializer.deserialize` returned a dict. Attribute-based access is + recommended, e.g. ``message_info.id``, to enable type checking. + """ + if key == "id": + return self.id + elif key == "message": + return self.message + elif key == "schema": + return self.schema + + raise KeyError(f"Unknown key: {key}") + + class Deserializer: """An Avro message deserializer that understands the Confluent Wire Format and obtains schemas on-demand from a Confluent Schema Registry. @@ -246,9 +301,7 @@ class Deserializer: def __init__(self, *, registry: RegistryApi) -> None: self._registry = registry - async def deserialize( - self, data: bytes, include_schema: bool = False - ) -> Dict[str, Any]: + async def deserialize(self, data: bytes) -> MessageInfo: """Deserialize a message. Parameters @@ -256,26 +309,11 @@ async def deserialize( data : `bytes` The encoded message, usually obtained directly from a Kafka consumer. The message must be in the Confluent Wire Format. - include_schema : `bool`, optional - If `True`, the schema itself is included in the returned value. - This is useful if your application operates on many different - types of messages, and needs a convenient way to introspect - a message's type. Returns ------- - message_info : `dict` - The deserialized message is wrapped in a dictionary to include - metadata. The keys are: - - ``'id'`` - The ID of the schema (an `int`) in the Schema Registry. This - uniquely identifies the message's schema. - ``'message'`` - The message itself, as a decoded Python object. - ``'schema'`` - The schema, as a Python object. This key is only included - when ``include_schema`` is `True`. + MessageInfo + The deserialized message and schema information. """ schema_id, message_data = unpack_wire_format_data(data) schema = await self._registry.get_schema_by_id(schema_id) @@ -283,10 +321,7 @@ async def deserialize( message_fh = BytesIO(message_data) message_fh.seek(0) message = fastavro.schemaless_reader(message_fh, schema) - result = {"id": schema_id, "message": message} - if include_schema: - result["schema"] = schema - return result + return MessageInfo(schema_id, schema, message) def pack_wire_format_prefix(schema_id: int) -> bytes: diff --git a/src/kafkit/registry/utils.py b/src/kafkit/registry/utils.py new file mode 100644 index 0000000..b1f20e2 --- /dev/null +++ b/src/kafkit/registry/utils.py @@ -0,0 +1,38 @@ +"""Utilities related to Avro schemas and the Confluent Schema Registry.""" + +from __future__ import annotations + +from typing import Any, Mapping + +__all__ = ["get_avro_fqn"] + + +def get_avro_fqn(schema: Mapping[str, Any]) -> str: + """Get the fully-qualified name of an Avro schema. + + Parameters + ---------- + schema + The Avro schema. + + Returns + ------- + str + The fully-qualified name. + + Notes + ----- + The decision sequence is: + + - If the ``name`` field includes a period (``.``), the ``name`` field + is treated as a fully-qualified name. + - Otherwise, if the schema includes a ``namespace`` field, the + fully-qualified name is ``{{namespace}}.{{name}}``. + - Otherwise, the ``name`` is treated as the fully-qualified name. + """ + if "." not in schema["name"] and "namespace" in schema: + fqn = ".".join((schema["namespace"], schema["name"])) + else: + fqn = schema["name"] + assert isinstance(fqn, str) + return fqn diff --git a/tests/pydantic_schema_manager_test.py b/tests/pydantic_schema_manager_test.py new file mode 100644 index 0000000..2f919f9 --- /dev/null +++ b/tests/pydantic_schema_manager_test.py @@ -0,0 +1,203 @@ +"""Tests for the PydanticSchemaManager class.""" + +from __future__ import annotations + +import os +from datetime import datetime, timezone +from enum import Enum +from typing import Optional + +import pytest +from dataclasses_avroschema.avrodantic import AvroBaseModel +from httpx import AsyncClient +from pydantic import Field + +from kafkit.registry import UnmanagedSchemaError +from kafkit.registry.httpx import RegistryApi +from kafkit.registry.manager import PydanticSchemaManager + + +def current_datetime() -> datetime: + """Return the current datetime.""" + return datetime.now(tz=timezone.utc) + + +class SlackMessageType(str, Enum): + """The type of Slack message.""" + + app_mention = "app_mention" + message = "message" + + +class SlackChannelType(str, Enum): + """The type of Slack channel.""" + + channel = "channel" # public channel + group = "group" # private channel + im = "im" # direct message + mpim = "mpim" # multi-persion direct message + + +class SquarebotMessage(AvroBaseModel): + """Model for a Slack message produced by Squarebot.""" + + type: SlackMessageType = Field(description="The Slack message type.") + + channel: str = Field( + description=( + "ID of the channel where the message was sent " + "(e.g., C0LAN2Q65)." + ) + ) + + channel_type: SlackChannelType = Field( + description="The type of channel (public, direct im, etc..)" + ) + + user: Optional[str] = Field( + description="The ID of the user that sent the message (eg U061F7AUR)." + ) + + text: str = Field(description="Content of the message.") + + ts: str = Field(description="Timestamp of the message.") + + event_ts: str = Field(description="When the event was dispatched.") + + class Meta: + """Metadata for the model.""" + + namespace = "squarebot" + schema_name = "message" + + +class SquarebotHeartbeat(AvroBaseModel): + """Model for a Squarebot heartbeat message.""" + + timestamp: datetime = Field( + description="The timestamp of the heartbeat.", + default_factory=current_datetime, + ) + + class Meta: + """Metadata for the model.""" + + namespace = "squarebot" + schema_name = "heartbeat" + + +AVRO_SCHEMA = { + "type": "record", + "name": "schema1", + "namespace": "test-schemas", + "fields": [ + {"name": "a", "type": "int"}, + {"name": "b", "type": "string"}, + ], +} + + +@pytest.mark.docker +@pytest.mark.skipif( + os.getenv("SCHEMA_REGISTRY_URL") is None, + reason="SCHEMA_REGISTRY_URL env var must be configured", +) +@pytest.mark.asyncio +async def test_pydantic_schema_manager() -> None: + """Test that the Pydantic Schema Manager can register a schema based on + a Pydantic model, and serialize/deserialize Pydantic models. + """ + registry_url = os.getenv("SCHEMA_REGISTRY_URL") + assert registry_url + + async with AsyncClient() as http_client: + registry = RegistryApi(http_client=http_client, url=registry_url) + manager = PydanticSchemaManager(registry=registry) + + # Register the schema + await manager.register_model(SquarebotMessage, compatibility="FORWARD") + + # Check the cache + assert manager._models.get("squarebot.message") is not None + assert manager._models["squarebot.message"].model == SquarebotMessage + + input_message = SquarebotMessage.fake() + + # Serialize the message + serialized_data = await manager.serialize(input_message) + + # Deserialize the message + output_message = await manager.deserialize(serialized_data) + + # Check that the deserialized message is the same as the input + assert isinstance(output_message, SquarebotMessage) + assert output_message == input_message + + # Automatically register the heartbeat schema and send a heartbeat + _ = await manager.serialize(SquarebotHeartbeat()) + + # Register a non-Pydantic schema to demonstrate handling + # unmanaged schemas + schema_id = await manager._registry.register_schema( + schema=AVRO_SCHEMA, + subject="test-schemas.schema1", + ) + unmanaged_message = await manager._serializer.serialize( + {"a": 1, "b": "test"}, + schema_id=schema_id, + ) + with pytest.raises(UnmanagedSchemaError): + await manager.deserialize(unmanaged_message) + + +@pytest.mark.docker +@pytest.mark.skipif( + os.getenv("SCHEMA_REGISTRY_URL") is None, + reason="SCHEMA_REGISTRY_URL env var must be configured", +) +@pytest.mark.asyncio +async def test_pydantic_schema_manager_suffixed() -> None: + """Test that the Pydantic Schema Manager can set a subject name suffix.""" + registry_url = os.getenv("SCHEMA_REGISTRY_URL") + assert registry_url + + async with AsyncClient() as http_client: + registry = RegistryApi(http_client=http_client, url=registry_url) + manager = PydanticSchemaManager(registry=registry, suffix="_v1") + + # Register the schema + await manager.register_model(SquarebotMessage, compatibility="FORWARD") + + # Check the cache + assert manager._models.get("squarebot.message_v1") is not None + assert ( + manager._models["squarebot.message_v1"].model == SquarebotMessage + ) + + input_message = SquarebotMessage.fake() + + # Serialize the message + serialized_data = await manager.serialize(input_message) + + # Deserialize the message + output_message = await manager.deserialize(serialized_data) + + # Check that the deserialized message is the same as the input + assert isinstance(output_message, SquarebotMessage) + assert output_message == input_message + + # Automatically register the heartbeat schema and send a heartbeat + _ = await manager.serialize(SquarebotHeartbeat()) + + # Register a non-Pydantic schema to demonstrate handling + # unmanaged schemas + schema_id = await manager._registry.register_schema( + schema=AVRO_SCHEMA, + subject="test-schemas.schema1", + ) + unmanaged_message = await manager._serializer.serialize( + {"a": 1, "b": "test"}, + schema_id=schema_id, + ) + with pytest.raises(UnmanagedSchemaError): + await manager.deserialize(unmanaged_message) diff --git a/tests/registry_serializer_test.py b/tests/registry_serializer_test.py index 0ddc733..c0fd866 100644 --- a/tests/registry_serializer_test.py +++ b/tests/registry_serializer_test.py @@ -113,14 +113,15 @@ async def test_deserializer() -> None: deserializer = Deserializer(registry=client) response_1 = await deserializer.deserialize(data_1) + # Test attribute and key-based access to message info assert response_1["id"] == schema1_id assert response_1["message"] == message_1 - assert "schema" not in response_1 + assert response_1.id == schema1_id + assert response_1.message == message_1 - response_2 = await deserializer.deserialize(data_2, include_schema=True) + response_2 = await deserializer.deserialize(data_2) assert response_2["id"] == schema2_id assert response_2["message"] == message_2 - assert "schema" in response_2 assert response_2["schema"]["name"] == "test-schemas.schema2" diff --git a/tox.ini b/tox.ini index 9d55105..09ab47e 100644 --- a/tox.ini +++ b/tox.ini @@ -10,6 +10,7 @@ extras = dev aiohttp httpx + pydantic allowlist_externals = docker-compose setenv = @@ -34,7 +35,7 @@ commands = [testenv:typing] description = Run mypy. commands = - mypy src/kafkit tests setup.py + mypy src/kafkit tests [testenv:lint] description = Lint codebase by running pre-commit (Black, isort, Flake8).