Skip to content

Function-Call Subgraphs, Per-Task Circuit Breaker, Explicit Reducers

Choose a tag to compare

@bw19 bw19 released this 13 Jun 17:50
· 1 commit to main since this release

v1.37.0 finishes the workflow control-signal redesign and hardens the foreman against unreachable tasks. Subgraphs become plain function calls: the static graph.AddSubgraph node type is gone, a child workflow is invoked at runtime by a regular task calling flow.Subgraph(url, input), and only the explicit input crosses into the child while only the returned out crosses back. flow.Interrupt and flow.Subgraph now return (data, yield, err) instead of merging results into state by field name, and a step parks at most once. Reducer selection is now explicit: the sum* / list* / set* name-prefix inference is removed in favor of graph.SetReducer, and five reducers (And, Or, Concat, Min, Max) join the set. The foreman gains a per-task circuit breaker that trips on unreachable-task signals (404 ack-timeouts, 503, 529) and probes back on an exponential schedule, plus Restart / RestartFrom operator endpoints that replace the removed Retry and Fork. Every LLM provider's Turn now returns a normalized stopReason. A new agentstudio dev microservice and a reworked Mermaid renderer round out the release.

Highlights

  • graph.AddSubgraph removed; subgraphs are dynamic function calls. A child workflow is no longer a node in the parent graph. A regular task calls flow.Subgraph(url, input); the child's initial state is exactly input (nothing auto-inherited), and the child's final_state returns to the calling task as out (not merged into parent state). graph.IsSubgraph and Node.Subgraph are gone. A subgraph may now serve as a workflow's entry point.
  • flow.Interrupt and flow.Subgraph return (data, yield, err). Resume data and child output are delivered through the return value, not merged into state by field name. The idiom is data, yield, err := ...; if yield { return nil }. A bare call still compiles, so this is a silent change at call sites that read the result back from state.
  • One park per step. flow.Interrupt, flow.Subgraph, and flow.Retry share a single park slot. A step parks at most once (interrupt XOR subgraph), and a retry clears the park slot so retrying after a resolved subgraph re-runs the child. The conditional retry variants (RetryNow, RetryNowOnTimeout, RetryOnTimeout) are removed; flow.Retry is the single primitive and the retryable condition is written explicitly in the surrounding if.
  • Reducer selection is explicit. The sum* / list* / set* name-prefix inference and workflow.ReducerForFieldName are removed. Every non-default fan-in field is wired with graph.SetReducer(name, reducer); unregistered fields use Replace. Five new reducers ship: ReducerAnd, ReducerOr, ReducerConcat, ReducerMin, ReducerMax.
  • Per-task circuit breaker. The foreman trips a per-task breaker on the first unreachable-task signal (a 404 ack timeout:, a 503, or a 529), parks the task's pending backlog out of the selection index, and admits one probe per shard on an exponential schedule (100ms, doubling, capped at 1m). On a successful probe it releases that shard's backlog as a rolling wave. Trips gossip across replicas and reconstitute on restart.
  • Restart and RestartFrom replace Retry and Fork. foremanapi.Retry and foremanapi.Fork are removed. Restart(flowKey, stateOverrides) re-runs a terminated flow from its entry step; RestartFrom(stepKey, stateOverrides) rewinds a flow from a chosen step, sweeping the downstream subtree. Both accept optional state overrides.
  • LLM providers return StopReason. llmapi.TurnOut gains a StopReason field. Provider microservices (a Turn endpoint matching llmapi.Turn) must add stopReason string before usage and return one of the llmapi.StopReason* constants. Pure consumers of Chat need no change.
  • TESTING deployment no longer reads config files. A connector in TESTING deployment skips config.yaml / config.local.yaml and sees only declared defaults plus values set in test code. Production deployments are unchanged.
  • New foreman.Step(stepKey) endpoint. Returns a single step's full payload (state, changes, status, error, timings) for operator tooling and the agentstudio UI.
  • Switch transitions. graph.AddTransitionSwitch(from, to, when) adds first-match-wins routing (at most one branch runs), distinct from the fan-out When. Switch and Goto may now coexist from the same source.
  • Breakpoints and ResumeBreak. BreakBefore pauses a flow before a named task; ResumeBreak continues it with optional state overrides. Interrupt pauses and breakpoint pauses are strictly separated and never auto-routed.
  • agentstudio dev microservice. A development UI for discovering workflows and inspecting flows: a flow list, a step modal with Overview / State tabs, expandable values, and rendered execution DAGs.
  • Mermaid renderer reworked. Graph and flow renderers are extracted (graphrenderer.go, flowrenderer.go); subgraphs render as pink boxes, switch / when as diamonds and reducers as circles, with a unified teal palette and clickable links. graph.MermaidWith(resolver) expands subgraphs inline.
  • Flow and step started_at. Both rows record when the current attempt began dispatching, driving accurate execution-duration columns that exclude pre-run queue and breaker-park time. Restart resets started_at; RestartFrom preserves it.
  • Schema migrations 11.sql - 15.sql. Applied automatically at foreman startup. See Breaking Changes.

New Features

Subgraphs as Function Calls

A subgraph is invoked one way: a regular task calls flow.Subgraph(url, input). The graph definition never models a child workflow directly.

func (svc *Service) RunChild(ctx context.Context, flow *workflow.Flow, applicantName string, ssn string) (verified bool, err error) {
    out, yield, err := flow.Subgraph(childapi.Verify.URL(), map[string]any{
        "applicantName": applicantName,
        "ssn":           ssn,
    })
    if err != nil {
        return false, errors.Trace(err)
    }
    if yield {
        return false, nil // parked, child workflow running
    }
    v, _ := out["verified"].(bool)
    return v, nil
}

Only the explicit input map crosses into the child as its initial state; the parent's state and accumulated changes do not auto-inherit. A nil input means "no arguments" (the child starts empty). To preserve the old inheriting behavior, pass flow.Snapshot() as input. The child's final_state returns to the calling task as out; the task adopts the fields it wants. Subgraph failure surfaces as the err return, so the task can flow.Retry (re-running the child) or route, rather than only cascading a flow failure.

Explicit Reducers

Every fan-in field that needs anything other than last-write-wins is wired at graph-build time:

graph.SetReducer("messages", workflow.ReducerAppend)  // accumulate per-branch deltas
graph.SetReducer("total",    workflow.ReducerAdd)     // sum numeric contributions
graph.SetReducer("seen",     workflow.ReducerUnion)   // dedupe across branches
graph.SetReducer("approved", workflow.ReducerAnd)     // all branches must approve
graph.SetReducer("flagged",  workflow.ReducerOr)      // any branch flags
graph.SetReducer("notes",    workflow.ReducerConcat)  // join string deltas
graph.SetReducer("lowScore", workflow.ReducerMin)     // smallest contribution wins
graph.SetReducer("topScore", workflow.ReducerMax)     // largest contribution wins

Fields without a registered reducer use Replace. The old sum* / list* / set* prefix convention is gone, which removes a class of foot-guns where a field name silently carried execution semantics that then leaked into argument lists, struct types, JSON tags, and OpenAPI surfaces. A cleared contribution (flow.Clear) folds to the reducer's identity and is ignored at fan-in.

Per-Task Circuit Breaker

When a task endpoint is unreachable or signals overload, repeatedly dispatching its backlog wastes work and re-floods a struggling downstream. The foreman now trips a per-task breaker on the first such signal:

  • Trip signals: a 404 carrying the connector's ack timeout: prefix (no subscriber answered), a 503 Service Unavailable, or a 529 Site Overloaded. Other 5xx remain ordinary task errors.
  • Park: the task's pending steps are parked out of the selection index via a new parked column, leaving exactly one probe per shard.
  • Probe schedule: exponential, local to each replica, starting at 100ms and doubling to a 1m cap. The schedule advances only when a genuinely-due probe dispatches.
  • Recovery: a successful probe releases that shard's backlog as a rolling wave. Trips gossip to peers via TripBreaker, and a restarting replica reconstitutes its breakers from the parked rows in the database.
  • Observability: microbus_task_breaker_state, microbus_task_breaker_trips_total, and microbus_task_breaker_probes_total, each labeled by cause (ack_timeout, unavailable, overloaded).

There is no auto-give-up on a forever-tripped breaker; flow lifetime remains the workflow author's responsibility.

Restart and RestartFrom

Two operator endpoints replace the removed Retry and Fork:

client := foremanapi.NewClient(svc)
// Re-run a terminated flow from its entry step, with optional overrides.
err := client.Restart(ctx, flowKey, map[string]any{"threshold": 0.9})
// Rewind a flow from a chosen step, sweeping its downstream subtree.
err = client.RestartFrom(ctx, stepKey, nil)

Restart deletes every step past the entry, resets the entry to pending with merge(originalEntryState, stateOverrides), and re-runs; it is a fresh attempt, so started_at is reset. RestartFrom is a surgical in-place rewind: it sweeps the target step's downstream DAG subtree, cascades up the subgraph chain when the target lives inside a child flow, and preserves started_at. Pass nil for no overrides.

LLM StopReason

Each provider maps its native finish reason into a normalized vocabulary (llmapi/stopreason.go): StopReasonEndTurn, StopReasonToolUse, StopReasonMaxTokens, StopReasonStopSequence, StopReasonRefusal, StopReasonPauseTurn, and StopReasonUnknown (the empty string). The llm.core chat loop branches on it: tool_use continues the loop, completions return, and max_tokens / pause_turn / unknown fail loud rather than shipping a partial response.

