Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pydantic_ai_slim/pydantic_ai/ui/_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ def load_messages(cls, messages: Sequence[MessageT]) -> list[ModelMessage]:
"""Transform protocol-specific messages into Pydantic AI messages."""
raise NotImplementedError

@classmethod
@abstractmethod
def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[MessageT]:
"""Transform Pydantic AI messages into protocol-specific messages."""
raise NotImplementedError

@abstractmethod
def build_event_stream(self) -> UIEventStream[RunInputT, EventT, AgentDepsT, OutputDataT]:
"""Build a protocol-specific event stream transformer."""
Expand Down
5 changes: 5 additions & 0 deletions pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ def state(self) -> dict[str, Any] | None:
"""Frontend state from the AG-UI run input."""
return self.run_input.state

@classmethod
def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[Message]:
"""Transform Pydantic AI messages into AG-UI messages."""
raise NotImplementedError('TODO: implement dump_messages method') # TODO: implement dump_messages method

@classmethod
def load_messages(cls, messages: Sequence[Message]) -> list[ModelMessage]:
"""Transform AG-UI messages into Pydantic AI messages."""
Expand Down
243 changes: 242 additions & 1 deletion pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from __future__ import annotations

from collections.abc import Sequence
import json
import uuid
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING
Expand All @@ -12,13 +14,16 @@

from ...messages import (
AudioUrl,
BaseToolCallPart,
BinaryContent,
BuiltinToolCallPart,
BuiltinToolReturnPart,
DocumentUrl,
FilePart,
ImageUrl,
ModelMessage,
ModelRequest,
ModelResponse,
RetryPromptPart,
SystemPromptPart,
TextPart,
Expand All @@ -35,6 +40,9 @@
from ._event_stream import VercelAIEventStream
from .request_types import (
DataUIPart,
DynamicToolInputAvailablePart,
DynamicToolOutputAvailablePart,
DynamicToolOutputErrorPart,
DynamicToolUIPart,
FileUIPart,
ReasoningUIPart,
Expand All @@ -43,10 +51,12 @@
SourceUrlUIPart,
StepStartUIPart,
TextUIPart,
ToolInputAvailablePart,
ToolOutputAvailablePart,
ToolOutputErrorPart,
ToolUIPart,
UIMessage,
UIMessagePart,
)
from .response_types import BaseChunk

Expand All @@ -57,6 +67,7 @@
__all__ = ['VercelAIAdapter']

request_data_ta: TypeAdapter[RequestData] = TypeAdapter(RequestData)
BUILTIN_TOOL_CALL_ID_PREFIX = 'pyd_ai_builtin'


@dataclass
Expand Down Expand Up @@ -141,8 +152,15 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: #
builtin_tool = part.provider_executed

tool_call_id = part.tool_call_id

args = part.input

if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
pass

Comment on lines 157 to +163
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests showed "load_messages" converted args from a dict to a string, so I added this one to parse them back, but args supports both.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our args is a str | dict[str, Any] while input is Any. Can you make this code a bit more robust in making sure we end up with a valid string or dict on our part? For example, if args is JSON, parsing it could also give us a list or something, in which case we'd rather want the string. And if it's not a string or already a dict, perhaps we should raise an error.

if builtin_tool:
call_part = BuiltinToolCallPart(tool_name=tool_name, tool_call_id=tool_call_id, args=args)
builder.add(call_part)
Expand Down Expand Up @@ -197,3 +215,226 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: #
assert_never(msg.role)

return builder.messages

@classmethod
def dump_messages( # noqa: C901
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this to the super class, and maybe you can look at integrating #3068 into this new framework as well, to make sure our decisions/assumptions hold up against 2 standards not just 1.

Copy link
Contributor Author

@dsfaccini dsfaccini Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the args front

  if isinstance(args, str):
      # if args is a list, we'll keep it as a string, otherwise try to parse as dict
      try:
          args = json.loads(args) if args.strip()[:1] != '[' else args
      except json.JSONDecodeError:
          pass
  elif not isinstance(args, dict | list | None):  # pragma: no branch
      raise UserError(f'Unsupported tool call args type: {type(args)}')

on the superclass front

  @classmethod
  @abstractmethod
  def dump_messages(cls, message: ModelMessage) -> MessageT:
      """Transform Pydantic AI messages into protocol-specific messages."""
      raise NotImplementedError

on the 3068 front, you mean integrating the changes from that PR into this one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the args front, I'd rather test the type of the parsed result

On the superclass front, yeah lets include that PR.

cls,
messages: Sequence[ModelMessage],
*,
_id_generator: Callable[[], str] | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't have this elsewhere I'd rather not have it here, and use IsStr() and IsSameStr() in the test instead

) -> list[UIMessage]:
"""Transform Pydantic AI messages into Vercel AI messages.

Args:
messages: A sequence of ModelMessage objects to convert
_id_generator: Optional ID generator function for testing. If not provided, uses uuid.uuid4().

Returns:
A list of UIMessage objects in Vercel AI format
"""

def _message_id_generator() -> str:
"""Generate a message ID."""
return _id_generator() if _id_generator is not None else str(uuid.uuid4())

tool_returns: dict[str, ToolReturnPart | BuiltinToolReturnPart] = {}
tool_errors: dict[str, RetryPromptPart] = {}

for msg in messages:
if isinstance(msg, ModelRequest):
for part in msg.parts:
if isinstance(part, ToolReturnPart | BuiltinToolReturnPart):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BuiltinToolReturnPart only exists on ModelResponse as it all happens on the server side

tool_returns[part.tool_call_id] = part
elif isinstance(part, RetryPromptPart) and part.tool_name is not None:
tool_errors[part.tool_call_id] = part

result: list[UIMessage] = []

for msg in messages:
if isinstance(msg, ModelRequest):
system_parts: list[SystemPromptPart] = []
user_parts: list[UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart] = []

for part in msg.parts:
if isinstance(part, SystemPromptPart):
system_parts.append(part)
elif isinstance( # pragma: no branch - All ModelRequest parts are covered
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No comments after pragma comments please, we don't have those anywhere else in the codebase

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also instead of this, could we add else: assert_never(part) to ensure that we cover each possible type? If we intentionally exclude a type, we should have an elif with pass and a comment explaining why

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No comments after pragma comments please, we don't have those anywhere else in the codebase

noted down, since pragmas are effectively skipping coverage checks I thought it important to justify the why, but will remove/leave them out in the future

part, UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart
):
user_parts.append(part)

if system_parts:
system_ui_parts: list[UIMessagePart] = [
TextUIPart(text=part.content, state='done') for part in system_parts
]
result.append(UIMessage(id=_message_id_generator(), role='system', parts=system_ui_parts))

# Note: Tool returns and retry prompts don't create user message parts
# They are only used to set the state of tool calls in assistant messages
if user_parts: # pragma: no branch - A ModelRequest with no user-visible parts is not tested
user_ui_parts: list[UIMessagePart] = []
for part in user_parts:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're looping over the parts twice now and repeating the isinstance checks; I think we can loop just once and build the system_ui_parts and user_ui_parts as we go, and then make UIMessages for them at the end

if isinstance(part, UserPromptPart):
user_ui_parts.extend(_convert_user_prompt_part(part))
elif isinstance(part, ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart):
# Tool returns/errors don't create separate UI parts
# They're merged into the tool call in the assistant message
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as up; let's make sure this is exhaustive


if user_ui_parts:
result.append(UIMessage(id=_message_id_generator(), role='user', parts=user_ui_parts))

elif isinstance( # pragma: no branch - All message types are covered (no tests for empty ModelResponse)
msg, ModelResponse
):
ui_parts: list[UIMessagePart] = []
text_parts: list[str] = []
had_interruption = False

# For builtin tools, returns can be in the same ModelResponse as calls
# Build a local mapping for this message
local_builtin_returns: dict[str, BuiltinToolReturnPart] = {}
for part in msg.parts:
if isinstance(part, BuiltinToolReturnPart):
local_builtin_returns[part.tool_call_id] = part

for part in msg.parts:
if isinstance(part, BuiltinToolReturnPart):
# Skip builtin tool returns - they're handled by the tool call logic
continue
elif isinstance(part, TextPart):
# If this is the first text after an interruption, prepend separator
if had_interruption:
text_parts.append('\n\n' + part.content)
else:
text_parts.append(part.content)
elif isinstance(part, ThinkingPart):
if text_parts:
ui_parts.append(TextUIPart(text=''.join(text_parts), state='done'))
text_parts = []
had_interruption = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can handle this more cleanly by checking the type of the last part, and if so, appending our text to theirs. Repeating this in multiple branches is a big smell

ui_parts.append(ReasoningUIPart(text=part.content, state='done'))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the goal of this feature is to be able to pass a Pydantic AI message history to the frontend, which will then send it back as Vercel AI types, we want the load_messages(dump_messages(pydantic_ai_messages)) roadtrip to be as little "lossy" as possible, attaching as much data as we can to reconstruct the exact original Pydantic AI messages at the end.

ThinkingPart has a lot of fields that we're currently losing (provider_name, id, signature) that are all required by APIs like OpenAI Responses and Anthropic to be able to faithfully continue a conversation in which the model had done thinking in the past. If they're excluded, the model will not be able to continue with the real thinking it did before, it'll just have access to the summary that we received over the API, meaning worse performance, so let's try to include this information on the provider_metadata field under a pydantic_ai key -- similar to something I did in the EventStream for builtin tools already.

Note that this goes for all parts and fields like id -- OpenAI Responses requires all of them to match in order to use the real server-side thinking history instead of just the summaries.

(If this is too much work right now, we could do it in a followup PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted! I'll review those aspects carefully after I'm done with the smaller changes

elif isinstance(part, FilePart):
if text_parts:
ui_parts.append(TextUIPart(text=''.join(text_parts), state='done'))
text_parts = []
had_interruption = False
ui_parts.append(
FileUIPart(
url=part.content.data_uri,
media_type=part.content.media_type,
)
)
elif isinstance(part, BaseToolCallPart): # pragma: no branch - All assistant part types are covered
if text_parts:
ui_parts.append(TextUIPart(text=''.join(text_parts), state='done'))
text_parts = []

# Mark that we had an interruption for next text part
had_interruption = True

if isinstance(part, BuiltinToolCallPart):
prefixed_id = _make_builtin_tool_call_id(part.provider_name, part.tool_call_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly just read it in your review of 3068 and thought I'd need a helper as well, but I'm only calling it once

# Check local returns first (same message), then global returns (from ModelRequest)
builtin_return = local_builtin_returns.get(part.tool_call_id) or (
tool_returns.get(part.tool_call_id)
if isinstance(tool_returns.get(part.tool_call_id), BuiltinToolReturnPart)
else None
)

if builtin_return:
content = builtin_return.model_response_str()
call_provider_metadata = (
{'pydantic_ai': {'provider_name': part.provider_name}}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we were already doing this for builtin tools in the event stream already :)

I suppose the stuff I said above also applies to the other events we output like for reasoning. But since we get the data chunk by chunk and have to stream it using particular Vercel AI events, we may not always have the information a Vercel AI event needs at the time we get the Pydantic AI event. But for best behavior we really should use provider_metadata to store this kind of info there more as well. Feel free to look into that here or in a new PR.

if part.provider_name
else None
)
ui_parts.append(
ToolOutputAvailablePart(
type=f'tool-{part.tool_name}',
tool_call_id=prefixed_id,
input=part.args_as_json_str(),
output=content,
state='output-available',
provider_executed=True,
call_provider_metadata=call_provider_metadata,
)
)
else: # pragma: no cover - Builtin tool call without a return is not tested
ui_parts.append(
ToolInputAvailablePart(
type=f'tool-{part.tool_name}',
tool_call_id=prefixed_id,
input=part.args_as_json_str(),
state='input-available',
provider_executed=True,
)
)
else:
tool_return = tool_returns.get(part.tool_call_id)
tool_error = tool_errors.get(part.tool_call_id)

if tool_return and isinstance(tool_return, ToolReturnPart):
content = tool_return.model_response_str()
ui_parts.append(
DynamicToolOutputAvailablePart(
tool_name=part.tool_name,
tool_call_id=part.tool_call_id,
input=part.args_as_json_str(),
output=content,
state='output-available',
)
)
elif tool_error:
error_text = tool_error.model_response()
ui_parts.append(
DynamicToolOutputErrorPart(
tool_name=part.tool_name,
tool_call_id=part.tool_call_id,
input=part.args_as_json_str(),
error_text=error_text,
state='output-error',
)
)
else:
ui_parts.append(
DynamicToolInputAvailablePart(
tool_name=part.tool_name,
tool_call_id=part.tool_call_id,
input=part.args_as_json_str(),
state='input-available',
)
)

if text_parts:
ui_parts.append(TextUIPart(text=''.join(text_parts), state='done'))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we force text parts to always follow the other ui parts? They can be mixed!


if ui_parts: # pragma: no branch - An empty ModelResponse is not tested
result.append(UIMessage(id=_message_id_generator(), role='assistant', parts=ui_parts))

return result


def _make_builtin_tool_call_id(provider_name: str | None, tool_call_id: str) -> str:
"""Create a prefixed tool call ID for builtin tools."""
return f'{BUILTIN_TOOL_CALL_ID_PREFIX}|{provider_name or ""}|{tool_call_id}'


def _convert_user_prompt_part(part: UserPromptPart) -> list[UIMessagePart]:
"""Convert a UserPromptPart to a list of UI message parts."""
ui_parts: list[UIMessagePart] = []

if isinstance(part.content, str):
ui_parts.append(TextUIPart(text=part.content, state='done'))
else:
for item in part.content:
if isinstance(item, str):
ui_parts.append(TextUIPart(text=item, state='done'))
elif isinstance(item, BinaryContent):
ui_parts.append(FileUIPart(url=item.data_uri, media_type=item.media_type))
elif isinstance(
item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl
): # pragma: no branch - All content types are covered
ui_parts.append(FileUIPart(url=item.url, media_type=item.media_type))

return ui_parts
4 changes: 4 additions & 0 deletions tests/test_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class DummyUIAdapter(UIAdapter[DummyUIRunInput, ModelMessage, str, AgentDepsT, O
def build_run_input(cls, body: bytes) -> DummyUIRunInput:
return DummyUIRunInput.model_validate_json(body)

@classmethod
def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[ModelMessage]:
return list(messages)

@classmethod
def load_messages(cls, messages: Sequence[ModelMessage]) -> list[ModelMessage]:
return list(messages)
Expand Down
Loading
Loading