Skip to content

Commit 7198895

Browse files
trivikraduh95
authored andcommitted
stream: serialize concurrent share consumer reads
Ensure overlapping next() calls on a single share() consumer resolve in the same order they were requested. Fixes: #63477 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63478 Fixes: #63477 Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent e1d65d9 commit 7198895

2 files changed

Lines changed: 78 additions & 48 deletions

File tree

lib/internal/streams/iter/share.js

Lines changed: 59 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class ShareImpl {
113113
resolve: null,
114114
reject: null,
115115
detached: false,
116+
pendingNext: PromiseResolve(),
116117
};
117118

118119
this.#consumers.add(state);
@@ -129,62 +130,72 @@ class ShareImpl {
129130
return {
130131
__proto__: null,
131132
[SymbolAsyncIterator]() {
132-
return {
133-
__proto__: null,
134-
async next() {
135-
if (self.#sourceError) {
136-
state.detached = true;
137-
self.#consumers.delete(state);
138-
throw self.#sourceError;
133+
const getNext = async () => {
134+
if (self.#sourceError) {
135+
state.detached = true;
136+
self.#consumers.delete(state);
137+
throw self.#sourceError;
138+
}
139+
140+
// Loop until we get data, source is exhausted, or
141+
// consumer is detached. Multiple consumers may be woken
142+
// after a single pull - those that find no data at their
143+
// cursor must re-pull rather than terminating prematurely.
144+
for (;;) {
145+
if (state.detached) {
146+
if (self.#sourceError) throw self.#sourceError;
147+
return { __proto__: null, done: true, value: undefined };
139148
}
140149

141-
// Loop until we get data, source is exhausted, or
142-
// consumer is detached. Multiple consumers may be woken
143-
// after a single pull - those that find no data at their
144-
// cursor must re-pull rather than terminating prematurely.
145-
for (;;) {
146-
if (state.detached) {
147-
if (self.#sourceError) throw self.#sourceError;
148-
return { __proto__: null, done: true, value: undefined };
149-
}
150+
if (self.#cancelled) {
151+
state.detached = true;
152+
self.#deleteConsumer(state);
153+
return { __proto__: null, done: true, value: undefined };
154+
}
150155

151-
if (self.#cancelled) {
152-
state.detached = true;
153-
self.#deleteConsumer(state);
154-
return { __proto__: null, done: true, value: undefined };
156+
// Check if data is available in buffer
157+
const bufferIndex = state.cursor - self.#bufferStart;
158+
if (bufferIndex < self.#buffer.length) {
159+
const chunk = self.#buffer.get(bufferIndex);
160+
const cursor = state.cursor;
161+
state.cursor++;
162+
if (cursor === self.#cachedMinCursor &&
163+
--self.#cachedMinCursorConsumers === 0) {
164+
self.#tryTrimBuffer();
155165
}
166+
return { __proto__: null, done: false, value: chunk };
167+
}
156168

157-
// Check if data is available in buffer
158-
const bufferIndex = state.cursor - self.#bufferStart;
159-
if (bufferIndex < self.#buffer.length) {
160-
const chunk = self.#buffer.get(bufferIndex);
161-
const cursor = state.cursor;
162-
state.cursor++;
163-
if (cursor === self.#cachedMinCursor &&
164-
--self.#cachedMinCursorConsumers === 0) {
165-
self.#tryTrimBuffer();
166-
}
167-
return { __proto__: null, done: false, value: chunk };
168-
}
169+
if (self.#sourceExhausted) {
170+
state.detached = true;
171+
self.#deleteConsumer(state);
172+
if (self.#sourceError) throw self.#sourceError;
173+
return { __proto__: null, done: true, value: undefined };
174+
}
169175

170-
if (self.#sourceExhausted) {
171-
state.detached = true;
172-
self.#deleteConsumer(state);
173-
if (self.#sourceError) throw self.#sourceError;
174-
return { __proto__: null, done: true, value: undefined };
175-
}
176+
// Need to pull from source - check buffer limit
177+
const canPull = await self.#waitForBufferSpace();
178+
if (!canPull) {
179+
state.detached = true;
180+
self.#deleteConsumer(state);
181+
if (self.#sourceError) throw self.#sourceError;
182+
return { __proto__: null, done: true, value: undefined };
183+
}
176184

177-
// Need to pull from source - check buffer limit
178-
const canPull = await self.#waitForBufferSpace();
179-
if (!canPull) {
180-
state.detached = true;
181-
self.#deleteConsumer(state);
182-
if (self.#sourceError) throw self.#sourceError;
183-
return { __proto__: null, done: true, value: undefined };
184-
}
185+
await self.#pullFromSource();
186+
}
187+
};
185188

186-
await self.#pullFromSource();
187-
}
189+
return {
190+
__proto__: null,
191+
next() {
192+
const next = PromisePrototypeThen(
193+
state.pendingNext,
194+
getNext,
195+
getNext);
196+
state.pendingNext =
197+
PromisePrototypeThen(next, undefined, () => {});
198+
return next;
188199
},
189200

190201
async return() {

test/parallel/test-stream-iter-share-async.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,24 @@ async function testShareMultipleConsumersConcurrentPull() {
306306
assert.strictEqual(t3, expected);
307307
}
308308

309+
async function testShareConsumerConcurrentNextCalls() {
310+
async function* source() {
311+
const enc = new TextEncoder();
312+
yield [enc.encode('first')];
313+
yield [enc.encode('second')];
314+
}
315+
316+
const shared = share(source());
317+
const it = shared.pull()[Symbol.asyncIterator]();
318+
const first = it.next();
319+
const second = it.next();
320+
321+
const [r1, r2] = await Promise.all([first, second]);
322+
const dec = new TextDecoder();
323+
assert.strictEqual(dec.decode(r1.value[0]), 'first');
324+
assert.strictEqual(dec.decode(r2.value[0]), 'second');
325+
}
326+
309327
// share() accepts string source directly (normalized via from())
310328
async function testShareStringSource() {
311329
const shared = share('hello-share');
@@ -327,5 +345,6 @@ Promise.all([
327345
testShareLateJoiningConsumer(),
328346
testShareConsumerBreak(),
329347
testShareMultipleConsumersConcurrentPull(),
348+
testShareConsumerConcurrentNextCalls(),
330349
testShareStringSource(),
331350
]).then(common.mustCall());

0 commit comments

Comments
 (0)