Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream.Transform changing order of items #46765

Closed
zuozp8 opened this issue Feb 22, 2023 · 14 comments · Fixed by #46818
Closed

stream.Transform changing order of items #46765

zuozp8 opened this issue Feb 22, 2023 · 14 comments · Fixed by #46818
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.

Comments

@zuozp8
Copy link

zuozp8 commented Feb 22, 2023

Version

v18.14.1

Platform

Linux 703748a51615 5.18.8-051808-generic #202206290850 SMP PREEMPT_DYNAMIC Wed Jun 29 08:59:08 UTC 2022 x86_64 Linux

Subsystem

stream

What steps will reproduce the bug?

const stream = require("node:stream");
const consumers = require("node:stream/consumers");
const createInnerTransfrom = () => new stream.Transform({
    objectMode: true,
    construct(callback) {
        this.push('header from constructor\n');
        callback();
    },
    transform: (row, encoding, callback) => {
        callback(null, JSON.stringify(row) + '\n');
    },
});
const createOuterTransfrom = () => {
    let innerTranfrorm;
    return new stream.Transform({
        objectMode: true,
        transform(row, encoding, callback) {
            if (!innerTranfrorm) {
                innerTranfrorm = createInnerTransfrom();
                innerTranfrorm.on('data', (data) => this.push(data));
                callback();
            }
            else if (innerTranfrorm.write(row)) {
                process.nextTick(callback);
            }
            else {
                innerTranfrorm.once('drain', callback);
            }
        },
    });
};
consumers.text(stream.Readable.from([
    'create InnerTransform',
    'firstLine',
    'secondLine',
]).pipe(createOuterTransfrom())).then((text) => console.log('output:\n', text));

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

output:
 header from constructor
"firstLine"
"secondLine"

What do you see instead?

output:
 header from constructor
"secondLine"
"firstLine"

Additional information

I expect Transform to always process incoming items in order, even with reckless code like in the example.

@zuozp8 zuozp8 changed the title Transform stream changing order of items stream.Transform changing order of items Feb 22, 2023
@VoltrexKeyva VoltrexKeyva added the stream Issues and PRs related to the stream subsystem. label Feb 23, 2023
@rluvaton
Copy link
Member

You accidentally wrote the expected in the current behavior

@zuozp8
Copy link
Author

zuozp8 commented Feb 24, 2023

You accidentally wrote the expected in the current behavior

I corrected it, thank you

@ronag
Copy link
Member

ronag commented Feb 24, 2023

Do you think you can make an even simpler example?

@rluvaton
Copy link
Member

What I saw is when you remove the construct it works well

@ronag
Copy link
Member

ronag commented Feb 24, 2023

That's a good hint.

@zuozp8
Copy link
Author

zuozp8 commented Feb 24, 2023

sometimes it starts/stops occuring when you change some process.nextTick(callback) into callack() or the other way around, so it's some kind of race-condition

I wasn't able to replicate the problem without nested Transforms

@rluvaton
Copy link
Member

When you wrap the:

else if (innerTranfrorm.write(row)) {
                process.nextTick(callback);
            } else

like this:

} else if (innerTransform.write('outer | ' + row)) {
          process.nextTick(() => {
            process.nextTick(callback);
          });
        } else {

it solves the problem

@rluvaton
Copy link
Member

rluvaton commented Feb 24, 2023

Found another solution and updated my PR

@mcollina
Copy link
Member

This is indeed a bug. Here is a simplified version:

const stream = require("node:stream");
const s = new stream.Transform({
  objectMode: true,
  construct(callback) {
    this.push('header from constructor\n');
    callback();
  },
  transform: (row, encoding, callback) => {
    callback(null, JSON.stringify(row) + '\n');
  },
});

s.pipe(process.stdout);
s.write('firstLine');
process.nextTick(() => s.write('secondLine'));

This fixes the example:

const stream = require("node:stream");
const s = new stream.Transform({
  objectMode: true,
  construct(callback) {
    this.push('header from constructor\n');
    process.nextTick(callback);
  },
  transform: (row, encoding, callback) => {
    callback(null, JSON.stringify(row) + '\n');
  },
});

s.pipe(process.stdout);
s.write('firstLine');
process.nextTick(() => s.write('secondLine'));

So I think we are missing a nextTick in the constructor logic.

@mcollina
Copy link
Member

@mcollina mcollina added confirmed-bug Issues with confirmed bugs. good first issue Issues that are suitable for first-time contributors. labels Feb 24, 2023
@rluvaton
Copy link
Member

I think 1f75a95/lib/internal/streams/readable.js#LL222C7-L222C7 should be delayed with a nextTick.

delaying it with nextTick did not worked...

@mcollina
Copy link
Member

Given the following script:

const stream = require("node:stream");
const consumers = require("node:stream/consumers");
const createInnerTransfrom = () => new stream.Transform({
    objectMode: true,
    construct(callback) {
        this.push('header from constructor\n');
        process.nextTick(callback);
    },
    transform: (row, encoding, callback) => {
        callback(null, JSON.stringify(row) + '\n');
    },
});
const createOuterTransfrom = () => {
    let innerTranfrorm;
    return new stream.Transform({
        objectMode: true,
        transform(row, encoding, callback) {
            if (!innerTranfrorm) {
                innerTranfrorm = createInnerTransfrom();
                innerTranfrorm.on('data', (data) => this.push(data));
                callback();
            }
            else if (innerTranfrorm.write(row)) {
                process.nextTick(callback);
            }
            else {
                innerTranfrorm.once('drain', callback);
            }
        },
    });
};
consumers.text(stream.Readable.from([
    'create InnerTransform',
    'firstLine',
    'secondLine',
]).pipe(createOuterTransfrom())).then((text) => console.log('output:\n', text));

It will error:

node:internal/process/promises:288
            triggerUncaughtException(err, true /* fromPromise */);
            ^

Error [ERR_STREAM_PUSH_AFTER_EOF]: stream.push() after EOF
    at new NodeError (node:internal/errors:399:5)
    at readableAddChunk (node:internal/streams/readable:285:30)
    at Readable.push (node:internal/streams/readable:234:10)
    at Transform.<anonymous> (/Users/matteo/tmp/aaa.js:20:58)
    at Transform.emit (node:events:513:28)
    at addChunk (node:internal/streams/readable:324:12)
    at readableAddChunk (node:internal/streams/readable:297:9)
    at Readable.push (node:internal/streams/readable:234:10)
    at node:internal/streams/transform:182:12
    at Transform.transform [as _transform] (/Users/matteo/tmp/aaa.js:10:9) {
  code: 'ERR_STREAM_PUSH_AFTER_EOF'
}

This is correct, as you are pushing things after the stream ended.

@rluvaton
Copy link
Member

I tested with your simplified version

mcollina added a commit to mcollina/node that referenced this issue Feb 24, 2023
Signed-off-by: Matteo Collina <hello@matteocollina.com>
Fixes: nodejs#46765
@mcollina
Copy link
Member

Here is the fix: #46818

mcollina added a commit to mcollina/node that referenced this issue Feb 24, 2023
Signed-off-by: Matteo Collina <hello@matteocollina.com>
Fixes: nodejs#46765
@mcollina mcollina removed the good first issue Issues that are suitable for first-time contributors. label Feb 24, 2023
nodejs-github-bot pushed a commit that referenced this issue Feb 26, 2023
Signed-off-by: Matteo Collina <hello@matteocollina.com>
Fixes: #46765
PR-URL: #46818
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
targos pushed a commit that referenced this issue Mar 13, 2023
Signed-off-by: Matteo Collina <hello@matteocollina.com>
Fixes: #46765
PR-URL: #46818
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
targos pushed a commit that referenced this issue Mar 14, 2023
Signed-off-by: Matteo Collina <hello@matteocollina.com>
Fixes: #46765
PR-URL: #46818
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
danielleadams pushed a commit that referenced this issue Apr 11, 2023
Signed-off-by: Matteo Collina <hello@matteocollina.com>
Fixes: #46765
PR-URL: #46818
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.
Projects
None yet
5 participants