Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider implementing a duplex mode #143

Closed
wtgtybhertgeghgtwtg opened this issue Jul 6, 2018 · 13 comments · Fixed by #912
Closed

Consider implementing a duplex mode #143

wtgtybhertgeghgtwtg opened this issue Jul 6, 2018 · 13 comments · Fixed by #912
Labels
enhancement 🎁 Rewarded on Issuehunt This issue has been rewarded on Issuehunt help wanted

Comments

@wtgtybhertgeghgtwtg
Copy link

wtgtybhertgeghgtwtg commented Jul 6, 2018

Issuehunt badges

Basically, a way to make a predictable duplex stream from stdin and stdout (or stderr), so you can use a child process as a transform stream. It'd emit an error if the process errors, and it'd kill the process if the stream is destroyed.

const execa = require('execa');

myStream.pipe(execa.duplex('myFile', ['my', 'arguments'])).pipe(myOtherStream);

It's not terribly difficult to do something like this as execa is now

const execa = require('execa');

execa.stdout('myFile', ['my', 'arguments'], {input: myStream}).pipe(myOtherStream);

but this can behave in unexpected ways.

// ENOENT gets swallowed, stream completes without error.
execa.stdout('myFiel', ['my', 'arguments'], {input: myStream});

const processStream = execa.stdout('myFile', ['my', 'arguments'], {input: myStream});
// `stdout` is destroyed, but the process doesn't die.
processStream.destroy();

IssueHunt Summary

ehmicky ehmicky has been rewarded.

Backers (Total: $70.00)

Submitted pull Requests


Tips

@sindresorhus sindresorhus changed the title Consider implementing a duplex mode. Consider implementing a duplex mode Jul 6, 2018
@sindresorhus
Copy link
Owner

I like it. I could see myself using this in some places.

I would name it duplexStream() instead of duplex(). I think that's clearer.

@IssueHuntBot
Copy link

@IssueHunt has funded $70.00 to this issue.


@dflupu
Copy link

dflupu commented Mar 25, 2019

I was looking into working on this, but I ran into a small issue.

If this syntax were to work
myStream.pipe(execa.duplex('myFile', ['my', 'arguments'])).pipe(myOtherStream);

we'd have to return a duplex stream:

	new stream.Duplex({
		write(...writeArgs) {
			return spawned.stdin.write(...writeArgs);
		},
		read(...readArgs) {
			return spawned.stdout.read(...readArgs);
		}
	});

but we can't close this stream once we're done writing the stdin data. How do we work around that?

@sindresorhus
Copy link
Owner

Maybe use a stream.Passthrough. I haven't really looked into this issue closely though.

@issuehunt-oss issuehunt-oss bot added the 💵 Funded on Issuehunt This issue has been funded on Issuehunt label May 10, 2019
@sloonz
Copy link

sloonz commented May 15, 2020

Trying to work on it too. Some questions :

  • Should a non-zero exit code emit an error event of the duplex ?
  • If the user call duplex.destroy(), should the process be killed, or do we let it run and possibly fail due to broken pipe ?

@sloonz
Copy link

sloonz commented May 15, 2020

Rough, mostly untested first try :

sloonz@9708d7c

@sloonz sloonz mentioned this issue May 15, 2020
@ehmicky
Copy link
Collaborator

ehmicky commented Dec 14, 2023

I am wondering whether the new pipe methods might already be fixing the underlying problem of this issue?

// stdout -> stdin
await execa(...).pipeStdout(execa(...))

// stdout -> stream
await execa(...).pipeStdout(otherStream)

// stdout -> file
await execa(...).pipeStdout('path');

// stderr -> stdin/stream/file
await execa(...).pipeStderr(...);

// stdout + stderr -> stdin/stream/file
await execa(...).pipeAll(...);

// stream -> stdin
await execa(..., {stdin: stream})

// file -> stdin
await execa(..., {inputFile: 'path'})

// string -> stdin
await execa(..., {input: 'value'})

So it is actually quite easy now to pipe multiple processes:

const { stdout: topTenFilenames } = await execa('ls', ['exampleDir'])
  .pipeStdout(execa('sort'))
  .pipeStdout(execa('head', ['-n', '10']))

@sindresorhus
Copy link
Owner

If anyone wants to work on this, see the feedback in: #424

@ehmicky
Copy link
Collaborator

ehmicky commented Mar 17, 2024

It's taken me quite some time, but I finally got this working. I have a PR at #912 which implements this, together with .readable() and .writable() methods.

Motivation

The purpose is to use a subprocess as a stream, to pass it to APIs which accept streams. This contrasts with:

  • The std*: stream options, which do the opposite, i.e. passing a stream to a subprocess
  • The .pipe() method, which only focuses on subprocess piping, and does it well

Readable vs Writable vs Duplex

Users might want to only use the readable/writable side of the subprocess, but still convert it to a stream. Using subprocess.stdin or subprocess.stdout works in some cases, but not if the user wants to:

  • Wait for the subprocess to complete
  • Propagate any error from the subprocess to the stream, and the other way around

If a user wants a read-only stream, it is improper to return a Duplex with an already ended writable side. Same for write-only streams. That's because the Node.js stream API has some subtle logic meant for Duplex only. For example, finished(duplex) (unlike finished(readable) and finished(writable)) does not necessarily wait for the close event and might be missing errors thrown during stream._destroy(). That seems like a minor thing, but because finished() is used everywhere by the Node.js stream API, this leads to many subtle bugs when using a Duplex as if it were a Readable or Writable.

I actually started implementing this as a single .duplex() method, but it became quite clear the proper way to do this while respecting the Node.js streams API's inner workings was to expose 3 different methods: readable, writable, duplex.

