Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion workers/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,15 @@ export default {
// response. The helper handles clone failures safely.
if (telemetryClone) {
const durationMs = Date.now() - startTime;
const cacheTier = tracer.indexSource;
// NOTE: Do NOT read tracer.indexSource here. The MCP handler returns
// a streaming Response — `await handler(...)` resolves with the
// Response object before the tool handler closure has finished
// running, so the tracer has not yet recorded the `index` span at
// this point. Reading here yields "none" for every tool. The tracer
// is only fully populated once the response body has been consumed
// (which forces the streaming tool handler to complete). The read
// therefore happens inside the waitUntil callback below, after
// `await responseClone.text()` resolves.
// Clone the response synchronously before returning so the body is
// still available to read inside the deferred waitUntil callback.
const responseClone = response.clone();
Expand All @@ -997,6 +1005,13 @@ export default {
} catch {
// Fall through with empty string; bytes_out / tokens_out will be 0.
}
// Read tracer.indexSource AFTER the response body has been
// consumed. By this point the streaming tool handler has
// completed and any "index" / "index-build" spans have been
// recorded. Reading earlier (e.g. immediately after `await
// handler()` returned) was the streaming-race bug that caused
// every tool call to record cache_tier="none" in production.
const cacheTier = tracer.indexSource;
const shape = await measurePayloadShape(requestText, responseText);
recordTelemetry(request, requestText, env, durationMs, cacheTier, shape);
} catch {
Expand Down
83 changes: 82 additions & 1 deletion workers/test/telemetry-integration.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const tsconfig = {
include: [
join(WORKERS_ROOT, "src", "tokenize.ts"),
join(WORKERS_ROOT, "src", "telemetry.ts"),
join(WORKERS_ROOT, "src", "tracing.ts"),
join(WORKERS_ROOT, "src", "zip-baseline-fetcher.ts"),
],
};
Expand All @@ -75,8 +76,9 @@ const compile = spawnSync("npx", ["--yes", "tsc", "-p", tsconfigPath], {
// actually need weren't emitted.
const tokenizeJs = join(tmp, "build", "tokenize.js");
const telemetryJs = join(tmp, "build", "telemetry.js");
const tracingJs = join(tmp, "build", "tracing.js");
const zipFetcherJs = join(tmp, "build", "zip-baseline-fetcher.js");
if (!existsSync(tokenizeJs) || !existsSync(telemetryJs) || !existsSync(zipFetcherJs)) {
if (!existsSync(tokenizeJs) || !existsSync(telemetryJs) || !existsSync(tracingJs) || !existsSync(zipFetcherJs)) {
console.error("TypeScript compile failed (target files not emitted):");
console.error(compile.stdout);
console.error(compile.stderr);
Expand Down Expand Up @@ -110,6 +112,7 @@ for (const f of rds(buildDir).filter(n => n.endsWith(".js"))) {

const { measurePayloadShape } = await import(tokenizeJs);
const { recordTelemetry } = await import(telemetryJs);
const { RequestTracer } = await import(tracingJs);

// ─── Mock env with writeDataPoint capture ──────────────────────────────────

Expand Down Expand Up @@ -536,5 +539,83 @@ await test("missing env.ODDKIT_TELEMETRY is a graceful no-op", async () => {
recordTelemetry(mockRequest(), requestBody, env, 5, "memory", shape);
});

// ─── Test 7: Streaming-race regression — cacheTier must be read AFTER body ──

await test("cache_tier reads must happen after the streaming response body completes", async () => {
// The MCP handler from agents/mcp returns a streaming Response. `await
// handler(...)` resolves with the Response object before the tool handler
// closure has finished populating the tracer. Reading `tracer.indexSource`
// immediately after the await yields "none" for every tool because the
// "index" / "index-build" span has not been recorded yet. The fix in
// workers/src/index.ts moves the read inside the waitUntil callback,
// after the response body has been consumed (which forces the streaming
// tool handler to complete).
//
// This test simulates the timing pattern. A tracer is created, then a
// deferred task adds the "index" span asynchronously (mimicking the tool
// handler running while the response body streams). We assert:
// (a) the OLD pattern (read immediately) returns "none" — this is the
// observable bug the fix exists to prevent
// (b) the FIXED pattern (read after a microtask flush) returns the
// actual span source — this proves the fix recovers the value

const tracer = new RequestTracer();

// Schedule the "index" span for the next tick — this models a streaming
// tool handler that has not yet recorded its index access at the moment
// the outer handler's `await` resolves.
const handlerDone = new Promise((resolve) => {
setImmediate(() => {
tracer.addSpan("index", 12, "cache");
resolve();
});
});

// (a) OLD pattern: read tracer.indexSource synchronously, before the
// deferred span has been added. This reproduces the production bug.
const oldPatternRead = tracer.indexSource;
assert.equal(
oldPatternRead,
"none",
"OLD pattern (read immediately after await) returns 'none' — this is the streaming-race bug",
);

// Wait for the deferred span to land (modeling `await responseClone.text()`
// forcing the streaming tool handler to finish).
await handlerDone;

// (b) FIXED pattern: read tracer.indexSource AFTER the deferred work has
// completed. The tracer now reflects the actual cache tier.
const fixedPatternRead = tracer.indexSource;
assert.equal(
fixedPatternRead,
"cache",
"FIXED pattern (read after body consumption) returns the actual span source",
);

// Round-trip: feed the fixed value through recordTelemetry and verify it
// lands in blob9.
const env = mockEnv();
const requestBody = JSON.stringify({ jsonrpc: "2.0", id: 1, method: "tools/call", params: { name: "oddkit_search", arguments: { input: "test" } } });
const responseBody = JSON.stringify({ jsonrpc: "2.0", id: 1, result: { content: [{ type: "text", text: "ok" }] } });
const shape = await measurePayloadShape(requestBody, responseBody);
recordTelemetry(mockRequest(), requestBody, env, 42, fixedPatternRead, shape);
assert.equal(env.ODDKIT_TELEMETRY.writes.length, 1, "exactly one data point written");
assert.equal(
env.ODDKIT_TELEMETRY.writes[0].blobs[8],
"cache",
"blob9 (cache_tier) carries the post-body-consumption tracer value",
);

// Sanity: if we had used the broken old-pattern read, blob9 would be "none"
const env2 = mockEnv();
recordTelemetry(mockRequest(), requestBody, env2, 42, oldPatternRead, shape);
assert.equal(
env2.ODDKIT_TELEMETRY.writes[0].blobs[8],
"none",
"blob9 with the OLD-pattern read would be 'none' — what production has been recording",
);
});

console.log(`\n${pass} passed, ${fail} failed`);
process.exit(fail > 0 ? 1 : 0);
Loading