From 42d6e8cd22b0ed8f662572e957e553e4cf98a81e Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 13:50:29 +0100 Subject: [PATCH 1/6] feat(supervisor): wide-event module + kill switch --- apps/supervisor/src/env.ts | 5 + apps/supervisor/src/wideEvents/context.ts | 14 ++ apps/supervisor/src/wideEvents/emit.test.ts | 115 ++++++++++ apps/supervisor/src/wideEvents/emit.ts | 72 ++++++ apps/supervisor/src/wideEvents/index.ts | 28 +++ .../src/wideEvents/middleware.test.ts | 207 ++++++++++++++++++ apps/supervisor/src/wideEvents/middleware.ts | 122 +++++++++++ apps/supervisor/src/wideEvents/new.test.ts | 81 +++++++ apps/supervisor/src/wideEvents/new.ts | 76 +++++++ apps/supervisor/src/wideEvents/record.test.ts | 112 ++++++++++ apps/supervisor/src/wideEvents/record.ts | 82 +++++++ apps/supervisor/src/wideEvents/state.ts | 62 ++++++ .../src/wideEvents/traceparent.test.ts | 43 ++++ apps/supervisor/src/wideEvents/traceparent.ts | 39 ++++ 14 files changed, 1058 insertions(+) create mode 100644 apps/supervisor/src/wideEvents/context.ts create mode 100644 apps/supervisor/src/wideEvents/emit.test.ts create mode 100644 apps/supervisor/src/wideEvents/emit.ts create mode 100644 apps/supervisor/src/wideEvents/index.ts create mode 100644 apps/supervisor/src/wideEvents/middleware.test.ts create mode 100644 apps/supervisor/src/wideEvents/middleware.ts create mode 100644 apps/supervisor/src/wideEvents/new.test.ts create mode 100644 apps/supervisor/src/wideEvents/new.ts create mode 100644 apps/supervisor/src/wideEvents/record.test.ts create mode 100644 apps/supervisor/src/wideEvents/record.ts create mode 100644 apps/supervisor/src/wideEvents/state.ts create mode 100644 apps/supervisor/src/wideEvents/traceparent.test.ts create mode 100644 apps/supervisor/src/wideEvents/traceparent.ts diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index f2d54741ee..0e52578daf 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -256,6 +256,11 @@ const Env = z // Debug DEBUG: BoolEnv.default(false), SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), + + // Wide-event observability - off by default. Emits one flat-keyed JSON + // line per natural unit of work (dequeue iteration, HTTP request, socket + // lifecycle). High-QPS hotpath, so the kill switch must be honoured. + TRIGGER_WIDE_EVENTS_ENABLED: BoolEnv.default(false), }) .superRefine((data, ctx) => { if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { diff --git a/apps/supervisor/src/wideEvents/context.ts b/apps/supervisor/src/wideEvents/context.ts new file mode 100644 index 0000000000..a89859c270 --- /dev/null +++ b/apps/supervisor/src/wideEvents/context.ts @@ -0,0 +1,14 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import type { State } from "./state.js"; + +/** + * AsyncLocalStorage threading per-operation `State` through the call stack. + * Wrappers enter a state via `wideEventStorage.run(state, () => fn())` and + * any code in the async call tree retrieves it via `fromContext()`. + */ +export const wideEventStorage = new AsyncLocalStorage(); + +/** Returns the State attached to the current async context, or null. */ +export function fromContext(): State | null { + return wideEventStorage.getStore() ?? null; +} diff --git a/apps/supervisor/src/wideEvents/emit.test.ts b/apps/supervisor/src/wideEvents/emit.test.ts new file mode 100644 index 0000000000..e9df12a3d4 --- /dev/null +++ b/apps/supervisor/src/wideEvents/emit.test.ts @@ -0,0 +1,115 @@ +import { describe, it, expect } from "vitest"; +import { emit, EmitMessage } from "./emit.js"; +import { newState } from "./new.js"; + +function captureEmit(state: Parameters[0]): Record { + const captured: string[] = []; + const origWrite = process.stdout.write; + process.stdout.write = ((chunk: unknown) => { + captured.push(String(chunk)); + return true; + }) as typeof process.stdout.write; + try { + emit(state); + } finally { + process.stdout.write = origWrite; + } + expect(captured).toHaveLength(1); + const line = captured[0]; + if (!line) throw new Error("no captured line"); + return JSON.parse(line) as Record; +} + +describe("emit", () => { + it("emits a single line with the stable message + request_id", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.durationMs = 5; + const out = captureEmit(s); + expect(out.msg).toBe(EmitMessage); + expect(out.request_id).toBe(s.requestId); + expect(out.service).toBe("supervisor"); + expect(out.ok).toBe(true); + expect(out.status).toBe(200); + expect(out.duration_ms).toBe(5); + }); + + it("omits empty optional fields", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + const out = captureEmit(s); + expect(out).not.toHaveProperty("trace_id"); + expect(out).not.toHaveProperty("version"); + expect(out).not.toHaveProperty("commit_sha"); + expect(out).not.toHaveProperty("instance_id"); + expect(out).not.toHaveProperty("error.code"); + }); + + it("flattens meta keys as meta.", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.meta.run_id = "run_abc"; + s.meta.deployment_id = "dep_xyz"; + const out = captureEmit(s); + expect(out["meta.run_id"]).toBe("run_abc"); + expect(out["meta.deployment_id"]).toBe("dep_xyz"); + expect(out).not.toHaveProperty("meta"); + }); + + it("flattens phases as phase..", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.phases.push({ name: "warm_start", durationMs: 12, ok: true, attempts: 1 }); + s.phases.push({ + name: "workload_create", + durationMs: 3, + ok: false, + attempts: 2, + errorCode: "Error", + errorMsg: "boom", + sub: { create_ms: 1 }, + }); + const out = captureEmit(s); + expect(out["phase.warm_start.duration_ms"]).toBe(12); + expect(out["phase.warm_start.ok"]).toBe(true); + expect(out["phase.warm_start.attempts"]).toBe(1); + expect(out["phase.workload_create.duration_ms"]).toBe(3); + expect(out["phase.workload_create.ok"]).toBe(false); + expect(out["phase.workload_create.attempts"]).toBe(2); + expect(out["phase.workload_create.error_code"]).toBe("Error"); + expect(out["phase.workload_create.error_message"]).toBe("boom"); + expect(out["phase.workload_create.create_ms"]).toBe(1); + }); + + it("includes error.code/message/kind when state.error is set", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 500; + s.error = { code: "InternalError", message: "kaboom", kind: "internal" }; + const out = captureEmit(s); + expect(out["error.code"]).toBe("InternalError"); + expect(out["error.message"]).toBe("kaboom"); + expect(out["error.kind"]).toBe("internal"); + }); + + it("truncates very long error messages", () => { + const s = newState({ service: "supervisor", env: {} }); + s.error = { code: "Big", message: "x".repeat(2000), kind: "internal" }; + const out = captureEmit(s); + expect((out["error.message"] as string).length).toBe(512); + }); + + it("flattens extras at the top level", () => { + const s = newState({ service: "supervisor", env: {} }); + s.statusCode = 200; + s.ok = true; + s.extras.route = "/health"; + s.extras["dispatch.result"] = "hit"; + const out = captureEmit(s); + expect(out.route).toBe("/health"); + expect(out["dispatch.result"]).toBe("hit"); + }); +}); diff --git a/apps/supervisor/src/wideEvents/emit.ts b/apps/supervisor/src/wideEvents/emit.ts new file mode 100644 index 0000000000..85a7a082ca --- /dev/null +++ b/apps/supervisor/src/wideEvents/emit.ts @@ -0,0 +1,72 @@ +import type { State } from "./state.js"; + +/** + * Stable slog message string for every wide event. Downstream filters (jq, + * Axiom queries, Vector pipelines) pin to this constant. The `service` field + * disambiguates which service emitted it. + */ +export const EmitMessage = "wide_event"; + +const MAX_ERROR_MSG_BYTES = 512; + +/** + * Serializes a State as a single flat-keyed JSON line on stdout. Keys are + * flat (no nested objects) to keep jq filtering and Axiom indexing cheap. + * Empty optional fields are omitted. + */ +export function emit(state: State): void { + const out: Record = { + msg: EmitMessage, + request_id: state.requestId, + }; + + if (state.traceId) out.trace_id = state.traceId; + appendIfSet(out, "service", state.service); + appendIfSet(out, "version", state.version); + appendIfSet(out, "commit_sha", state.commitSha); + appendIfSet(out, "region", state.region); + appendIfSet(out, "node_id", state.nodeId); + appendIfSet(out, "instance_id", state.instanceId); + + out.ok = state.ok; + if (state.statusCode !== 0) out.status = state.statusCode; + out.duration_ms = state.durationMs; + + if (state.error) { + appendIfSet(out, "error.code", state.error.code); + const msg = + state.error.message.length > MAX_ERROR_MSG_BYTES + ? state.error.message.slice(0, MAX_ERROR_MSG_BYTES) + : state.error.message; + appendIfSet(out, "error.message", msg); + appendIfSet(out, "error.kind", state.error.kind); + } + + for (const [k, v] of Object.entries(state.meta)) { + out["meta." + k] = v; + } + + for (const p of state.phases) { + const prefix = "phase." + p.name + "."; + out[prefix + "duration_ms"] = p.durationMs; + out[prefix + "ok"] = p.ok; + out[prefix + "attempts"] = p.attempts; + if (p.errorCode) out[prefix + "error_code"] = p.errorCode; + if (p.errorMsg) out[prefix + "error_message"] = p.errorMsg; + if (p.sub) { + for (const [sk, sv] of Object.entries(p.sub)) { + out[prefix + sk] = sv; + } + } + } + + for (const [k, v] of Object.entries(state.extras)) { + out[k] = v; + } + + process.stdout.write(JSON.stringify(out) + "\n"); +} + +function appendIfSet(out: Record, key: string, value: string | undefined): void { + if (value) out[key] = value; +} diff --git a/apps/supervisor/src/wideEvents/index.ts b/apps/supervisor/src/wideEvents/index.ts new file mode 100644 index 0000000000..742d5c018e --- /dev/null +++ b/apps/supervisor/src/wideEvents/index.ts @@ -0,0 +1,28 @@ +/** + * Wide-event observability surface for the supervisor. One flat-keyed JSON + * line per natural unit of work (HTTP request, dequeue iteration, socket + * lifecycle event). Events join across services via `trace_id` (parsed from + * the inbound W3C `traceparent`) and `meta.run_id`. + * + * Off by default behind a kill switch - the dispatch hotpath runs at high + * QPS, so logging pressure must be cleanly removable. + */ +export { type Env, isValidRequestId, newState, type NewStateOptions } from "./new.js"; +export { emit, EmitMessage } from "./emit.js"; +export { parseTraceId } from "./traceparent.js"; +export { fromContext, wideEventStorage } from "./context.js"; +export { + type PhaseOpt, + recordPhase, + recordPhaseSince, + timePhase, +} from "./record.js"; +export { + emitOneShot, + runWideEvent, + setExtra, + setMeta, + type WideEventLifecycleOptions, + type WideEventOptions, +} from "./middleware.js"; +export type { ErrorInfo, PhaseRecord, State } from "./state.js"; diff --git a/apps/supervisor/src/wideEvents/middleware.test.ts b/apps/supervisor/src/wideEvents/middleware.test.ts new file mode 100644 index 0000000000..18a6469d7b --- /dev/null +++ b/apps/supervisor/src/wideEvents/middleware.test.ts @@ -0,0 +1,207 @@ +import { describe, it, expect } from "vitest"; +import { fromContext } from "./context.js"; +import { emitOneShot, runWideEvent, setMeta } from "./middleware.js"; + +function captureStdout(fn: () => Promise | unknown): Promise { + const captured: string[] = []; + const orig = process.stdout.write; + process.stdout.write = ((chunk: unknown) => { + captured.push(String(chunk)); + return true; + }) as typeof process.stdout.write; + return Promise.resolve(fn()) + .finally(() => { + process.stdout.write = orig; + }) + .then(() => captured); +} + +describe("runWideEvent", () => { + it("emits one event with ok=true when no statusCode is set", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true, route: "/x", method: "POST" }, + async () => undefined + ); + }); + expect(lines).toHaveLength(1); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(true); + expect(ev.service).toBe("supervisor"); + expect(ev.route).toBe("/x"); + expect(ev.method).toBe("POST"); + expect(typeof ev.duration_ms).toBe("number"); + expect(typeof ev.request_id).toBe("string"); + }); + + it("derives ok from statusCode set via finalize", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => undefined, + (state) => { + state.statusCode = 200; + } + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(true); + expect(ev.status).toBe(200); + }); + + it("treats 4xx as ok=false", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => undefined, + (state) => { + state.statusCode = 400; + } + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(false); + expect(ev.status).toBe(400); + }); + + it("emits ok=false with error.kind=internal on throw", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => { + throw new Error("boom"); + } + ).catch(() => undefined); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(false); + expect(ev.status).toBe(500); + expect(ev["error.kind"]).toBe("internal"); + expect(ev["error.message"]).toBe("boom"); + }); + + it("threads state through AsyncLocalStorage", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => { + setMeta(fromContext(), "run_id", "run_abc"); + } + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev["meta.run_id"]).toBe("run_abc"); + expect(ev.ok).toBe(true); + }); + + it("picks up inbound traceparent for trace_id", async () => { + const tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: true, traceparent: tp }, + async () => undefined + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.trace_id).toBe("4bf92f3577b34da6a3ce929d0e0e4736"); + }); + + it("honours setup() to attach meta and extras before fn runs", async () => { + const lines = await captureStdout(async () => { + await runWideEvent( + { + service: "supervisor", + env: {}, + enabled: true, + setup: (state) => { + state.meta.run_id = "run_abc"; + state.extras.iteration = "dequeue"; + }, + }, + async () => undefined + ); + }); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev["meta.run_id"]).toBe("run_abc"); + expect(ev.iteration).toBe("dequeue"); + }); + + it("short-circuits to pass-through when enabled=false", async () => { + let seenState: ReturnType = null; + const lines = await captureStdout(async () => { + await runWideEvent( + { service: "supervisor", env: {}, enabled: false }, + async () => { + seenState = fromContext(); + } + ); + }); + expect(lines).toHaveLength(0); + expect(seenState).toBe(null); + }); + + it("isolates state across concurrent invocations", async () => { + const lines = await captureStdout(async () => { + await Promise.all( + ["a", "b", "c"].map((tag) => + runWideEvent( + { service: "supervisor", env: {}, enabled: true }, + async () => { + const s = fromContext(); + if (!s) throw new Error("no state"); + s.meta.tag = tag; + await new Promise((r) => setTimeout(r, 5)); + expect(s.meta.tag).toBe(tag); + } + ) + ) + ); + }); + const tags = lines.map((l) => (JSON.parse(l) as Record)["meta.tag"]); + expect(tags.sort()).toEqual(["a", "b", "c"]); + }); +}); + +describe("emitOneShot", () => { + it("emits a single event with populated meta when enabled", async () => { + const lines = await captureStdout(() => { + emitOneShot({ + service: "supervisor", + env: {}, + enabled: true, + populate: (s) => { + s.meta.run_id = "run_abc"; + s.extras.event = "run:start"; + }, + }); + }); + expect(lines).toHaveLength(1); + const line = lines[0]; + if (!line) throw new Error("no line"); + const ev = JSON.parse(line) as Record; + expect(ev.ok).toBe(true); + expect(ev["meta.run_id"]).toBe("run_abc"); + expect(ev.event).toBe("run:start"); + }); + + it("emits nothing when disabled", async () => { + const lines = await captureStdout(() => { + emitOneShot({ service: "supervisor", env: {}, enabled: false }); + }); + expect(lines).toHaveLength(0); + }); +}); diff --git a/apps/supervisor/src/wideEvents/middleware.ts b/apps/supervisor/src/wideEvents/middleware.ts new file mode 100644 index 0000000000..5dafdfa7d1 --- /dev/null +++ b/apps/supervisor/src/wideEvents/middleware.ts @@ -0,0 +1,122 @@ +import { emit } from "./emit.js"; +import { newState, type Env } from "./new.js"; +import { wideEventStorage } from "./context.js"; +import type { State } from "./state.js"; + +/** Options common to every wide-event lifecycle. */ +export type WideEventOptions = { + service: string; + env: Env; + /** + * Kill switch. When false, lifecycles degenerate into transparent + * pass-through - no State allocation, no AsyncLocalStorage run, no emit. + * Important for the dispatch hotpath where logging pressure must be + * cleanly removable. + */ + enabled: boolean; +}; + +/** Per-invocation options layered on top of `WideEventOptions`. */ +export type WideEventLifecycleOptions = WideEventOptions & { + /** Route template (HTTP only) captured into `extras.route`. */ + route?: string; + /** HTTP method captured into `extras.method`. */ + method?: string; + /** Inbound W3C traceparent (HTTP header, queue message field). */ + traceparent?: string; + /** Inbound request id (e.g. `x-request-id` header). */ + inboundRequestId?: string; + /** Runs after the state is built, before the wrapped fn. Use to attach meta. */ + setup?: (state: State) => void; +}; + +/** + * Runs `fn` inside an AsyncLocalStorage state and emits one wide event on + * completion or error. `finalize` runs after `fn` returns but before emit - + * use it to read out-of-band outcome info (e.g. `res.statusCode` for an HTTP + * route) and assign to `state.statusCode`. The wrapper computes `ok` from + * `statusCode` if it's set; otherwise it defaults to true on success. + * + * Returns the original `fn` result. When `enabled=false`, `fn` runs unchanged + * with no event emitted. + */ +export async function runWideEvent( + opts: WideEventLifecycleOptions, + fn: () => Promise | T, + finalize?: (state: State) => void +): Promise { + if (!opts.enabled) { + return fn(); + } + + const state = newState({ + service: opts.service, + env: opts.env, + inboundRequestId: opts.inboundRequestId, + traceparent: opts.traceparent, + }); + if (opts.route) state.extras.route = opts.route; + if (opts.method) state.extras.method = opts.method; + if (opts.setup) opts.setup(state); + + const start = performance.now(); + try { + const result = await wideEventStorage.run(state, () => Promise.resolve(fn())); + state.durationMs = Math.round(performance.now() - start); + if (finalize) finalize(state); + if (state.statusCode !== 0) { + state.ok = state.statusCode >= 200 && state.statusCode < 300; + } else { + state.ok = true; + } + emit(state); + return result; + } catch (err) { + state.durationMs = Math.round(performance.now() - start); + const e = err instanceof Error ? err : new Error(String(err)); + if (state.statusCode === 0) state.statusCode = 500; + state.ok = false; + state.error = { + code: e.name || "Error", + message: e.message, + kind: "internal", + }; + emit(state); + throw err; + } +} + +/** + * One-shot wide event with no wrapped operation. Use for socket lifecycle + * events (`run:start`, `run:stop`) where there is no surrounding async unit + * of work to time. `populate` runs synchronously to attach meta/extras + * before emit. + */ +export function emitOneShot( + opts: WideEventOptions & { + traceparent?: string; + populate?: (state: State) => void; + } +): void { + if (!opts.enabled) return; + const state = newState({ + service: opts.service, + env: opts.env, + traceparent: opts.traceparent, + }); + if (opts.populate) opts.populate(state); + state.ok = true; + emit(state); +} + +/** Convenience accessor for in-handler meta mutation. */ +export function setMeta(state: State | null, key: string, value: string): void { + if (!state) return; + state.meta[key] = value; +} + +/** Convenience for free-form fields (did_warm_start, dispatch.result, ...). */ +export function setExtra(state: State | null, key: string, value: unknown): void { + if (!state) return; + state.extras[key] = value; +} diff --git a/apps/supervisor/src/wideEvents/new.test.ts b/apps/supervisor/src/wideEvents/new.test.ts new file mode 100644 index 0000000000..476c49c3d0 --- /dev/null +++ b/apps/supervisor/src/wideEvents/new.test.ts @@ -0,0 +1,81 @@ +import { describe, it, expect } from "vitest"; +import { isValidRequestId, newState } from "./new.js"; + +describe("isValidRequestId", () => { + it("accepts visible ASCII", () => { + expect(isValidRequestId("req-abc-123_456.7")).toBe(true); + expect(isValidRequestId("a")).toBe(true); + }); + + it("rejects empty string", () => { + expect(isValidRequestId("")).toBe(false); + }); + + it("rejects overlong strings (>128 bytes)", () => { + expect(isValidRequestId("a".repeat(128))).toBe(true); + expect(isValidRequestId("a".repeat(129))).toBe(false); + }); + + it("rejects whitespace, newlines, control chars", () => { + expect(isValidRequestId("has space")).toBe(false); + expect(isValidRequestId("has\ttab")).toBe(false); + expect(isValidRequestId("has\nnewline")).toBe(false); + expect(isValidRequestId("\x00null")).toBe(false); + }); + + it("rejects high-bit / non-ASCII", () => { + expect(isValidRequestId("café")).toBe(false); + expect(isValidRequestId("a\x7f")).toBe(false); + }); +}); + +describe("newState", () => { + const env = { version: "1.0.0", commitSha: "abc123", region: "us-east-1", nodeId: "node-1" }; + + it("populates service identity from env", () => { + const s = newState({ service: "supervisor", env }); + expect(s.service).toBe("supervisor"); + expect(s.version).toBe("1.0.0"); + expect(s.commitSha).toBe("abc123"); + expect(s.region).toBe("us-east-1"); + expect(s.nodeId).toBe("node-1"); + }); + + it("mints a fresh request id when none provided", () => { + const s = newState({ service: "test", env: {} }); + expect(s.requestId).toMatch(/^req-[0-9a-f]{32}$/); + }); + + it("honours a valid inbound request id", () => { + const s = newState({ service: "test", env: {}, inboundRequestId: "trace-abc-123" }); + expect(s.requestId).toBe("trace-abc-123"); + }); + + it("rejects unsafe inbound request id and mints a fresh one", () => { + const s = newState({ service: "test", env: {}, inboundRequestId: "has space" }); + expect(s.requestId).toMatch(/^req-[0-9a-f]{32}$/); + }); + + it("parses traceparent into traceId and preserves the raw header", () => { + const tp = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; + const s = newState({ service: "test", env: {}, traceparent: tp }); + expect(s.traceId).toBe("4bf92f3577b34da6a3ce929d0e0e4736"); + expect(s.traceparent).toBe(tp); + }); + + it("leaves traceId empty when no traceparent provided", () => { + const s = newState({ service: "test", env: {} }); + expect(s.traceId).toBe(""); + expect(s.traceparent).toBe(""); + }); + + it("initialises empty meta/extras/phases", () => { + const s = newState({ service: "test", env: {} }); + expect(s.meta).toEqual({}); + expect(s.extras).toEqual({}); + expect(s.phases).toEqual([]); + expect(s.ok).toBe(false); + expect(s.statusCode).toBe(0); + expect(s.durationMs).toBe(0); + }); +}); diff --git a/apps/supervisor/src/wideEvents/new.ts b/apps/supervisor/src/wideEvents/new.ts new file mode 100644 index 0000000000..2b7c88b7cc --- /dev/null +++ b/apps/supervisor/src/wideEvents/new.ts @@ -0,0 +1,76 @@ +import { randomBytes } from "node:crypto"; +import { parseTraceId } from "./traceparent.js"; +import type { State } from "./state.js"; + +const MAX_REQUEST_ID_LEN = 128; + +/** + * Validates an inbound request id. Non-empty, no longer than 128 bytes, + * composed entirely of visible ASCII (0x21..0x7E). Rejects newlines, control + * characters, whitespace, DEL, high-bit bytes - any of which could poison the + * log pipeline if echoed back verbatim. + */ +export function isValidRequestId(s: string): boolean { + if (s.length === 0 || s.length > MAX_REQUEST_ID_LEN) return false; + for (let i = 0; i < s.length; i++) { + const c = s.charCodeAt(i); + if (c < 0x21 || c > 0x7e) return false; + } + return true; +} + +/** + * Service-level identity that's constant for the lifetime of the process. + * Populated once at startup, copied into every State. + */ +export type Env = { + version?: string; + commitSha?: string; + region?: string; + nodeId?: string; +}; + +export type NewStateOptions = { + service: string; + env: Env; + /** Optional inbound request id (e.g. from `x-request-id`). If unsafe or absent, a fresh `req-` is minted. */ + inboundRequestId?: string; + /** Optional inbound W3C traceparent (HTTP header, queue message field). */ + traceparent?: string; +}; + +/** + * Builds a State for a wide-event lifecycle. + * + * - requestId: honours `inboundRequestId` if present and safe; otherwise + * mints a fresh `req-` id. + * - traceId: parsed from the provided traceparent (graceful empty if + * absent or malformed). + * - traceparent: preserved verbatim for downstream propagation. + */ +export function newState(opts: NewStateOptions): State { + const traceparent = opts.traceparent ?? ""; + const inbound = opts.inboundRequestId ?? ""; + const requestId = isValidRequestId(inbound) ? inbound : newRequestId(); + + return { + requestId, + traceId: parseTraceId(traceparent), + traceparent, + service: opts.service, + version: opts.env.version, + commitSha: opts.env.commitSha, + region: opts.env.region, + nodeId: opts.env.nodeId, + meta: {}, + phases: [], + ok: false, + statusCode: 0, + durationMs: 0, + extras: {}, + }; +} + +function newRequestId(): string { + return "req-" + randomBytes(16).toString("hex"); +} diff --git a/apps/supervisor/src/wideEvents/record.test.ts b/apps/supervisor/src/wideEvents/record.test.ts new file mode 100644 index 0000000000..beeb0fff22 --- /dev/null +++ b/apps/supervisor/src/wideEvents/record.test.ts @@ -0,0 +1,112 @@ +import { describe, it, expect } from "vitest"; +import { fromContext, wideEventStorage } from "./context.js"; +import { recordPhase, recordPhaseSince, timePhase } from "./record.js"; +import { newState } from "./new.js"; +import type { State } from "./state.js"; + +function makeState(): State { + return newState({ service: "test", env: {} }); +} + +describe("recordPhase", () => { + it("appends a successful phase", () => { + const s = makeState(); + recordPhase(s, "lookup", performance.now() - 50, undefined); + expect(s.phases).toHaveLength(1); + const phase = s.phases[0]; + if (!phase) throw new Error("missing phase"); + expect(phase.name).toBe("lookup"); + expect(phase.ok).toBe(true); + expect(phase.attempts).toBe(1); + expect(phase.durationMs).toBeGreaterThanOrEqual(45); + }); + + it("appends a failed phase with error code/message", () => { + const s = makeState(); + recordPhase(s, "dispatch", performance.now(), new Error("nope")); + const phase = s.phases[0]; + if (!phase) throw new Error("missing phase"); + expect(phase.ok).toBe(false); + expect(phase.errorCode).toBe("Error"); + expect(phase.errorMsg).toBe("nope"); + }); + + it("truncates very long error messages", () => { + const s = makeState(); + recordPhase(s, "x", performance.now(), new Error("y".repeat(2000))); + const phase = s.phases[0]; + if (!phase) throw new Error("missing phase"); + expect(phase.errorMsg?.length).toBe(512); + }); + + it("honours opts.attempts", () => { + const s = makeState(); + recordPhase(s, "retry", performance.now(), undefined, { attempts: 3 }); + expect(s.phases[0]?.attempts).toBe(3); + }); + + it("attaches sub-timings", () => { + const s = makeState(); + recordPhase(s, "complex", performance.now(), undefined, { sub: { setup_ms: 10, work_ms: 5 } }); + expect(s.phases[0]?.sub).toEqual({ setup_ms: 10, work_ms: 5 }); + }); + + it("is a no-op when state is null", () => { + expect(() => recordPhase(null, "x", performance.now(), undefined)).not.toThrow(); + }); +}); + +describe("timePhase + AsyncLocalStorage threading", () => { + it("records via fromContext on success", async () => { + const s = makeState(); + const value = await wideEventStorage.run(s, () => timePhase("work", async () => 42)); + expect(value).toBe(42); + expect(s.phases).toHaveLength(1); + expect(s.phases[0]?.ok).toBe(true); + }); + + it("records via fromContext on error and rethrows", async () => { + const s = makeState(); + await expect( + wideEventStorage.run(s, () => + timePhase("work", async () => { + throw new Error("boom"); + }) + ) + ).rejects.toThrow("boom"); + expect(s.phases).toHaveLength(1); + expect(s.phases[0]?.ok).toBe(false); + expect(s.phases[0]?.errorMsg).toBe("boom"); + }); + + it("runs fn unchanged when no state on context", async () => { + const value = await timePhase("work", async () => "ok"); + expect(value).toBe("ok"); + }); +}); + +describe("recordPhaseSince", () => { + it("records using a caller-captured start time", async () => { + const s = makeState(); + await wideEventStorage.run(s, async () => { + const start = performance.now(); + await new Promise((r) => setTimeout(r, 10)); + recordPhaseSince("spanning", start, undefined); + }); + expect(s.phases).toHaveLength(1); + expect(s.phases[0]?.durationMs).toBeGreaterThanOrEqual(8); + }); +}); + +describe("fromContext", () => { + it("returns null when no state attached", () => { + expect(fromContext()).toBe(null); + }); + + it("returns the state when inside wideEventStorage.run", () => { + const s = makeState(); + wideEventStorage.run(s, () => { + expect(fromContext()).toBe(s); + }); + }); +}); diff --git a/apps/supervisor/src/wideEvents/record.ts b/apps/supervisor/src/wideEvents/record.ts new file mode 100644 index 0000000000..b37c7246a0 --- /dev/null +++ b/apps/supervisor/src/wideEvents/record.ts @@ -0,0 +1,82 @@ +import { fromContext } from "./context.js"; +import type { PhaseRecord, State } from "./state.js"; + +const MAX_ERROR_MSG_BYTES = 512; + +/** Optional knobs for a phase record. */ +export type PhaseOpt = { + /** Attempt count for the phase (default 1). */ + attempts?: number; + /** Sub-timings to fold into `phase..`. */ + sub?: Record; +}; + +/** + * Appends a phase outcome to `state.phases`. Safe to call on success + * (`err === undefined`) and error paths. `errorMsg` is truncated to 512 bytes + * to keep the wide event compact. No-op if state is null. + */ +export function recordPhase( + state: State | null, + name: string, + startMs: number, + err: Error | undefined, + opts: PhaseOpt = {} +): void { + if (!state) return; + const p: PhaseRecord = { + name, + durationMs: Math.round(performance.now() - startMs), + ok: err === undefined, + attempts: opts.attempts ?? 1, + }; + if (err) { + p.errorCode = err.name || "Error"; + const msg = err.message; + p.errorMsg = msg.length > MAX_ERROR_MSG_BYTES ? msg.slice(0, MAX_ERROR_MSG_BYTES) : msg; + } + if (opts.sub) p.sub = opts.sub; + state.phases.push(p); +} + +/** + * Runs `fn` and appends a phase outcome to the State attached to the current + * async context. If no state is on context (test paths, background work), + * `fn` runs unchanged. The phase is recorded on both success and error paths + * so failed phases still appear in the wide event with duration_ms + + * error_code. + */ +export async function timePhase( + name: string, + fn: () => Promise | T, + opts: PhaseOpt = {} +): Promise { + const start = performance.now(); + try { + const result = await fn(); + recordPhase(fromContext(), name, start, undefined, opts); + return result; + } catch (err) { + recordPhase(fromContext(), name, start, asError(err), opts); + throw err; + } +} + +/** + * Appends a phase outcome to the State attached to the current async context + * using a `startMs` captured by the caller. Use when the phase boundary spans + * multiple calls with intermediate error handling that can't fit inside a + * single `timePhase` closure. Nil-state safe. + */ +export function recordPhaseSince( + name: string, + startMs: number, + err: Error | undefined, + opts: PhaseOpt = {} +): void { + recordPhase(fromContext(), name, startMs, err, opts); +} + +function asError(e: unknown): Error { + return e instanceof Error ? e : new Error(String(e)); +} diff --git a/apps/supervisor/src/wideEvents/state.ts b/apps/supervisor/src/wideEvents/state.ts new file mode 100644 index 0000000000..5fb53ecb93 --- /dev/null +++ b/apps/supervisor/src/wideEvents/state.ts @@ -0,0 +1,62 @@ +/** + * Per-event accumulator backing a single wide event. The supervisor emits one + * flat-keyed JSON line per natural unit of work (dequeue iteration, HTTP + * request, socket lifecycle event). Optional fields are omitted on emit so + * events stay compact. + */ +export type State = { + // Cross-stack correlation. + requestId: string; + traceId: string; + /** + * Raw inbound W3C `traceparent`, preserved verbatim so outbound calls can + * propagate the same trace context without losing the parent span-id. + * Empty when no inbound traceparent was set. + */ + traceparent: string; + + // Service identity (set by `newState` from Env). + service: string; + version?: string; + commitSha?: string; + region?: string; + nodeId?: string; + instanceId?: string; + + // Caller-attached opaque metadata, flattened to `meta.` on emit. + meta: Record; + + // Per-phase outcomes, in completion order. + phases: PhaseRecord[]; + + // Top-level outcome (set after the wrapped operation returns). + ok: boolean; + statusCode: number; + durationMs: number; + error?: ErrorInfo; + + // Free-form ad-hoc additions (route, method, did_warm_start, ...). + extras: Record; +}; + +/** + * Single named phase outcome. Retries collapse into `attempts > 1` with the + * last error reflected in errorCode/errorMsg. + */ +export type PhaseRecord = { + name: string; + durationMs: number; + ok: boolean; + attempts: number; + errorCode?: string; + errorMsg?: string; + sub?: Record; +}; + +/** Top-level error summary for a failed operation. */ +export type ErrorInfo = { + code: string; + message: string; + /** Coarse classification - "client" | "upstream" | "internal" | "timeout". */ + kind: string; +}; diff --git a/apps/supervisor/src/wideEvents/traceparent.test.ts b/apps/supervisor/src/wideEvents/traceparent.test.ts new file mode 100644 index 0000000000..85ed31c3b6 --- /dev/null +++ b/apps/supervisor/src/wideEvents/traceparent.test.ts @@ -0,0 +1,43 @@ +import { describe, it, expect } from "vitest"; +import { parseTraceId } from "./traceparent.js"; + +describe("parseTraceId", () => { + const validTraceId = "4bf92f3577b34da6a3ce929d0e0e4736"; + const validHeader = `00-${validTraceId}-00f067aa0ba902b7-01`; + + it("extracts the trace-id from a valid W3C traceparent", () => { + expect(parseTraceId(validHeader)).toBe(validTraceId); + }); + + it("returns empty string for empty/null/undefined input", () => { + expect(parseTraceId("")).toBe(""); + expect(parseTraceId(null)).toBe(""); + expect(parseTraceId(undefined)).toBe(""); + }); + + it("returns empty for wrong segment count", () => { + expect(parseTraceId("00-abc-def")).toBe(""); + expect(parseTraceId("00-abc-def-01-extra")).toBe(""); + }); + + it("returns empty for non-zero version byte", () => { + expect(parseTraceId(`01-${validTraceId}-00f067aa0ba902b7-01`)).toBe(""); + }); + + it("returns empty for wrong-length trace-id", () => { + expect(parseTraceId("00-abc-00f067aa0ba902b7-01")).toBe(""); + }); + + it("returns empty for non-hex trace-id", () => { + expect(parseTraceId("00-zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz-00f067aa0ba902b7-01")).toBe(""); + }); + + it("returns empty for all-zero trace-id", () => { + expect(parseTraceId("00-00000000000000000000000000000000-00f067aa0ba902b7-01")).toBe(""); + }); + + it("accepts uppercase hex", () => { + const tid = "4BF92F3577B34DA6A3CE929D0E0E4736"; + expect(parseTraceId(`00-${tid}-00f067aa0ba902b7-01`)).toBe(tid); + }); +}); diff --git a/apps/supervisor/src/wideEvents/traceparent.ts b/apps/supervisor/src/wideEvents/traceparent.ts new file mode 100644 index 0000000000..9e84294f06 --- /dev/null +++ b/apps/supervisor/src/wideEvents/traceparent.ts @@ -0,0 +1,39 @@ +/** + * Extracts the trace-id from a W3C `traceparent` header. Returns "" when the + * header is absent, malformed, or carries an all-zero trace-id. + * + * Format: `---` + * version : 2 hex chars, must be "00" + * trace-id: 32 hex chars, non-zero + * span-id : 16 hex chars (not validated - we only need trace-id) + * flags : 2 hex chars (not validated) + */ +export function parseTraceId(header: string | null | undefined): string { + if (!header) return ""; + const parts = header.split("-"); + if (parts.length !== 4) return ""; + if (parts[0] !== "00") return ""; + const tid = parts[1]; + if (!tid || tid.length !== 32) return ""; + if (!isHex(tid)) return ""; + if (isAllZero(tid)) return ""; + return tid; +} + +function isHex(s: string): boolean { + for (let i = 0; i < s.length; i++) { + const c = s.charCodeAt(i); + const isDigit = c >= 0x30 && c <= 0x39; + const isLower = c >= 0x61 && c <= 0x66; + const isUpper = c >= 0x41 && c <= 0x46; + if (!isDigit && !isLower && !isUpper) return false; + } + return true; +} + +function isAllZero(s: string): boolean { + for (let i = 0; i < s.length; i++) { + if (s.charCodeAt(i) !== 0x30) return false; + } + return true; +} From da7e6226b95be622982b8c263fb4413d61d02e8a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 13:50:39 +0100 Subject: [PATCH 2/6] feat(supervisor): wide events on dequeue + warm-start trace --- apps/supervisor/src/index.ts | 334 ++++++++++++++++++++++------------- 1 file changed, 207 insertions(+), 127 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 6f5913c47c..c4768658b1 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -28,6 +28,14 @@ import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; import { OtlpTraceService } from "./services/otlpTraceService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; +import { + fromContext, + recordPhaseSince, + runWideEvent, + setExtra, + setMeta, + type WideEventOptions, +} from "./wideEvents/index.js"; if (env.METRICS_COLLECT_DEFAULTS) { collectDefaultMetrics({ register }); @@ -50,6 +58,12 @@ class ManagedSupervisor { private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; + private readonly wideEventOpts: WideEventOptions = { + service: "supervisor", + env: { nodeId: env.TRIGGER_WORKER_INSTANCE_NAME }, + enabled: env.TRIGGER_WIDE_EVENTS_ENABLED, + }; + constructor() { const { TRIGGER_WORKER_TOKEN, @@ -239,149 +253,202 @@ class ManagedSupervisor { async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => { this.logger.verbose(`Received message with timestamp ${time.toLocaleString()}`, message); - if (message.completedWaitpoints.length > 0) { - this.logger.debug("Run has completed waitpoints", { - runId: message.run.id, - completedWaitpoints: message.completedWaitpoints.length, - }); - } - - if (!message.image) { - this.logger.error("Run has no image", { runId: message.run.id }); - return; - } + const traceparent = extractTraceparent(message.run.traceContext); + + await runWideEvent( + { + ...this.wideEventOpts, + traceparent, + setup: (state) => { + setMeta(state, "run_id", message.run.id); + setMeta(state, "env_id", message.environment.id); + setMeta(state, "org_id", message.organization.id); + setMeta(state, "project_id", message.project.id); + if (message.deployment.friendlyId) { + setMeta(state, "deployment_id", message.deployment.friendlyId); + } + setMeta(state, "machine_preset", message.run.machine.name); + state.extras.iteration = "dequeue"; + state.extras.dequeue_response_ms = dequeueResponseMs; + state.extras.polling_interval_ms = pollingIntervalMs; + state.extras.completed_waitpoints = message.completedWaitpoints.length; + }, + }, + async () => { + if (message.completedWaitpoints.length > 0) { + this.logger.debug("Run has completed waitpoints", { + runId: message.run.id, + completedWaitpoints: message.completedWaitpoints.length, + }); + } - const { checkpoint, ...rest } = message; - - // Register trace context early so snapshot spans work for all paths - // (cold create, restore, warm start). Re-registration on restore is safe - // since dequeue always provides fresh context. - if (this.computeManager?.traceSpansEnabled) { - const traceparent = extractTraceparent(message.run.traceContext); - - if (traceparent) { - this.workloadServer.registerRunTraceContext(message.run.friendlyId, { - traceparent, - envId: message.environment.id, - orgId: message.organization.id, - projectId: message.project.id, - }); - } - } + if (!message.image) { + setExtra(fromContext(), "path_taken", "skipped_no_image"); + this.logger.error("Run has no image", { runId: message.run.id }); + return; + } - if (checkpoint) { - this.logger.debug("Restoring run", { runId: message.run.id }); + const { checkpoint, ...rest } = message; - if (this.computeManager) { - try { - const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id); - - const didRestore = await this.computeManager.restore({ - snapshotId: checkpoint.location, - runnerId, - runFriendlyId: message.run.friendlyId, - snapshotFriendlyId: message.snapshot.friendlyId, - machine: message.run.machine, - traceContext: message.run.traceContext, + // Register trace context early so snapshot spans work for all paths + // (cold create, restore, warm start). Re-registration on restore is safe + // since dequeue always provides fresh context. + if (this.computeManager?.traceSpansEnabled && traceparent) { + this.workloadServer.registerRunTraceContext(message.run.friendlyId, { + traceparent, envId: message.environment.id, orgId: message.organization.id, projectId: message.project.id, - dequeuedAt: message.dequeuedAt, }); + } + + if (checkpoint) { + setExtra(fromContext(), "path_taken", "restore"); + this.logger.debug("Restoring run", { runId: message.run.id }); + + if (this.computeManager) { + const restoreStart = performance.now(); + try { + const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id); + + const didRestore = await this.computeManager.restore({ + snapshotId: checkpoint.location, + runnerId, + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + machine: message.run.machine, + traceContext: message.run.traceContext, + envId: message.environment.id, + orgId: message.organization.id, + projectId: message.project.id, + dequeuedAt: message.dequeuedAt, + }); + recordPhaseSince("restore", restoreStart, undefined); + setExtra(fromContext(), "did_restore", didRestore); + + if (didRestore) { + this.logger.debug("Compute restore successful", { + runId: message.run.id, + runnerId, + }); + } else { + this.logger.error("Compute restore failed", { + runId: message.run.id, + runnerId, + }); + } + } catch (error) { + recordPhaseSince( + "restore", + restoreStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to restore run (compute)", { error }); + } + + return; + } - if (didRestore) { - this.logger.debug("Compute restore successful", { - runId: message.run.id, - runnerId, + if (!this.checkpointClient) { + this.logger.error("No checkpoint client", { runId: message.run.id }); + return; + } + + const restoreStart = performance.now(); + try { + const didRestore = await this.checkpointClient.restoreRun({ + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + body: { + ...rest, + checkpoint, + }, }); - } else { - this.logger.error("Compute restore failed", { runId: message.run.id, runnerId }); + recordPhaseSince("restore", restoreStart, undefined); + setExtra(fromContext(), "did_restore", didRestore); + + if (didRestore) { + this.logger.debug("Restore successful", { runId: message.run.id }); + } else { + this.logger.error("Restore failed", { runId: message.run.id }); + } + } catch (error) { + recordPhaseSince( + "restore", + restoreStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to restore run", { error }); } - } catch (error) { - this.logger.error("Failed to restore run (compute)", { error }); + + return; } - return; - } + this.logger.debug("Scheduling run", { runId: message.run.id }); - if (!this.checkpointClient) { - this.logger.error("No checkpoint client", { runId: message.run.id }); - return; - } + const warmStartStart = performance.now(); + const didWarmStart = await this.tryWarmStart(message, traceparent); + const warmStartCheckMs = Math.round(performance.now() - warmStartStart); + recordPhaseSince("warm_start", warmStartStart, undefined); + setExtra(fromContext(), "did_warm_start", didWarmStart); - try { - const didRestore = await this.checkpointClient.restoreRun({ - runFriendlyId: message.run.friendlyId, - snapshotFriendlyId: message.snapshot.friendlyId, - body: { - ...rest, - checkpoint, - }, - }); - - if (didRestore) { - this.logger.debug("Restore successful", { runId: message.run.id }); - } else { - this.logger.error("Restore failed", { runId: message.run.id }); + if (didWarmStart) { + setExtra(fromContext(), "path_taken", "warm_start"); + this.logger.debug("Warm start successful", { runId: message.run.id }); + return; } - } catch (error) { - this.logger.error("Failed to restore run", { error }); - } - return; - } - - this.logger.debug("Scheduling run", { runId: message.run.id }); + setExtra(fromContext(), "path_taken", "cold_create"); - const warmStartStart = performance.now(); - const didWarmStart = await this.tryWarmStart(message); - const warmStartCheckMs = Math.round(performance.now() - warmStartStart); + const createStart = performance.now(); + try { + if (!message.deployment.friendlyId) { + // mostly a type guard, deployments always exists for deployed environments + // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. + throw new Error("Deployment is missing"); + } - if (didWarmStart) { - this.logger.debug("Warm start successful", { runId: message.run.id }); - return; - } + await this.workloadManager.create({ + dequeuedAt: message.dequeuedAt, + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, + envId: message.environment.id, + envType: message.environment.type, + image: message.image, + machine: message.run.machine, + orgId: message.organization.id, + projectId: message.project.id, + deploymentFriendlyId: message.deployment.friendlyId, + deploymentVersion: message.backgroundWorker.version, + runId: message.run.id, + runFriendlyId: message.run.friendlyId, + version: message.version, + nextAttemptNumber: message.run.attemptNumber, + snapshotId: message.snapshot.id, + snapshotFriendlyId: message.snapshot.friendlyId, + placementTags: message.placementTags, + traceContext: message.run.traceContext, + annotations: message.run.annotations, + hasPrivateLink: message.organization.hasPrivateLink, + }); + recordPhaseSince("workload_create", createStart, undefined); - try { - if (!message.deployment.friendlyId) { - // mostly a type guard, deployments always exists for deployed environments - // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. - throw new Error("Deployment is missing"); + // Disabled for now + // this.resourceMonitor.blockResources({ + // cpu: message.run.machine.cpu, + // memory: message.run.machine.memory, + // }); + } catch (error) { + recordPhaseSince( + "workload_create", + createStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to create workload", { error }); + } } - - await this.workloadManager.create({ - dequeuedAt: message.dequeuedAt, - dequeueResponseMs, - pollingIntervalMs, - warmStartCheckMs, - envId: message.environment.id, - envType: message.environment.type, - image: message.image, - machine: message.run.machine, - orgId: message.organization.id, - projectId: message.project.id, - deploymentFriendlyId: message.deployment.friendlyId, - deploymentVersion: message.backgroundWorker.version, - runId: message.run.id, - runFriendlyId: message.run.friendlyId, - version: message.version, - nextAttemptNumber: message.run.attemptNumber, - snapshotId: message.snapshot.id, - snapshotFriendlyId: message.snapshot.friendlyId, - placementTags: message.placementTags, - traceContext: message.run.traceContext, - annotations: message.run.annotations, - hasPrivateLink: message.organization.hasPrivateLink, - }); - - // Disabled for now - // this.resourceMonitor.blockResources({ - // cpu: message.run.machine.cpu, - // memory: message.run.machine.memory, - // }); - } catch (error) { - this.logger.error("Failed to create workload", { error }); - } + ); } ); @@ -404,6 +471,7 @@ class ManagedSupervisor { checkpointClient: this.checkpointClient, computeManager: this.computeManager, tracing: this.tracing, + wideEventOpts: this.wideEventOpts, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); @@ -420,19 +488,31 @@ class ManagedSupervisor { this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]); } - private async tryWarmStart(dequeuedMessage: DequeuedMessage): Promise { + private async tryWarmStart( + dequeuedMessage: DequeuedMessage, + traceparent: string | undefined + ): Promise { if (!this.warmStartUrl) { return false; } const warmStartUrlWithPath = new URL("/warm-start", this.warmStartUrl); + const headers: Record = { + "Content-Type": "application/json", + }; + // Propagate the inbound W3C traceparent so the upstream warm-start + // receiver continues the same trace instead of minting a new one. Gated + // by the same kill switch as the wide-event emission so the whole PR is + // a no-op on the wire when disabled. + if (this.wideEventOpts.enabled && traceparent) { + headers.traceparent = traceparent; + } + try { const res = await fetch(warmStartUrlWithPath.href, { method: "POST", - headers: { - "Content-Type": "application/json", - }, + headers, body: JSON.stringify({ dequeuedMessage }), }); From df8d2f3be1f4ebbc9673da47ec642599701b859f Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 13:50:46 +0100 Subject: [PATCH 3/6] feat(supervisor): wide events on workload server + sockets --- .server-changes/supervisor-wide-events.md | 6 + apps/supervisor/src/workloadServer/index.ts | 594 ++++++++++++-------- 2 files changed, 375 insertions(+), 225 deletions(-) create mode 100644 .server-changes/supervisor-wide-events.md diff --git a/.server-changes/supervisor-wide-events.md b/.server-changes/supervisor-wide-events.md new file mode 100644 index 0000000000..49b73668dd --- /dev/null +++ b/.server-changes/supervisor-wide-events.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +Wide-event observability for the dequeue loop, workload-server routes, and run socket lifecycle. Off by default behind `TRIGGER_WIDE_EVENTS_ENABLED`. diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index bd38cc8700..0ba0e00a42 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -31,6 +31,14 @@ import { } from "../services/computeSnapshotService.js"; import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; import type { OtlpTraceService } from "../services/otlpTraceService.js"; +import type { ServerResponse } from "node:http"; +import { + emitOneShot, + runWideEvent, + setMeta, + type State, + type WideEventOptions, +} from "../wideEvents/index.js"; // Use the official export when upgrading to socket.io@4.8.0 interface DefaultEventsMap { @@ -67,6 +75,7 @@ type WorkloadServerOptions = { checkpointClient?: CheckpointClient; computeManager?: ComputeWorkloadManager; tracing?: OtlpTraceService; + wideEventOpts: WideEventOptions; }; export class WorkloadServer extends EventEmitter { @@ -74,6 +83,7 @@ export class WorkloadServer extends EventEmitter { private readonly snapshotService?: ComputeSnapshotService; private readonly logger = new SimpleStructuredLogger("workload-server"); + private readonly wideEventOpts: WideEventOptions; private readonly httpServer: HttpServer; private readonly websocketServer: Namespace< @@ -103,6 +113,7 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; + this.wideEventOpts = opts.wideEventOpts; if (opts.computeManager?.snapshotsEnabled) { this.snapshotService = new ComputeSnapshotService({ @@ -142,6 +153,47 @@ export class WorkloadServer extends EventEmitter { return this.headerValueFromRequest(req, WORKLOAD_HEADERS.PROJECT_REF); } + /** + * Sets common route meta on the wide-event state from URL params. + */ + private attachRouteMeta(state: State, params: unknown): void { + if (!params || typeof params !== "object") return; + const p = params as Record; + if (typeof p.runFriendlyId === "string") setMeta(state, "run_id", p.runFriendlyId); + if (typeof p.snapshotFriendlyId === "string") { + setMeta(state, "snapshot_id", p.snapshotFriendlyId); + } + if (typeof p.deploymentId === "string") setMeta(state, "deployment_id", p.deploymentId); + } + + /** + * Wraps an HTTP route handler body with the wide-event lifecycle. Reads + * `traceparent` and `x-request-id` from `req.headers`, attaches `run_id` / + * `snapshot_id` / `deployment_id` meta from `params` when present, and + * captures the response status from `res.statusCode` after `fn` returns. + */ + private wideRoute( + ctx: { req: IncomingMessage; res: ServerResponse; params?: unknown }, + route: string, + method: string, + fn: () => Promise | T + ): Promise { + return runWideEvent( + { + ...this.wideEventOpts, + route, + method, + traceparent: this.headerValueFromRequest(ctx.req, "traceparent"), + inboundRequestId: this.headerValueFromRequest(ctx.req, "x-request-id"), + setup: (state) => this.attachRouteMeta(state, ctx.params), + }, + fn, + (state) => { + state.statusCode = ctx.res.statusCode; + } + ); + } + private createHttpServer({ host, port }: { host: string; port: number }) { const httpServer = new HttpServer({ port, @@ -162,26 +214,33 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, bodySchema: WorkloadRunAttemptStartRequestBody, - handler: async ({ req, reply, params, body }) => { - const startResponse = await this.workerClient.startRunAttempt( - params.runFriendlyId, - params.snapshotFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - - if (!startResponse.success) { - this.logger.error("Failed to start run", { - params, - error: startResponse.error, - }); - reply.empty(500); - return; - } - - reply.json(startResponse.data satisfies WorkloadRunAttemptStartResponseBody); - return; - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/start", + "POST", + async () => { + const { req, reply, params, body } = ctx; + const startResponse = await this.workerClient.startRunAttempt( + params.runFriendlyId, + params.snapshotFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + + if (!startResponse.success) { + this.logger.error("Failed to start run", { + params, + error: startResponse.error, + }); + reply.empty(500); + return; + } + + reply.json(startResponse.data satisfies WorkloadRunAttemptStartResponseBody); + return; + } + ), } ) .route( @@ -190,26 +249,35 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, bodySchema: WorkloadRunAttemptCompleteRequestBody, - handler: async ({ req, reply, params, body }) => { - const completeResponse = await this.workerClient.completeRunAttempt( - params.runFriendlyId, - params.snapshotFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - - if (!completeResponse.success) { - this.logger.error("Failed to complete run", { - params, - error: completeResponse.error, - }); - reply.empty(500); - return; - } - - reply.json(completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody); - return; - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/complete", + "POST", + async () => { + const { req, reply, params, body } = ctx; + const completeResponse = await this.workerClient.completeRunAttempt( + params.runFriendlyId, + params.snapshotFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + + if (!completeResponse.success) { + this.logger.error("Failed to complete run", { + params, + error: completeResponse.error, + }); + reply.empty(500); + return; + } + + reply.json( + completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody + ); + return; + } + ), } ) .route( @@ -218,27 +286,34 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, bodySchema: WorkloadHeartbeatRequestBody, - handler: async ({ req, reply, params, body }) => { - const heartbeatResponse = await this.workerClient.heartbeatRun( - params.runFriendlyId, - params.snapshotFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - - if (!heartbeatResponse.success) { - this.logger.error("Failed to heartbeat run", { - params, - error: heartbeatResponse.error, - }); - reply.empty(500); - return; - } - - reply.json({ - ok: true, - } satisfies WorkloadHeartbeatResponseBody); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/heartbeat", + "POST", + async () => { + const { req, reply, params, body } = ctx; + const heartbeatResponse = await this.workerClient.heartbeatRun( + params.runFriendlyId, + params.snapshotFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + + if (!heartbeatResponse.success) { + this.logger.error("Failed to heartbeat run", { + params, + error: heartbeatResponse.error, + }); + reply.empty(500); + return; + } + + reply.json({ + ok: true, + } satisfies WorkloadHeartbeatResponseBody); + } + ), } ) .route( @@ -246,87 +321,94 @@ export class WorkloadServer extends EventEmitter { "GET", { paramsSchema: WorkloadActionParams, - handler: async ({ reply, params, req }) => { - const runnerId = this.runnerIdFromRequest(req); - const deploymentVersion = this.deploymentVersionFromRequest(req); - const projectRef = this.projectRefFromRequest(req); - - this.logger.debug("Suspend request", { - params, - runnerId, - deploymentVersion, - projectRef, - }); - - if (!runnerId || !deploymentVersion || !projectRef) { - this.logger.error("Invalid headers for suspend request", { - ...params, - runnerId, - deploymentVersion, - projectRef, - }); - reply.json( - { - ok: false, - error: "Invalid headers", - } satisfies WorkloadSuspendRunResponseBody, - false, - 400 - ); - return; - } - - if (this.snapshotService) { - // Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints. - // If the run continues before the delay expires, the snapshot is cancelled. - reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); - - this.snapshotService.schedule(params.runFriendlyId, { - runnerId, - runFriendlyId: params.runFriendlyId, - snapshotFriendlyId: params.snapshotFriendlyId, - }); - - return; - } - - if (!this.checkpointClient) { - reply.json( - { - ok: false, - error: "Checkpoints disabled", - } satisfies WorkloadSuspendRunResponseBody, - false, - 400 - ); - return; - } - - reply.json( - { - ok: true, - } satisfies WorkloadSuspendRunResponseBody, - false, - 202 - ); - - const suspendResult = await this.checkpointClient.suspendRun({ - runFriendlyId: params.runFriendlyId, - snapshotFriendlyId: params.snapshotFriendlyId, - body: { - runnerId, - runId: params.runFriendlyId, - snapshotId: params.snapshotFriendlyId, - projectRef, - deploymentVersion, - }, - }); - - if (!suspendResult) { - this.logger.error("Failed to suspend run", { params }); - return; - } - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/suspend", + "GET", + async () => { + const { reply, params, req } = ctx; + const runnerId = this.runnerIdFromRequest(req); + const deploymentVersion = this.deploymentVersionFromRequest(req); + const projectRef = this.projectRefFromRequest(req); + + this.logger.debug("Suspend request", { + params, + runnerId, + deploymentVersion, + projectRef, + }); + + if (!runnerId || !deploymentVersion || !projectRef) { + this.logger.error("Invalid headers for suspend request", { + ...params, + runnerId, + deploymentVersion, + projectRef, + }); + reply.json( + { + ok: false, + error: "Invalid headers", + } satisfies WorkloadSuspendRunResponseBody, + false, + 400 + ); + return; + } + + if (this.snapshotService) { + // Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints. + // If the run continues before the delay expires, the snapshot is cancelled. + reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); + + this.snapshotService.schedule(params.runFriendlyId, { + runnerId, + runFriendlyId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, + }); + + return; + } + + if (!this.checkpointClient) { + reply.json( + { + ok: false, + error: "Checkpoints disabled", + } satisfies WorkloadSuspendRunResponseBody, + false, + 400 + ); + return; + } + + reply.json( + { + ok: true, + } satisfies WorkloadSuspendRunResponseBody, + false, + 202 + ); + + const suspendResult = await this.checkpointClient.suspendRun({ + runFriendlyId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, + body: { + runnerId, + runId: params.runFriendlyId, + snapshotId: params.snapshotFriendlyId, + projectRef, + deploymentVersion, + }, + }); + + if (!suspendResult) { + this.logger.error("Failed to suspend run", { params }); + return; + } + } + ), } ) .route( @@ -334,33 +416,40 @@ export class WorkloadServer extends EventEmitter { "GET", { paramsSchema: WorkloadActionParams, - handler: async ({ req, reply, params }) => { - this.logger.debug("Run continuation request", { params }); - - // Cancel any pending delayed snapshot for this run - this.snapshotService?.cancel(params.runFriendlyId); - - const continuationResult = await this.workerClient.continueRunExecution( - params.runFriendlyId, - params.snapshotFriendlyId, - this.runnerIdFromRequest(req) - ); - - if (!continuationResult.success) { - this.logger.error("Failed to continue run execution", { params }); - reply.json( - { - ok: false, - error: "Failed to continue run execution", - }, - false, - 400 - ); - return; - } - - reply.json(continuationResult.data as WorkloadContinueRunExecutionResponseBody); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/continue", + "GET", + async () => { + const { req, reply, params } = ctx; + this.logger.debug("Run continuation request", { params }); + + // Cancel any pending delayed snapshot for this run + this.snapshotService?.cancel(params.runFriendlyId); + + const continuationResult = await this.workerClient.continueRunExecution( + params.runFriendlyId, + params.snapshotFriendlyId, + this.runnerIdFromRequest(req) + ); + + if (!continuationResult.success) { + this.logger.error("Failed to continue run execution", { params }); + reply.json( + { + ok: false, + error: "Failed to continue run execution", + }, + false, + 400 + ); + return; + } + + reply.json(continuationResult.data as WorkloadContinueRunExecutionResponseBody); + } + ), } ) .route( @@ -368,24 +457,33 @@ export class WorkloadServer extends EventEmitter { "GET", { paramsSchema: WorkloadActionParams, - handler: async ({ req, reply, params }) => { - const sinceSnapshotResponse = await this.workerClient.getSnapshotsSince( - params.runFriendlyId, - params.snapshotFriendlyId, - this.runnerIdFromRequest(req) - ); - - if (!sinceSnapshotResponse.success) { - this.logger.error("Failed to get snapshots since", { - runId: params.runFriendlyId, - error: sinceSnapshotResponse.error, - }); - reply.empty(500); - return; - } - - reply.json(sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/since/:snapshotFriendlyId", + "GET", + async () => { + const { req, reply, params } = ctx; + const sinceSnapshotResponse = await this.workerClient.getSnapshotsSince( + params.runFriendlyId, + params.snapshotFriendlyId, + this.runnerIdFromRequest(req) + ); + + if (!sinceSnapshotResponse.success) { + this.logger.error("Failed to get snapshots since", { + runId: params.runFriendlyId, + error: sinceSnapshotResponse.error, + }); + reply.empty(500); + return; + } + + reply.json( + sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody + ); + } + ), } ) .route("/api/v1/workload-actions/deployments/:deploymentId/dequeue", "GET", { @@ -393,61 +491,83 @@ export class WorkloadServer extends EventEmitter { deploymentId: z.string(), }), - handler: async ({ req, reply, params }) => { - const dequeueResponse = await this.workerClient.dequeueFromVersion( - params.deploymentId, - 1, - this.runnerIdFromRequest(req) - ); - - if (!dequeueResponse.success) { - this.logger.error("Failed to get latest snapshot", { - deploymentId: params.deploymentId, - error: dequeueResponse.error, - }); - reply.empty(500); - return; - } + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/deployments/:deploymentId/dequeue", + "GET", + async () => { + const { req, reply, params } = ctx; + const dequeueResponse = await this.workerClient.dequeueFromVersion( + params.deploymentId, + 1, + this.runnerIdFromRequest(req) + ); - reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody); - }, + if (!dequeueResponse.success) { + this.logger.error("Failed to get latest snapshot", { + deploymentId: params.deploymentId, + error: dequeueResponse.error, + }); + reply.empty(500); + return; + } + + reply.json(dequeueResponse.data satisfies WorkloadDequeueFromVersionResponseBody); + } + ), }); if (env.SEND_RUN_DEBUG_LOGS) { httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }), bodySchema: WorkloadDebugLogRequestBody, - handler: async ({ req, reply, params, body }) => { - reply.empty(204); - - await this.workerClient.sendDebugLog( - params.runFriendlyId, - body, - this.runnerIdFromRequest(req) - ); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", + "POST", + async () => { + const { req, reply, params, body } = ctx; + reply.empty(204); + + await this.workerClient.sendDebugLog( + params.runFriendlyId, + body, + this.runnerIdFromRequest(req) + ); + } + ), }); } else { // Lightweight mock route without schemas httpServer.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { - handler: async ({ reply }) => { - reply.empty(204); - }, + handler: async (ctx) => + this.wideRoute( + ctx, + "/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", + "POST", + async () => { + ctx.reply.empty(204); + } + ), }); } - // Compute snapshot callback endpoint + // Snapshot callback endpoint (inbound from compute path) httpServer.route("/api/v1/compute/snapshot-complete", "POST", { bodySchema: SnapshotCallbackPayloadSchema, - handler: async ({ reply, body }) => { - if (!this.snapshotService) { - reply.empty(404); - return; - } + handler: async (ctx) => + this.wideRoute(ctx, "/api/v1/compute/snapshot-complete", "POST", async () => { + const { reply, body } = ctx; + if (!this.snapshotService) { + reply.empty(404); + return; + } - const result = await this.snapshotService.handleCallback(body); - reply.empty(result.status); - }, + const result = await this.snapshotService.handleCallback(body); + reply.empty(result.status); + }), }); return httpServer; @@ -591,6 +711,18 @@ export class WorkloadServer extends EventEmitter { try { runConnected(message.run.friendlyId); + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.event = "run:start"; + setMeta(state, "run_id", message.run.friendlyId); + if (socket.data.deploymentId) { + setMeta(state, "deployment_id", socket.data.deploymentId); + } + if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); + state.extras.socket_id = socket.id; + }, + }); } catch (error) { log.error("run:start error", { error }); } @@ -610,6 +742,18 @@ export class WorkloadServer extends EventEmitter { // Don't delete trace context here - run:stop fires after each snapshot/shutdown // but the run may be restored on a new VM and snapshot again. Trace context is // re-populated on dequeue, and entries are small (4 strings per run). + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.event = "run:stop"; + setMeta(state, "run_id", message.run.friendlyId); + if (socket.data.deploymentId) { + setMeta(state, "deployment_id", socket.data.deploymentId); + } + if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); + state.extras.socket_id = socket.id; + }, + }); } catch (error) { log.error("run:stop error", { error }); } From a0baf2cf044bd74f9b7d3e0a8737f8901eb34aa6 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 19 May 2026 14:33:04 +0100 Subject: [PATCH 4/6] fix(supervisor): wide-event review fixes + noisy-routes flag + socket lifecycle --- apps/supervisor/src/env.ts | 4 + apps/supervisor/src/index.ts | 4 +- apps/supervisor/src/wideEvents/emit.test.ts | 1 - apps/supervisor/src/wideEvents/emit.ts | 1 - apps/supervisor/src/wideEvents/middleware.ts | 2 +- apps/supervisor/src/wideEvents/state.ts | 1 - apps/supervisor/src/workloadServer/index.ts | 81 ++++++++++++-------- 7 files changed, 56 insertions(+), 38 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 0e52578daf..071de4cc81 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -261,6 +261,10 @@ const Env = z // line per natural unit of work (dequeue iteration, HTTP request, socket // lifecycle). High-QPS hotpath, so the kill switch must be honoured. TRIGGER_WIDE_EVENTS_ENABLED: BoolEnv.default(false), + // When true, also emit wide events for high-frequency HTTP routes + // (heartbeat, snapshots-since, logs/debug). Off in prod to keep event + // volume manageable; on in test environments for full-fidelity debugging. + TRIGGER_WIDE_EVENTS_NOISY_ROUTES: BoolEnv.default(false), }) .superRefine((data, ctx) => { if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index c4768658b1..e2c16c4b1d 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -63,6 +63,7 @@ class ManagedSupervisor { env: { nodeId: env.TRIGGER_WORKER_INSTANCE_NAME }, enabled: env.TRIGGER_WIDE_EVENTS_ENABLED, }; + private readonly wideEventsNoisyRoutes = env.TRIGGER_WIDE_EVENTS_NOISY_ROUTES; constructor() { const { @@ -260,7 +261,7 @@ class ManagedSupervisor { ...this.wideEventOpts, traceparent, setup: (state) => { - setMeta(state, "run_id", message.run.id); + setMeta(state, "run_id", message.run.friendlyId); setMeta(state, "env_id", message.environment.id); setMeta(state, "org_id", message.organization.id); setMeta(state, "project_id", message.project.id); @@ -472,6 +473,7 @@ class ManagedSupervisor { computeManager: this.computeManager, tracing: this.tracing, wideEventOpts: this.wideEventOpts, + wideEventsNoisyRoutes: this.wideEventsNoisyRoutes, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); diff --git a/apps/supervisor/src/wideEvents/emit.test.ts b/apps/supervisor/src/wideEvents/emit.test.ts index e9df12a3d4..4e778d3e99 100644 --- a/apps/supervisor/src/wideEvents/emit.test.ts +++ b/apps/supervisor/src/wideEvents/emit.test.ts @@ -43,7 +43,6 @@ describe("emit", () => { expect(out).not.toHaveProperty("trace_id"); expect(out).not.toHaveProperty("version"); expect(out).not.toHaveProperty("commit_sha"); - expect(out).not.toHaveProperty("instance_id"); expect(out).not.toHaveProperty("error.code"); }); diff --git a/apps/supervisor/src/wideEvents/emit.ts b/apps/supervisor/src/wideEvents/emit.ts index 85a7a082ca..a1644237d5 100644 --- a/apps/supervisor/src/wideEvents/emit.ts +++ b/apps/supervisor/src/wideEvents/emit.ts @@ -26,7 +26,6 @@ export function emit(state: State): void { appendIfSet(out, "commit_sha", state.commitSha); appendIfSet(out, "region", state.region); appendIfSet(out, "node_id", state.nodeId); - appendIfSet(out, "instance_id", state.instanceId); out.ok = state.ok; if (state.statusCode !== 0) out.status = state.statusCode; diff --git a/apps/supervisor/src/wideEvents/middleware.ts b/apps/supervisor/src/wideEvents/middleware.ts index 5dafdfa7d1..a87a1885d5 100644 --- a/apps/supervisor/src/wideEvents/middleware.ts +++ b/apps/supervisor/src/wideEvents/middleware.ts @@ -57,10 +57,10 @@ export async function runWideEvent( }); if (opts.route) state.extras.route = opts.route; if (opts.method) state.extras.method = opts.method; - if (opts.setup) opts.setup(state); const start = performance.now(); try { + if (opts.setup) opts.setup(state); const result = await wideEventStorage.run(state, () => Promise.resolve(fn())); state.durationMs = Math.round(performance.now() - start); if (finalize) finalize(state); diff --git a/apps/supervisor/src/wideEvents/state.ts b/apps/supervisor/src/wideEvents/state.ts index 5fb53ecb93..1929e5e980 100644 --- a/apps/supervisor/src/wideEvents/state.ts +++ b/apps/supervisor/src/wideEvents/state.ts @@ -21,7 +21,6 @@ export type State = { commitSha?: string; region?: string; nodeId?: string; - instanceId?: string; // Caller-attached opaque metadata, flattened to `meta.` on emit. meta: Record; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 0ba0e00a42..ea7728e637 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -76,6 +76,8 @@ type WorkloadServerOptions = { computeManager?: ComputeWorkloadManager; tracing?: OtlpTraceService; wideEventOpts: WideEventOptions; + /** When true, high-frequency HTTP routes also emit wide events. */ + wideEventsNoisyRoutes: boolean; }; export class WorkloadServer extends EventEmitter { @@ -84,6 +86,7 @@ export class WorkloadServer extends EventEmitter { private readonly logger = new SimpleStructuredLogger("workload-server"); private readonly wideEventOpts: WideEventOptions; + private readonly wideEventsNoisyRoutes: boolean; private readonly httpServer: HttpServer; private readonly websocketServer: Namespace< @@ -114,6 +117,7 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; this.wideEventOpts = opts.wideEventOpts; + this.wideEventsNoisyRoutes = opts.wideEventsNoisyRoutes; if (opts.computeManager?.snapshotsEnabled) { this.snapshotService = new ComputeSnapshotService({ @@ -171,16 +175,25 @@ export class WorkloadServer extends EventEmitter { * `traceparent` and `x-request-id` from `req.headers`, attaches `run_id` / * `snapshot_id` / `deployment_id` meta from `params` when present, and * captures the response status from `res.statusCode` after `fn` returns. + * + * Pass `highFrequency: true` for noisy routes (heartbeat, polling). Those + * still go through the wrapper but only emit when + * `TRIGGER_WIDE_EVENTS_NOISY_ROUTES` is on, so prod can keep them dark + * while test envs capture full-fidelity traffic for debugging. */ private wideRoute( ctx: { req: IncomingMessage; res: ServerResponse; params?: unknown }, route: string, method: string, - fn: () => Promise | T + fn: () => Promise | T, + routeOpts: { highFrequency?: boolean } = {} ): Promise { + const enabled = + this.wideEventOpts.enabled && (!routeOpts.highFrequency || this.wideEventsNoisyRoutes); return runWideEvent( { ...this.wideEventOpts, + enabled, route, method, traceparent: this.headerValueFromRequest(ctx.req, "traceparent"), @@ -312,7 +325,8 @@ export class WorkloadServer extends EventEmitter { reply.json({ ok: true, } satisfies WorkloadHeartbeatResponseBody); - } + }, + { highFrequency: true } ), } ) @@ -482,7 +496,8 @@ export class WorkloadServer extends EventEmitter { reply.json( sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody ); - } + }, + { highFrequency: true } ), } ) @@ -536,7 +551,8 @@ export class WorkloadServer extends EventEmitter { body, this.runnerIdFromRequest(req) ); - } + }, + { highFrequency: true } ), }); } else { @@ -549,7 +565,8 @@ export class WorkloadServer extends EventEmitter { "POST", async () => { ctx.reply.empty(204); - } + }, + { highFrequency: true } ), }); } @@ -640,6 +657,26 @@ export class WorkloadServer extends EventEmitter { }; }; + const emitSocketLifecycle = ( + event: "run_connected" | "run_disconnected", + friendlyId: string, + disconnectReason?: string + ) => { + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.event = event; + setMeta(state, "run_id", friendlyId); + if (socket.data.deploymentId) { + setMeta(state, "deployment_id", socket.data.deploymentId); + } + if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); + state.extras.socket_id = socket.id; + if (disconnectReason) state.extras.disconnect_reason = disconnectReason; + }, + }); + }; + const runConnected = (friendlyId: string) => { socketLogger.debug("runConnected", { ...getSocketMetadata() }); @@ -650,20 +687,22 @@ export class WorkloadServer extends EventEmitter { newRunId: friendlyId, oldRunId: socket.data.runFriendlyId, }); - runDisconnected(socket.data.runFriendlyId); + runDisconnected(socket.data.runFriendlyId, "socket_run_replaced"); } this.runSockets.set(friendlyId, socket); this.emit("runConnected", { run: { friendlyId } }); socket.data.runFriendlyId = friendlyId; + emitSocketLifecycle("run_connected", friendlyId); }; - const runDisconnected = (friendlyId: string) => { + const runDisconnected = (friendlyId: string, reason: string) => { socketLogger.debug("runDisconnected", { ...getSocketMetadata() }); this.runSockets.delete(friendlyId); this.emit("runDisconnected", { run: { friendlyId } }); socket.data.runFriendlyId = undefined; + emitSocketLifecycle("run_disconnected", friendlyId, reason); }; socketLogger.debug("wsServer socket connected", { ...getSocketMetadata() }); @@ -681,7 +720,7 @@ export class WorkloadServer extends EventEmitter { }); if (socket.data.runFriendlyId) { - runDisconnected(socket.data.runFriendlyId); + runDisconnected(socket.data.runFriendlyId, `socket_disconnecting:${reason}`); } }); @@ -711,18 +750,6 @@ export class WorkloadServer extends EventEmitter { try { runConnected(message.run.friendlyId); - emitOneShot({ - ...this.wideEventOpts, - populate: (state) => { - state.extras.event = "run:start"; - setMeta(state, "run_id", message.run.friendlyId); - if (socket.data.deploymentId) { - setMeta(state, "deployment_id", socket.data.deploymentId); - } - if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); - state.extras.socket_id = socket.id; - }, - }); } catch (error) { log.error("run:start error", { error }); } @@ -738,22 +765,10 @@ export class WorkloadServer extends EventEmitter { log.debug("Handling run:stop"); try { - runDisconnected(message.run.friendlyId); + runDisconnected(message.run.friendlyId, "run_stop_message"); // Don't delete trace context here - run:stop fires after each snapshot/shutdown // but the run may be restored on a new VM and snapshot again. Trace context is // re-populated on dequeue, and entries are small (4 strings per run). - emitOneShot({ - ...this.wideEventOpts, - populate: (state) => { - state.extras.event = "run:stop"; - setMeta(state, "run_id", message.run.friendlyId); - if (socket.data.deploymentId) { - setMeta(state, "deployment_id", socket.data.deploymentId); - } - if (socket.data.runnerId) setMeta(state, "runner_id", socket.data.runnerId); - state.extras.socket_id = socket.id; - }, - }); } catch (error) { log.error("run:stop error", { error }); } From e02990c039821599d6ec5d5ac8a69648b5999780 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 20 May 2026 19:00:33 +0100 Subject: [PATCH 5/6] feat(supervisor): forward traceparent + request_id to compute --- .server-changes/README.md | 8 ++++++++ .../supervisor-compute-traceparent-forwarding.md | 6 ++++++ apps/supervisor/src/workloadManager/compute.ts | 15 +++++++++++++++ internal-packages/compute/src/client.ts | 15 +++++++++++++++ 4 files changed, 44 insertions(+) create mode 100644 .server-changes/supervisor-compute-traceparent-forwarding.md diff --git a/.server-changes/README.md b/.server-changes/README.md index 82716de981..2b0eeade36 100644 --- a/.server-changes/README.md +++ b/.server-changes/README.md @@ -38,6 +38,14 @@ Speed up batch queue processing by removing stalls and fixing retry race The body text (below the frontmatter) is a one-line description of the change. Keep it concise — it will appear in release notes. +### Writing guidance + +These entries are public-facing - they ship verbatim in user-visible release notes. A few rules to keep them clean: + +- **One sentence is usually enough.** The body is the bullet in the changelog. If you need a paragraph, you're probably describing the implementation rather than the change. +- **Describe behavior, not implementation.** Skip internal scopes, middleware names, library specifics, framework internals. Users care about what's different for them, not how it's wired. +- **Never name internal tools or infra.** Observability stacks, internal services, infra components, monitoring backends, CI surfaces, AWS specifics - none of these belong in user-facing notes. + ## Lifecycle 1. Engineer adds a `.server-changes/` file in their PR diff --git a/.server-changes/supervisor-compute-traceparent-forwarding.md b/.server-changes/supervisor-compute-traceparent-forwarding.md new file mode 100644 index 0000000000..5d20c08693 --- /dev/null +++ b/.server-changes/supervisor-compute-traceparent-forwarding.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: improvement +--- + +Forward `traceparent` headers on outbound calls to the compute provider so distributed traces stay continuous across services. diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts index 1c00f33aad..2c51b15f2d 100644 --- a/apps/supervisor/src/workloadManager/compute.ts +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -10,6 +10,7 @@ import { ComputeClient, stripImageDigest } from "@internal/compute"; import { extractTraceparent, getRunnerId } from "../util.js"; import type { OtlpTraceService } from "../services/otlpTraceService.js"; import { tryCatch } from "@trigger.dev/core"; +import { fromContext } from "../wideEvents/index.js"; type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { gateway: { @@ -46,6 +47,20 @@ export class ComputeWorkloadManager implements WorkloadManager { gatewayUrl: opts.gateway.url, authToken: opts.gateway.authToken, timeoutMs: opts.gateway.timeoutMs, + // Forward the current wide-event scope's traceparent + request_id so the + // downstream service continues the same trace and joins its own wide + // events to ours. When called outside a wide-event scope (or when wide + // events are disabled), `fromContext` returns undefined and propagation + // is skipped. + getPropagationHeaders: () => { + const state = fromContext(); + if (!state) return {}; + const headers: Record = { "x-request-id": state.requestId }; + if (state.traceparent) { + headers.traceparent = state.traceparent; + } + return headers; + }, }); } diff --git a/internal-packages/compute/src/client.ts b/internal-packages/compute/src/client.ts index 97585345ea..d6215678b5 100644 --- a/internal-packages/compute/src/client.ts +++ b/internal-packages/compute/src/client.ts @@ -11,6 +11,13 @@ export type ComputeClientOptions = { gatewayUrl: string; authToken?: string; timeoutMs: number; + /** + * Called once per outbound request to collect cross-service correlation + * headers (e.g. `traceparent`, `x-request-id`) from the caller's current + * scope. The returned record is merged onto the outbound headers. Return + * `{}` (or omit the option) to skip propagation. + */ + getPropagationHeaders?: () => Record; }; export class ComputeClient { @@ -40,6 +47,14 @@ class HttpTransport { if (this.opts.authToken) { h["Authorization"] = `Bearer ${this.opts.authToken}`; } + const propagation = this.opts.getPropagationHeaders?.(); + if (propagation) { + for (const [key, value] of Object.entries(propagation)) { + if (value) { + h[key] = value; + } + } + } return h; } From 3330bdced26f6c716b71136d9b1d3cce850157cd Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Wed, 20 May 2026 19:56:57 +0100 Subject: [PATCH 6/6] feat(supervisor): wide events on snapshot lifecycle --- .../supervisor-snapshot-lifecycle-events.md | 6 ++ .../src/services/computeSnapshotService.ts | 98 ++++++++++++++++--- apps/supervisor/src/workloadServer/index.ts | 1 + 3 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 .server-changes/supervisor-snapshot-lifecycle-events.md diff --git a/.server-changes/supervisor-snapshot-lifecycle-events.md b/.server-changes/supervisor-snapshot-lifecycle-events.md new file mode 100644 index 0000000000..1970a54e61 --- /dev/null +++ b/.server-changes/supervisor-snapshot-lifecycle-events.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: improvement +--- + +Add observability events at the schedule, dispatch, and callback phases of the snapshot lifecycle. diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts index 041e2902c7..3b88aa9852 100644 --- a/apps/supervisor/src/services/computeSnapshotService.ts +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -6,6 +6,15 @@ import { type SnapshotCallbackPayload } from "@internal/compute"; import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; import { TimerWheel } from "./timerWheel.js"; import type { OtlpTraceService } from "./otlpTraceService.js"; +import { + emitOneShot, + fromContext, + recordPhaseSince, + runWideEvent, + setExtra, + setMeta, + type WideEventOptions, +} from "../wideEvents/index.js"; type DelayedSnapshot = { runnerId: string; @@ -24,6 +33,7 @@ export type ComputeSnapshotServiceOptions = { computeManager: ComputeWorkloadManager; workerClient: SupervisorHttpClient; tracing?: OtlpTraceService; + wideEventOpts: WideEventOptions; }; export class ComputeSnapshotService { @@ -37,11 +47,13 @@ export class ComputeSnapshotService { private readonly computeManager: ComputeWorkloadManager; private readonly workerClient: SupervisorHttpClient; private readonly tracing?: OtlpTraceService; + private readonly wideEventOpts: WideEventOptions; constructor(opts: ComputeSnapshotServiceOptions) { this.computeManager = opts.computeManager; this.workerClient = opts.workerClient; this.tracing = opts.tracing; + this.wideEventOpts = opts.wideEventOpts; this.dispatchLimit = pLimit(this.computeManager.snapshotDispatchLimit); this.timerWheel = new TimerWheel({ @@ -62,6 +74,16 @@ export class ComputeSnapshotService { /** Schedule a delayed snapshot for a run. Replaces any pending snapshot for the same run. */ schedule(runFriendlyId: string, data: DelayedSnapshot) { this.timerWheel.submit(runFriendlyId, data); + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.op = "snapshot.schedule"; + state.meta.run_id = runFriendlyId; + state.meta.snapshot_id = data.snapshotFriendlyId; + state.extras.runner_id = data.runnerId; + state.extras.delay_ms = this.computeManager.snapshotDelayMs; + }, + }); this.logger.debug("Snapshot scheduled", { runFriendlyId, snapshotFriendlyId: data.snapshotFriendlyId, @@ -73,6 +95,13 @@ export class ComputeSnapshotService { cancel(runFriendlyId: string): boolean { const cancelled = this.timerWheel.cancel(runFriendlyId); if (cancelled) { + emitOneShot({ + ...this.wideEventOpts, + populate: (state) => { + state.extras.op = "snapshot.canceled"; + state.meta.run_id = runFriendlyId; + }, + }); this.logger.debug("Snapshot cancelled", { runFriendlyId }); } return cancelled; @@ -81,6 +110,24 @@ export class ComputeSnapshotService { /** Handle the callback from the gateway after a snapshot completes or fails. */ async handleCallback(body: SnapshotCallbackPayload) { const snapshotId = body.status === "completed" ? body.snapshot_id : undefined; + const runId = body.metadata?.runId; + const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; + + // Enrich the wrapping route's wide event with snapshot metadata. The + // `/api/v1/compute/snapshot-complete` route is registered with `wideRoute`, + // so `fromContext()` returns the State of that route and these calls + // become extras/meta on the same wide event - no nested emission. + const state = fromContext(); + if (state) { + state.extras.op = "snapshot.callback"; + state.extras["snapshot.status"] = body.status; + if (body.instance_id) state.extras["snapshot.instance_id"] = body.instance_id; + if (body.duration_ms !== undefined) state.extras["snapshot.duration_ms"] = body.duration_ms; + if (snapshotId) state.extras["snapshot.id"] = snapshotId; + if (body.status === "failed" && body.error) state.extras["snapshot.error"] = body.error; + } + if (runId) setMeta(state, "run_id", runId); + if (snapshotFriendlyId) setMeta(state, "snapshot_id", snapshotFriendlyId); this.logger.debug("Snapshot callback", { snapshotId, @@ -91,9 +138,6 @@ export class ComputeSnapshotService { durationMs: body.duration_ms, }); - const runId = body.metadata?.runId; - const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; - if (!runId || !snapshotFriendlyId) { this.logger.error("Snapshot callback missing metadata", { body }); return { ok: false as const, status: 400 }; @@ -102,6 +146,7 @@ export class ComputeSnapshotService { this.#emitSnapshotSpan(runId, body.duration_ms, snapshotId); if (body.status === "completed") { + const submitStart = performance.now(); const result = await this.workerClient.submitSuspendCompletion({ runId, snapshotId: snapshotFriendlyId, @@ -113,6 +158,11 @@ export class ComputeSnapshotService { }, }, }); + recordPhaseSince( + "submit_completion", + submitStart, + result.success ? undefined : new Error(String(result.error)) + ); if (result.success) { this.logger.debug("Suspend completion submitted", { @@ -121,6 +171,7 @@ export class ComputeSnapshotService { snapshotId: body.snapshot_id, }); } else { + setExtra(state, "submit_completion.error", String(result.error)); this.logger.error("Failed to submit suspend completion", { runId, snapshotFriendlyId, @@ -128,6 +179,7 @@ export class ComputeSnapshotService { }); } } else { + const submitStart = performance.now(); const result = await this.workerClient.submitSuspendCompletion({ runId, snapshotId: snapshotFriendlyId, @@ -136,8 +188,14 @@ export class ComputeSnapshotService { error: body.error ?? "Snapshot failed", }, }); + recordPhaseSince( + "submit_completion", + submitStart, + result.success ? undefined : new Error(String(result.error)) + ); if (!result.success) { + setExtra(state, "submit_completion.error", String(result.error)); this.logger.error("Failed to submit suspend failure", { runId, snapshotFriendlyId, @@ -184,20 +242,30 @@ export class ComputeSnapshotService { /** Dispatch a snapshot request to the gateway. */ private async dispatch(snapshot: DelayedSnapshot): Promise { - const result = await this.computeManager.snapshot({ - runnerId: snapshot.runnerId, - metadata: { - runId: snapshot.runFriendlyId, - snapshotFriendlyId: snapshot.snapshotFriendlyId, + await runWideEvent( + { + ...this.wideEventOpts, + setup: (state) => { + state.extras.op = "snapshot.dispatch"; + state.meta.run_id = snapshot.runFriendlyId; + state.meta.snapshot_id = snapshot.snapshotFriendlyId; + state.extras.runner_id = snapshot.runnerId; + }, }, - }); + async () => { + const result = await this.computeManager.snapshot({ + runnerId: snapshot.runnerId, + metadata: { + runId: snapshot.runFriendlyId, + snapshotFriendlyId: snapshot.snapshotFriendlyId, + }, + }); - if (!result) { - this.logger.error("Failed to request snapshot", { - runId: snapshot.runFriendlyId, - runnerId: snapshot.runnerId, - }); - } + if (!result) { + throw new Error("Snapshot dispatch returned no result"); + } + } + ); } #emitSnapshotSpan(runFriendlyId: string, durationMs?: number, snapshotId?: string) { diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index ea7728e637..da6099b4a3 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -124,6 +124,7 @@ export class WorkloadServer extends EventEmitter { computeManager: opts.computeManager, workerClient: opts.workerClient, tracing: opts.tracing, + wideEventOpts: this.wideEventOpts, }); }