Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the option to leave the last stream open to Stream.pipeline, similar to the end opton on pipe #34805

Closed
markddrake opened this issue Aug 17, 2020 · 9 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@markddrake
Copy link

markddrake commented Aug 17, 2020

I have a scenario where I want to be able to send multiple streams of data to an output stream using multple pipeline. statements. I can do this with pipe() by passsing end:false as an option to the pipe function. However if I try to convert this code to use pipeline, the output stream is automatically closed when the first operation completes, preventing me from using it as the target of a subsequent pipeline operation.

Is your feature request related to a problem? Please describe.
Please describe the problem you are trying to solve.

Describe the solution you'd like
Please describe the desired behavior.

Provide an option to pipeline similar to pipe

Describe alternatives you've considered
Please describe alternative solutions or features you have considered.

Reverting to Pipe (please not)
Monkey Patching pipe - See comment below
Subclassing the stream to override pipe() - Not feasible since my last is a writeStream() created by the factory method fs.createWriteStream()

@lpinca lpinca added the stream Issues and PRs related to the stream subsystem. label Aug 17, 2020
@markddrake
Copy link
Author

I tried to monkey patch my way out of this by replacing pipe.. However it appears that pipeline does the samething..

redefined pipe as

(os,options) => {
          console.log('pipe()');
          options = options || {}
          options.end = false;
          return outputStream.pipeImpl(os,options)
        }

However my log message never appeared and after the first pipeline completes console.log.pipe.toString() generates

function() {
  errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
}

@markddrake
Copy link
Author

Another related question. If I have two pipeline operations in progress ending with the same output stream, should the output stream close, as soon as the first operation finished, or should it remain open until all operations have finished. I was trying another work around, which involved setting up a pipeline that consisted of a reader -> writable, which would not finish, then waiting for a second pipeline to finish sending it's data to the same writable, then waiting for a third piipleine to finish sending it's data to the same writeable, then allowing the first reader to finish. However, after the second pipeline operation completed the writableEnded flag was set on the writable and subsequent operations failed to complete.

@markddrake
Copy link
Author

The only workaround I have so far to add a dummy transform into the pipleine before the writeable. This transform overrides pipe() and sets end:false before calling super.pipe(). This prevents the pipeline from finishing and consequently the writeable remains available.

When the dummy transform ends, it invokes unpipe() on itself and sets up the processes required for the next operation. It then uses a chained pipe() to the downstream writeable to process the next input. When there are no more inputs available it emits 'end' allowing the pipeline to terminate.

I will freely admit, that while this works it, scares the living daylights out of me. I am sure there are a million things that will fail since the I am rebuilding the pipeline manually in the middle of the piipeline operation...

@markddrake
Copy link
Author

markddrake commented Aug 22, 2020

Ok, I have a workaround that appears to solve the issue. Subclass passthrough, and override pipe passing the option end:false and then inserting that class into the pipeline will achieve the desired results. Of course the callback will not execute (or promise resolve) until you unpipe() the passthrough stream that ended without propagating it's end() event and pipe another stream that is allowed to propagate it's end event.

@mcollina
Copy link
Member

cc @nodejs/streams @ronag wdyt?

@sg-gs
Copy link

sg-gs commented Jan 11, 2022

Something new here? This could be a very useful feature

@ronag
Copy link
Member

ronag commented Jan 11, 2022

This would rather easy to add to the promisified pipeline api.

@ronag
Copy link
Member

ronag commented Jan 11, 2022

Ah, I already implemented and merged that.

@ronag ronag closed this as completed Jan 11, 2022
@NemoStein
Copy link

This feature is undocumented.

The docs mention the signal: AbortSignal property, but not the end: boolean property.

The pipeline API provides a promise version, which can also receive an options argument as the last parameter with a signal <AbortSignal> property.

It took me a while to discover this...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

6 participants