Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How could we use transducers with Node.js streams? #22

Open
jeffbski opened this issue Oct 24, 2014 · 14 comments
Open

How could we use transducers with Node.js streams? #22

jeffbski opened this issue Oct 24, 2014 · 14 comments

Comments

@jeffbski
Copy link

I love the concept of transducers and your implementation is excellent and performant.

I was trying to figure out how we might use transducers with Node.js streams, but I'm getting hung up on the fact that the iterator protocol doesn't really have the concept of data not being ready yet.

From my understanding, if we try to call iter.next() on an iterator that would be reading from a stream, that if there is no data then it will either end or pass undefined. So we could try to have a stream ready check to only call next once data is available, however if the transducer chain is filtering data, it could end up getting filtered out, and I assume transducer will immediately go back for another chunk which might not be available yet.

So it feels like there is something missing to throttle and allow this to work with async streams.

I know that other languages simply block when data is not available, but it would be nice to come up with a strategy that could work with ES5 as well until generators are commonly available.

Do you have any thoughts? I would really love to consolidate things using transducers for everything.

Thanks.

@kevinbeaty
Copy link
Contributor

Here is one option: transduce-stream. It should work with either transducers.js or transducers-js (I plan on testing and verifying this weekend, only tried it with transducers-js so far.)

The secret is to use transducers with push streams: instead of using reduce/transduce to pull values from an array/iterator through a transformer, use the transformer directly and call step as you have values.

@jeffbski
Copy link
Author

@kevinbeaty excellent. Thanks for mentioning this package, I will check it out.

@jlongster
Copy link
Owner

Right, as @kevinbeaty said, how to iterate data and how to build up a new data structures is not relevant to transducers. I include functionality in this library for working with native types for convenience, but for other data structures (like stream) you have to do the integration yourself or use a different library.

It's not too hard to use transducers manually, so it's pretty easy to integrate. @kevinbeaty's library looks nice, and it should work with my library.

@pjeby
Copy link

pjeby commented May 25, 2015

Unfortunately, the "secret" of using push streams doesn't work if you want to use, say, cat, on a stream of streams. cat uses reduce(), which expects to iterate and step by itself, instead of delegating the reduction to the container. If there were a @@transducer/reduce protocol that reduce() looked for before doing iteration, then asynchronous iteration of a source stream would work.

That would allow one to do the kind of .flatten() and .flatMap() operations on async streams that Bacon, Kefir, RxJS, Highland, etc. can do. It'd probably even let you do flatMapLatest() or flatMapWithConcurrencyLimit(), if you were clever enough with your intervening transforms.

(Basically, with that one addition to the protocol, async streams become first-class players in the transduction game. There's no backpressure capability per se, but you can stop receiving data by returning a reduced result from a downstream /step operation.)

Is there a place to propose an official addition to the spec? Is the spec even a thing? I saw something that was supposedly a link to the spec that turned out to be a lengthy GH issue discussion thread, but nothing that looked like an actual spec spec, if you know what I mean.

@dypsilon
Copy link

dypsilon commented Jul 2, 2015

@pjeby do you have any progress with the "reduce"-addition and stream integration? I would like to port my code from awkward object streams to transducers and have no idea yet if it's even possible.

@NodeGuy
Copy link

NodeGuy commented Aug 26, 2016

Alas, what an oversight.

@shaunc
Copy link

shaunc commented Apr 25, 2017

@pjeby -- Could you explain your idea a bit more? Who is supposed to "know" about the subobjects in your collection/stream? Where would "reduce()" look for the "@@transducer/reduce" protocol? If it looks to the cat transformer, then that doesn't help us much. Should it look to the upstream accumulator? (Or should cat itself, which has access to it?)

@pjeby
Copy link

pjeby commented Apr 26, 2017

Gosh, it's been a while, so I'm actually fuzzy now in my recollection of how transducers work, not to mention this discussion. But IIRC, shouldn't the concatenation of streams implement reduction by reduction on its concatenated streams? i.e., shouldn't it be solved by recursion on reduce()?

@shaunc
Copy link

shaunc commented Apr 27, 2017

That sounds good... but in so much as I understand it myself, the idea of transducers (at least in the concrete implementations in js) abstract "reduce" by resolving three parts: initialization of the "accumulator" stepping through input and accumulating, and collecting results.

(See, incidentally, #46 implemented by #48. Upstream and downstream might be opaque, but every transducer should have the opportunity to control "its own" accumulator).

A transducer needn't concern itself with the source of its objects, but it needs to "know" about the objects themselves, or be told somehow. This could be a property of the upstream "accumulator" as I was thinking earlier. More straightforward,, though it makes the pipeline "typed", would be to pass another transformer -- a transformer for the subobjects -- as an optional argument into cat. This would allow cat to run through these objects (possibly transforming them). Then we wouldn't need another special property.

Alternately, if we wanted each "cat" pipeline to be general, having a special property containing a sub-object-transformer might be the way to go?

Either way the sub-object transformer could be cat itself, perhaps, so we could flatten trees of streams if we really wanted too. :)

@pjeby
Copy link

pjeby commented Apr 27, 2017

Right. What I'm saying is that the problem is the part where reduction currently hardcodes reliance on the standard, synchronous iteration protocol. If instead there were a higher-level protocol for "reducing a sequence", then the default implementation would still fall back to synchronous iteration, but an asynchronous "collection" (e.g. a stream) would be able to provide an asynchronous reduction algorithm.

(In principle, one could hardcode an additional asynchronous iteration protocol, but making reduction itself a protocol gives freedom to experiment with different kinds of asynchronous streams and results.)

@mzpkjs
Copy link

mzpkjs commented Apr 21, 2020

@pjeby:
I am curious, did you managed to implement such protocol?
I have been lately working on such with no success whatsoever, so any insight would be helpful.

Way of transducing seems to be really coupled with both, a source you "loop" through and a target you reduce to. E.g. a transducer for Iterator -> Iterator and Array -> Iterator / Iterator -> Array need to be implemented differently in my case.

For now, I am trying to somehow mitigate this issue with some kind of double dispatching and picking the "right" transduce implementation based on both, a target and a source.

Anyone could share their experience on this topic?

@jaawerth
Copy link

@mzpkjs Async iteration was actually added to the ES Spec in 2018 along with async generators, and is already supported on a number of platforms, including modern browsers and Node >= 12.x! The tldr is that the .next() method on an async iterator returns a promise that resolves to { value, done }.

In those newer node releases, there's also Readable.from, which can accept an iterable OR an async iterable and returns a readable stream, and Readable.prototype[Symbol.asyncIterator] allows iterating a stream using a for..await loop in an async function, or anything else that works with the protocol.

It should be possible to transduce in terms of these when appropriate... definitely worth looking into!

@pjeby
Copy link

pjeby commented Apr 21, 2020

Nice to know about; it sounds like the new Readable partially obsoletes yieldable-streams, which was originally written for node 0.10 (not to be confused with node 10 😉).

@jaawerth
Copy link

Yep! It's the native JS interop glue that's been missing from streams. You can do things like:

const { Readable, PassThrough } = require('stream');

const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
async function* delayedRange(delay, start = 1, end = Infinity) {
  for (let i = start; i <= end; i++) {
    // just to show you can both await and yield from an async generator
    await sleep(delay);
    yield i;
  }
}

// create stream from iterable or async iterable
const src1 = Readable.from(delayedRange(1000, 1, 5));
// pipe to another stream
const src2 = new PassThrough({ objectMode: true });
src1.pipe(src2);

// iterate a readable stream from an async function with for..await
(async () => {
  for await (const chunk of src2) {
    console.log(chunk);
  }
})();

The beauty is that a transduce function that operates on async iterables could easily be plugged into a streams interface via either an adapter or native support, without worrying about platform-specific implementation details (e.g. node streams vs the new DOM Streams API

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants