[FEAT] Add stateful responses layer with history rehydration and DB persistence#21
[FEAT] Add stateful responses layer with history rehydration and DB persistence#21maralbahari wants to merge 16 commits into
Conversation
Signed-off-by: maral <maralbahari.98@gmail.com>
Co-authored-by: Tan Jia Huei tanjiahuei@gmail.com Co-authored-by: noobHappylife aratar1991@hotmail.com Co-authored-by: Claude Signed-off-by: maralbahari maralbahari.98@gmail.com Signed-off-by: maral <maralbahari.98@gmail.com>
Co-authored-by: Claude Signed-off-by: maralbahari maralbahari.98@gmail.com Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Co-authored-by: Claude Signed-off-by: maral <maralbahari.98@gmail.com>
noobHappylife
left a comment
There was a problem hiding this comment.
For the test cassettes, should we also use a model with reasoning too?
| default="sqlite+aiosqlite:///./agentic_api.db", | ||
| description="SQLAlchemy async database URL.", | ||
| ) | ||
| db_dialect: str = Field( |
There was a problem hiding this comment.
Do we need this? we should be able to know it's sqlite or postgres from the db_url already?
There was a problem hiding this comment.
this is in the case that user created their Postgres database hosted and passed the url.
There was a problem hiding this comment.
Yeah, so I meant the db_dialect can be derived from db_url directly so we don't need to set it manually?
There was a problem hiding this comment.
Yeh but need to make sure they pass with posgres keyword along as well cause a url might contain host and port. So i guess for the url field need to add a verify check to pass the schema postgresql://
| elif isinstance(event, MessageDone): | ||
| yield from self._message_done(event) | ||
| elif isinstance(event, ReasoningStarted): | ||
| pass # Reasoning items are not emitted as output events in this implementation |
There was a problem hiding this comment.
what does this means? not emitted as output events
There was a problem hiding this comment.
yes for now. This PR doesnt focus on that. in another PR the reasoning events would be added.
| effective_tool_choice=hydrated_body.tool_choice, | ||
| effective_instructions=hydrated_body.instructions, | ||
| ) | ||
| await self._conversation_store.put_turn( # type: ignore[union-attr] |
There was a problem hiding this comment.
If a response failed halfway, does the emitted events still write into DB? and the full history list will be "hanging" with a failed event?
There was a problem hiding this comment.
it is all handled by the conversation CRUD ConversationStore if something goes wrong there is error being displayed and nothing would be stored. since these are async function and the CURD transaction session would roll back and the nothing would be stored.
There was a problem hiding this comment.
So, say the upstream got an error. We should treat it as an error event right https://www.openresponses.org/specification#errors? So from agentic-api PoV there shouldn't be "error", so user just see error event, and response.failed.
In this case, is the response stored?
There was a problem hiding this comment.
If something goes wrong on upstream that would be an event error. The _persist function is only called to store the responses on successful events therefore there the data is stored.
| from sqlalchemy.orm import DeclarativeBase | ||
|
|
||
|
|
||
| class Base(DeclarativeBase): |
There was a problem hiding this comment.
should we define the fields that exists in all tables here? Also should we consider using sqlmodel?
There was a problem hiding this comment.
This is the base table with no fields other than default builtin. each Response, Item and Conversation tables would write their own field/column. The sqlmodel is built on top of declarative sqlalchemy and it actually slower in query since it uses pydantic it add overhead and we dont want that overhead in CRUD usually. since we use dataclasses and querying the tables directly it is much faster. than sqlmodel and pydantic.
| proxy_client_manager: ProxyClientManager = ( | ||
| request.app.state.proxy_client_manager | ||
| ) | ||
| return await proxy_responses( |
There was a problem hiding this comment.
Should we also use the engine even if disabled response_store? so the behavior is consistent (since it goes through the same compose/normalize steps.
There was a problem hiding this comment.
yes we can use the engine too.
There was a problem hiding this comment.
@noobHappylife added a TODO for this to handle properly in another PR.
Signed-off-by: maral <maralbahari.98@gmail.com>
franciscojavierarceo
left a comment
There was a problem hiding this comment.
Thanks for the thorough work here — the architecture is clean and the test coverage is great. Left inline comments on the items worth addressing, grouped roughly by severity.
| conversation_id=row.id, | ||
| history_item_ids=[], | ||
| created_at=row.created_at, | ||
| ) |
There was a problem hiding this comment.
Critical: TOCTOU race in get_or_create
get and create_conversation run in separate sessions. Under concurrent requests with the same conversation_id, two coroutines can both observe None and both attempt to create — causing an unhandled IntegrityError (500).
A proven pattern for this is to use an atomic upsert (INSERT ... ON CONFLICT DO NOTHING + RETURNING, or catch IntegrityError and retry with a get). Other implementations of the Responses API use dialect-specific insert().on_conflict_do_nothing() for exactly this reason — it makes the create-if-not-exists operation atomic at the DB level without needing application-level locking.
There was a problem hiding this comment.
@franciscojavierarceo implemented dialect-specific insert().on_conflict_do_nothing().
| metadata=metadata_, | ||
| ) | ||
| except IntegrityError as e: | ||
| raise BadInputError(f"Response id already exists: {response_id}") from e |
There was a problem hiding this comment.
Critical: Lost-update race in put_turn
put_turn reads history_item_ids via self.get() (session 1), appends new IDs in Python, then writes the full list via _persist_conversation_turn (session 2). Two concurrent turns for the same conversation will both read the same history_item_ids and each will append their own items — the second write silently overwrites the first's items.
Two approaches that work well here:
- Single session with
SELECT ... FOR UPDATE— serialize concurrent writes to the same conversation at the row level. - Append-only item table — instead of maintaining a mutable
history_item_idslist on the Conversation row, store each item with aconversation_idFK and an ordering column (e.g.,created_at + sequence). Rehydration becomes a simpleSELECT ... WHERE conversation_id = ? ORDER BY seq. This eliminates the read-modify-write cycle entirely and is the pattern other Responses API implementations use successfully.
There was a problem hiding this comment.
@franciscojavierarceo I have applied your second suggestion. where we retrieve history from item tables for conversation api.
| ItemPayload.model_validate(items_by_id[item_id].data).item | ||
| for item_id in stored.history_item_ids | ||
| if item_id in items_by_id | ||
| ] |
There was a problem hiding this comment.
High: Silent data loss during rehydration
if item_id in items_by_idMissing Item rows (due to data corruption, partial cleanup, or eventual consistency) are silently skipped. The conversation history will be truncated without any error or log warning, which could lead to incorrect LLM context and confusing downstream behavior.
At minimum, log a warning for each missing item. Ideally, raise if len(items_by_id) != len(stored.history_item_ids) — a count mismatch indicates data integrity issues that shouldn't be silently swallowed.
There was a problem hiding this comment.
@franciscojavierarceo this wouldn't be necessary anymore here. after changing to second suggestion above.
| """ | ||
| global _session_factory | ||
| _session_factory = async_sessionmaker( | ||
| engine, class_=AsyncSession, expire_on_commit=False |
There was a problem hiding this comment.
Critical: configure_session_factory is not idempotent
This unconditionally replaces the module-level _session_factory global every time it's called. Both ResponseStore.__init__ and ConversationStore.__init__ call it. The comment on the ConversationStore side claims idempotency, but the implementation doesn't enforce it.
Suggestion — add a guard:
if _session_factory is not None:
returnOr do an identity check on the engine to catch misuse early.
|
|
||
|
|
||
| DONE_MARKER = "data: [DONE]\n\n" | ||
| TERMINAL_EVENT_TYPES = {"response.completed", "response.failed"} |
There was a problem hiding this comment.
High: response.incomplete missing from TERMINAL_EVENT_TYPES
The composer can emit "response.incomplete" (see composer.py around line 277), and the engine treats it as terminal. But this set only contains completed and failed, so the data: [DONE] marker won't be emitted immediately after an incomplete event.
TERMINAL_EVENT_TYPES = {"response.completed", "response.failed", "response.incomplete"}| tool_choice: ToolChoice = Field(default_factory=AutoToolChoice) | ||
| stream: bool = False | ||
| response_store_enabled: bool = True | ||
| conversation_store_enabled: bool = False |
There was a problem hiding this comment.
Medium: conversation_store_enabled not gated server-side
This is a per-request field — any client can enable conversation tracking by setting it in their request body. There's no server-level config to disallow it.
Consider adding a RuntimeConfig.conversation_store_enabled flag that gates whether the per-request field is honored. When the server flag is off, ignore the client field (or return 400). This gives operators control over whether the feature is available.
There was a problem hiding this comment.
@franciscojavierarceo we're planning to split conversation handling into its own dedicated router in another PR. Once that separation is in place, the per-request flag will be removed and server-side control will live naturally in the conversation router's config. added a TODO for this
| _cached_text_clause("SELECT pg_advisory_unlock(:k)"), {"k": key} | ||
| ) | ||
| except Exception: | ||
| return |
There was a problem hiding this comment.
Medium: Advisory lock unlock failure silently swallowed
except Exception:
returnIf the unlock fails (e.g., connection dropped), the PostgreSQL advisory lock leaks with no log output. At minimum, log a warning here so operators can detect lock leaks.
| return ModelRequest( | ||
| parts=[ | ||
| ToolReturnPart( | ||
| tool_name="", content=item.output, tool_call_id=item.call_id |
There was a problem hiding this comment.
Medium: Empty tool_name passed to ToolReturnPart
ToolReturnPart(tool_name="", content=item.output, tool_call_id=item.call_id)This is an inherent limitation of the OpenAI Responses API contract (function_call_output doesn't include the tool name). Worth adding a short comment explaining why, so future readers don't try to "fix" it. If the history contains a prior FunctionToolCall item with the matching call_id, the tool name could be looked up from there.
| ItemPayload.model_validate(items_by_id[item_id].data).item | ||
| for item_id in stored.history_item_ids | ||
| if item_id in items_by_id | ||
| ] |
There was a problem hiding this comment.
High: Silent data loss during rehydration (same issue as conversation store)
if item_id in items_by_idSame concern as ConversationStore.rehydrate — missing items are silently skipped. Should log a warning or raise on count mismatch.
There was a problem hiding this comment.
@franciscojavierarceo fixed added a warning log.
| effective_tool_choice=hydrated_body.tool_choice, | ||
| effective_instructions=hydrated_body.instructions, | ||
| ) | ||
| await self._conversation_store.put_turn( # type: ignore[union-attr] |
There was a problem hiding this comment.
Medium: Consider upsert-based streaming persistence
Currently, the response is persisted only on completion via _persist. If the process crashes mid-stream, the response is lost entirely.
A pattern that works well for streaming is checkpoint-based upsert persistence: INSERT the response with in_progress status when streaming begins, then UPDATE (upsert) at key events (output_item.done, response.completed). This lets clients poll GET /v1/responses/{id} to see partial progress, and ensures incomplete responses are at least partially recoverable after crashes.
Not necessarily required for this PR, but worth considering for a follow-up — especially if background/async response execution is planned.
There was a problem hiding this comment.
@franciscojavierarceo added a #TODO for this.
franciscojavierarceo
left a comment
There was a problem hiding this comment.
Two additional follow-up comments on streaming resilience and future-proofing.
| async for event in self._iter_events(run_settings, pipeline, stream=True): | ||
| if event.type in {"response.completed", "response.incomplete"}: | ||
| await self._persist( | ||
| hydrated_body=hydrated_body, |
There was a problem hiding this comment.
Medium: Persistence failure kills the SSE stream
await self._persist(...) is in the hot path of the streaming response. If the DB write throws (connection drop, constraint violation, etc.), the exception propagates and kills the SSE stream — the client gets an abrupt disconnect instead of the response they were already receiving.
A more resilient pattern is best-effort persistence: wrap _persist in a try/except, log the failure as a warning, and let the stream complete. The client still gets their full response even if the DB hiccups. The response just won't be rehydratable for future turns.
try:
await self._persist(...)
except Exception:
logger.warning("Failed to persist response %s", pipeline.composer.response.id, exc_info=True)| request_tool_choice=self._body.tool_choice, | ||
| stored_tool_choice=stored.metadata.effective_tool_choice, | ||
| tool_choice_explicitly_set="tool_choice" in fields_set, | ||
| ), |
There was a problem hiding this comment.
Low (future-proofing): No status validation on previous_response_id
_rehydrate fetches the stored response via get_or_raise but doesn't check whether the referenced response has a terminal status (completed, incomplete, failed). Today this is fine because responses are only persisted on completion. But once streaming persistence lands (where responses are INSERT'd as in_progress), a client could reference an in-progress response and get partial/inconsistent history.
Worth adding a status check here when that work arrives:
if stored.status not in {"completed", "incomplete", "failed"}:
raise BadInputError(f"Cannot chain from response with status '{stored.status}'")There was a problem hiding this comment.
@franciscojavierarceo added a a #TODO for this
Signed-off-by: maral <maralbahari.98@gmail.com>
…t_turn Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
Signed-off-by: maral <maralbahari.98@gmail.com>
leseb
left a comment
There was a problem hiding this comment.
given our recent direction to move to rust, i believe this should be closed.
@leseb Sure. will open PR converting this PR from python to rust. |
Summary
Implements the stateful responses layer for
agentic-api: a full request orchestration pipeline that adds conversation history, protocol translation, and a three-table persistence store on top of the existing vLLM proxy gateway.What's in this PR
POST /v1/responses—previous_response_idrehydration chains turns from prior responses;conversation_idrehydration loads full multi-turn sessions from the DBItem,Response, andConversationtables (SQLite default, PostgreSQL for multi-worker);SchemaManagerhandles DDL lifecycleEnginedrives the full request lifecycle;Pipelineruns the pydantic_ai agent against vLLM's Responses Model per requestPydanticAINormalizerconverts pydantic_ai events → internalNormalizedEvents;ResponseComposerconverts them → OpenAI Responses API SSE eventsRequestInputTranslatorconvertsInputItem/OutputItem→ pydantic_aiModelMessages;StoreInputTranslatornormalizes history items before persistence--response-store-enabled, default on) —ResponseStoresaves/loads response checkpoints;ConversationStoremanages multi-turn session stateresponse_store_enabled=falsefalls back to raw HTTP passthrough (the original proxy);response_store_enabled=trueroutes through the full managed pipeline--conversation-store-enabledflag — opt-in multi-turn conversation tracking (default off)test_responses_api.pyandtest_conversation_api.pyreplay responses recorded against the OpenAI API; no GPU or real vLLM neededImplementation Detail
Layer 3 — Core Orchestration (ADR-01 §4)
core/engine.py:Engineorchestrates the full request: rehydration → translation → agent run → normalization → composition → persistencecore/pipeline.py:Pipelinewraps the pydantic_ai agent run for a single turn, yieldingNormalizedEventstreamcore/normalizer.py:PydanticAINormalizer: maps pydantic_aiStreamEventsubtypes →MessageStarted,MessageDelta,ReasoningDelta,FunctionCallStarted,FunctionCallDelta,FunctionCallDone,MessageDone.core/composer.py:ResponseComposer: mapsNormalizedEvent→ OpenAI Responses API SSE frames (response.created,response.output_item.added,response.output_text.delta,response.output_item.done,response.completed, etc.)core/translator.py:RequestInputTranslator: convertsInputMessage/OutputMessage(including tool calls and results) → pydantic_aiModelMessagescore/normalized_events.py: internal dataclass hierarchy for the normalizer↔composer contractcore/sse.py: SSE frame encoderLayer 4 — Persistence / Store (ADR-02)
store/response.py—ResponseStore: saves completed responses, rehydrates history fromprevious_response_idchainstore/conversation.py—ConversationStore: saves/loads conversation state, appends new items per turnstore/translator.py—StoreInputTranslator: normalises raw input items from the DB before passing to the translatorLayer 5 — Database (ADR-02)
database/db_engine.py— async SQLAlchemy engine; PostgreSQL advisory lock helpers for future multi-worker supportdatabase/schema.py—SchemaManager: creates/drops all tables; called during FastAPI lifespandatabase/session.py— async session factory;@session_transactionand@run_in_sessiondecoratorsdatabase/item.py/database/response.py/database/conversation.py— ORM models for the three-table schemaTest Plan
Tests cover stateful multi-turn rehydration (
previous_response_idandconversation_id), streaming vs non-streaming, protocol normalizer/composer unit tests, store persistence, and the full pipeline — all in-process via VCR cassettes recorded against the OpenAI API; no GPU or real vLLM needed.Test Results
100 passed, 1 warning in 1.43s