// Provider Turn signature gains stopReason before usage.
func (svc *Service) Turn(ctx context.Context, model string, messages []llmapi.Message, tools []llmapi.Tool, options *llmapi.TurnOptions) (content string, toolCalls []llmapi.ToolCall, stopReason string, usage llmapi.Usage, err error)

Breaking Changes

The upgrade skill handles each of these. Manual migration is not recommended.

  • graph.AddSubgraph, graph.IsSubgraph, Node.Subgraph removed (loud). Each static subgraph node becomes a regular task that calls flow.Subgraph(url, input) and is registered with graph.AddTask.
  • flow.Interrupt / flow.Subgraph no longer merge into state (silent). Both return (data, yield, err); resume data and child output are delivered through the return value. A task that read the result back from state by field name will see stale or zero values until rewritten.
  • flow.Subgraph input is now the complete child state (silent). The parent's state no longer auto-inherits. Pass flow.Snapshot() for the old behavior, or a named-fields map for the explicit contract.
  • flow.RetryNow, flow.RetryNowOnTimeout, flow.RetryOnTimeout removed (loud). Use bare flow.Retry(maxAttempts, initialDelay, multiplier, maxDelay) with the retryable condition in the surrounding if.
  • Reducer prefix inference and workflow.ReducerForFieldName removed (silent + loud). Add a graph.SetReducer per previously-inferred fan-in field. coreservices/llm.ChatLoop renamed its history field from listMessages to messages.
  • foremanapi.Retry and foremanapi.Fork removed (loud). Migrate callers to Restart / RestartFrom. Fork's independent-sibling semantic has no direct replacement; RestartFrom mutates the existing flow in place.
  • llmapi.TurnOut gained StopReason (loud for providers). Provider Turn endpoints add stopReason string before usage; pure consumers need no change.
  • TESTING deployment no longer reads config.yaml / config.local.yaml (silent). Move test-relevant config values into the test setup (e.g. svc.SetConfig(name, value) before app.RunInTest(t)).
  • Schema migrations 11.sql - 15.sql. parked discriminator column plus reworked selection / saturation indexes (11), Fork columns dropped (12), cohort_failures (13), step started_at (14), flow started_at (15). Applied automatically at foreman startup; no manual schema change required.

Migration

From inside a Microbus project, ask Claude Code to upgrade Microbus:

{{< prompt >}}
Get the latest version of Microbus.
{{< /prompt >}}

The upgrade skill handles the version bump end-to-end:

  1. Bump go.mod to v1.37.0 and go mod tidy.
  2. Refresh .claude/rules/, .claude/skills/, and project-wide framework-managed files.
  3. Rewrite the removed retry variants to bare flow.Retry.
  4. Convert graph.AddSubgraph nodes to flow.Subgraph caller tasks, and rewrite flow.Interrupt / flow.Subgraph call sites to capture (data, yield, err).
  5. Add a graph.SetReducer per prefix-named fan-in field, and fix ChatLoop callers (listMessages -> messages).
  6. Migrate foremanapi.Retry / foremanapi.Fork callers to Restart / RestartFrom.
  7. Add stopReason to any LLM provider's Turn signature.
  8. Move TESTING-mode config values out of config.yaml into the test setup.
  9. Regenerate mocks (genmock) and manifests (genmanifest), then run go vet ./... && go test ./....

The load-bearing assertions are the silent ones: a workflow test that previously saw a subgraph's output in parent state, a resume that fed data back to an interrupted task, or a fan-in field summed across branches. If a test now sees a zero value, an empty out, or a last-write-wins value, a rewrite is missing at that call site.

Documentation

  • Updated: Agentic Workflows and State for the function-call subgraph model, Switch transitions, and the (data, yield, err) park signals.
  • Updated: Reducers for explicit graph.SetReducer, the removed prefix inference, and the new And / Or / Concat / Min / Max reducers.
  • Updated: Building Agentic Workflows and Building an LLM Workflow for flow.Subgraph, Switch, the single flow.Retry, and Restart / RestartFrom.
  • Updated: Package workflow for the new flow.Subgraph / flow.Interrupt shapes, Switch transitions, and the removed AddSubgraph / retry variants.
  • Updated: Foreman for the per-task circuit breaker, Restart / RestartFrom, the Step endpoint, started_at, and the parked column.
  • Updated: LLM and LLM integration for the normalized StopReason and the messages (formerly listMessages) ChatLoop field.
  • Updated: Credit Flow example and Python integration to the function-call subgraph, explicit reducers, and single flow.Retry.