Skip to content

feat: add workflow query endpoint for agent state inspection#173

Open
gabrycina wants to merge 3 commits intomainfrom
feat/workflow-query-endpoint
Open

feat: add workflow query endpoint for agent state inspection#173
gabrycina wants to merge 3 commits intomainfrom
feat/workflow-query-endpoint

Conversation

@gabrycina
Copy link
Contributor

@gabrycina gabrycina commented Mar 24, 2026

Summary

Adds GET /tasks/{task_id}/query/{query_name} endpoint that proxies Temporal workflow queries through the Agentex REST API.

Problem

When programmatically invoking agentic (Temporal) agents, callers have no way to know when the agent has finished processing a turn. The task status stays RUNNING throughout. Agents using the state machine SDK internally track their state (waiting_for_input, researching, etc.) but this isn't exposed externally.

Solution

Expose Temporal's built-in workflow query API through the existing REST API. Agents that register @workflow.query handlers can now be queried for their current state without affecting execution.

New endpoint

GET /tasks/{task_id}/query/{query_name}
→ {"task_id": "...", "query": "get_current_state", "result": "waiting_for_input"}

Changes

  • Added query route to tasks.py using existing DTemporalAdapter dependency
  • Changed TemporalQueryError from 500 to 400 (invalid query is a client error)

Companion change needed

Agents need to register @workflow.query handlers to be queryable. The state machine SDK should add a default get_current_state query handler — tracked separately in the SDK repo.

Use case

The Agent Plane communication service (meta-registry-comms-svc) invokes agents via RPC and polls for responses. With this endpoint, it can check get_current_state to detect when the agent transitions back to waiting_for_input, providing a reliable turn-completion signal for multi-turn conversations.

Follows the same pattern as Google A2A protocol's INPUT_REQUIRED task state.

🤖 Generated with Claude Code

Greptile Summary

This PR exposes Temporal's workflow query API through a new GET /tasks/{task_id}/query/{query_name} REST endpoint, enabling callers to inspect internal agent state (e.g. waiting_for_input) without polling task status. A small guard is also added in query_workflow to avoid passing None as a positional argument to handle.query() when no arg is needed.

Key observations:

  • The authorization check (DAuthorizedId with read permission) is correctly applied to the new endpoint.
  • The arg is not None guard in adapter_temporal.py is the right fix — passing None to handle.query() would serialize it and send it to the query handler as an actual argument.
  • TemporalQueryError remains a ServiceError with code = 500 in exceptions.py; the PR description claims this was changed to 400, but the file was not modified.
  • The new endpoint injects DTemporalAdapter directly rather than going through a domain use-case, which breaks the consistent layered pattern followed by every other route in tasks.py and skips the standard DB-level task existence check.

Confidence Score: 4/5

  • Safe to merge with the understanding that the direct adapter injection in the route layer is a minor architectural inconsistency to follow up on.
  • The changes are small and well-scoped. Authorization is correctly applied, error handling flows through the existing exception hierarchy, and the None-guard fix in the adapter is correct. The one concrete concern — bypassing the use-case layer and its DB existence check — is a real inconsistency but not a runtime bug for the happy path (Temporal will return 404 for unknown workflows anyway). The PR description's claim about changing TemporalQueryError to 400 is inaccurate (the file wasn't modified), but since the code keeps the existing 500 behavior, the previous thread's concern is moot in practice.
  • agentex/src/api/routes/tasks.py — the direct DTemporalAdapter injection warrants a follow-up to add a use-case wrapper for consistency.

Important Files Changed

Filename Overview
agentex/src/adapters/temporal/adapter_temporal.py Minor fix: adds a None guard before passing arg to handle.query() so that parameterless query handlers are invoked correctly. The change is correct — passing None as a positional arg is different from passing no arg in the Temporal Python SDK.
agentex/src/api/routes/tasks.py Adds GET /{task_id}/query/{query_name} endpoint. Authorization is correctly applied, error handling flows through the existing Temporal exception hierarchy, and route ordering is unambiguous. Main concern: the endpoint injects DTemporalAdapter directly instead of going through a domain use-case, which is inconsistent with every other route in the file and skips the standard DB existence check for the task.

Sequence Diagram

