Skip to content

Commit

Permalink
Deprecate gather, gatherN, gatherUnordered in favor of parSequence, p…
Browse files Browse the repository at this point in the history
…arSequenceN, parSequenceUnordered (#1145)

* Deprecate gather, gatherN, gatherUnordered in favor of parSequence, parSequenceN, parSequenceUnordered

* Deprecate wander, wanderN, wanderUnordered in favor of parTraverse, parTraverseN, parTraverseUnordered
  • Loading branch information
jozic committed Mar 23, 2020
1 parent 5319f30 commit a8b3cc4
Show file tree
Hide file tree
Showing 17 changed files with 193 additions and 139 deletions.
3 changes: 3 additions & 0 deletions AUTHORS
Expand Up @@ -69,3 +69,6 @@ https://github.com/gclaramunt

Konrad Najder
https://github.com/najder-k

Eugene Platonov
https://github.com/jozic
Expand Up @@ -45,7 +45,7 @@ import scala.concurrent.Future
*
* Or to specify custom values use below format:
*
* jmh:run -i 20 -wi 20 -f 4 -t 2 monix.benchmarks.TaskGatherBenchmark
* jmh:run -i 20 -wi 20 -f 4 -t 2 monix.benchmarks.TaskSequenceBenchmark
*
* Which means "20 iterations", "20 warm-up iterations", "4 forks", "2 thread".
* Please note that benchmarks should be usually executed at least in
Expand Down Expand Up @@ -94,24 +94,24 @@ class TaskSequenceBenchmark {


@Benchmark
def monixGather(): Long = {
def monixParSequence(): Long = {
val tasks = (0 until count).map(_ => Task.eval(1)).toList
val result = Task.gather(tasks).map(_.sum.toLong)
val result = Task.parSequence(tasks).map(_.sum.toLong)
result.runSyncUnsafe()
}


@Benchmark
def monixGatherUnordered(): Long = {
def monixParSequenceUnordered(): Long = {
val tasks = (0 until count).map(_ => Task.eval(1)).toList
val result = Task.gatherUnordered(tasks).map(_.sum.toLong)
val result = Task.parSequenceUnordered(tasks).map(_.sum.toLong)
result.runSyncUnsafe()
}

@Benchmark
def monixGatherN(): Long = {
def monixParSequenceN(): Long = {
val tasks = (0 until count).map(_ => Task.eval(1)).toList
val result = Task.gatherN(parallelism)(tasks).map(_.sum.toLong)
val result = Task.parSequenceN(parallelism)(tasks).map(_.sum.toLong)
result.runSyncUnsafe()
}

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -14,7 +14,7 @@ val allProjects = List(
)

val benchmarkProjects = List(
"benchmarksPrev",
// "benchmarksPrev", // TODO: temporarily disabling to avoid deprecation errors, enable after 3.2.0 release
"benchmarksNext"
).map(_ + "/compile")

Expand Down
78 changes: 38 additions & 40 deletions monix-eval/shared/src/main/scala/monix/eval/Task.scala
Expand Up @@ -169,7 +169,7 @@ import scala.util.{Failure, Success, Try}
* it does for `Future.sequence`, the given `Task` values being
* evaluated one after another, in ''sequence'', not in ''parallel''.
* If you want parallelism, then you need to use
* [[monix.eval.Task.gather Task.gather]] and thus be explicit about it.
* [[monix.eval.Task.parSequence Task.parSequence]] and thus be explicit about it.
*
* This is great because it gives you the possibility of fine tuning the
* execution. For example, say you want to execute things in parallel,
Expand All @@ -185,7 +185,7 @@ import scala.util.{Failure, Success, Try}
* val chunks = list.sliding(30, 30).toSeq
*
* // Specify that each batch should process stuff in parallel
* val batchedTasks = chunks.map(chunk => Task.gather(chunk))
* val batchedTasks = chunks.map(chunk => Task.parSequence(chunk))
* // Sequence the batches
* val allBatches = Task.sequence(batchedTasks)
*
Expand Down Expand Up @@ -3499,7 +3499,7 @@ object Task extends TaskInstancesLevel1 {
* results in the same collection.
*
* This operation will execute the tasks one by one, in order, which means that
* both effects and results will be ordered. See [[gather]] and [[gatherUnordered]]
* both effects and results will be ordered. See [[parSequence]] and [[parSequenceUnordered]]
* for unordered results or effects, and thus potential of running in parallel.
*
* It's a simple version of [[traverse]].
Expand All @@ -3524,29 +3524,29 @@ object Task extends TaskInstancesLevel1 {
* This function is the nondeterministic analogue of `sequence` and should
* behave identically to `sequence` so long as there is no interaction between
* the effects being gathered. However, unlike `sequence`, which decides on
* a total order of effects, the effects in a `gather` are unordered with
* a total order of effects, the effects in a `parSequence` are unordered with
* respect to each other, the tasks being execute in parallel, not in sequence.
*
* Although the effects are unordered, we ensure the order of results
* matches the order of the input sequence. Also see [[gatherUnordered]]
* matches the order of the input sequence. Also see [[parSequenceUnordered]]
* for the more efficient alternative.
*
* Example:
* {{{
* val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3))
*
* // Yields 2, 4, 6
* Task.gather(tasks)
* Task.parSequence(tasks)
* }}}
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see [[gatherN]] for a version that limits parallelism.
* @see [[parSequenceN]] for a version that limits parallelism.
*/
def gather[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] =
TaskGather[A, M](in, () => newBuilder(bf, in))
def parSequence[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] =
TaskParSequence[A, M](in, () => newBuilder(bf, in))

/** Executes the given sequence of tasks in parallel, non-deterministically
* gathering their results, returning a task that will signal the sequence
Expand All @@ -3567,17 +3567,17 @@ object Task extends TaskInstancesLevel1 {
* )
*
* // Yields 2, 4, 6, 8 after around 6 seconds
* Task.gatherN(2)(tasks)
* Task.parSequenceN(2)(tasks)
* }}}
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see [[gather]] for a version that does not limit parallelism.
* @see [[parSequence]] for a version that does not limit parallelism.
*/
def gatherN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] =
TaskGatherN[A](parallelism, in)
def parSequenceN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] =
TaskParSequenceN[A](parallelism, in)

/** Given a `Iterable[A]` and a function `A => Task[B]`,
* nondeterministically apply the function to each element of the collection
Expand All @@ -3587,23 +3587,23 @@ object Task extends TaskInstancesLevel1 {
* This function is the nondeterministic analogue of `traverse` and should
* behave identically to `traverse` so long as there is no interaction between
* the effects being gathered. However, unlike `traverse`, which decides on
* a total order of effects, the effects in a `wander` are unordered with
* a total order of effects, the effects in a `parTraverse` are unordered with
* respect to each other.
*
* Although the effects are unordered, we ensure the order of results
* matches the order of the input sequence. Also see [[wanderUnordered]]
* matches the order of the input sequence. Also see [[parTraverseUnordered]]
* for the more efficient alternative.
*
* It's a generalized version of [[gather]].
* It's a generalized version of [[parSequence]].
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see [[wanderN]] for a version that limits parallelism.
* @see [[parTraverseN]] for a version that limits parallelism.
*/
def wander[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] =
Task.eval(in.map(f)).flatMap(col => TaskGather[B, M](col, () => newBuilder(bf, in)))
def parTraverse[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] =
Task.eval(in.map(f)).flatMap(col => TaskParSequence[B, M](col, () => newBuilder(bf, in)))

/** Given a `Iterable[A]` and a function `A => Task[B]`,
* nondeterministically apply the function to each element of the collection
Expand All @@ -3620,37 +3620,35 @@ object Task extends TaskInstancesLevel1 {
* val numbers = List(1, 2, 3, 4)
*
* // Yields 2, 4, 6, 8 after around 6 seconds
* Task.wanderN(2)(numbers)(n => Task(n + n).delayExecution(n.second))
* Task.parTraverseN(2)(numbers)(n => Task(n + n).delayExecution(n.second))
* }}}
*
* $parallelismAdvice
*
* $parallelismNote
*
* @see [[wander]] for a version that does not limit parallelism.
* @see [[parTraverse]] for a version that does not limit parallelism.
*/
def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] =
Task.suspend {
TaskGatherN(parallelism, in.map(f))
}
def parTraverseN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] =
Task.suspend(TaskParSequenceN(parallelism, in.map(f)))

/** Processes the given collection of tasks in parallel and
* nondeterministically gather the results without keeping the original
* ordering of the given tasks.
*
* This function is similar to [[gather]], but neither the effects nor the
* This function is similar to [[parSequence]], but neither the effects nor the
* results will be ordered. Useful when you don't need ordering because:
*
* - it has non-blocking behavior (but not wait-free)
* - it can be more efficient (compared with [[gather]]), but not
* - it can be more efficient (compared with [[parSequence]]), but not
* necessarily (if you care about performance, then test)
*
* Example:
* {{{
* val tasks = List(Task(1 + 1), Task(2 + 2), Task(3 + 3))
*
* // Yields 2, 4, 6 (but order is NOT guaranteed)
* Task.gatherUnordered(tasks)
* Task.parSequenceUnordered(tasks)
* }}}
*
* $parallelismAdvice
Expand All @@ -3659,28 +3657,28 @@ object Task extends TaskInstancesLevel1 {
*
* @param in is a list of tasks to execute
*/
def gatherUnordered[A](in: Iterable[Task[A]]): Task[List[A]] =
TaskGatherUnordered(in)
def parSequenceUnordered[A](in: Iterable[Task[A]]): Task[List[A]] =
TaskParSequenceUnordered(in)

/** Given a `Iterable[A]` and a function `A => Task[B]`,
* nondeterministically apply the function to each element of the collection
* without keeping the original ordering of the results.
*
* This function is similar to [[wander]], but neither the effects nor the
* This function is similar to [[parTraverse]], but neither the effects nor the
* results will be ordered. Useful when you don't need ordering because:
*
* - it has non-blocking behavior (but not wait-free)
* - it can be more efficient (compared with [[wander]]), but not
* - it can be more efficient (compared with [[parTraverse]]), but not
* necessarily (if you care about performance, then test)
*
* It's a generalized version of [[gatherUnordered]].
* It's a generalized version of [[parSequenceUnordered]].
*
* $parallelismAdvice
*
* $parallelismNote
*/
def wanderUnordered[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B]): Task[List[B]] =
Task.eval(in.map(f)).flatMap(gatherUnordered)
def parTraverseUnordered[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B]): Task[List[B]] =
Task.eval(in.map(f)).flatMap(parSequenceUnordered)