const streamOne = subprocess.readable()
const streamTwo = subprocess.writable()
const streamThree = subprocess.duplex()

This also means implementing this as a PassThrough does not work, since those are duplexes. That was my first implementation attempt, and I had to backtrack from that approach.

This also means Duplex.from() cannot be used since it returns duplexes. I also made an implementation attempt with this, and it just did not work well.

File descriptors

Users should be able to choose the file descriptor. My PR does it by re-using the from and to option from the .pipe() method. It works for any file descriptor, not only stdin/stdout/stderr. all can be specified, to get from both stdout and stderr. For example:

const stream = subprocess.readable({from: 'stderr'})

Method vs property

Exposing this as a property (as I previously suggested in #591) does not work because:

  • This does not allow choosing the file descriptor (as described above)
  • If the user does not use that property, the stream will still pull data and propagate errors, which might break things for existing users, and result in a waste of resources

So it needs to be called on-demand via a method.

Error propagation

The stream awaits the subprocess completion. If the subprocess fails, an error event is emitted on the stream. This means users do not need to await subprocess anymore as long as they await the stream (using methods like text(), finished(), pipeline(), etc.).

In the other direction, the subprocess does not wait for the stream: there is no need to. However, if the subprocess takes some input, its stdin needs to be ended either through the stream or through subprocess.stdin.

If the stream errors and/or is destroyed, it destroys subprocess.stdin or subprocess.stdout|stderr. It does not terminate the subprocess via a signal. Doing so would be improper as it prevents graceful exit. Just like we do with the .pipe() method, we follow the way shells behave in that matter, which is to just close the subprocess' stdin or stdout or let the subprocess end on its own. If the subprocess uses stdin, ending stdin will make the subprocess end gracefully. If the subprocess writes to stdout, ending stdout will create EPIPE errors, making the subprocess end gracefully.

For duplexes, we purposely do not propagate errors between the readable and the writable side. There is no reason to, and it creates race conditions and subtle bugs.

Multiple consumers

Using subprocess.readable() should not make the Execa's return value's result.stdout or result.stderr become empty. This was initially the case due to get-stream not supporting multiple consumers, so this required some major changes to that library.

Also, the PR allows for calling subprocess.readable(), subprocess.writable() and subprocess.duplex() more than once, on either the same file descriptor or different ones.

Hard parts

Solving this issue was very difficult. I tried many approaches and was close to giving up a few times, especially with tests that failed in CI but not locally due to hard-to-debug race conditions. I am pretty confident with the current PR though as it has lots of automated tests covering edge cases.

Some of the things that made this hard to implement:

  • Cannot re-use Duplex.from(), PassThrough, piping. In other words, custom Readable/Writable/Duplex classes must be defined with their own implementation of _write/_read/_final/_destroy.
  • Needs to work in object mode, since users can now make subprocess.std* that are in object mode when using transforms
  • Needs to respect stream highWaterMark and buffering behavior
  • .readable() needs not to buffer too much and automatically pause/resume subprocess.stdout when its buffer gets full
  • Needs to work with the autoDestroy option, which tends to propagate state when not wanted
  • subprocess.stdin, subprocess.stdout and subprocess.stderr are weird. By default, those 3 are actually duplex TCP sockets. subprocess.stdin is writable and not readable, but subprocess.stdout/subprocess.stderr are both writable and readable.

Web streams

My PR is implementing this with Node.js streams. All of this work should be done for web streams too. I have opened #913 to track this.

I believe we should let users choose whether they want a Node.js stream or a web stream. While Node.js streams are legacy, most Node modules still only accept them. Also, from having worked with both a lot, I am expecting the implementing with web streams to be more complicated, slower and less stable. Finally, there a few things that only Node.js streams can do, such as having multiple readers at once, where there might be the only option in specific cases.

That being said, we definitely should implement this with for web streams too.

Method names

Because of the above, my PR uses the following names, which are the same names as the stream class itself and the TypeScript type:

  • .readable(), .writable(), .duplex(): Node.js streams
  • .readableStream(), .writableStream(), .transformTransform(): web streams

@sindresorhus
Copy link
Owner

This is impressive research and work. I'm pretty sure I would have given up long before you. 👏

@sindresorhus
Copy link
Owner

I wonder if there is anything Node.js could improve that would have made this work easier. If so, definitely open some Node.js issues. For example, a version of Duplex.from() that can be reused.

@ehmicky
Copy link
Collaborator

ehmicky commented Mar 17, 2024

One of the issues with Duplex.from() is that it is not designed to create read-only or write-only streams. It is possible to do it, but it is still a Duplex under-the-hood, which creates a few issues. But I think that's by design.

Also it propagates state changes between the write and read side. In most cases, this is good, but this was such a problem with this specific issue.

The problem with streams is that they combine (by their own nature) multiple things that lead to complexity: performance-sensitive, memory-sensitive, statefulness, async, extendable by users. Then this issue combines it with OS processes, which are complex too. :)

Copy link

issuehunt-oss bot commented Mar 18, 2024

@sindresorhus has rewarded $63.00 to @ehmicky. See it on IssueHunt

  • 💰 Total deposit: $70.00
  • 🎉 Repository reward(0%): $0.00
  • 🔧 Service fee(10%): $7.00

@issuehunt-oss issuehunt-oss bot added 🎁 Rewarded on Issuehunt This issue has been rewarded on Issuehunt and removed 💵 Funded on Issuehunt This issue has been funded on Issuehunt labels Mar 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement 🎁 Rewarded on Issuehunt This issue has been rewarded on Issuehunt help wanted
Projects
None yet
6 participants