Skip to content

fix(a2a): resubscribe on graceful SSE close with in-flight task#2406

Merged
malinskibeniamin merged 4 commits intomasterfrom
bm/fix-a2a-reconnect-clean-close
Apr 21, 2026
Merged

fix(a2a): resubscribe on graceful SSE close with in-flight task#2406
malinskibeniamin merged 4 commits intomasterfrom
bm/fix-a2a-reconnect-clean-close

Conversation

@malinskibeniamin
Copy link
Copy Markdown
Contributor

@malinskibeniamin malinskibeniamin commented Apr 21, 2026

Summary

Load balancers with idle timeouts (commonly ~5 min — AWS ALB, GCP LB, nginx defaults) close idle TCP connections gracefully with a FIN rather than an error. The A2A streaming iterator surfaces this as clean end-of-stream, so streamMessage's existing catch-block reconnect never fires and the message is finalized with a non-terminal taskState, orphaning the task server-side and showing the user a truncated reply.

Fix: after the for await over streamResult.fullStream exits cleanly, if isResubscribable(state) is true, route through the same resubscribeLoop already used in the error path. The loop already handles backoff, progress detection, and give-up — it just needed a second entry point.

Guard: if the clean-close path already ran resubscribeLoop and a subsequent finalizeMessage throws (e.g., DB write fails), the outer catch must not invoke resubscribeLoop a second time. Tracked with a resubscribeAttempted flag.

Context — not caused by recent SDK bumps

This bug predates the A2A 0.3.10 → 0.3.13, AI SDK v6, and streamdown v2 upgrades (#2389). The try { for-await } catch { resubscribeLoop } shape is unchanged from the original SSE reconnection commit (4fefd7b73). Clean-close never triggered the catch path in any version.

Which 5-min LB failure modes does this cover

LB behavior at 5 min Before After
Graceful TCP close (FIN) — AWS ALB, GCP LB, nginx default No reconnect, message truncated resubscribeTask fires, same backoff loop runs ✅
Connection reset (RST) or socket error Existing catch path → resubscribeLoop Unchanged
HTTP 504 mid-stream Existing catch path → resubscribeLoop Unchanged

If the existing error path is "still buggy" for case 2 or 3, the root cause is different from what this PR addresses.

Test plan

New regression tests in use-message-streaming.test.ts:

  • resubscribes when stream ends cleanly but task is still in-flight (LB idle-timeout) — asserts resubscribeTask is called and task completes after clean-close reconnect
  • does not resubscribe when stream ends cleanly and task is already terminal — negative guard
  • finalizes with gave-up status when clean-close triggers resubscribe but it exhausts retries — content preserved, single gave-up block shown
  • does not re-enter resubscribeLoop when finalizeMessage fails after a gave-up clean-close — asserts exactly 5 retries (one round), not 10

Verification (run locally):

  • bun run test:unit -- use-message-streaming — 31/31 pass
  • bun run type:check — clean
  • bun run lint on touched files — clean
  • Manual repro on a deployed cloud env: start a long-running agent task, let it idle past 5 min, watch for the disconnected → reconnecting → reconnected status blocks instead of silent truncation. DevTools Network panel will show 200 OK/completed on the streaming request (graceful close) vs (failed)/net::ERR_* (error close — different code path).

@claude please review.

🤖 Generated with Claude Code

Load balancers with idle timeouts (commonly ~5 minutes) close idle TCP
connections gracefully with a FIN rather than an error. The A2A streaming
iterator surfaces this as end-of-stream, so the catch-block reconnect
never runs and the message is finalized with a non-terminal taskState,
orphaning the task server-side.

Route clean closes through the same resubscribeLoop used in the error
path when the task is still resubscribable. The loop already handles
backoff, progress detection, and give-up; it just needed a second entry
point.

Covered by three regression tests: clean-close with in-flight task,
clean-close with terminal task (no resubscribe), and clean-close where
resubscribe exhausts retries (finalizes with gave-up status).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@malinskibeniamin
Copy link
Copy Markdown
Contributor Author

@claude review

@malinskibeniamin malinskibeniamin self-assigned this Apr 21, 2026
@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Claude finished @malinskibeniamin's task in 1m 18s —— View job


Code Review

  • Gather context and read changed files
  • Review implementation in use-message-streaming.ts
  • Review tests in use-message-streaming.test.ts
  • Post final review

The fix is correct, well-motivated, and minimal. The reuse of the existing resubscribeLoop entry point is a good call — it avoids duplicating backoff/retry logic. A few observations below.

Implementation (use-message-streaming.ts)

Redundant closeActiveTextBlock call (minor)

Lines 336–337 already close the active text block and null it:

closeActiveTextBlock(state.contentBlocks, state.activeTextBlock);
state.activeTextBlock = null;

The new block at 356–357 repeats this:

if (isResubscribable(state)) {
  closeActiveTextBlock(state.contentBlocks, state.activeTextBlock); // activeTextBlock is already null here
  state.activeTextBlock = null;
  await resubscribeLoop(...);
}

No code between those two points can set activeTextBlock to a new value, so the second call is a no-op. It mirrors the shape of the catch block (which needs it because the loop can exit mid-stream), but that reasoning doesn't apply here. Fix this →

success: true when resubscribe gives up on a clean close (design note)

After the for await exits, if resubscribeLoop exhausts all retries, execution falls through to finalizeMessage which returns { success: true }. This is intentional (Scenario 16d test, line 1039 comment: "success=true reflects 'no exception'"), but it creates an asymmetry: the equivalent gave-up path after an error returns { success: false }. Callers relying on success to detect orphaned tasks will miss this case. The "gave-up" connection-status block is the only signal. Worth documenting or considering whether the clean-close gave-up path should also return success: false.

Tests (use-message-streaming.test.ts)

New scenarios 16b/16c/16d are solid — they directly cover the regression: clean end-of-stream with in-flight task, clean end with terminal task, and clean end → gave-up. Good coverage of the fix.

Duplicate scenario numbering — there are three scenarios numbered "13" (13, 13b, 13c). Doesn't affect test execution but makes navigating the file confusing. Fix this →

Misleading test name for Scenario 15'rethrows TypeError from resubscribe instead of retrying' — the code doesn't actually rethrow; it hits the break and falls to the "gave-up" path, producing success: false. "Stops retrying immediately on TypeError" would be more accurate. Fix this →

vi.useRealTimers() + vi.useFakeTimers() pattern — the reset in Scenarios 5, 16d, and 17 is correct. Just note that the beforeEach uses shouldAdvanceTime: true and those tests override with shouldAdvanceTime: false; this pattern works but could confuse contributors who add timer-sensitive tests later.

Summary

Bug fix correctness ✅ Correct — routes clean close through the existing resubscribe loop
Minimal change ✅ 12 lines of production code
Test coverage ✅ Three new targeted regression scenarios
Minor cleanups See redundant close call and test numbering above

…ailure

After the clean-close path enters resubscribeLoop and exhausts its 5
attempts without reaching terminal state, the captured task state is
still 'working' and isResubscribable() remains true. If finalizeMessage
then throws (e.g., the DB write fails), the outer catch would re-enter
resubscribeLoop for another full 31s backoff round.

Track entry with a resubscribeAttempted flag and skip the catch-path
reconnect when the clean-close path already ran one. Mirrors the
defensive pattern in the existing catch block around finalizeMessage.

Add a regression test asserting exactly one round of 5 retries even when
both resubscribe and finalize fail.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@malinskibeniamin
Copy link
Copy Markdown
Contributor Author

@claude please review this PR. Specifically interested in your read on:

  1. Whether the resubscribeAttempted flag is the cleanest way to prevent double-reconnect, or whether this would be better refactored (e.g., extract a shared "resubscribe + finalize" helper used by both the clean-close and error paths).
  2. Whether the clean-close entry into resubscribeLoop should have any other guardrails — e.g., a minimum delay before the first "disconnected" block appears, to avoid flashing the reconnect UI for quick-completing tasks that happen to also clean-close at the wire level.
  3. Any scenario I missed in the tests.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Claude finished @malinskibeniamin's task in 2m 33s —— View job


Code Review — Focused Q&A

  • Gather context
  • Read implementation (use-message-streaming.ts)
  • Read tests (use-message-streaming.test.ts)
  • Answer Q1: resubscribeAttempted flag vs. shared helper
  • Answer Q2: guardrails for clean-close UI flash
  • Answer Q3: missing test scenarios
  • Post final review

Q1 — Is resubscribeAttempted the right shape, or should there be a shared helper?

Short answer: the flag is correct. A shared helper doesn't simplify anything.

The reason the flag is necessary is that the two call sites have genuinely different gave-up semantics:

Path resubscribeLoop returns false (gave-up)
Clean-close Falls through to finalizeMessage — success=true, gave-up block visible in UI
Error catch Skips finalizeMessage, falls to the a2a-error block — success=false

A "shared resubscribe + finalize" helper would need to express this same fork internally — either via its own parameter flag, or by returning a discriminated result. That's the same complexity in a different location, and it fragments the paths that are currently readable in one place.

The one structural alternative worth noting: move the clean-close finalizeMessage call inside the if (isResubscribable) block, then add a second finalizeMessage call for the non-resubscribable branch. That eliminates the flag but introduces a duplicate call (or a helper that calls it in both arms). The flag wins.

One thing the current implementation does paper over: after resubscribeLoop gives up, state.capturedTaskState is still non-terminal (the server never sent a terminal event). So isResubscribable(state) is still true when the outer catch fires. The flag is what prevents re-entry. A deeper fix would be to force-transition capturedTaskState to a pseudo-terminal value inside resubscribeLoop before returning false — then isResubscribable itself would return false and the flag becomes unnecessary. That said, mutating task state to something that was never sent by the server has its own risks. The flag is safer.


Q2 — Does the clean-close entry need a minimum delay or other guardrails to avoid flashing the reconnect UI?

Short answer: no extra guardrails needed. The flash concern mostly doesn't apply.

The scenario you're worried about — "quick task completes + clean TCP close → UI briefly shows disconnected" — is actually handled at the isResubscribable check:

  • If the task completed normally, the server sent a terminal status-update event before closing the connection. That event updated state.capturedTaskState to 'completed'. isResubscribable returns false. No reconnect UI, no flash. (Scenario 16c.)

  • If both the FIN and the final event are racing in the TCP buffer, the iterator still drains them before exiting. By the time the for await exits, capturedTaskState is already terminal. Same result.

The flash only occurs when the LB closed the connection before the terminal event arrived — meaning the task is genuinely still in-flight. In that case the "disconnected" status block is accurate information, not noise.

The one real edge case: the task completes server-side in the 1-second window between the clean close and the first resubscribeTask attempt. The user sees disconnected → reconnecting → reconnected (completed) for ~1s. That's an accurate summary of what happened, and it's a narrow window. A pre-check getTask call would eliminate it, but it adds a network round-trip and complexity that isn't worth it here.


Q3 — Missing test scenarios

The four new tests (16b–16e) are solid. The one gap I'd flag:

Missing: clean-close + resubscribe succeeds + finalizeMessage throws

Scenario 16e tests: clean-close → gave-up → finalizeMessage throws → no second resubscribe loop.
Scenario 16 tests: error path → resubscribe succeeds → finalizeMessage throws → falls to error path.

Nobody tests: clean-close → resubscribe succeeds (task reaches terminal) → finalizeMessage throws → catch fires with resubscribeAttempted = true → no second loop → error path.

This matters because resubscribeAttempted = true is set before resubscribeLoop is awaited (line 361), so it guards both gave-up and success outcomes. But there's no test that a successful clean-close resubscribe followed by a DB failure doesn't trigger a second loop. The production code path is:

// line 360-365 (clean-close branch)
if (isResubscribable(state)) {
  resubscribeAttempted = true;       // ← set here
  ...
  await resubscribeLoop(...);         // ← task now terminal, returns true
}
return await finalizeMessage(...);   // ← DB write fails → throws
// catch fires: !resubscribeAttempted is false → guard triggers → correct

The guard is correct but untested for this specific combination. Worth adding a test analogous to Scenario 16 but with clean-close as the entry.

Lower priority: clean-close when taskId comes from response metadata fallback

Scenario 10 confirms that capturedTaskId can be populated from streamResult.response metadata. The clean-close check at line 360 runs after the metadata fallback at lines 344–352, so the ordering is correct. A test combining clean-close + metadata-fallback taskId would confirm this ordering holds. Low priority since the logic path is sequential and obvious, but it's a coverage gap.


@malinskibeniamin malinskibeniamin requested review from a team, Mateoc, datamali, sago2k8 and weeco and removed request for a team April 21, 2026 15:37
@malinskibeniamin malinskibeniamin added bug Something isn't working ui/ux labels Apr 21, 2026
Production code (use-message-streaming.ts):
- Remove redundant closeActiveTextBlock call in the clean-close branch;
  the block at the top of that stretch already closed it and nothing
  between could have opened a new one.
- Report success=false when the clean-close path's resubscribe gives up,
  mirroring the error-path's gave-up semantics. An orphaned task is a
  failure regardless of whether the original disconnect was graceful.

Tests (use-message-streaming.test.ts):
- Renumber scenarios into a clean 1-25 sequence (was 1-13, 13b, 13c,
  14-17 with an added 16b/c/d/e block mid-file).
- Rename the TypeError test: the code breaks out of the retry loop
  rather than rethrowing, so "stops retrying immediately on TypeError"
  is accurate.
- Update gave-up clean-close test to assert success=false.
- Add scenario 23: clean-close resubscribe succeeds then finalizeMessage
  throws — guards the success-path arm of resubscribeAttempted against
  future regressions that change terminal-state tracking.
- Add scenario 24: clean-close with taskId captured only from response
  metadata fallback — confirms the metadata block runs before the
  clean-close isResubscribable check and does not trigger spurious
  reconnects.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@malinskibeniamin
Copy link
Copy Markdown
Contributor Author

@claude I've addressed the review feedback. Summary of changes in 2d9abcf:

Production code:

  1. Removed the redundant closeActiveTextBlock/state.activeTextBlock = null calls in the clean-close branch (the lines above already do this with nothing mutating activeTextBlock in between).
  2. Made the clean-close gave-up path return success: false for symmetry with the error-path gave-up. Orphaned tasks are failures regardless of whether the disconnect was graceful or thrown. The gave-up connection-status block is still visible to the user either way.

Tests:
3. Renumbered all scenarios into a clean 1–25 sequence. Previously we had 13, 13b, 13c interleaved with 14–17 plus an added 16b/c/d/e block.
4. Renamed Scenario 17 (the TypeError test) from rethrows TypeError from resubscribe instead of retrying to stops retrying immediately on TypeError from resubscribe — the code breaks out and falls through, it doesn't rethrow.
5. Updated the gave-up clean-close test to assert success: false (matches the new production behavior).
6. Added Scenario 23: clean-close resubscribe succeeds, then finalizeMessage throws — explicitly guards the success-path arm of resubscribeAttempted against future regressions that change terminal-state tracking.
7. Added Scenario 24: clean-close with taskId from response metadata fallback only — confirms the fallback runs before the clean-close isResubscribable check and doesn't spuriously reconnect.

33/33 tests pass. Type-check and lint clean.

Please take another pass.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Claude finished @malinskibeniamin's task in 1m 30s —— View job


Code Review — Follow-up Pass

  • Post initial comment
  • Read use-message-streaming.ts (current state)
  • Read use-message-streaming.test.ts (current state)
  • Verify each addressed item
  • Post final review

All seven items from the trigger comment are confirmed in the code. Here's a point-by-point check plus one minor observation.

Verified changes

1. Redundant closeActiveTextBlock calls removed
Lines 364–372 in the clean-close branch call straight into resubscribeLoop with no repeated close. The comment at lines 354–363 explicitly explains why it's not needed here (active block already closed above, unlike the catch branch which can arrive mid-stream). That comment is genuinely useful — keep it.

2. Clean-close gave-up returns success: false
Line 371: return recovered ? finalResult : { ...finalResult, success: false }. Correct and the symmetry argument is sound — an orphaned task is a failure regardless of whether the FIN was graceful or thrown.

3. Scenarios renumbered 1–25
Sequential, no duplicates. Much easier to navigate.

4. Scenario 17 renamed
'stops retrying immediately on TypeError from resubscribe' — accurate, no longer implies a rethrow.

5. Scenario 21 asserts success: false
Line 1040. Consistent with the production change.

6. Scenario 23 added (clean-close succeeds → finalizeMessage throws)
Lines 1118–1156. Tests exactly the production path described in the comment above the test: task goes terminal during resubscribe, then DB write fails. Both the resubscribeAttempted guard and isResubscribable returning false (terminal state) independently prevent re-entry. The comment correctly documents this belt-and-suspenders situation.

7. Scenario 24 added (metadata-fallback taskId + clean close)
Lines 1159–1191. Confirms the ordering: metadata fallback runs, then isResubscribable checks for capturedTaskState (never set in this path) → no reconnect, no connection-status blocks.


One minor observation (non-blocking)

vi.spyOn(console, 'error') in Scenarios 22 and 23 suppresses nothing

The console.error at line 392 is inside the error-path's inner try { finalizeMessage } catch, which is only reached when !resubscribeAttempted && isResubscribable(state). In both Scenario 22 and 23, resubscribeAttempted is already true, so execution skips that block entirely — the log is never emitted and the spy never fires. The suppression is harmless (and defensive against future refactors), but if a contributor later adds a console.error call to the outer catch's fallthrough they might assume it's already being suppressed here when it isn't.

Not worth a change request — just something to be aware of when reading the test output.


Summary

All review feedback addressed
success: false symmetry ✅ Clean improvement
Scenario numbering ✅ 1–25, sequential
Scenario 17 name ✅ Accurate
Scenario 23 (success-path guard) ✅ Explicit regression coverage
Scenario 24 (metadata-fallback ordering) ✅ Confirms sequential correctness
Production code ✅ No issues found

This is in good shape to merge.

@malinskibeniamin
Copy link
Copy Markdown
Contributor Author

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Claude finished @malinskibeniamin's task in 1m 48s —— View job


Code Review — Final Pass

  • Post initial comment
  • Read use-message-streaming.ts
  • Read use-message-streaming.test.ts
  • Review diff against origin/master
  • Post final review

This is a clean final state. All items from the previous three review rounds are addressed, and the two new scenarios (23, 24) are well-constructed. One minor observation, non-blocking.

Production code (use-message-streaming.ts)

finalizeMessage failure in the clean-close path isn't logged (non-blocking)

In the error path, if finalizeMessage throws after a successful recovery, the inner try/catch at lines 386–394 catches it, emits a console.error, then falls through. In the clean-close path (lines 364–372) there is no equivalent inner try/catch — a finalizeMessage failure propagates directly to the outer catch, which hits parseA2AError silently.

The result for the user is the same (an a2a-error block), but a DB write failure after a clean-close resubscribe leaves no log trace in production. Scenario 23 confirms the behavior is correct, so this is a monitoring gap rather than a functional bug.

If this ever becomes noise in production telemetry, the fix is straightforward: wrap lines 367–371 in the same inner try/catch pattern that lines 386–394 use. Not necessary to block on.

Tests (use-message-streaming.test.ts)

Scenarios 22/23 console.error spy — previously noted, still present. The spy never fires on these paths (the resubscribeAttempted guard prevents reaching line 392), so the comment "Silence expected finalize-failure log" is inaccurate. Harmless and defensive against future refactors. Fine to leave.

Scenario 22 timer advancement — the test advances 5 backoff windows, which is exactly enough for the guard-passes case. If the guard breaks and a second round fires, await resultPromise would hang (time out) rather than produce a clean assertion failure. The toHaveBeenCalledTimes(5) check is only reached if the promise resolves, so the test failure mode is a timeout rather than a clear message. This is a test design trade-off; not worth restructuring, just worth knowing.


Summary

Bug fix correctness
resubscribeAttempted guard ✅ Correct — prevents double-loop in all four paths (gave-up×2, success×2)
success: false symmetry ✅ Clean-close gave-up now mirrors error-path gave-up
Scenario renumbering (1–25)
Scenario 22: gave-up + finalize fail
Scenario 23: success + finalize fail
Scenario 24: metadata-fallback ordering
Logging gap on clean-close finalize fail ⚠️ Minor — monitoring gap only, not functional

LGTM. The one observation above is minor enough that I wouldn't hold the merge for it.

Mirror the catch-block recovery branch's inner try/catch so a DB write
failure after a clean-close reconnect is observable in production
telemetry instead of silently producing an a2a-error block.

Closes the last review observation from the final Claude pass: the
clean-close path previously let finalizeMessage errors propagate to the
outer catch, which runs parseA2AError on the DB error but emits no log.
Scenarios 22 and 23 now also assert the log fires exactly once from the
clean-close branch, making the previously-inaccurate console.error spy
comments accurate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@malinskibeniamin
Copy link
Copy Markdown
Contributor Author

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 21, 2026

Claude finished @malinskibeniamin's task in 1m 27s —— View job


Code Review — Final Pass

  • Post initial comment
  • Read use-message-streaming.ts (current state)
  • Read use-message-streaming.test.ts (current state)
  • Review new commit (3cd57b2 — log finalize failures on clean-close path)
  • Post final review

The last remaining observation from the previous review pass — "a finalizeMessage failure after a clean-close resubscribe leaves no log trace in production" — is addressed in 3cd57b2. Here's the verification.

Production code (use-message-streaming.ts)

Inner try/catch added around finalizeMessage in the clean-close branch

// lines 367–381
try {
  const finalResult = await finalizeMessage(state, assistantMessage);
  return recovered ? finalResult : { ...finalResult, success: false };
} catch (finalizeError) {
  console.error('finalizeMessage failed after clean-close recovery:', finalizeError);
  throw finalizeError;
}

This is the exact same shape as the error-catch recovery branch at lines 396–404. A DB write failure after a clean-close reconnect now produces a log entry in production telemetry rather than silently falling through to parseA2AError. The rethrow correctly delegates error-block construction to the outer catch.

Tests (use-message-streaming.test.ts)

Scenarios 22 and 23 console.error spy now accurate

Previously the spy was set up but never fired (the console.error call didn't exist). Both scenarios now assert the log fires exactly once:

const cleanCloseLogs = errorSpy.mock.calls.filter(
  (args) => typeof args[0] === 'string' && args[0].startsWith('finalizeMessage failed after clean-close recovery')
);
expect(cleanCloseLogs).toHaveLength(1);

This makes the spy both functional (silences test output noise) and assertive (regression coverage against the log being removed or moved). The comments in both tests now accurately describe what they're capturing.


Summary

Bug fix correctness
resubscribeAttempted guard
success: false symmetry for gave-up
Logging on clean-close finalize fail ✅ Addressed in 3cd57b2
Scenarios 22/23 spy now verifies log ✅ Accurate and assertive
Scenario 24 metadata-fallback ordering
Scenario numbering (1–25)

No outstanding issues. LGTM.

@malinskibeniamin malinskibeniamin enabled auto-merge (squash) April 21, 2026 16:13
@malinskibeniamin malinskibeniamin merged commit 99bd993 into master Apr 21, 2026
17 checks passed
@malinskibeniamin malinskibeniamin deleted the bm/fix-a2a-reconnect-clean-close branch April 21, 2026 16:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working ui/ux

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants