Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gather + wander n #954

Merged
merged 1 commit into from Aug 9, 2019

Conversation

@allantl
Copy link
Contributor

commented Jul 23, 2019

Closes #926

@allantl allantl force-pushed the allantl:feature/task-gather-wander-n branch from baed394 to 4c18998 Jul 24, 2019

@oleg-py
Copy link
Collaborator

left a comment

This needs better tests. Particularly, I can't see from them that parallelism is really bounded by N, which is the most important thing to check.

@allantl allantl force-pushed the allantl:feature/task-gather-wander-n branch 2 times, most recently from d40d6a1 to 925c563 Jul 24, 2019

@allantl

This comment has been minimized.

Copy link
Contributor Author

commented Jul 24, 2019

Thanks for the comments. I've updated the tests btw.

@oleg-py

This comment has been minimized.

Copy link
Collaborator

commented Jul 24, 2019

Good job, now I can see that it does what it says on the cover 👍

@allantl

This comment has been minimized.

Copy link
Contributor Author

commented Jul 24, 2019

Actually, I just did a quick testing comparing my implementation in the PR vs

  def wanderN[T, U](parallelism: Int)(items: Seq[T])(f: => T => Task[U]): Task[Seq[U]] =
    for {
      queue <- ConcurrentQueue.bounded[Task, (Deferred[Task, U], T)](items.length)
      pairs <- Task.traverse(items)(item => Deferred[Task, U].map(p => (p, item)))
      _ <- queue.offerMany(pairs)
      fibers <- Task.wander(0.until(parallelism).toList) { _ =>
        queue.poll.flatMap { case (p, a) =>
          f(a).flatMap(p.complete)
        }.loopForever.start
      }
      res <- Task.sequence(pairs.map(_._1.get)).guarantee(
        Task.traverse(fibers)(fiber => Task.eval(fiber.cancel)).void
      )
    } yield res
  def wanderN[T, U](parallelism: Int)(items: Seq[T])(f: => T => Task[U]): Task[List[U]] =
    Observable.fromIterable(items).mapParallelOrdered(parallelism)(f).toListL

The implementation in the PR seems much slower. Probably, I'm doing something not efficient. Will need to investigate.

@Avasil

This comment has been minimized.

Copy link
Collaborator

commented Jul 25, 2019

First off all, great job @allantl :D

The implementation in the PR seems much slower. Probably, I'm doing something not efficient. Will need to investigate.

What's the difference on the benchmarks?

Unfortunately I'm not able to take a closer look/experiment (benchmarking it with profiler on sounds fun) with the implementation right now but I have two random ideas that could work and improve performance:

  1. We could probably start first n tasks without asking for any permit
  2. Instead of using AsyncSemaphore, we could run n tasks and then each time one of the tasks is finished, we start a new one.
@allantl

This comment has been minimized.

Copy link
Contributor Author

commented Jul 27, 2019

@Avasil 👍 I'm on vacation currently. I'll take a look when I get back.

@allantl allantl force-pushed the allantl:feature/task-gather-wander-n branch from 925c563 to 082f1fb Aug 3, 2019

@allantl

This comment has been minimized.

Copy link
Contributor Author

commented Aug 3, 2019

@Avasil I've updated the PR following your suggestion no 2, so no more async sempahore.

Here's the benchmark (source):

Benchmark                                                   Cnt    Score    Error  Units

// May run out of of memory if list is very big
GatherNBenchmark.catsEffectParTraverseNBenchmark            10  116.799 ± 14.034  ops/s

GatherNBenchmark.manualImplementationWanderNBenchmark       10   56.101 ±  0.743  ops/s
GatherNBenchmark.observableMapParallelOrderedBenchmark      10   15.459 ±  0.450  ops/s
GatherNBenchmark.observableMapParallelUnorderedBenchmark    10   39.675 ±  0.866  ops/s

// I didn't handle error/cancellation scenario on this one
GatherNBenchmark.wanderNConcurrentQueueBenchmark            10  357.297 ±  4.390  ops/s

// May run out of of memory if list is very big
GatherNBenchmark.wanderNMonixSemaphoreBenchmark             10    6.022 ±  0.132  ops/s
@Avasil

This comment has been minimized.

Copy link
Collaborator

commented Aug 3, 2019

Thank you for your investigation @allantl !

It's pretty surprising to me but I guess in this situation it's not worth to go with low-level solution. We can always revisit it later if we want to optimize.

BTW - any reason why wanderNSemaphore implementation is different than parTraverseN (other than parTraverse => wander)? I think it could be responsible for such a score

@allantl

This comment has been minimized.

Copy link
Contributor Author

commented Aug 3, 2019

The other difference is cats.effect.concurrent.Sempahore vs monix.catnap.Semaphore. Im not sure whether they are the same though.

@allantl allantl force-pushed the allantl:feature/task-gather-wander-n branch from 082f1fb to 8ff6bd8 Aug 3, 2019

@allantl

This comment has been minimized.

Copy link
Contributor Author

commented Aug 3, 2019

I've updated the implementation to use concurrent queue instead. Benchmark is

Benchmark                                               Mode  Cnt    Score   Error  Units
GatherNBenchmark.manualImplementationWanderNBenchmark  thrpt   10  344.197 ± 0.796  ops/s

So, around 3x faster compared to cats effect par traverse

@allantl allantl force-pushed the allantl:feature/task-gather-wander-n branch from 8ff6bd8 to 7af313b Aug 4, 2019

@Avasil
Copy link
Collaborator

left a comment

Thank you once again @allantl ! I think it is very well done and quite close - we just have to make it a bit more cancelation-proof and test for it. I've left some ideas, let me know if something is not very clear. :)

in.head.map(List(_))
} else {
for {
error <- Deferred[Task, Option[Throwable]]

This comment has been minimized.

Copy link
@Avasil

Avasil Aug 4, 2019

Collaborator

I think we could have Deferred[Task, Throwable] directly

Task.pure(values)
}
.guarantee(
Task.traverse(fiber)(_.cancel).void

This comment has been minimized.

Copy link
@Avasil

Avasil Aug 4, 2019

Collaborator

We don't have to cancel everything on success, we could use guaranteeCase to only act on cancelation / error

p.complete
)
}.loopForever.start
})

This comment has been minimized.

Copy link
@Avasil

Avasil Aug 4, 2019

Collaborator

I think if we cancel entire Task.gatherN at this point, it can get dirty

If we turn AlwaysAsyncExecution and enableAutoCancelableRunLoops then any flatMap could be canceled

I think bracketCase could solve it. acquire would be fiber <- Task.gather(...), use is everything that comes after and release block instead of guarantee.

I will try to think about good test for it later but we could take an advantage from TestScheduler.

@Avasil Avasil requested a review from oleg-py Aug 5, 2019

@Avasil Avasil referenced this pull request Aug 5, 2019

@allantl allantl force-pushed the allantl:feature/task-gather-wander-n branch from 7af313b to b54e8fd Aug 9, 2019

@allantl

This comment has been minimized.

Copy link
Contributor Author

commented Aug 9, 2019

@Avasil I've updated the PR. I have one question, is cancellation needed when it exits on failure?

@Avasil

This comment has been minimized.

Copy link
Collaborator

commented Aug 9, 2019

@Avasil I've updated the PR. I have one question, is cancellation needed when it exits on failure?

Yes, we should return the failure and cancel the rest, otherwise it would just leak cpu resources

And thank you! I will play with it in a few hours. We plan to do a 3.0.0 release very soon (in ~ 10 hours) and hopefully we can include it!

@allantl allantl force-pushed the allantl:feature/task-gather-wander-n branch from b54e8fd to 4ad9522 Aug 9, 2019

@Avasil Avasil merged commit 9c5fcf1 into monix:master Aug 9, 2019

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@Avasil

This comment has been minimized.

Copy link
Collaborator

commented Aug 9, 2019

Okay, I think it is correct, I have an idea for small improvement but I'll do a PR myself in a moment to deliver it for 3.0.0

Good job @allantl :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.