Skip to content

Commit

Permalink
Scheduler that manually advances time (#akka#24150)
Browse files Browse the repository at this point in the history
* Don't apply dilation to scheduler parameter
* Clarify ExecutionContext usage
* Clarify comment on timePasses
* Make ExplicitlyTriggeredScheduler internals private
* List currently scheduled tasks in one log message
* Execute immediately if (initialDelay <= Duration.Zero)
* Don't reschedule if scheduled task fails
* Be more efficient about logging
* Widen `timePasses` delay for now
akka#24243 (comment) for some
discussion on what to do instead
* Remove mechanism for mixing in config from a test trait
  • Loading branch information
raboof authored and manonthegithub committed Jan 31, 2018
1 parent a63cf31 commit 4ce2c27
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed;

//#manual-scheduling-simple
import java.util.concurrent.TimeUnit;
import static com.typesafe.config.ConfigFactory.parseString;

import scala.concurrent.duration.Duration;

import akka.actor.typed.javadsl.Behaviors;

import org.junit.Test;

import akka.testkit.typed.TestKit;
import akka.testkit.typed.ExplicitlyTriggeredScheduler;
import akka.testkit.typed.javadsl.TestProbe;

public class ManualTimerTest extends TestKit {
ExplicitlyTriggeredScheduler scheduler;

public ManualTimerTest() {
super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.ExplicitlyTriggeredScheduler\""));
this.scheduler = (ExplicitlyTriggeredScheduler) system().scheduler();
}

static final class Tick {}
static final class Tock {}

@Test
public void testScheduleNonRepeatedTicks() {
TestProbe<Tock> probe = new TestProbe<>(system());
Behavior<Tick> behavior = Behaviors.withTimers(timer -> {
timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS));
return Behaviors.immutable( (ctx, tick) -> {
probe.ref().tell(new Tock());
return Behaviors.same();
});
});

spawn(behavior);

scheduler.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe);

scheduler.timePasses(Duration.create(2, TimeUnit.MILLISECONDS));
probe.expectMsgType(Tock.class);

scheduler.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe);
}


}
//#manual-scheduling-simple
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package akka.actor.typed

//#manual-scheduling-simple
import scala.concurrent.duration._

import akka.actor.typed.scaladsl.Behaviors

import org.scalatest.WordSpecLike

import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.{ ManualTime, TestProbe }

class ManualTimerSpec extends TestKit(ManualTime.config) with ManualTime with WordSpecLike {

"A timer" must {
"schedule non-repeated ticks" in {
case object Tick
case object Tock

val probe = TestProbe[Tock.type]()
val behavior = Behaviors.withTimers[Tick.type] { timer
timer.startSingleTimer("T", Tick, 10.millis)
Behaviors.immutable { (ctx, Tick)
probe.ref ! Tock
Behaviors.same
}
}

spawn(behavior)

scheduler.expectNoMessageFor(9.millis, probe)

scheduler.timePasses(2.millis)
probe.expectMsg(Tock)

scheduler.expectNoMessageFor(10.seconds, probe)
}
//#manual-scheduling-simple

"schedule repeated ticks" in {
case object Tick
case object Tock

val probe = TestProbe[Tock.type]()
val behavior = Behaviors.withTimers[Tick.type] { timer
timer.startPeriodicTimer("T", Tick, 10.millis)
Behaviors.immutable { (ctx, Tick)
probe.ref ! Tock
Behaviors.same
}
}

spawn(behavior)

for (_ Range(0, 5)) {
scheduler.expectNoMessageFor(9.millis, probe)

scheduler.timePasses(1.milli)
probe.expectMsg(Tock)
}
}

"replace timer" in {
sealed trait Command
case class Tick(n: Int) extends Command
case class SlowThenBump(nextCount: Int) extends Command
sealed trait Event
case class Tock(n: Int) extends Event

val probe = TestProbe[Event]("evt")
val interval = 10.millis

val behavior = Behaviors.withTimers[Command] { timer
timer.startPeriodicTimer("T", Tick(1), interval)
Behaviors.immutable { (ctx, cmd)
cmd match {
case Tick(n)
probe.ref ! Tock(n)
Behaviors.same
case SlowThenBump(nextCount)
scheduler.timePasses(interval)
timer.startPeriodicTimer("T", Tick(nextCount), interval)
Behaviors.same
}
}
}

val ref = spawn(behavior)

scheduler.timePasses(11.millis)
probe.expectMsg(Tock(1))

// next Tock(1) enqueued in mailboxed, but should be discarded because of new timer
ref ! SlowThenBump(2)
scheduler.expectNoMessageFor(interval, probe)

scheduler.timePasses(interval)
probe.expectMsg(Tock(2))
}

//#manual-scheduling-simple
}
}
//#manual-scheduling-simple
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,15 @@ object AskPattern {
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*/
def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
// We do not currently use the implicit scheduler, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219
ref match {
case a: adapt.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f)
case a: adapt.ActorSystemAdapter[_] askUntyped(ref, a.untyped.guardian, timeout, f)
case a throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass)
}
}
}

private val onTimeout: String Throwable = msg new TimeoutException(msg)
Expand Down
13 changes: 13 additions & 0 deletions akka-docs/src/main/paradox/typed/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,16 @@ Scala
Java
: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn-anonymous }

### Controlling the scheduler

It can be hard to reliably unit test specific scenario's when your actor relies on timing:
especially when running many tests in parallel it can be hard to get the timing just right.
Making such tests more reliable by using generous timeouts make the tests take a long time to run.

For such situations, we provide a scheduler where you can manually, explicitly advance the clock.

Scala
: @@snip [ManualTimerSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala) { #manual-scheduling-simple }

Java
: @@snip [ManualTimerTest.scala]($akka$/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java) { #manual-scheduling-simple }
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.testkit.typed

import java.util.concurrent.ThreadFactory

import com.typesafe.config.Config

import scala.annotation.varargs
import scala.concurrent.duration.{ Duration, FiniteDuration }

import akka.event.LoggingAdapter

class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) {

@varargs
def expectNoMessageFor(duration: FiniteDuration, on: scaladsl.TestProbe[_]*): Unit = {
timePasses(duration)
on.foreach(_.expectNoMessage(Duration.Zero))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase {
def this(name: String) = this(name, None)
def this(config: Config) = this(TestKit.getCallerName(classOf[TestKit]), Some(config))
def this(name: String, config: Config) = this(name, Some(config))

import TestKit._
implicit val system = ActorSystem(testKitGuardian, name, config = config)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package akka.testkit.typed.scaladsl

import com.typesafe.config.{ Config, ConfigFactory }

import akka.testkit.typed._

object ManualTime {
val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""")
}
trait ManualTime { self: TestKit
override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package akka.testkit

import java.util.concurrent.ThreadFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import com.typesafe.config.Config

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.Try

import akka.actor.Cancellable
import akka.actor.Scheduler
import akka.event.LoggingAdapter

/**
* For testing: scheduler that does not look at the clock, but must be
* progressed manually by calling `timePasses`.
*
* This allows for faster and less timing-sensitive specs, as jobs will be
* executed on the test thread instead of using the original
* {ExecutionContext}. This means recreating specific scenario's becomes
* easier, but these tests might fail to catch race conditions that only
* happen when tasks are scheduled in parallel in 'real time'.
*/
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler {

private case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable)

private val currentTime = new AtomicLong()
private val scheduled = new ConcurrentHashMap[Item, Unit]()

override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
schedule(initialDelay, Some(interval), runnable)

override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
schedule(delay, None, runnable)

/**
* Advance the clock by the specified duration, executing all outstanding jobs on the calling thread before returning.
*
* We will not add a dilation factor to this amount, since the scheduler API also does not apply dilation.
* If you want the amount of time passed to be dilated, apply the dilation before passing the delay to
* this method.
*/
def timePasses(amount: FiniteDuration) = {
// Give dispatchers time to clear :(. See
// https://github.com/akka/akka/pull/24243#discussion_r160985493
// for some discussion on how to deal with this properly.
Thread.sleep(100)

val newTime = currentTime.get + amount.toMillis
if (log.isDebugEnabled)
log.debug(s"Time proceeds from ${currentTime.get} to $newTime, currently scheduled for this period:" + scheduledTasks(newTime).map(item s"\n- $item"))

executeTasks(newTime)
currentTime.set(newTime)
}

private def scheduledTasks(runTo: Long): Seq[Item] =
scheduled
.keySet()
.asScala
.filter(_.time <= runTo)
.toList
.sortBy(_.time)

@tailrec
private[testkit] final def executeTasks(runTo: Long): Unit = {
scheduledTasks(runTo).headOption match {
case Some(task)
currentTime.set(task.time)
val runResult = Try(task.runnable.run())
scheduled.remove(task)

if (runResult.isSuccess)
task.interval.foreach(i scheduled.put(task.copy(time = task.time + i.toMillis), ()))

// running the runnable might have scheduled new events
executeTasks(runTo)
case _ // Done
}
}

private def schedule(initialDelay: FiniteDuration, interval: Option[FiniteDuration], runnable: Runnable): Cancellable = {
val firstTime = currentTime.get + initialDelay.toMillis
val item = Item(firstTime, interval, runnable)
log.debug("Scheduled item for {}: {}", firstTime, item)
scheduled.put(item, ())

if (initialDelay <= Duration.Zero)
executeTasks(currentTime.get)

new Cancellable {
var cancelled = false

override def cancel(): Boolean = {
val before = scheduled.size
scheduled.remove(item)
cancelled = true
before > scheduled.size
}

override def isCancelled: Boolean = cancelled
}
}

override def maxFrequency: Double = 42
}

0 comments on commit 4ce2c27

Please sign in to comment.