Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stream] support piping multiple streams into a single stream #93

Closed
missinglink opened this issue Dec 6, 2014 · 22 comments
Closed

[stream] support piping multiple streams into a single stream #93

missinglink opened this issue Dec 6, 2014 · 22 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@missinglink
Copy link
Contributor

In the following example sink should wait for all piped streams to unpipe() before calling _flush.

As it currently stands the following code outputs:

$ node test1.js 
a
done

If you remove the // applyFix(); comment you get:

$ node test1.js 
a
b
done
var through = require('through2');

var tap1 = through.obj();
var tap2 = through.obj();
var sink = through.obj(transform, flush);

var pipes = 0;
// applyFix();

tap1.pipe(sink);
tap2.pipe(sink);

tap1.write('a');
tap1.end();

setTimeout(function(){
  tap2.write('b');
  tap2.end();
}, 100);

function applyFix(){
  sink.on( 'pipe', function(){
    pipes++;
  });
  sink.end = function(){
    if( !--pipes ){
      sink._flush();
    }
  };
}

function flush(){
  console.log( 'done' );
}

function transform( item, e, next ){
  console.log( item );
  this.push(item);
  next();
}

ref: #89

@chrisdickinson
Copy link
Contributor

Ah, interesting -- it seems like the pipe mechanism isn't designed to handle multiple sources piping into a single sink (it assumes it the source is the only stream writing into the destination). This should definitely be made explicit in the docs, at the very least.

I'm curious: what is your use case for piping multiple streams into a single stream?

Also, do you mind editing the title to "[stream] support piping multiple streams into a single stream"?

@missinglink missinglink changed the title [streams] sink.end should wait for all taps to finish [stream] support piping multiple streams into a single stream Dec 6, 2014
@missinglink
Copy link
Contributor Author

For me this is a pretty common case when forking the output from a parser to separate pipelines (one per 'type' of record) and then saving to a database using a db client singelton sink.

@phpnode
Copy link

phpnode commented Dec 6, 2014

Another use case - piping data from multiple clients into one stream and then piping the stream back into each client, e.g. for chat rooms.

@vkurchatkin
Copy link
Contributor

this only works for object mode streams, and only if the order of objects is not significant

@benjamingr
Copy link
Member

+1 for making this explicit in the docs. Are we sure this change won't break code existing code though? Also what vkurchatkin said.

@indutny
Copy link
Member

indutny commented Dec 8, 2014

Summoning @caineio

@sonewman
Copy link
Contributor

sonewman commented Dec 9, 2014

This is interesting because it is going to be much more prevelelant in a situation when you have a stream hanging around and potentially never being ended.

Streams are used best when the are created, handle a specific flow of data and the disposed of. But I can see a use case for persisted streams and multiple multiple sources of input for aggregation in addition to multiple outputs (which does already work) should this discussion be moved to https://github.com/isaacs/readable-stream?

The complexity to this issue is great however particular if one readable-stream is pushing data and the other is expected to be pulled via '.read()'

@chrisdickinson
Copy link
Contributor

should this discussion be moved to https://github.com/isaacs/readable-stream?

Nope! Here is fine.

This is interesting because it is going to be much more prevelelant in a situation when you have a stream hanging around and potentially never being ended.

It's definitely worth looking into what happens to tap2 when sink has ended due to tap1 -- does it unpipe? Does it end? Does it remain open?

@sonewman
Copy link
Contributor

sonewman commented Dec 9, 2014

hmm, interesting... because when a readable stream ends (via .push(null)) although it calls .end on the writable it might be piping to, the writable definitely does not emit anything back up the chain (by default) to anything else that might be piping to it. So if the second stream were to start pushing data through again, you would get a write after end error.

@sonewman
Copy link
Contributor

sonewman commented Dec 9, 2014

Another interesting edge case is that we have the functionality to have a readable stream, which can be ended but will not call .end() on the writables it is piping too, but we can't do it the other way round.

For instance if we want to say 'this readable is staying open, it will continue to get data (and buffer it up) but everything, which is consuming that readable stream right now, must stop'. We can apply back pressure but that isn't really the same, we would have to manually know when that point was/is and unpipe individually.

@greelgorke
Copy link

imho the core streams should remain as basic as possible. for merging several streams into one there are already a few modules on npm, and merging might have different strategies, like unordered merge, merge with a function or merge in pipe-order etc.

@sonewman
Copy link
Contributor

The main problem is that streams have be created for Node's particular use cases, and each of them have different constraints. E.g. net.Socket, process.sdin, http.IncomingMessage, etc. The ideology has been created to fit specific needs, based on the complexity of other internal components.

I think in general, @trevnorris idea of building low level apis, wrapping the c/c++ libs (libuv, v8, http-parser) means that we can start building abstractions on a simpler base now we know what iojs is supposed to do.

The previous development has always seemed very organic. But now we have lessons learnt from where we are now, better abstractions can be built on a simpler base and inevitably make the high level api's simpler as a result.

It is always tricky to re-write the underlying foundations of some software without compromising backwards compatibility. But i believe we are in a better place than ever now, with the most talented individuals to accomplish the task!

@zensh
Copy link

zensh commented Dec 13, 2014

@missinglink https://github.com/teambition/merge2 merge multiple streams into a single stream in sequence or parallel.

I use it in gulp

@ken-okabe
Copy link

In my humble opinion, stream of Node/io.js has fundamental problems, and I have never liked stream, and the fundamental problem of stream is so obvious to me.

I understand that all streams are instances of EventEmitter.

As you know this implementation is very hard to make everything work well.
This is simply because stream is data on time-line, and so far Node guys have not solve this time-line problem well.

In order to solve this time-line problem fundamentally, you must employ Functional Reactive Programming (FRP) paradigm.

What is (functional) reactive programming?

I do resonate with Laurence G's simple description that FRP is about "datatypes that represent a value 'over time' ". Conventional imperative programming captures these dynamic values only indirectly, through state and mutations. The complete history (past, present, future) has no first class representation. Moreover, only discretely evolving values can be (indirectly) captured, since the imperative paradigm is temporally discrete. In contrast, FRP captures these evolving values directly and has no difficulty with continuously evolving values.

FRP is also unusual in that it is concurrent without running afoul of the theoretical & pragmatic rats' nest that plagues imperative concurrency. Semantically, FRP's concurrency is fine-grained, determinate, and continuous. (I'm talking about meaning, not implementation. An implementation may or may not involve concurrency or parallelism.) Semantic determinacy is very important for reasoning, both rigorous and informal. While concurrency adds enormous complexity to imperative programming (due to nondeterministic interleaving), it is effortless in FRP.

So, what is FRP? You could have invented it yourself. Start with these ideas:

  • Dynamic/evolving values (i.e., values "over time") are first class values in themselves. You can define them and combine them, pass them into & out of functions. I called these things "behaviors".
  • Behaviors are built up out of a few primitives, like constant (static) behaviors and time (like a clock), and then with sequential and parallel combination. n behaviors are combined by applying an n-ary function (on static values), "point-wise", i.e., continuously over time.
  • To account for discrete phenomena, have another type (family) of "events", each of which has a stream (finite or infinite) of occurrences. Each occurrence has an associated time and value.

and so on. Read on the link above.

Do not push or pull data, instead, leave them alone, and make the entire Evented IO data as immutable stream data on time-line and make the stream data to be used by functions.

Node/io.js is for JavaScript, and JavaScript is a functional language.
So why not make io.js stream as FRP stream?
Currently, not.


piping multiple streams into a single stream?

Easy. FRP stream can be operated in any way by functional programming.


Node/io.js is a back-end technology, and currently, the fundamental data structure, which is stream, is not FRP base.

On the other hand, as a front-end technology, currently, facebook commits FRP base projects, extremely actively.
GitHub decided to employ facebook-react.

React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
http://facebook.github.io/react/
https://github.com/facebook/react

IMMUTABLE
Immutable persistent data collections for Javascript which increase efficiency and simplicity.
http://facebook.github.io/immutable-js/
https://github.com/facebook/immutable-js

As I said, Node/io.js stream is not declarative or immutable, and that is the fundamental problem.

Also, I already have some conceptual FRP code by myself. It's very powerful with a very simple idea.
If you are interested in, let me know.

Regards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests