New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scanEval for Iterant / Observable and Task.fromEffect #412

Merged
merged 7 commits into from Aug 11, 2017

Conversation

Projects
None yet
1 participant
@alexandru
Member

alexandru commented Aug 11, 2017

Adding a new operator to Iterant[F, A]:

import cats.effect.Sync

sealed abstract class Iterant[F[_], A] {
  // ...

  def scanEval[S](seed: F[S])(op: (S, A) => F[S])(implicit F: Sync[F]): Iterant[F, S]
}

Adding these operators to Observable[A]:

import cats.effect.Effect

trait Observable[+A] {
  //...

  def scanEval[F[_], S](seed: F[S])(op: (S, A) => F[S])(implicit F: Effect[F]): Self[S]

  def scanTask[S](seed: Task[S])(op: (S, A) => Task[S]): Self[S]
}

Adding this builder to Task[+A]:

import cats.effect.Effect

object Task {
  // ...
  def fromEffect[F[_], A](fa: F[A])(implicit F: Effect[F]): Task[A]
}

Interestingly the scanEval operator for Iterant is much easier to implement than for Observable.

The Observable implementation is copying what I did previously for mapTask, an implementation that uses Atomic.getAndSet operations to ensure visibility between multiple threads, to eliminate asynchronous boundaries when not needed and given that we can end up with concurrency between the final onComplete of the source and the processing of the last Task.

Benchmark results (see ScanEvalBenchmark as part of this PR):

iterScanEvalPure    10000  thrpt   20   693.887 ±  7.158  ops/s
iterScanEvalDelay   10000  thrpt   20   670.719 ±  7.631  ops/s
iterScanEvalFork    10000  thrpt   20    66.156 ±  7.373  ops/s

obsFlatScanPure     10000  thrpt   20  1986.595 ± 11.284  ops/s
obsFlatScanDelay    10000  thrpt   20  1844.675 ± 18.319  ops/s
obsFlatScanFork     10000  thrpt   20    36.009 ±  0.445  ops/s

obsScanEvalPure     10000  thrpt   20  1363.414 ± 32.942  ops/s
obsScanEvalDelay    10000  thrpt   20  1343.907 ± 19.109  ops/s
obsScanEvalFork     10000  thrpt   20    34.727 ±  0.895  ops/s

This compares Iterant.scanEval vs Observable.flatScan vs Observable.scanEval.

Interestingly for Task references that fork, Iterant can be faster, although it can be argued that for such asynchronous boundaries you end up doing I/O, so CPU speed might matter less.

On the other hand for Task or Observable references that get evaluated immediately (the tests suffixed with Pure and Delay), the Observable implementation ends up being significantly faster, which is quite the feat, given that on each item processed we need to handle some concurrency concerns / synchronization.

@codecov

This comment has been minimized.

codecov bot commented Aug 11, 2017

Codecov Report

Merging #412 into master will increase coverage by 0.08%.
The diff coverage is 93.51%.

@@            Coverage Diff             @@
##           master     #412      +/-   ##
==========================================
+ Coverage   88.21%   88.29%   +0.08%     
==========================================
  Files         336      338       +2     
  Lines        9135     9235     +100     
  Branches     1233     1244      +11     
==========================================
+ Hits         8058     8154      +96     
- Misses       1077     1081       +4

@alexandru alexandru changed the title from Add Iterant.scanEval, Observable.scanEval+scanTask, Task.fromEffect to Add scanEval for Iterant / Observable and Task.fromEffect Aug 11, 2017

alexandru added some commits Aug 11, 2017

@alexandru alexandru merged commit fadf043 into monix:master Aug 11, 2017

1 check was pending

continuous-integration/travis-ci/pr The Travis CI build is in progress
Details

@etorreborre etorreborre referenced this pull request Aug 12, 2017

Merged

removes mutable variable #3

@alexandru alexandru added this to the 3.0.0 milestone Jan 21, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment