Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task.sleep + Task.delay* deprecations #550

Merged
merged 1 commit into from Jan 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 102 additions & 41 deletions monix-eval/shared/src/main/scala/monix/eval/Task.scala
Expand Up @@ -601,30 +601,29 @@ sealed abstract class Task[+A] extends Serializable {
/** Returns a task that waits for the specified `timespan` before
* executing and mirroring the result of the source.
*
* @see [[delayExecutionWith]] for delaying the execution of the
* source with a customizable trigger.
*/
final def delayExecution(timespan: FiniteDuration): Task[A] =
TaskDelayExecution(this, timespan)

/** Returns a task that waits for the specified `trigger` to succeed
* before mirroring the result of the source.
* In this example we're printing to standard output, but before
* doing that we're introducing a 3 seconds delay:
*
* {{{
* Task(println("Hello!"))
* .delayExecution(3.seconds)
* }}}
*
* If the `trigger` ends in error, then the resulting task will
* also end in error.
* This operation is also equivalent with:
*
* As an example, these are equivalent (in the observed effects and
* result, not necessarily in implementation):
* {{{
* val ta = source.delayExecution(10.seconds)
* val tb = source.delayExecutionWith(Task.unit.delayExecution(10.seconds))
* Task.sleep(timespan).flatMap(_ => task)
* }}}
*
* @see [[delayExecution]] for delaying the execution of the
* source with a simple timespan
* See [[Task.sleep]] for the operation that describes the effect
* and [[Task.delayResult]] for the version that evaluates the
* task on time, but delays the signaling of the result.
*
* @param timespan is the time span to wait before triggering
* the evaluation of the task
*/
final def delayExecutionWith(trigger: Task[Any]): Task[A] =
TaskDelayExecutionWith(this, trigger)
final def delayExecution(timespan: FiniteDuration): Task[A] =
Task.sleep(timespan).flatMap(_ => this)

/** Returns a task that executes the source immediately on `runAsync`,
* but before emitting the `onSuccess` result for the specified
Expand All @@ -633,36 +632,36 @@ sealed abstract class Task[+A] extends Serializable {
* Note that if an error happens, then it is streamed immediately
* with no delay.
*
* @see [[delayResultBySelector]] for applying different
* delay strategies depending on the signaled result.
*/
final def delayResult(timespan: FiniteDuration): Task[A] =
TaskDelayResult(this, timespan)

/** Returns a task that executes the source immediately on `runAsync`,
* but with the result delayed by the specified `selector`.
* See [[delayExecution]] for delaying the evaluation of the
* task with the specified duration. The [[delayResult]] operation
* is effectively equivalent with:
*
* The `selector` generates another `Task` whose execution will
* delay the signaling of the result generated by the source.
* Compared with [[delayResult]] this gives you an opportunity
* to apply different delay strategies depending on the
* signaled result.
* {{{
* task.flatMap(a => Task.now(a).delayExecution(timespan))
* }}}
*
* Or if we are to use the [[Task.sleep]] describing just the
* effect, this operation is equivalent with:
*
* As an example, these are equivalent (in the observed effects
* and result, not necessarily in implementation):
* {{{
* val t1 = source.delayResult(10.seconds)
* val t2 = source.delayResultBySelector(_ =>
* Task.unit.delayExecution(10.seconds))
* task.flatMap(a => Task.sleep(timespan).map(_ => a))
* }}}
*
* Note that if an error happens, then it is streamed immediately
* with no delay.
* Thus in this example 3 seconds will pass before the result
* is being generated by the source, plus another 5 seconds
* before it is finally emitted:
*
* @see [[delayResult]] for delaying with a simple timeout
* {{{
* Task.eval(1 + 1)
* .delayExecution(3.seconds)
* .delayResult(5.seconds)
* }}}
*
* @param timespan is the time span to sleep before signaling
* the result, but after the evaluation of the source
*/
final def delayResultBySelector[B](selector: A => Task[B]): Task[A] =
TaskDelayResultBySelector(this, selector)
final def delayResult(timespan: FiniteDuration): Task[A] =
flatMap(a => Task.sleep(timespan).map(_ => a))

/** Overrides the default [[monix.execution.Scheduler Scheduler]],
* possibly forcing an asynchronous boundary before execution
Expand Down Expand Up @@ -1731,6 +1730,28 @@ object Task extends TaskInstancesLevel1 {
})
}

/** Creates a new `Task` that will sleep for the given duration,
* emitting a tick when that time span is over.
*
* As an example on evaluation this will print "Hello!" after
* 3 seconds:
*
* {{{
* import scala.concurrent.duration._
*
* Task.sleep(3.seconds).flatMap { _ =>
* Task.eval(println("Hello!"))
* }
* }}}
*
* See [[Task.delayExecution]] for this operation described as
* a method on `Task` references or [[Task.delayResult]] for the
* helper that triggers the evaluation of the source on time, but
* then delays the result.
*/
def sleep(timespan: FiniteDuration): Task[Unit] =
TaskSleep.apply(timespan)

/** Given a `TraversableOnce` of tasks, transforms it to a task signaling
* the collection, executing the tasks one by one and gathering their
* results in the same collection.
Expand Down Expand Up @@ -2442,6 +2463,46 @@ object Task extends TaskInstancesLevel1 {
self.executeAsync
// $COVERAGE-ON$
}

/** DEPRECATED - please use [[Task.flatMap flatMap]].
*
* The reason for the deprecation is that this operation is
* redundant, as it can be expressed with `flatMap`, with the
* same effect:
* {{{
* trigger.flatMap(_ => task)
* }}}
*
* The syntax provided by Cats can also help:
* {{{
* import cats.syntax.all._
*
* trigger *> task
* }}}
*/
@deprecated("Please use flatMap", "3.0.0")
def delayExecutionWith(trigger: Task[Any]): Task[A] = {
// $COVERAGE-OFF$
trigger.flatMap(_ => self)
// $COVERAGE-ON$
}

/** DEPRECATED - please use [[Task.flatMap flatMap]].
*
* The reason for the deprecation is that this operation is
* redundant, as it can be expressed with `flatMap` and `map`,
* with the same effect:
*
* {{{
* task.flatMap(a => selector(a).map(_ => a))
* }}}
*/
@deprecated("Please rewrite in terms of flatMap", "3.0.0")
def delayResultBySelector[B](selector: A => Task[B]): Task[A] = {
// $COVERAGE-OFF$
self.flatMap(a => selector(a).map(_ => a))
// $COVERAGE-OFF$
}
}

// -- INTERNALS
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Expand Up @@ -18,27 +18,31 @@
package monix.eval.internal

import monix.eval.{Callback, Task}
import monix.eval.Task.Context
import monix.execution.cancelables.SingleAssignCancelable
import scala.concurrent.duration.FiniteDuration

private[eval] object TaskDelayExecution {
/**
* Implementation for `Task.delayExecution`
*/
def apply[A](self: Task[A], timespan: FiniteDuration): Task[A] =
Task.unsafeCreate { (context, cb) =>
implicit val s = context.scheduler
val conn = context.connection
import scala.concurrent.duration.Duration

private[eval] object TaskSleep {
def apply(timespan: Duration): Task[Unit] =
Task.Async { (ctx, cb) =>
val c = SingleAssignCancelable()
conn push c
ctx.connection.push(c)

c := ctx.scheduler.scheduleOnce(
timespan.length,
timespan.unit,
new SleepRunnable(ctx, cb))
}

private final class SleepRunnable(ctx: Context, cb: Callback[Unit])
extends Runnable {

c := s.scheduleOnce(timespan.length, timespan.unit, new Runnable {
def run(): Unit = {
conn.pop()
// We had an async boundary, as we must reset the frame
context.frameRef.reset()
Task.unsafeStartNow(self, context, Callback.async(cb))
}
})
def run(): Unit = {
ctx.connection.pop()
// We had an async boundary, as we must reset the frame
ctx.frameRef.reset()
cb.onSuccess(())
}
}
}