Skip to content

sse events

Kadyapam edited this page May 29, 2026 · 2 revisions

SSE events

The gateway exposes a Server-Sent Events channel at GET /events. Authenticated clients use it as a push surface for execution lifecycle, callback results, and live data subscriptions.

Source: src/sse.rs, src/connection_hub.rs.

Opening the channel

GET /events?session_token=<gateway_session_token>&client_id=<optional_existing_client_id>
  • session_token is required. The gateway validates it against the session cache (and falls back to the database for cache misses). Invalid or expired sessions are rejected with 401.
  • client_id is optional. Pass an existing one to reattach to a previous client identity after a reconnect; omit to get a fresh one. The gateway returns the chosen client_id in the init frame so the client can store it for the reconnect path.

Response is an SSE stream with Content-Type: text/event-stream. Standard EventSource semantics apply.

Frame catalog

Frame name Direction Payload sketch Sent when
message (init) server → client { result: { clientId } } Once, immediately after connection.
ping server → client { heartbeat: true, t: <unix_ms> } Periodic. Defaults from GATEWAY_HEARTBEAT_INTERVAL_SECS.
playbook/result server → client { requestId, executionId, status, data, error? } A playbook posted a final callback to /api/internal/callback.
playbook/progress server → client { requestId, executionId, step?, message? } A playbook posted to /api/internal/progress.
playbook/state server → client { execution_id, event_type, step_name?, status?, at } The NATS bridge observed an execution lifecycle event matching FORWARDED_EVENT_TYPES. Added in v2.11.0.

Clients filter frames by SSE event name on the EventSource instance:

const es = new EventSource(`${gateway}/events?session_token=${token}`);
es.addEventListener('message',         ev => /* init / heartbeat */);
es.addEventListener('playbook/result', ev => handlePlaybookResult(JSON.parse(ev.data)));
es.addEventListener('playbook/state',  ev => handlePlaybookState(JSON.parse(ev.data)));

Forwarded event types

The playbook/state frame only fires for execution lifecycle events whose event_type appears in the FORWARDED_EVENT_TYPES allowlist (src/playbook_state.rs).

Current allowlist (as of v2.12.0):

Event type Emitted by Meaning
step.exit NoETL worker A playbook step completed (success or failure).
playbook.completed NoETL server A playbook execution finished successfully.
playbook.failed NoETL server A playbook execution ended in failure.
calendar.event.touched itinerary-planner.yaml The orchestrator wrote a calendar event. Added in v2.12.0.

calendar.event.touched is a domain event emitted by the travel itinerary-planner orchestrator inside the NoETL event log (one per calendar event written). The gateway NATS bridge picks it up and forwards it as a playbook/state frame to the originating SSE client.

The travel SPA's calendarSubscription.ts listens for this event to refresh the CalendarView widget without waiting for the coarser playbook.completed signal. playbook.completed is kept as a fallback for turns that do not write calendar events.

Frame schemas

message (init)

{
  "jsonrpc": "2.0",
  "id": "<request_id_or_null>",
  "result": { "clientId": "<assigned_client_id>" }
}

The init frame is the first event a client receives. The clientId is what GraphQL executePlaybook mutations pass as their callback-routing identifier. Store it for the lifetime of the SSE connection; pass it back in the URL on reconnect.

ping

{ "heartbeat": true, "t": 1779600000000 }

Keeps intermediaries (Cloudflare Tunnel, browsers) from dropping the idle connection. Interval is configurable via GATEWAY_HEARTBEAT_INTERVAL_SECS (default 15s, see Configuration).

playbook/result

Emitted when a playbook the client started via executePlaybook (and any callback path that routes through /api/internal/callback*) has produced a result.

{
  "requestId": "<uuid>",
  "executionId": "<noetl_execution_id>",
  "status": "ok | error",
  "data": { /* playbook-specific result shape */ },
  "error": null
}

Note: the playbook itself must post to /api/internal/callback{,async} for this frame to fire. The itinerary-planner playbook in the travel SPA does not; the auth0_login playbook does.

playbook/progress

{
  "requestId": "<uuid>",
  "executionId": "<noetl_execution_id>",
  "step": "<step_name>",
  "message": "<human readable progress note>"
}

Emitted when a playbook posts to /api/internal/progress. Not all playbooks emit these. The travel SPA currently does not consume them (worth doing in a future round to surface "what the playbook is currently doing" in the UI).

playbook/state (v2.11.0)

{
  "execution_id": "<noetl_execution_id>",
  "event_type": "step.exit | playbook.completed | playbook.failed",
  "step_name": "<step name, if applicable>",
  "status": "<status from the event log>",
  "at": "2026-05-24T18:42:00.123Z"
}

This frame is the push channel for execution lifecycle. The gateway subscribes to NATS subjects under playbooks.executions.<execution_id>.* (prefix from NATS_UPDATES_SUBJECT_PREFIX) and forwards relevant frames to the originating client.

A SPA awaiting completion of an execution it started can use playbook/state instead of polling /api/executions/{id} repeatedly. The travel SPA migrated to this in travel#50.

Reconnection

EventSource transparently reconnects on transient errors. When a client reconnects:

  1. The client reuses its stored client_id in the reconnect URL.
  2. The gateway re-registers the connection under the same client_id.
  3. RequestStore::get_by_client returns the in-flight requests for this client, and the gateway re-attaches the callback routing.

Tests

Gateway SSE behavior is covered in src/sse.rs's unit tests and exercised end-to-end against a mock SSE client in the v2.11.0 PR. SPA-side coverage lives in src/api/noetlClient.test.ts.

Related

Clone this wiki locally