/** Yields a task that on evaluation will process the given tasks
* in parallel, then apply the given mapping function on their results.
Expand Down Expand Up @@ -3886,7 +3884,7 @@ object Task extends TaskInstancesLevel1 {
* ordering the results, but not the side effects, the evaluation
* being done in parallel.
*
* This is a specialized [[Task.gather]] operation and as such
* This is a specialized [[Task.parSequence]] operation and as such
* the tasks are evaluated in parallel, ordering the results.
* In case one of the tasks fails, then all other tasks get
* cancelled and the final result will be a failure.
Expand Down Expand Up @@ -3919,7 +3917,7 @@ object Task extends TaskInstancesLevel1 {
* ordering the results, but not the side effects, the evaluation
* being done in parallel.
*
* This is a specialized [[Task.gather]] operation and as such
* This is a specialized [[Task.parSequence]] operation and as such
* the tasks are evaluated in parallel, ordering the results.
* In case one of the tasks fails, then all other tasks get
* cancelled and the final result will be a failure.
Expand Down Expand Up @@ -3955,7 +3953,7 @@ object Task extends TaskInstancesLevel1 {
* ordering the results, but not the side effects, the evaluation
* being done in parallel if the tasks are async.
*
* This is a specialized [[Task.gather]] operation and as such
* This is a specialized [[Task.parSequence]] operation and as such
* the tasks are evaluated in parallel, ordering the results.
* In case one of the tasks fails, then all other tasks get
* cancelled and the final result will be a failure.
Expand Down Expand Up @@ -3993,7 +3991,7 @@ object Task extends TaskInstancesLevel1 {
* ordering the results, but not the side effects, the evaluation
* being done in parallel if the tasks are async.
*
* This is a specialized [[Task.gather]] operation and as such
* This is a specialized [[Task.parSequence]] operation and as such
* the tasks are evaluated in parallel, ordering the results.
* In case one of the tasks fails, then all other tasks get
* cancelled and the final result will be a failure.
Expand Down Expand Up @@ -4032,7 +4030,7 @@ object Task extends TaskInstancesLevel1 {
* ordering the results, but not the side effects, the evaluation
* being done in parallel if the tasks are async.
*
* This is a specialized [[Task.gather]] operation and as such
* This is a specialized [[Task.parSequence]] operation and as such
* the tasks are evaluated in parallel, ordering the results.
* In case one of the tasks fails, then all other tasks get
* cancelled and the final result will be a failure.
Expand Down
Expand Up @@ -21,7 +21,9 @@ package internal
import cats.effect.{ConcurrentEffect, IO}
import monix.eval.Task.Options
import monix.execution.annotations.UnsafeBecauseImpure
import monix.execution.compat.BuildFrom
import monix.execution.{Callback, Cancelable, CancelableFuture, Scheduler}

import scala.annotation.unchecked.uncheckedVariance
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -414,5 +416,53 @@ private[eval] object TaskDeprecated {
Task.from(ioa)
// $COVERAGE-ON$
}

/** DEPRECATED — renamed to [[Task.parSequence]]. */
@deprecated("Use parSequence", "3.2.0")
def gather[A, M[X] <: Iterable[X]](in: M[Task[A]])(implicit bf: BuildFrom[M[Task[A]], A, M[A]]): Task[M[A]] = {
// $COVERAGE-OFF$
Task.parSequence(in)
// $COVERAGE-ON$
}

/** DEPRECATED — renamed to [[Task.parSequenceN]] */
@deprecated("Use parSequenceN", "3.2.0")
def gatherN[A](parallelism: Int)(in: Iterable[Task[A]]): Task[List[A]] = {
// $COVERAGE-OFF$
Task.parSequenceN(parallelism)(in)
// $COVERAGE-ON$
}

/** DEPRECATED — renamed to [[Task.parSequenceUnordered]] */
@deprecated("Use parSequenceUnordered", "3.2.0")
def gatherUnordered[A](in: Iterable[Task[A]]): Task[List[A]] = {
// $COVERAGE-OFF$
Task.parSequenceUnordered(in)
// $COVERAGE-ON$
}

/** DEPRECATED — renamed to [[Task.parTraverse]] */
@deprecated("Use parTraverse", "3.2.0")
def wander[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B])(implicit bf: BuildFrom[M[A], B, M[B]]): Task[M[B]] = {
// $COVERAGE-OFF$
Task.parTraverse(in)(f)
// $COVERAGE-ON$
}

/** DEPRECATED — renamed to [[Task.parTraverseN]] */
@deprecated("Use parTraverseN", "3.2.0")
def wanderN[A, B](parallelism: Int)(in: Iterable[A])(f: A => Task[B]): Task[List[B]] = {
// $COVERAGE-OFF$
Task.parTraverseN(parallelism)(in)(f)
// $COVERAGE-ON$
}

/** DEPRECATED — renamed to [[Task.parTraverseUnordered]] */
@deprecated("Use parTraverseUnordered", "3.2.0")
def wanderUnordered[A, B, M[X] <: Iterable[X]](in: M[A])(f: A => Task[B]): Task[List[B]] = {
// $COVERAGE-OFF$
Task.parTraverseUnordered(in)(f)
// $COVERAGE-ON$
}
}
}
Expand Up @@ -26,9 +26,9 @@ import scala.util.control.NonFatal
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

private[eval] object TaskGather {
private[eval] object TaskParSequence {
/**
* Implementation for `Task.gather`
* Implementation for [[Task.parSequence]]
*/
def apply[A, M[X] <: Iterable[X]](in: Iterable[Task[A]], makeBuilder: () => mutable.Builder[A, M[A]]): Task[M[A]] = {
Async(
Expand Down
Expand Up @@ -23,8 +23,10 @@ import monix.catnap.ConcurrentQueue
import monix.eval.Task
import monix.execution.{BufferCapacity, ChannelType}

private[eval] object TaskGatherN {

private[eval] object TaskParSequenceN {
/**
* Implementation for [[Task.parSequenceN]]
*/
def apply[A](
parallelism: Int,
in: Iterable[Task[A]]
Expand All @@ -42,7 +44,7 @@ private[eval] object TaskGatherN {
.withConfig[Task, (Deferred[Task, A], Task[A])](BufferCapacity.Bounded(itemSize), ChannelType.SPMC)
pairs <- Task.traverse(in.toList)(task => Deferred[Task, A].map(p => (p, task)))
_ <- queue.offerMany(pairs)
workers = Task.gather(List.fill(parallelism.min(itemSize)) {
workers = Task.parSequence(List.fill(parallelism.min(itemSize)) {
queue.poll.flatMap {
case (p, task) =>
task.redeemWith(
Expand Down

0 comments on commit a8b3cc4

Please sign in to comment.