Skip to content

Commit 5f727fd

Browse files
authored
stream: fast-path stateless transform flush results
Avoid generator-based normalization for common flush result types in fused stateless stream/iter transforms. This keeps the corrected flush ordering while reducing overhead for already-normalized batches and primitive byte outputs. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63605 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 2d0c926 commit 5f727fd

1 file changed

Lines changed: 64 additions & 4 deletions

File tree

lib/internal/streams/iter/pull.js

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,31 @@ function* processTransformResultSync(result) {
280280
* @param {*} result
281281
*/
282282
function appendTransformResultSync(target, result) {
283+
if (result === null) {
284+
return;
285+
}
286+
if (isUint8ArrayBatch(result)) {
287+
if (result.length > 0) {
288+
ArrayPrototypePush(target, result);
289+
}
290+
return;
291+
}
292+
if (isUint8Array(result)) {
293+
ArrayPrototypePush(target, [result]);
294+
return;
295+
}
296+
if (typeof result === 'string') {
297+
ArrayPrototypePush(target, [toUint8Array(result)]);
298+
return;
299+
}
300+
if (isAnyArrayBuffer(result)) {
301+
ArrayPrototypePush(target, [new Uint8Array(result)]);
302+
return;
303+
}
304+
if (ArrayBufferIsView(result)) {
305+
ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]);
306+
return;
307+
}
283308
for (const batch of processTransformResultSync(result)) {
284309
ArrayPrototypePush(target, batch);
285310
}
@@ -372,9 +397,38 @@ async function* processTransformResultAsync(result) {
372397
* Append normalized transform result batches to an array (async).
373398
* @param {Array<Uint8Array[]>} target
374399
* @param {*} result
375-
* @returns {Promise<void>}
400+
* @returns {Promise<void>|undefined}
376401
*/
377-
async function appendTransformResultAsync(target, result) {
402+
function appendTransformResultAsync(target, result) {
403+
if (result === null) {
404+
return;
405+
}
406+
if (isUint8ArrayBatch(result)) {
407+
if (result.length > 0) {
408+
ArrayPrototypePush(target, result);
409+
}
410+
return;
411+
}
412+
if (isUint8Array(result)) {
413+
ArrayPrototypePush(target, [result]);
414+
return;
415+
}
416+
if (typeof result === 'string') {
417+
ArrayPrototypePush(target, [toUint8Array(result)]);
418+
return;
419+
}
420+
if (isAnyArrayBuffer(result)) {
421+
ArrayPrototypePush(target, [new Uint8Array(result)]);
422+
return;
423+
}
424+
if (ArrayBufferIsView(result)) {
425+
ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]);
426+
return;
427+
}
428+
return appendTransformResultAsyncSlow(target, result);
429+
}
430+
431+
async function appendTransformResultAsyncSlow(target, result) {
378432
for await (const batch of processTransformResultAsync(result)) {
379433
ArrayPrototypePush(target, batch);
380434
}
@@ -553,13 +607,19 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
553607
for (let i = 0; i < run.length; i++) {
554608
const next = [];
555609
for (let j = 0; j < pending.length; j++) {
556-
await appendTransformResultAsync(
610+
const pendingResult = appendTransformResultAsync(
557611
next,
558612
run[i](pending[j], { __proto__: null, signal }));
613+
if (pendingResult !== undefined) {
614+
await pendingResult;
615+
}
559616
}
560-
await appendTransformResultAsync(
617+
const flushResult = appendTransformResultAsync(
561618
next,
562619
run[i](null, { __proto__: null, signal }));
620+
if (flushResult !== undefined) {
621+
await flushResult;
622+
}
563623
pending = next;
564624
}
565625
for (let i = 0; i < pending.length; i++) {

0 commit comments

Comments
 (0)