Skip to content
laforge49 edited this page Oct 28, 2011 · 9 revisions

Transactions come in two basic flavors, queries and updates, and provide additional thread safety while allowing multiple queries to run at the same time. Consider the following code:

case class SomeMessage
case class SomeQueryMessage
case class SomeUpdateMessage

class SomeActor extends Actor {
  bind(classOf[SomeMessage], someMessageMethod)
  bindSafe(classOf[SomeQueryMessage], new Query(someQueryMessageMethod))
  bindSafe(classOf[SomeUpdateMessage], new Update(someUpdateMessageMethod))

  def someMessageMethod(msg: AnyRef, rf: Any => Unit) {
    ...
  }

  def someMessageQueryMethod(msg: AnyRef, rf: Any => Unit) {
    ...
  }

  def someMessageUpdateMethod(msg: AnyRef, rf: Any => Unit) {
    ...
  }
}

The someMessageMethod is moderately thread safe, as actors are single threaded. However, a message method may be in the middle of processing a message and has sent a message to another actor. The actor does not wait for the reply but can begin executing someMessageMethod. Indeed, the processing of several SomeMessage messages can be interleaved along with the processing of other messages. The only constraint is that the actor will only process a single message so long as that processing does not send any messages to other actors.

Updates are at the other end of the spectrum. Update messages are processed in succession, with the processing of the next update message waiting until the current update message sends a reply. Of course, the processing of other messages (not updates) can be interleaved with the processing of an update message.

Queries fall somewhere in the middle. The processing of a query message is not started if an update message is being processed and the processing of an update message is not started if a query message is being processed. But the processing of query message can be interleaved with each other and with the processing of other kinds of messages.

Of course an actor is required to accept every message that it gets from its mailbox, as otherwise the mailbox could not be shared by other actors. So when an actor receives a query or update message, it adds the message to a queue of pending transactions and processes them in order when it can. Should we look at an example?

First, we need to simulate a workload.

case class Pause()

class Worker
  extends Actor {
  bind(classOf[Pause], pause)

  def pause(msg: AnyRef, rf: Any => Unit) {
    Thread.sleep(30)
    rf(null)
  }
}

object Pause {
  def apply(rf: Any => Unit)(implicit srcActor: ActiveActor) {
    val worker = new Worker
    worker.setMailbox(new ReactorMailbox)
    worker(Pause())(rf)
  }
}

Now we need an actor which binds some message as queries or updates.

case class SimpleQuery(name: String)

case class SimpleUpdate(name: String)

class SimpleTransactionProcessor
  extends Actor {
  bindSafe(classOf[SimpleQuery], new Query(query))
  bindSafe(classOf[SimpleUpdate], new Update(update))

  def query(msg: AnyRef, rf: Any => Unit) {
    val name = msg.asInstanceOf[SimpleQuery].name
    println("start query " + name)
    Pause{
      rsp => {
        println("  end query " + name)
        rf(null)
      }
    }
  }

  def update(msg: AnyRef, rf: Any => Unit) {
    val name = msg.asInstanceOf[SimpleUpdate].name
    println("start update " + name)
    Pause{
      rsp => {
        println("  end update " + name)
        rf(null)
      }
    }
  } 
}

Now we need a driver actor. This actor will send 6 message to the SimpleTransactionProcessor actor in one burst, and then wait for all 6 responses.

case class Doit()

class Driver
  extends Actor {
  bind(classOf[Doit], doit)

  lazy val simpleTransactionProcessor = {
    val stp = new SimpleTransactionProcessor
    stp.setMailbox(new ReactorMailbox)
    stp
  }

  def doit(msg: AnyRef, rf: Any => Unit) {
    var rem = 6
    simpleTransactionProcessor(SimpleQuery("1")) {
      rsp1 => {
        rem -= 1
        if (rem == 0) rf(null)
      }
    }
    simpleTransactionProcessor(SimpleQuery("2")) {
      rsp1 => {
        rem -= 1
        if (rem == 0) rf(null)
      }
    }
    simpleTransactionProcessor(SimpleUpdate("3")) {
      rsp1 => {
        rem -= 1
        if (rem == 0) rf(null)
      }
    }
    simpleTransactionProcessor(SimpleUpdate("4")) {
      rsp1 => {
        rem -= 1
        if (rem == 0) rf(null)
      }
    }
    simpleTransactionProcessor(SimpleQuery("5")) {
      rsp1 => {
        rem -= 1
        if (rem == 0) rf(null)
      }
    }
    simpleTransactionProcessor(SimpleQuery("6")) {
      rsp1 => {
        rem -= 1
        if (rem == 0) rf(null)
      }
    }
  }
}

The test code to run this is trivial.

val driver = new Driver
driver.setMailbox(new ReactorMailbox)
Future(driver, Doit())

Output.

start query 1
start query 2
  end query 2
  end query 1
start update 3
  end update 3
start update 4
  end update 4
start query 5
start query 6
  end query 6
  end query 5

Note that the 6 transactions are started in the order they are received, but updates will not start while another transaction is running and queries will not start while an update is running. (The completion order of queries running at the same time is indeterminate.)

TransactionTest

Tutorial

Clone this wiki locally