Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline deadlock with readable tail? #40685

Open
ronag opened this issue Nov 1, 2021 · 1 comment
Open

pipeline deadlock with readable tail? #40685

ronag opened this issue Nov 1, 2021 · 1 comment
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@ronag
Copy link
Member

ronag commented Nov 1, 2021

#40653 (comment)

@lpinca feel free to edit issue and fill out with your concern. Otherwise I will dig into this at some later point.


The pipeline() callback might never be called if the destination is a Transform stream that is not read. Here is an example.

const stream = require('stream');

const chunk = Buffer.alloc(1024);
let bytesRead = 0;

const readable = new stream.Readable({
  read() {
    if (bytesRead === 100 * 1024) {
      readable.push(null);
    } else {
      bytesRead += chunk.length;
      readable.push(chunk);
    }
  }
});

const passThrough = new stream.PassThrough();

stream.pipeline(readable, passThrough, function (err) {
  if (err) {
    throw err;
  }

  console.log('done');
});

In the callback version this is not a big issue because pipeline() returns the last stream, so the user could resume the returned stream.

pipeline(readable, passThrough, fn).resume();

In the promisified variant, pipeline() is just a wrapper that returns a promise that is fulfilled when the callback of the callback variant is called.

However the user could not be able to resume the returned stream because they might not have direct control on it. For example if the last entry is a generator.

const stream = require('stream');
const streamPromises = require('stream/promises');

const chunk = Buffer.alloc(1024);
let bytesRead = 0;

const readable = new stream.Readable({
  read() {
    if (bytesRead === 100 * 1024) {
      readable.push(null);
    } else {
      bytesRead += chunk.length;
      readable.push(chunk);
    }
  }
});

async function* passThrough(source) {
  for await (const chunk of source) {
    yield chunk;
  }
}

async function run() {
  await streamPromises.pipeline(readable, passThrough);
}

run()
  .then(function () {
    console.log('done');
  })
  .catch(console.error);

In this case stream.pipeline() returns an internal PassThrough proxy that the user cannot resume so the promise returned by streamPromises.pipeline() might never be fulfilled.

A possible workaround is to resume the returned stream in the promisified variant.

diff --git a/lib/stream/promises.js b/lib/stream/promises.js
index 0db01a8b20..5bbadd43c5 100644
--- a/lib/stream/promises.js
+++ b/lib/stream/promises.js
@@ -23,13 +23,17 @@ function pipeline(...streams) {
       signal = options.signal;
     }
 
-    pl(streams, (err, value) => {
+    const stream = pl(streams, (err, value) => {
       if (err) {
         reject(err);
       } else {
         resolve(value);
       }
     }, { signal });
+
+    if (stream.readable) {
+      stream.resume();
+    }
   });
 }
@ronag ronag added the stream Issues and PRs related to the stream subsystem. label Nov 1, 2021
@romayalon
Copy link

I ran into the same issue...
any news regarding a fix?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

2 participants