Skip to content

PydanticSerializationError when streaming OpenAI agent events via WorkflowStreamClient #1585

@wangzhihao0629

Description

@wangzhihao0629

Summary

When using temporalio.contrib.openai_agents with Runner.run_streamed(), the SDK's OpenAIPayloadConverter fails to serialize certain OpenAI stream event types through WorkflowStreamClient, raising PydanticSerializationError.

Root Cause

PydanticJSONPlainPayloadConverter uses SchemaSerializer(any_schema()).to_json(value, exclude_unset=True) which cannot handle OpenAI's complex discriminated-union stream event types (50+ variants in ResponseStreamEvent). Both SchemaSerializer and pydantic_core.to_json() fail on these types.

The concrete model's own model_dump_json() works fine — it uses the model's type-aware serializer rather than the generic any_schema() approach.

Stack Trace

temporalio.workflow:task_failed:Failure handling streaming events
pydantic_core._pydantic_core.PydanticSerializationError: 
  Unable to serialize unknown type: <class 'openai.types.responses.response_output_text_annotation_added_event.ResponseOutputTextAnnotationAddedEvent'>

  During handling: TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'

Call stack:
  temporalio/converter.py         → PydanticJSONPlainPayloadConverter.to_payload
  temporalio/contrib/openai_agents/_workflow_stream.py → WorkflowStreamClient._send

Workaround

We currently subclass OpenAIPayloadConverter and wrap the json/plain converter with a fallback that catches PydanticSerializationError and uses model_dump_json() instead:

class _FallbackJsonConverter(EncodingPayloadConverter):
    def __init__(self, delegate: EncodingPayloadConverter) -> None:
        self._delegate = delegate

    @property
    def encoding(self) -> str:
        return self._delegate.encoding

    def to_payload(self, value: Any) -> Payload | None:
        try:
            return self._delegate.to_payload(value)
        except PydanticSerializationError:
            if hasattr(value, "model_dump_json"):
                data = value.model_dump_json().encode("utf-8")
            elif hasattr(value, "model_dump"):
                data = json.dumps(value.model_dump()).encode("utf-8")
            else:
                data = json.dumps(value, default=str).encode("utf-8")
            return Payload(
                metadata={"encoding": self.encoding.encode()}, data=data
            )

    def from_payload(self, payload: Payload, type_hint: type | None = None) -> Any:
        return self._delegate.from_payload(payload, type_hint)


class RobustOpenAIPayloadConverter(OpenAIPayloadConverter):
    def __init__(self) -> None:
        super().__init__()
        key = b"json/plain"
        if key in self.converters:
            self.converters[key] = _FallbackJsonConverter(self.converters[key])

Environment

  • temporalio 1.28.0
  • openai-agents 0.17.4
  • pydantic 2.x
  • Python 3.12

Suggestion

OpenAIPayloadConverter (or the underlying PydanticJSONPlainPayloadConverter) could fall back to model_dump_json() when SchemaSerializer(any_schema()).to_json() raises PydanticSerializationError, since the concrete Pydantic model's own serializer handles these complex union types correctly.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions