Skip to content

Commit fd41ef3

Browse files
jasnelladuh95
authored andcommitted
test: add tests for experimental stream/iter implementation
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-By: Claude/Opus 4.6 PR-URL: #62066 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent e1f0d2a commit fd41ef3

39 files changed

+8924
-0
lines changed

test/parallel/test-fs-promises-file-handle-pull.js

Lines changed: 440 additions & 0 deletions
Large diffs are not rendered by default.

test/parallel/test-fs-promises-file-handle-pullsync.js

Lines changed: 498 additions & 0 deletions
Large diffs are not rendered by default.

test/parallel/test-fs-promises-file-handle-writer.js

Lines changed: 1126 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { broadcast, text } = require('stream/iter');
7+
8+
// =============================================================================
9+
// Backpressure policies
10+
// =============================================================================
11+
12+
async function testDropOldest() {
13+
const { writer, broadcast: bc } = broadcast({
14+
highWaterMark: 2,
15+
backpressure: 'drop-oldest',
16+
});
17+
const consumer = bc.push();
18+
19+
writer.writeSync('first');
20+
writer.writeSync('second');
21+
// This should drop 'first'
22+
writer.writeSync('third');
23+
writer.endSync();
24+
25+
const data = await text(consumer);
26+
assert.strictEqual(data, 'secondthird');
27+
}
28+
29+
async function testDropNewest() {
30+
const { writer, broadcast: bc } = broadcast({
31+
highWaterMark: 1,
32+
backpressure: 'drop-newest',
33+
});
34+
const consumer = bc.push();
35+
36+
writer.writeSync('kept');
37+
// This should be silently dropped
38+
writer.writeSync('dropped');
39+
writer.endSync();
40+
41+
const data = await text(consumer);
42+
assert.strictEqual(data, 'kept');
43+
}
44+
45+
// =============================================================================
46+
// Block backpressure
47+
// =============================================================================
48+
49+
async function testBlockBackpressure() {
50+
const { writer, broadcast: bc } = broadcast({
51+
highWaterMark: 1,
52+
backpressure: 'block',
53+
});
54+
const consumer = bc.push();
55+
writer.writeSync('a');
56+
57+
// Next write should block
58+
let writeResolved = false;
59+
const writePromise = writer.write('b').then(() => { writeResolved = true; });
60+
await new Promise(setImmediate);
61+
assert.strictEqual(writeResolved, false);
62+
63+
// Drain consumer to unblock the pending write
64+
const iter = consumer[Symbol.asyncIterator]();
65+
const first = await iter.next();
66+
assert.strictEqual(first.done, false);
67+
await new Promise(setImmediate);
68+
assert.strictEqual(writeResolved, true);
69+
70+
writer.endSync();
71+
// Drain remaining data and verify completion
72+
const second = await iter.next();
73+
assert.strictEqual(second.done, false);
74+
await writePromise;
75+
}
76+
77+
// Verify block backpressure data flows correctly end-to-end
78+
async function testBlockBackpressureContent() {
79+
const { writer, broadcast: bc } = broadcast({
80+
highWaterMark: 1,
81+
backpressure: 'block',
82+
});
83+
const consumer = bc.push();
84+
85+
writer.writeSync('a');
86+
const writePromise = writer.write('b');
87+
await new Promise(setImmediate);
88+
89+
// Read all and verify content
90+
const iter = consumer[Symbol.asyncIterator]();
91+
const first = await iter.next();
92+
assert.strictEqual(first.done, false);
93+
const firstStr = new TextDecoder().decode(first.value[0]);
94+
assert.strictEqual(firstStr, 'a');
95+
96+
await writePromise;
97+
writer.endSync();
98+
99+
const second = await iter.next();
100+
assert.strictEqual(second.done, false);
101+
const secondStr = new TextDecoder().decode(second.value[0]);
102+
assert.strictEqual(secondStr, 'b');
103+
104+
const done = await iter.next();
105+
assert.strictEqual(done.done, true);
106+
}
107+
108+
// Writev async path
109+
async function testWritevAsync() {
110+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
111+
const consumer = bc.push();
112+
113+
await writer.writev(['hello', ' ', 'world']);
114+
await writer.end();
115+
116+
const data = await text(consumer);
117+
assert.strictEqual(data, 'hello world');
118+
}
119+
120+
// endSync returns the total byte count
121+
async function testEndSyncReturnValue() {
122+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
123+
bc.push(); // Need a consumer to write to
124+
125+
writer.writeSync('hello'); // 5 bytes
126+
writer.writeSync(' world'); // 6 bytes
127+
const total = writer.endSync();
128+
assert.strictEqual(total, 11);
129+
}
130+
131+
Promise.all([
132+
testDropOldest(),
133+
testDropNewest(),
134+
testBlockBackpressure(),
135+
testBlockBackpressureContent(),
136+
testWritevAsync(),
137+
testEndSyncReturnValue(),
138+
]).then(common.mustCall());
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { broadcast, text } = require('stream/iter');
7+
8+
// =============================================================================
9+
// Basic broadcast
10+
// =============================================================================
11+
12+
async function testBasicBroadcast() {
13+
const { writer, broadcast: bc } = broadcast();
14+
15+
// Create two consumers
16+
const consumer1 = bc.push();
17+
const consumer2 = bc.push();
18+
19+
assert.strictEqual(bc.consumerCount, 2);
20+
21+
await writer.write('hello');
22+
await writer.end();
23+
24+
const [data1, data2] = await Promise.all([
25+
text(consumer1),
26+
text(consumer2),
27+
]);
28+
29+
assert.strictEqual(data1, 'hello');
30+
assert.strictEqual(data2, 'hello');
31+
}
32+
33+
async function testMultipleWrites() {
34+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
35+
36+
const consumer = bc.push();
37+
38+
await writer.write('a');
39+
await writer.write('b');
40+
await writer.write('c');
41+
await writer.end();
42+
43+
const data = await text(consumer);
44+
assert.strictEqual(data, 'abc');
45+
}
46+
47+
async function testConsumerCount() {
48+
const { broadcast: bc } = broadcast();
49+
50+
assert.strictEqual(bc.consumerCount, 0);
51+
52+
const c1 = bc.push();
53+
assert.strictEqual(bc.consumerCount, 1);
54+
55+
bc.push();
56+
assert.strictEqual(bc.consumerCount, 2);
57+
58+
bc.cancel();
59+
60+
// After cancel, consumer count drops to 0
61+
assert.strictEqual(bc.consumerCount, 0);
62+
63+
// Consumers are detached and yield nothing
64+
const batches = [];
65+
for await (const batch of c1) {
66+
batches.push(batch);
67+
}
68+
assert.strictEqual(batches.length, 0);
69+
}
70+
71+
// =============================================================================
72+
// Writer methods
73+
// =============================================================================
74+
75+
async function testWriteSync() {
76+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 2 });
77+
const consumer = bc.push();
78+
79+
assert.strictEqual(writer.writeSync('a'), true);
80+
assert.strictEqual(writer.writeSync('b'), true);
81+
// Buffer full (highWaterMark=2, strict policy)
82+
assert.strictEqual(writer.writeSync('c'), false);
83+
84+
writer.endSync();
85+
86+
const data = await text(consumer);
87+
assert.strictEqual(data, 'ab');
88+
}
89+
90+
async function testWritevSync() {
91+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 10 });
92+
const consumer = bc.push();
93+
94+
assert.strictEqual(writer.writevSync(['hello', ' ', 'world']), true);
95+
writer.endSync();
96+
97+
const data = await text(consumer);
98+
assert.strictEqual(data, 'hello world');
99+
}
100+
101+
async function testWriterEnd() {
102+
const { writer, broadcast: bc } = broadcast();
103+
const consumer = bc.push();
104+
105+
await writer.write('data');
106+
const totalBytes = await writer.end();
107+
assert.strictEqual(totalBytes, 4); // 'data' = 4 UTF-8 bytes
108+
109+
const data = await text(consumer);
110+
assert.strictEqual(data, 'data');
111+
}
112+
113+
async function testWriterFail() {
114+
const { writer, broadcast: bc } = broadcast();
115+
const consumer = bc.push();
116+
117+
writer.fail(new Error('test error'));
118+
119+
await assert.rejects(
120+
async () => {
121+
// eslint-disable-next-line no-unused-vars
122+
for await (const _ of consumer) {
123+
assert.fail('Should not reach here');
124+
}
125+
},
126+
{ message: 'test error' },
127+
);
128+
}
129+
130+
// =============================================================================
131+
// Cancel
132+
// =============================================================================
133+
134+
async function testCancelWithoutReason() {
135+
const { broadcast: bc } = broadcast();
136+
const consumer = bc.push();
137+
138+
bc.cancel();
139+
140+
const batches = [];
141+
for await (const batch of consumer) {
142+
batches.push(batch);
143+
}
144+
assert.strictEqual(batches.length, 0);
145+
}
146+
147+
async function testCancelWithReason() {
148+
const { broadcast: bc } = broadcast();
149+
150+
// Start a consumer that is waiting for data (promise pending)
151+
const consumer = bc.push();
152+
const resultPromise = text(consumer).catch((err) => err);
153+
154+
// Give the consumer time to enter the waiting state
155+
await new Promise((resolve) => setImmediate(resolve));
156+
157+
bc.cancel(new Error('cancelled'));
158+
159+
const result = await resultPromise;
160+
assert.ok(result instanceof Error);
161+
assert.strictEqual(result.message, 'cancelled');
162+
}
163+
164+
// =============================================================================
165+
// Writer fail detaches consumers
166+
// =============================================================================
167+
168+
async function testFailDetachesConsumers() {
169+
const { writer, broadcast: bc } = broadcast();
170+
const consumer1 = bc.push();
171+
const consumer2 = bc.push();
172+
173+
assert.strictEqual(bc.consumerCount, 2);
174+
175+
// Write some data, then fail the writer
176+
await writer.write('data');
177+
await writer.fail(new Error('writer failed'));
178+
179+
// After fail, consumers are detached
180+
assert.strictEqual(bc.consumerCount, 0);
181+
182+
// Both consumers should see the error
183+
await assert.rejects(
184+
async () => {
185+
// eslint-disable-next-line no-unused-vars
186+
for await (const _ of consumer1) {
187+
assert.fail('Should not reach here');
188+
}
189+
},
190+
{ message: 'writer failed' },
191+
);
192+
193+
await assert.rejects(
194+
async () => {
195+
// eslint-disable-next-line no-unused-vars
196+
for await (const _ of consumer2) {
197+
assert.fail('Should not reach here');
198+
}
199+
},
200+
{ message: 'writer failed' },
201+
);
202+
}
203+
204+
// =============================================================================
205+
// Writer fail idempotent
206+
// =============================================================================
207+
208+
async function testWriterFailIdempotent() {
209+
const { writer, broadcast: bc } = broadcast();
210+
const consumer = bc.push();
211+
writer.writeSync('hello');
212+
writer.fail(new Error('fail!'));
213+
// Second call is a no-op (already errored)
214+
writer.fail(new Error('fail2'));
215+
await assert.rejects(async () => {
216+
// eslint-disable-next-line no-unused-vars
217+
for await (const _ of consumer) { /* consume */ }
218+
}, { message: 'fail!' });
219+
}
220+
221+
// cancel() with falsy reason (0, "", false) should still treat as error
222+
async function testCancelWithFalsyReason() {
223+
const { broadcast: bc } = broadcast();
224+
const consumer = bc.push();
225+
const resultPromise = text(consumer).catch((err) => err);
226+
await new Promise((resolve) => setImmediate(resolve));
227+
bc.cancel(0);
228+
const result = await resultPromise;
229+
assert.strictEqual(result, 0);
230+
}
231+
232+
// Late-joining consumer should read from oldest buffered entry
233+
async function testLateJoinerSeesBufferedData() {
234+
const { writer, broadcast: bc } = broadcast({ highWaterMark: 16 });
235+
236+
// Write data before any consumer joins
237+
writer.writeSync('before-join');
238+
writer.endSync();
239+
240+
// Consumer joins after data is written
241+
const consumer = bc.push();
242+
const result = await text(consumer);
243+
assert.strictEqual(result, 'before-join');
244+
}
245+
246+
Promise.all([
247+
testBasicBroadcast(),
248+
testMultipleWrites(),
249+
testConsumerCount(),
250+
testWriteSync(),
251+
testWritevSync(),
252+
testWriterEnd(),
253+
testWriterFail(),
254+
testCancelWithoutReason(),
255+
testCancelWithReason(),
256+
testCancelWithFalsyReason(),
257+
testFailDetachesConsumers(),
258+
testWriterFailIdempotent(),
259+
testLateJoinerSeesBufferedData(),
260+
]).then(common.mustCall());

0 commit comments

Comments
 (0)