Skip to content

Commit f013d38

Browse files
committed
fix: re-offer coalesced wakeup requests after in-flight dispatch completes
When a second provider wakeup for the same threadId was dequeued while an earlier dispatchWakeup was still in flight, the dispatcher would log 'provider-wakeup.coalesced' and permanently discard the request. If the in-flight dispatch then failed (quiescence timeout, thread-stayed-busy, or dispatch-failed), that coalesced wakeup was lost with no retry path. Fix: store coalesced requests in a per-thread pending map. When the in-flight dispatch finishes (whether it succeeds or fails), re-offer any pending request back to the relay queue so it gets another dispatch attempt.
1 parent b1a0017 commit f013d38

1 file changed

Lines changed: 31 additions & 8 deletions

File tree

apps/server/src/orchestration-v2/ProviderWakeupService.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,11 @@ export const runWakeupDispatcher: Effect.Effect<
171171
// Wakeups dispatch concurrently, one in flight per thread: quiescence
172172
// waiting for one busy thread must not head-of-line-block every other
173173
// thread's wakeup (which could be superseded while it waits). A wakeup
174-
// arriving while its thread already has one in flight is coalesced away —
175-
// the adapter buffers all pending activity behind a single attach.
174+
// arriving while its thread already has one in flight is held pending;
175+
// when the in-flight dispatch finishes it re-offers the pending request
176+
// so a failed dispatch does not permanently lose the wakeup.
176177
const inFlightThreads = yield* Ref.make(new Set<ThreadId>());
178+
const pendingWakeups = yield* Ref.make(new Map<ThreadId, ProviderWakeupRequest>());
177179
return yield* relay.take.pipe(
178180
Effect.flatMap((input) =>
179181
Ref.modify(inFlightThreads, (current) => {
@@ -192,16 +194,37 @@ export const runWakeupDispatcher: Effect.Effect<
192194
const next = new Set(current);
193195
next.delete(input.threadId);
194196
return next;
195-
}),
197+
}).pipe(
198+
Effect.andThen(
199+
Ref.modify(pendingWakeups, (m) => {
200+
const pending = m.get(input.threadId);
201+
if (pending === undefined) return [undefined, m] as const;
202+
const next = new Map(m);
203+
next.delete(input.threadId);
204+
return [pending, next] as const;
205+
}),
206+
),
207+
Effect.flatMap((pending) =>
208+
pending !== undefined ? relay.offer(pending) : Effect.void,
209+
),
210+
),
196211
),
197212
Effect.forkScoped,
198213
Effect.asVoid,
199214
)
200-
: Effect.logInfo("orchestration-v2.provider-wakeup.coalesced", {
201-
threadId: input.threadId,
202-
providerThreadId: input.providerThreadId,
203-
origin: input.origin,
204-
}),
215+
: Ref.update(pendingWakeups, (m) => {
216+
const next = new Map(m);
217+
next.set(input.threadId, input);
218+
return next;
219+
}).pipe(
220+
Effect.andThen(
221+
Effect.logInfo("orchestration-v2.provider-wakeup.coalesced", {
222+
threadId: input.threadId,
223+
providerThreadId: input.providerThreadId,
224+
origin: input.origin,
225+
}),
226+
),
227+
),
205228
),
206229
),
207230
),

0 commit comments

Comments
 (0)