sequenceDiagram
    participant C as Caller
    participant R as tasks.py route
    participant A as TemporalAdapter
    participant T as Temporal Server

    C->>R: GET /tasks/{task_id}/query/{query_name}
    R->>R: DAuthorizedId — check read permission on task_id
    R->>A: query_workflow(workflow_id=task_id, query=query_name)
    A->>A: check client connected
    A->>T: get_workflow_handle(task_id)
    alt arg is not None
        A->>T: handle.query(query_name, arg)
    else no arg
        A->>T: handle.query(query_name)
    end
    alt success
        T-->>A: query result
        A-->>R: result
        R-->>C: {"task_id": "...", "query": "...", "result": "..."}
    else workflow not found
        T-->>A: not found error
        A-->>R: TemporalWorkflowNotFoundError (404)
        R-->>C: 404 Not Found
    else query/other error
        T-->>A: exception
        A-->>R: TemporalQueryError (500)
        R-->>C: 500 Internal Server Error
    end
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 221-239

Comment:
**Bypasses domain use-case layer — inconsistent with all other routes**

Every other endpoint in this file (and in the wider routes layer) delegates to a use-case class (`DTaskUseCase`, `DStreamsUseCase`, `DSchedulesUseCase`, …) and only interacts with adapters through those classes. This is the first route to inject an adapter (`DTemporalAdapter`) directly.

Side-effects of the bypass:
1. **No database existence check**: other routes call `task_use_case.get_task()` which verifies the task exists in the Agentex DB before touching Temporal. Here, a caller who has a valid auth token for a `task_id` that was deleted from the DB (but whose Temporal workflow is still running) would get a live Temporal response — or a raw 404 from Temporal — without going through the standard task-not-found path.
2. **Harder to unit-test**: tests for routes typically mock use-cases; a direct adapter dependency requires a different mock strategy.
3. **Couples the route to a Temporal-specific concept**: if the underlying scheduling engine changes, this route breaks without a use-case abstraction in between.

Consider adding a thin `query_workflow` method to `TasksUseCase` (or a `TemporalUseCase`) to keep the layer boundary consistent.

How can I resolve this? If you propose a fix, please make it concise.

Reviews (3): Last reviewed commit: "fix: don't pass None arg to Temporal que..." | Re-trigger Greptile

…al workflow queries

Expose Temporal workflow queries via the tasks REST API. The endpoint
delegates to the existing TemporalAdapter.query_workflow method and
relies on the global exception handler for error mapping:
- TemporalWorkflowNotFoundError → 404
- TemporalQueryError → 400 (changed from 500 to reflect client error)
- Other TemporalError subclasses → 500

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@gabrycina gabrycina requested a review from a team as a code owner March 24, 2026 19:35
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns a _workflow_state string that defaults
to "initialized".

Agents using StateMachine should override this to return the
state machine's current state, enabling external callers to
detect turn completion (state == "waiting_for_input").

Example override:

    @workflow.query(name="get_current_state")
    def get_current_state(self) -> str:
        return self.state_machine.get_current_state()

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns a _workflow_state string that defaults
to "initialized".

Agents using StateMachine should override this to return the
state machine's current state, enabling external callers to
detect turn completion (state == "waiting_for_input").

Example override:

    @workflow.query(name="get_current_state")
    def get_current_state(self) -> str:
        return self.state_machine.get_current_state()

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Greptile correctly flagged that TemporalQueryError is used as a
catch-all in the adapter, not just for missing query handlers.
Keeping it as 500 (ServiceError) is correct since connectivity,
serialization, and other server-side failures also raise this.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Automatically returns the state machine's current
state if self.state_machine exists (covers all StateMachine-based
agents without any override needed). Falls back to _workflow_state
string for non-state-machine agents.

Enables external callers to detect turn completion:
  "waiting_for_input" = agent is done, ready for next message
  "researching" = agent is still working

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
gabrycina added a commit to scaleapi/scale-agentex-python that referenced this pull request Mar 24, 2026
Adds a default @workflow.query handler named "get_current_state"
to BaseWorkflow. Returns "unknown" by default — agents override
to report their actual state.

Example for StateMachine-based agents:

    @workflow.query(name="get_current_state")
    def get_current_state(self) -> str:
        return self.state_machine.get_current_state()

Companion to scaleapi/scale-agentex#173 which adds
GET /tasks/{task_id}/query/{query_name} endpoint.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
handle.query(name, None) passes None as a positional argument
to the query handler, causing "takes 1 positional argument but
2 were given" for handlers that take no args.

Only pass arg when it's not None.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>


@router.get(
"/{task_id}/query/{query_name}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for this new route?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants