Skip to content

Commit

Permalink
Add JSONTypeConverter (#269)
Browse files Browse the repository at this point in the history
Fixes #264
  • Loading branch information
cretz committed Feb 8, 2023
1 parent 3c8ee0a commit b52e04e
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 5 deletions.
104 changes: 101 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ event loop. This means task management, sleep, cancellation, etc have all been d
- [Usage](#usage)
- [Client](#client)
- [Data Conversion](#data-conversion)
- [Custom Type Data Conversion](#custom-type-data-conversion)
- [Workers](#workers)
- [Workflows](#workflows)
- [Definition](#definition)
Expand Down Expand Up @@ -268,21 +269,118 @@ The default data converter supports converting multiple types including:
* Iterables including ones JSON dump may not support by default, e.g. `set`
* Any class with a `dict()` method and a static `parse_obj()` method, e.g.
[Pydantic models](https://pydantic-docs.helpmanual.io/usage/models)
* Note, this doesn't mean every Pydantic field can be converted, only fields which the data converter supports
* The default data converter is deprecated for Pydantic models and will warn if used since not all fields work.
See [this sample](https://github.com/temporalio/samples-python/tree/main/pydantic_converter) for the recommended
approach.
* [IntEnum, StrEnum](https://docs.python.org/3/library/enum.html) based enumerates
* [UUID](https://docs.python.org/3/library/uuid.html)

This notably doesn't include any `date`, `time`, or `datetime` objects as they may not work across SDKs.

Users are strongly encouraged to use a single `dataclass` for parameter and return types so fields with defaults can be
easily added without breaking compatibility.

Classes with generics may not have the generics properly resolved. The current implementation, similar to Pydantic, does
not have generic type resolution. Users should use concrete types.

##### Custom Type Data Conversion

For converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care has
been taken to support all common typings including `Optional`, `Union`, all forms of iterables and mappings, `NewType`,
etc in addition to the regular JSON values mentioned before.

Users are strongly encouraged to use a single `dataclass` for parameter and return types so fields with defaults can be
easily added without breaking compatibility.
Data converters contain a reference to a payload converter class that is used to convert to/from payloads/values. This
is a class and not an instance because it is instantiated on every workflow run inside the sandbox. The payload
converter is usually a `CompositePayloadConverter` which contains a multiple `EncodingPayloadConverter`s it uses to try
to serialize/deserialize payloads. Upon serialization, each `EncodingPayloadConverter` is tried until one succeeds. The
`EncodingPayloadConverter` provides an "encoding" string serialized onto the payload so that, upon deserialization, the
specific `EncodingPayloadConverter` for the given "encoding" is used.

The default data converter uses the `DefaultPayloadConverter` which is simply a `CompositePayloadConverter` with a known
set of default `EncodingPayloadConverter`s. To implement a custom encoding for a custom type, a new
`EncodingPayloadConverter` can be created for the new type. For example, to support `IPv4Address` types:

```python
class IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/ipv4-address"

def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, ipaddress.IPv4Address):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=str(value).encode(),
)
else:
return None

def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
assert not type_hint or type_hint is ipaddress.IPv4Address
return ipaddress.IPv4Address(payload.data.decode())

class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
IPv4AddressEncodingPayloadConverter(),
*DefaultPayloadConverter.default_encoding_payload_converters,
)

my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
```

Imports are left off for brevity.

This is good for many custom types. However, sometimes you want to override the behavior of the just the existing JSON
encoding payload converter to support a new type. It is already the last encoding data converter in the list, so it's
the fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit of
making the type work in lists, unions, etc.

The `JSONPlainPayloadConverter` uses the Python [json](https://docs.python.org/3/library/json.html) library with an
advanced JSON encoder by default and a custom value conversion method to turn `json.load`ed values to their type hints.
The conversion can be customized for serialization with a custom `json.JSONEncoder` and deserialization with a custom
`JSONTypeConverter`. For example, to support `IPv4Address` types in existing JSON conversion:

```python
class IPv4AddressJSONEncoder(AdvancedJSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, ipaddress.IPv4Address):
return str(o)
return super().default(o)
class IPv4AddressJSONTypeConverter(JSONTypeConverter):
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
if issubclass(hint, ipaddress.IPv4Address):
return ipaddress.IPv4Address(value)
return JSONTypeConverter.Unhandled

class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Replace default JSON plain with our own that has our encoder and type
# converter
json_converter = JSONPlainPayloadConverter(
encoder=IPv4AddressJSONEncoder,
custom_type_converters=[IPv4AddressJSONTypeConverter()],
)
super().__init__(
*[
c if not isinstance(c, JSONPlainPayloadConverter) else json_converter
for c in DefaultPayloadConverter.default_encoding_payload_converters
]
)

my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
```

Now `IPv4Address` can be used in type hints including collections, optionals, etc.

### Workers

Expand Down
54 changes: 52 additions & 2 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Dict,
List,
Mapping,
NewType,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -458,18 +459,22 @@ def __init__(
encoder: Optional[Type[json.JSONEncoder]] = AdvancedJSONEncoder,
decoder: Optional[Type[json.JSONDecoder]] = None,
encoding: str = "json/plain",
custom_type_converters: Sequence[JSONTypeConverter] = [],
) -> None:
"""Initialize a JSON data converter.
Args:
encoder: Custom encoder class object to use.
decoder: Custom decoder class object to use.
encoding: Encoding name to use.
custom_type_converters: Set of custom type converters that are used
when converting from a payload to type-hinted values.
"""
super().__init__()
self._encoder = encoder
self._decoder = decoder
self._encoding = encoding
self._custom_type_converters = custom_type_converters

@property
def encoding(self) -> str:
Expand Down Expand Up @@ -500,12 +505,43 @@ def from_payload(
try:
obj = json.loads(payload.data, cls=self._decoder)
if type_hint:
obj = value_to_type(type_hint, obj)
obj = value_to_type(type_hint, obj, self._custom_type_converters)
return obj
except json.JSONDecodeError as err:
raise RuntimeError("Failed parsing") from err


_JSONTypeConverterUnhandled = NewType("_JSONTypeConverterUnhandled", object)


class JSONTypeConverter(ABC):
"""Converter for converting an object from Python :py:func:`json.loads`
result (e.g. scalar, list, or dict) to a known type.
"""

Unhandled = _JSONTypeConverterUnhandled(object())
"""Sentinel value that must be used as the result of
:py:meth:`to_typed_value` to say the given type is not handled by this
converter."""

@abstractmethod
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
"""Convert the given value to a type based on the given hint.
Args:
hint: Type hint to use to help in converting the value.
value: Value as returned by :py:func:`json.loads`. Usually a scalar,
list, or dict.
Returns:
The converted value or :py:attr:`Unhandled` if this converter does
not handle this situation.
"""
raise NotImplementedError


class PayloadCodec(ABC):
"""Codec for encoding/decoding to/from bytes.
Expand Down Expand Up @@ -1112,7 +1148,11 @@ def decode_search_attributes(
return ret


def value_to_type(hint: Type, value: Any) -> Any:
def value_to_type(
hint: Type,
value: Any,
custom_converters: Sequence[JSONTypeConverter] = [],
) -> Any:
"""Convert a given value to the given type hint.
This is used internally to convert a raw JSON loaded value to a specific
Expand All @@ -1121,13 +1161,23 @@ def value_to_type(hint: Type, value: Any) -> Any:
Args:
hint: Type hint to convert the value to.
value: Raw value (e.g. primitive, dict, or list) to convert from.
custom_converters: Set of custom converters to try before doing default
conversion. Converters are tried in order and the first value that
is not :py:attr:`JSONTypeConverter.Unhandled` will be returned from
this function instead of doing default behavior.
Returns:
Converted value.
Raises:
TypeError: Unable to convert to the given hint.
"""
# Try custom converters
for conv in custom_converters:
ret = conv.to_typed_value(hint, value)
if ret is not JSONTypeConverter.Unhandled:
return ret

# Any or primitives
if hint is Any:
return value
Expand Down
60 changes: 60 additions & 0 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import dataclasses
import ipaddress
import logging
import sys
import traceback
Expand All @@ -22,6 +23,7 @@
Set,
Text,
Tuple,
Type,
Union,
)
from uuid import UUID, uuid4
Expand All @@ -37,11 +39,16 @@
from temporalio.api.common.v1 import Payloads
from temporalio.api.failure.v1 import Failure
from temporalio.converter import (
AdvancedJSONEncoder,
BinaryProtoPayloadConverter,
CompositePayloadConverter,
DataConverter,
DefaultFailureConverterWithEncodedAttributes,
DefaultPayloadConverter,
JSONPlainPayloadConverter,
JSONTypeConverter,
PayloadCodec,
_JSONTypeConverterUnhandled,
decode_search_attributes,
encode_search_attribute_values,
)
Expand Down Expand Up @@ -516,3 +523,56 @@ async def test_failure_encoded_attributes():
not in failure.application_failure_info.details.payloads[0].metadata
)
assert failure == orig_failure


class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Replace default JSON plain with our own that has our type converter
json_converter = JSONPlainPayloadConverter(
encoder=IPv4AddressJSONEncoder,
custom_type_converters=[IPv4AddressJSONTypeConverter()],
)
super().__init__(
*[
c if not isinstance(c, JSONPlainPayloadConverter) else json_converter
for c in DefaultPayloadConverter.default_encoding_payload_converters
]
)


class IPv4AddressJSONEncoder(AdvancedJSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, ipaddress.IPv4Address):
return str(o)
return super().default(o)


class IPv4AddressJSONTypeConverter(JSONTypeConverter):
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
if issubclass(hint, ipaddress.IPv4Address):
return ipaddress.IPv4Address(value)
return JSONTypeConverter.Unhandled


async def test_json_type_converter():
addr = ipaddress.IPv4Address("1.2.3.4")
custom_conv = dataclasses.replace(
DataConverter.default, payload_converter_class=IPv4AddressPayloadConverter
)

# Fails to encode with default
with pytest.raises(TypeError):
await DataConverter.default.encode([addr])

# But encodes with custom
payload = (await custom_conv.encode([addr]))[0]
assert '"1.2.3.4"' == payload.data.decode()

# Fails to decode with default
with pytest.raises(TypeError):
await DataConverter.default.decode([payload], [ipaddress.IPv4Address])

# But decodes with custom
assert addr == (await custom_conv.decode([payload], [ipaddress.IPv4Address]))[0]

0 comments on commit b52e04e

Please sign in to comment.