Skip to content

Commit bfff8cb

Browse files
jasnelladuh95
authored andcommitted
benchmark: add benchmarks for experimental stream/iter
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-By: Claude/Opus 4.6 PR-URL: #62066 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent fd41ef3 commit bfff8cb

9 files changed

+1158
-0
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Benchmark: pipeToSync with sync compression transforms.
2+
// Measures fully synchronous file-to-file pipeline (no threadpool, no promises).
3+
'use strict';
4+
5+
const common = require('../common.js');
6+
const fs = require('fs');
7+
const { openSync, closeSync, writeSync, unlinkSync } = fs;
8+
9+
const tmpdir = require('../../test/common/tmpdir');
10+
tmpdir.refresh();
11+
const srcFile = tmpdir.resolve(`.removeme-sync-bench-src-${process.pid}`);
12+
const dstFile = tmpdir.resolve(`.removeme-sync-bench-dst-${process.pid}`);
13+
14+
const bench = common.createBenchmark(main, {
15+
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
16+
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
17+
n: [5],
18+
}, {
19+
flags: ['--experimental-stream-iter'],
20+
});
21+
22+
function main({ compression, filesize, n }) {
23+
// Create the fixture file with repeating lowercase ASCII
24+
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
25+
const fd = openSync(srcFile, 'w');
26+
let remaining = filesize;
27+
while (remaining > 0) {
28+
const toWrite = Math.min(remaining, chunk.length);
29+
writeSync(fd, chunk, 0, toWrite);
30+
remaining -= toWrite;
31+
}
32+
closeSync(fd);
33+
34+
const { pipeToSync } = require('stream/iter');
35+
const {
36+
compressGzipSync,
37+
compressDeflateSync,
38+
compressBrotliSync,
39+
compressZstdSync,
40+
} = require('zlib/iter');
41+
const { open } = fs.promises;
42+
43+
const compressFactory = {
44+
gzip: compressGzipSync,
45+
deflate: compressDeflateSync,
46+
brotli: compressBrotliSync,
47+
zstd: compressZstdSync,
48+
}[compression];
49+
50+
// Stateless uppercase transform (sync)
51+
const upper = (chunks) => {
52+
if (chunks === null) return null;
53+
const out = new Array(chunks.length);
54+
for (let j = 0; j < chunks.length; j++) {
55+
const src = chunks[j];
56+
const buf = Buffer.allocUnsafe(src.length);
57+
for (let i = 0; i < src.length; i++) {
58+
const b = src[i];
59+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
60+
}
61+
out[j] = buf;
62+
}
63+
return out;
64+
};
65+
66+
// Use a synchronous wrapper since pipeToSync is fully sync.
67+
// We need FileHandle for pullSync/writer, so open async then run sync.
68+
(async () => {
69+
const srcFh = await open(srcFile, 'r');
70+
const dstFh = await open(dstFile, 'w');
71+
72+
// Warm up
73+
runSync(srcFh, dstFh, upper, compressFactory, pipeToSync);
74+
75+
// Reset file positions for the benchmark
76+
await srcFh.close();
77+
await dstFh.close();
78+
79+
bench.start();
80+
let totalBytes = 0;
81+
for (let i = 0; i < n; i++) {
82+
const src = await open(srcFile, 'r');
83+
const dst = await open(dstFile, 'w');
84+
totalBytes += runSync(src, dst, upper, compressFactory, pipeToSync);
85+
await src.close();
86+
await dst.close();
87+
}
88+
bench.end(totalBytes / (1024 * 1024));
89+
90+
cleanup();
91+
})();
92+
}
93+
94+
function runSync(srcFh, dstFh, upper, compressFactory, pipeToSync) {
95+
const w = dstFh.writer();
96+
pipeToSync(srcFh.pullSync(upper, compressFactory()), w);
97+
return w.endSync();
98+
}
99+
100+
function cleanup() {
101+
try { unlinkSync(srcFile); } catch { /* Ignore */ }
102+
try { unlinkSync(dstFile); } catch { /* Ignore */ }
103+
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Compare FileHandle.createReadStream() vs readableWebStream() vs pull()
2+
// reading a large file through two transforms: uppercase then compress.
3+
'use strict';
4+
5+
const common = require('../common.js');
6+
const fs = require('fs');
7+
const zlib = require('zlib');
8+
const { Transform, Writable, pipeline } = require('stream');
9+
10+
const tmpdir = require('../../test/common/tmpdir');
11+
tmpdir.refresh();
12+
const filename = tmpdir.resolve(`.removeme-benchmark-garbage-${process.pid}`);
13+
14+
const bench = common.createBenchmark(main, {
15+
api: ['classic', 'webstream', 'pull'],
16+
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
17+
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
18+
n: [5],
19+
}, {
20+
flags: ['--experimental-stream-iter'],
21+
// Classic and webstream only support gzip (native zlib / CompressionStream).
22+
// Brotli, deflate, zstd are pull-only via stream/iter transforms.
23+
combinationFilter({ api, compression }) {
24+
if (api === 'classic' && compression !== 'gzip') return false;
25+
if (api === 'webstream' && compression !== 'gzip') return false;
26+
return true;
27+
},
28+
});
29+
30+
function main({ api, compression, filesize, n }) {
31+
// Create the fixture file with repeating lowercase ASCII
32+
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
33+
const fd = fs.openSync(filename, 'w');
34+
let remaining = filesize;
35+
while (remaining > 0) {
36+
const toWrite = Math.min(remaining, chunk.length);
37+
fs.writeSync(fd, chunk, 0, toWrite);
38+
remaining -= toWrite;
39+
}
40+
fs.closeSync(fd);
41+
42+
if (api === 'classic') {
43+
benchClassic(n, filesize).then(() => cleanup());
44+
} else if (api === 'webstream') {
45+
benchWebStream(n, filesize).then(() => cleanup());
46+
} else {
47+
benchPull(n, filesize, compression).then(() => cleanup());
48+
}
49+
}
50+
51+
function cleanup() {
52+
try { fs.unlinkSync(filename); } catch { /* ignore */ }
53+
}
54+
55+
// Stateless uppercase transform (shared by all paths)
56+
function uppercaseChunk(chunk) {
57+
const buf = Buffer.allocUnsafe(chunk.length);
58+
for (let i = 0; i < chunk.length; i++) {
59+
const b = chunk[i];
60+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
61+
}
62+
return buf;
63+
}
64+
65+
// ---------------------------------------------------------------------------
66+
// Classic streams path: createReadStream -> Transform (upper) -> createGzip
67+
// ---------------------------------------------------------------------------
68+
async function benchClassic(n, filesize) {
69+
await runClassic();
70+
71+
bench.start();
72+
let totalBytes = 0;
73+
for (let i = 0; i < n; i++) {
74+
totalBytes += await runClassic();
75+
}
76+
bench.end(totalBytes / (1024 * 1024));
77+
}
78+
79+
function runClassic() {
80+
return new Promise((resolve, reject) => {
81+
const rs = fs.createReadStream(filename);
82+
83+
const upper = new Transform({
84+
transform(chunk, encoding, callback) {
85+
callback(null, uppercaseChunk(chunk));
86+
},
87+
});
88+
89+
const gz = zlib.createGzip();
90+
91+
let totalBytes = 0;
92+
const sink = new Writable({
93+
write(chunk, encoding, callback) {
94+
totalBytes += chunk.length;
95+
callback();
96+
},
97+
});
98+
99+
pipeline(rs, upper, gz, sink, (err) => {
100+
if (err) reject(err);
101+
else resolve(totalBytes);
102+
});
103+
});
104+
}
105+
106+
// ---------------------------------------------------------------------------
107+
// WebStream path: readableWebStream -> TransformStream (upper) -> CompressionStream
108+
// ---------------------------------------------------------------------------
109+
async function benchWebStream(n, filesize) {
110+
await runWebStream();
111+
112+
bench.start();
113+
let totalBytes = 0;
114+
for (let i = 0; i < n; i++) {
115+
totalBytes += await runWebStream();
116+
}
117+
bench.end(totalBytes / (1024 * 1024));
118+
}
119+
120+
async function runWebStream() {
121+
const fh = await fs.promises.open(filename, 'r');
122+
try {
123+
const rs = fh.readableWebStream();
124+
125+
const upper = new TransformStream({
126+
transform(chunk, controller) {
127+
const buf = new Uint8Array(chunk.length);
128+
for (let i = 0; i < chunk.length; i++) {
129+
const b = chunk[i];
130+
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
131+
}
132+
controller.enqueue(buf);
133+
},
134+
});
135+
136+
const compress = new CompressionStream('gzip');
137+
const output = rs.pipeThrough(upper).pipeThrough(compress);
138+
const reader = output.getReader();
139+
140+
let totalBytes = 0;
141+
while (true) {
142+
const { done, value } = await reader.read();
143+
if (done) break;
144+
totalBytes += value.byteLength;
145+
}
146+
return totalBytes;
147+
} finally {
148+
await fh.close();
149+
}
150+
}
151+
152+
// ---------------------------------------------------------------------------
153+
// Pull/iter path: pull() with uppercase transform + selected compression
154+
// ---------------------------------------------------------------------------
155+
async function benchPull(n, filesize, compression) {
156+
const iter = require('zlib/iter');
157+
158+
const compressFactory = {
159+
gzip: iter.compressGzip,
160+
deflate: iter.compressDeflate,
161+
brotli: iter.compressBrotli,
162+
zstd: iter.compressZstd,
163+
}[compression];
164+
165+
// Warm up
166+
await runPull(compressFactory);
167+
168+
bench.start();
169+
let totalBytes = 0;
170+
for (let i = 0; i < n; i++) {
171+
totalBytes += await runPull(compressFactory);
172+
}
173+
bench.end(totalBytes / (1024 * 1024));
174+
}
175+
176+
async function runPull(compressFactory) {
177+
const fh = await fs.promises.open(filename, 'r');
178+
try {
179+
// Stateless transform: uppercase each chunk in the batch
180+
const upper = (chunks) => {
181+
if (chunks === null) return null;
182+
const out = new Array(chunks.length);
183+
for (let j = 0; j < chunks.length; j++) {
184+
out[j] = uppercaseChunk(chunks[j]);
185+
}
186+
return out;
187+
};
188+
189+
const readable = fh.pull(upper, compressFactory());
190+
191+
let totalBytes = 0;
192+
for await (const chunks of readable) {
193+
for (let i = 0; i < chunks.length; i++) {
194+
totalBytes += chunks[i].byteLength;
195+
}
196+
}
197+
return totalBytes;
198+
} finally {
199+
await fh.close();
200+
}
201+
}

0 commit comments

Comments
 (0)