Skip to content
This repository has been archived by the owner on Dec 23, 2018. It is now read-only.

end on stream should be passed to busses #139

Closed
crcn opened this issue Apr 23, 2015 · 0 comments
Closed

end on stream should be passed to busses #139

crcn opened this issue Apr 23, 2015 · 0 comments

Comments

@crcn
Copy link
Owner

crcn commented Apr 23, 2015

var Writable    = require("obj-stream").Writable;
var _async      = require("./_async");
var _eachSeries = require("./_eachSeries");
var _group      = require("./_group");
var through     = require("obj-stream").through;

/**
 */

module.exports = function(iterator) {
  return _group(function(operation, busses) {
    return _async(function(stream) {

      var found;
      var i = 0;


      // _pipe(bus(operation), stream )

      iterator(busses, function(bus, next) {
        var index = ++i;
        var bs;

        // TODO - duplex this

        (bs = bus(operation)).pipe(through(function(data, next) {
          if (!found || found === index) {
            found = index;
            this.push(data);
          }
          next();
        })).once("end", function() {
          if (found) {
            stream.end();
          } else {
            next();
          }
        }).pipe(stream, { end: false });


        if (bs.writable) {
          stream.once("end", bs.end.bind(bs));
        }
      }, function() {
        stream.end();
      });
    });
  });
};
@crcn crcn added the bug label Apr 23, 2015
@crcn crcn added the completed label Jun 5, 2015
@crcn crcn closed this as completed Jun 5, 2015
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant