New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull-based streaming based on Task / Coeval #280

merged 100 commits into from Aug 4, 2017


None yet
5 participants

alexandru commented Dec 21, 2016

We are introducing a new Iterant[F[_], A] type in a new sub-project called monix-tail, that's going to be a pull-based alternative to Observable, with a customizable F[_] evaluation context and thus powered by Task, Coeval or other monadic types.

For abstracting over F[_] we rely on type classes defined in Typelevel's cats-core along with cats-effect.


Rx.NET has IAsyncEnumerable<T> and IAsyncEnumerator<T>. The Enumerable (Iterable) type is considered the dual of the Observable implementation and IAsyncEnumerator is basically an Enumerator that returns a Future (or Task) when calling next().

We need a similar pull-based dual because:

  1. a pull-based communication model is many times easier to reason about for certain use-cases
  2. it's easier to provide support for certain FP-ish operations, like foldRight, which isn't possible to implement for Observable (without converting it into a pull-based type first, hence this new type)
  3. Observable will still be much better for performance and for time-based / reactive operations; actually this new type will not implement operations that are non-deterministic in nature, e.g. it will have zip, but not combineLatest ;-)
  4. for people wanting a replacement for Scala's own scala.collection.immutable.Stream type, the Observable type might seem to be a little too much, since it is oriented towards reactive stuff; but conversions back and forth will be seamless

However the implementation will be different compared to an Iterable / Enumerator: our implementation will be an example of functional programming in Scala, thus following principles of functional programming, building on the knowledge we acquired and versatility of Task, so our communication protocol will not be based on Java's Iterator.


Basically our Iterant[F[_], A] is modeled as a bunch of states, similar to Task and Free, inspired by the List implementation (in Scala, Haskell and others) and by the Streaming type in Dogs.

The minimal/elegant encoding, with which we can achieve every operation that we want is this:

sealed abstract class Iterant[F[_], A]

case class Next[F[_], A](
    item: A,
    rest: F[Iterant[F, A]],
    stop: F[Unit])
    extends Iterant[F ,A]

case class Suspend[F[_], A](
    rest: F[Iterant[F,A]],
    stop: F[Unit])
    extends Iterant[F,A]

case class Halt[F[_], A](ex: Option[Throwable])
    extends Iterant[F, A]

So this is very similar with Scala's List, except that:

  • the rest in Next (the equivalent of the tail in Scala's List) is an F[A], which can be a Task or a Coeval, meaning that its evaluation can also be lazy and/or asynchronous
  • the stream can end in an error (Halt(Some(exception)))
  • we need to handle file handlers and network sockets, hence we need a stop: F[Unit] routine that is supposed to be called whenever we want to prematurely stop the stream processing

So basically with a Scala List at any point the only choice is to keep processing the rest tail reference. But our stream gives us a choice: either process the tail or stop.


The above encoding is minimal, but not efficient. We also need:

case class NextCursor[F[_], A](
    cursor: BatchCursor[A],
    rest: F[Iterant[F, A]],
    stop: F[Unit])
    extends Iterant[F, A] 

case class NextBatch[F[_], A](
    batch: Batch[A],
    rest: F[Iterant[F, A]],
    stop: F[Unit])
    extends Iterant[F, A]

The Cursor type is a light alternative to Scala's and Java's Iterator, which
can wrap either a normal Iterator, an Array or whatever can be iterated
synchronously and efficiently. This is because:

  1. performance of heap-allocated linked-lists, especially linked lists evaluated by means of a trampolined call-stack is absolutely terrible
  2. IMO any streaming abstraction sucks badly if it cannot iterate over arrays efficiently (so without trashing cache locality or the garbage collector)
  3. the Iterator pattern is very, very efficient and best of all it supports our needs for efficient head / tail decomposition, even if it is destructive when compared with Scala's LinearSeq (e.g. List, Stack), because linked-lists suck for performance

The Cursor type is basically an Iterator, however I decided to introduce
a custom type because I want to control its implementation, because we need to specialize it for primitives and because Iteratordoes not have the right semantics for the provided transformations.

For instance when working with arrays, I want and Cursor.filter and other
such operations to have strict behavior, not lazy. I also want Cursor to have specialized implementations for arrays of primitives.

So YES, I'm optimising this FP abstraction by shoving an Iterator in one of these states. It's dirty, but it is well encapsulated and it works.

But that's not all, we also need:

case class Last[F[_], A](item: A)
  extends Iterant[F, A]

This is an optimisation over signalling Next(item, F.pure(Halt(None)), F.unit),
which in benchmarking shows that it greatly improves the performance of flatMap.
Basically without this state, the flatMap operation is slower than the one
implemented on Observable and that would be quite bad, given that
Observable.flatMap is forced to do concurrency handling due to the
back-pressure protocol handling of onComplete (currently by means of 2
getAndSet operation per concatenated child).

Work in progress

The target is version 3.0.0.

Not all operations I want are implemented, plus I'd rather ship a minimal usable version, than something that breaks compatibility later.

alexandru added some commits Sep 2, 2016

@alexandru alexandru closed this May 4, 2017

@alexandru alexandru deleted the wip-streams branch May 4, 2017

@alexandru alexandru restored the wip-streams branch Jun 27, 2017

@alexandru alexandru reopened this Jun 27, 2017


This comment has been minimized.

Show comment
Hide comment

alexandru Aug 2, 2017


Resuming work on this feature 😃 I've merged with master, which contains the upgrade to Cats 1.0.0-MF.


alexandru commented Aug 2, 2017

Resuming work on this feature 😃 I've merged with master, which contains the upgrade to Cats 1.0.0-MF.

@alexandru alexandru modified the milestones: 3.0.0, 2.3.0 Aug 2, 2017

@alexandru alexandru changed the title from WIP: pull-based streaming based on Task / Coeval to Pull-based streaming based on Task / Coeval Aug 4, 2017

alexandru added some commits Aug 4, 2017

@alexandru alexandru merged commit 4ceea29 into master Aug 4, 2017

0 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build is in progress
continuous-integration/travis-ci/push The Travis CI build is in progress

This comment has been minimized.

Show comment
Hide comment

alexandru Aug 4, 2017


I've merged this PR in master, even though the Iterant type isn't finished, but it provides a good base to build on and this PR was getting too large already.


alexandru commented Aug 4, 2017

I've merged this PR in master, even though the Iterant type isn't finished, but it provides a good base to build on and this PR was getting too large already.

@alexandru alexandru deleted the wip-streams branch Jan 21, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment