Skip to content
laforge49 edited this page Nov 29, 2011 · 6 revisions

AsyncFP operates on a single thread, because that is generally faster. But sometimes it is better to do things in parallel. That's when you need an asynchronous mailbox.

As an exercise, we will use Pause to represent a work load that you want to be able to run in parallel. Here's the code, including a convenience companion object:

case class Pause()

class Worker extends Actor {
  bind(classOf[Pause], pause)

  def pause(msg: AnyRef, rf: Any => Unit) {
    Thread.sleep(200)
    rf(null)
  }
}

object Pause {
  def apply(rf: Any => Unit)
           (implicit srcActor: ActiveActor) {
    val worker = new Worker
    worker.setExchangeMessenger(srcActor.bindActor.newAsyncMailbox)
    worker(Pause())(rf)
  }
}

Note the call to newAsyncMailbox in the Pause companion actor. This is what forces messages sent to the worker actor to be run on a different thread... which is the default for Scala actors.

Next we need a driver, an actor which will launch any number of worker actors. Once they have all completed, we print out the elapsed time and return a null result:

case class Doit(c: Int)

class Driver extends Actor {
  bind(classOf[Doit], doit)
  var rem = 0
  var c = 0
  var rf: Any => Unit = null
  var t0 = 0L

  def doit(msg: AnyRef, _rf: Any => Unit) {
    c = msg.asInstanceOf[Doit].c
    rem = c
    rf = _rf
    t0 = System.currentTimeMillis
    var i = 0
    while(i < c) {
      i += 1
      Pause(r)
    }
  }

  def r(rsp: Any) {
    rem -= 1
    if (rem == 0) {
      val t1 = System.currentTimeMillis
      println("total time for "+c+" messages = "+(t1 - t0)+" milliseconds")
      rf(null)
    }
  }
}

Now we need a bit of test code:

val systemServices = SystemServices()
try {
  val driver = new Driver
  driver.setExchangeMessenger(systemServices.newSyncMailbox)
  Future(driver, Doit(10))
  Future(driver, Doit(20))
} finally {
  systemServices.close
}

A SystemServices object has any number of uses. It manages the threads--which is why it is important to do the close if you are running multiple tests. It also creates the mailboxes. Later we will see how SystemServices is used in IOC.

The test code runs two tests, the first with 10 simulated work loads and the second with 20. As workloads take 200 millisconds and are run in parallel, we would expect to see the total time to be around 200 milliseconds. Only we may run out of threads, in which case the total time will be some multiple of 200 milliseconds, depending on the number of simulated workloads and the number of available threads. Here's the output:

total time for 10 messages = 208 milliseconds
total time for 20 messages = 401 milliseconds

AsyncTest

SystemServices

Clone this wiki locally