Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamx.pipe() does not add a listener for data event. #16

Closed
coreyfarrell opened this issue May 13, 2020 · 15 comments
Closed

streamx.pipe() does not add a listener for data event. #16

coreyfarrell opened this issue May 13, 2020 · 15 comments

Comments

@coreyfarrell
Copy link

streamx.pipe(dest) does not attach a data listener to streamx as stream v3 does. My goal is to migrate to-through from though2 to streamx.Transform, to do this I need to be able to know when streamx is piped to or from.

Stream code sometimes watches for the newListener event with data or readable for the argument.

Examples:
https://github.com/mcollina/cloneable-readable/blob/f6bbe0a9da6561af84ec14ca1bc75c37c455f11c/index.js#L23-L38
https://github.com/gulpjs/to-through/blob/552d17efd3f9469166bc87832e77e86602850828/index.js#L34-L43

I'm not sure if it would be appropriate for ReadableState#pipe to call this.stream.emit('newListener', 'data') directly.

Related to #14

@mafintosh
Copy link
Owner

I think we need a new API for that. streamx explicitly does not use data listeners for pumping as it makes it much faster to avoid that internally.

If we added more events would that help? pipe-to pipe-from ?

@mafintosh
Copy link
Owner

To make my events more clear, I suggest we add:

  • src.emit('pipe-to', dst)
  • dst.emit('pipe-from', src)

In addition to the pipe event we already have (that we need to fix) for compat

@coreyfarrell
Copy link
Author

This pipe-to / pipe-from seem reasonable to me. Obviously other code would need to be updated to detect this new event in addition to catching the newListener data/readable events.

Has there been any communication about this module with node.js streams / readable-stream maintainers? I feel like their opinion on this question would be more valuable than my own.

@mafintosh
Copy link
Owner

I'm one of the maintainers of those.

The idea with streamx isn't to be 100% compat, but to break compat if it means we can massively simplify/perf boost. That's why pipe is different along with other tweaks.

We can probably add more explicit events to Node.js streams though. I'll make an issue for that on Node.

@coreyfarrell
Copy link
Author

Sounds great, I just wanted to make sure I gave the "my opinion isn't the best" qualifier to my statement that the two new events seem reasonable to me.

@mafintosh
Copy link
Owner

@coreyfarrell reading into your examples more, is this about knowing when the stream is being piped or when it is "flowing", ie. reading internally?

@coreyfarrell
Copy link
Author

We're trying to know when the argument given to toThrough should be flushed to the result stream. So I guess we need to know when it should flow?

Example 1:

if (pipeOut) {
  input1
    .pipe(toThrough(input2))
    .pipe(writeable)
} else {
  input1
    .pipe(toThrough(input2))
    .on('data', gotChunk)
}

Example 2:

if (pipeOut) {
  toThrough(input2)
    .pipe(writeable)
} else {
  toThrough(input2)
    .on('data', gotChunk)
}

In example 1 the writeable stream or gotChunk callback will receive all data input1 before input2. In this situation we detect the inbound pipe and allow that that to complete before flushing input2 into the result stream.

In example 2 there is nothing being piped in. When we detect a listener to data is added (via pipe or direct listener) we see no inbound pipe has occurred so we immediately flush input2 to the result stream.

@phated
Copy link

phated commented Oct 21, 2020

Hey all, what's the status on this?

@mafintosh
Copy link
Owner

mafintosh commented Oct 22, 2020

We added pipe/piping so you can do it like this now:

stream.on('newListener', function (name) {
  if (name === 'data') onflowing()
})
stream.once('piping', function () {
  onflowing()
})

function onflowing () {
  console.log('stream is flowing now...')
}

@mafintosh
Copy link
Owner

If the above is fine with y'all, we could potentially add a helper for that also

streamx.isFlowing(stream) // true, false
streamx.flowing(stream, cb)

@coreyfarrell
Copy link
Author

@mafintosh thanks for adding the new events. Sorry I haven't been about to give feedback / testing yet, life has pulled me in other directions for now.

@phated TBH it's going to early next year before I'm able to put any serious effort into any open source code. In addition to current work commitments I am in the process of a long distance move. I have not forgotten about gulp and plan to work towards getting things migrated to streamx but I want to give reasonable expectation based on my schedule.

@coreyfarrell
Copy link
Author

@mafintosh I can't speak for others but I don't see myself using a streamx.isFlowing(stream) helper. I would only event want to watch for event(s) telling me when it happens.

If streamx.flowing(stream, cb) would work for node.js native or streamx as the stream argument I could see that being useful but I think it would be more than a single function. It would need to support the equivalent to emitter.on, emitter.off and emitter.once. I'm wondering if this is something that would go in a helper module rather than in the streamx core?

@mafintosh
Copy link
Owner

Ya helper sounds like a good idea I agree

@phated
Copy link

phated commented Aug 29, 2022

I believe I want a streamx.isFlowing(stream) API for our lead project, as I have to do this to ensure I don't try to pipe twice:

if (stream._readableState.pipeTo) {
      return;
}

@mafintosh
Copy link
Owner

we fixed this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants