Skip to content

Dwarf Engine Extraction, Out-Pointer Subgraphs, sequel Telemetry

Latest

Choose a tag to compare

@bw19 bw19 released this 16 Jun 15:42

v1.39.0 extracts the workflow engine into its own module — Dwarf — and rebuilds the Foreman as a thin Microbus adapter over it. The workflow package moves with the engine: github.com/microbus-io/fabric/workflow becomes github.com/microbus-io/dwarf/workflow, adding a dwarf dependency, while its exported types keep their names. The move brings the engine's current API. workflow.NewGraph gains a leading graph-name argument; flow.Goto now takes a graph node name instead of an endpoint URL; and flow.Subgraph / flow.Interrupt adopt an out-pointer — yield, err := flow.Subgraph(url, input, &out) — that unmarshals the child or resume result into a struct you pass, instead of returning a map. The asynchronous stop notification moves onto the flow itself: the StartNotify endpoint is gone, replaced by FlowOptions.NotifyOnStop set at Create. PascalCase becomes the convention for graph and task node names. Separately and non-breaking, sequel v1.10.2 adds an opt-in OpenTelemetry layer — client spans, sequel_* metrics, and migration logs — that lights up once a SQL microservice attaches the connector's providers.

Highlights

  • Workflow engine extracted to the Dwarf module. The engine that defines Graph, Flow, FlowOptions, FlowOutcome, the reducers, and END now lives in github.com/microbus-io/dwarf. The Foreman is a thin Microbus host over the embedded engine; the engine owns durable flow execution, scheduling, and recovery and stays transport-agnostic. Every file that imported fabric/workflow now imports dwarf/workflow, and the project gains a direct dwarf dependency.
  • workflow.NewGraph gained a name argument (loud). NewGraph(url) becomes NewGraph(name, url); the new first argument is the graph's display name (its PascalCase feature name). A call left at one argument is a compile error.
  • flow.Goto takes a graph node name, not an endpoint URL (silent). The signature is unchanged (Goto(string)), but the value's meaning changed: pass the node name the task was registered under in AddTask, not someapi.Task.URL(). A call left as flow.Goto(api.X.URL()) compiles and then fails to route.
  • flow.Subgraph / flow.Interrupt adopt an out-pointer (loud). Both now take a trailing out any pointer and return (yield, err); the child's final state and the resume payload are unmarshaled into out rather than returned as a leading map. The idiom is var out T; yield, err := flow.Subgraph(url, input, &out). The input may be a typed struct or a map[string]any; out may be a *struct, *map[string]any, or nil.
  • StartNotify removed; FlowOptions.NotifyOnStop replaces it (loud). To be notified when a flow stops, set NotifyOnStop: true at Create. The Foreman records the caller's host at create time and fires OnFlowStopped to it when the flow terminates — no delivery address is passed.
  • PascalCase graph and task node names. Newly scaffolded graphs name their graph and nodes in PascalCase (AddTask("VerifySSN", ...)). Node names are arbitrary strings that only need to be internally consistent, so existing graphs are not required to change.
  • sequel telemetry (opt-in, non-breaking). sequel v1.10.2 emits OpenTelemetry client spans, sequel_* metrics, and slog migration logs once a SQL microservice attaches the connector's TracerProvider / MeterProvider / Logger to its *sequel.DB. Without the wiring, behavior is identical.

New Features

The Dwarf Engine

The workflow engine is now the standalone Dwarf module. The Foreman implements the engine's Host interface — loading graphs over the bus, dispatching tasks, classifying transport errors, and delivering stop notifications — while Dwarf owns the durable state machine, the lease-based step execution, the adaptive rate-limit valve and per-task circuit breaker, and the recovery sweeps. Because the engine never inspects HTTP status codes itself, the Foreman maps a task's reply into the engine's two disposition wrappers — workflow.ErrRateLimited (a 429, engaging the valve) and workflow.ErrUnavailable (a 404 ack-timeout, 503, or 529, tripping the breaker). In a Microbus deployment you return the status code as before; the wrappers are the engine-facing translation.

Microbus docs stay at the "how to use" level — the Dwarf repository owns the engine internals.

Out-Pointer Subgraphs and Interrupts

flow.Subgraph and flow.Interrupt deliver their result through a pointer argument and return only (yield, err). Passing a typed In / Out struct keeps the boundary type-safe:

func (svc *Service) RunIdentityVerification(ctx context.Context, flow *workflow.Flow, applicantName string, ssn string) (identityVerified bool, err error) {
    var out creditflowapi.IdentityVerificationOut
    yield, err := flow.Subgraph(creditflowapi.IdentityVerification.URL(), creditflowapi.IdentityVerificationIn{
        ApplicantName: applicantName,
        SSN:           ssn,
    }, &out)
    if yield {
        return false, nil // first pass: parked, child workflow running
    }
    if err != nil {
        return false, errors.Trace(err) // child failed; retry, route, or propagate
    }
    return out.IdentityVerified, nil
}

flow.Interrupt(payload, &resume) is symmetric: it unmarshals the data passed to foremanapi.Resume into resume. Pass nil for out when you do not need the result inline.

Asynchronous Notification via NotifyOnStop

Opting a flow into a stop notification is now a flow option, not a separate endpoint call:

flowID, err := client.Create(ctx, myserviceapi.CreditApproval.URL(), map[string]any{
    "applicant": applicant,
}, &workflow.FlowOptions{NotifyOnStop: true})
// ... handle err ...
err = client.Start(ctx, flowID)

The Foreman fires the OnFlowStopped outbound event back to the calling microservice when the flow reaches a terminal status or interrupts. It records the caller's host at Create, so you no longer pass a hostname.

sequel Telemetry

A SQL CRUD microservice lights up sequel's observability by attaching the connector's providers in openDatabase, before the Migrate call so migrations are instrumented too:

svc.db, err = sequel.OpenSingleton(driverName, dataSourceName)
if err != nil {
    return errors.Trace(err)
}
// Route sequel's spans, sequel_* metrics, and migration logs through the connector's telemetry pipeline.
svc.db.SetTracerProvider(svc.TracerProvider())
svc.db.SetMeterProvider(svc.MeterProvider())
svc.db.SetLogger(svc.Logger())

The accessors return no-op providers when a signal is disabled, so the block is safe in every deployment including TESTING.

Breaking Changes

The upgrade skill handles each of these. Manual migration is not recommended.

  • github.com/microbus-io/fabric/workflowgithub.com/microbus-io/dwarf/workflow (loud). Every file that imports the workflow package — hand-written service.go, the generated intermediate.go / mock.go / mock_test.go, and the *api/client.go proxy — is repointed, and the project gains a direct dwarf dependency. Exported identifiers are otherwise unchanged.
  • workflow.NewGraph(url)NewGraph(name, url) (loud). Add the graph's PascalCase name as the leading argument.
  • flow.Goto(url)flow.Goto(nodeName) (silent). Pass the node name registered in AddTask, not an endpoint URL. A mismatch is caught by graph.Validate() at startup, not by the compiler.
  • flow.Subgraph / flow.Interrupt out-pointer (loud). data, yield, err := flow.Subgraph(url, input) becomes yield, err := flow.Subgraph(url, input, &out); likewise for flow.Interrupt(payload, &out). Both the argument count and the return count change, so old call sites fail to compile.
  • foremanapi.StartNotify removed (loud). Set FlowOptions.NotifyOnStop at Create and Start the flow as usual; the notify host is recorded automatically.
  • sequel bumped to v1.10.2. Additive — the telemetry layer is opt-in. No call sites break.

Migration

From inside a Microbus project, ask Claude Code to upgrade Microbus:

{{< prompt >}}
Get the latest version of Microbus.
{{< /prompt >}}

The upgrade skill handles the version bump end-to-end:

  1. Bump go.mod to v1.39.0 and go mod tidy (which adds github.com/microbus-io/dwarf and the sequel bump).
  2. Refresh .claude/rules/, .claude/skills/, and project-wide framework-managed files.
  3. Relocate the workflow import project-wide (fabric/workflowdwarf/workflow).
  4. Add the leading name argument to every workflow.NewGraph call.
  5. Change flow.Goto arguments from endpoint URLs to the registered node names.
  6. Rewrite flow.Subgraph / flow.Interrupt call sites to the out-pointer shape, and replace StartNotify with FlowOptions.NotifyOnStop.
  7. Wire sequel telemetry into each SQL CRUD microservice's openDatabase.
  8. Regenerate mocks (genmock) and manifests (genmanifest), then run go vet ./... && go test ./....

The load-bearing assertion is the silent one: a workflow test that drives a Goto transition fails to route if a call still passes a .URL() instead of the node name.

Documentation

  • Updated: Agentic Workflows and State for the out-pointer subgraph boundary and the relocated workflow package.
  • New: Yield and Re-Enter — the park-and-re-execute pattern shared by Interrupt, Subgraph, and Retry, with the out-pointer idiom.
  • Updated: Building Agentic Workflows and Building an LLM Workflow for NewGraph(name, url), flow.Goto node names, the out-pointer signatures, NotifyOnStop, and PascalCase node names.
  • Updated: Package workflow for the out-pointer park signals and the typed In / Out subgraph contracts.
  • Updated: Foreman for the Dwarf engine adapter and the NotifyOnStop notification model.
  • Updated: Credit Flow example for the typed-struct subgraph call and PascalCase node names.