Skip to content

AsyncFP Transactions

laforge49 edited this page Nov 29, 2011 · 5 revisions

Once you start using multiple threads, thread safety becomes an issue. Fortunately thread safety is handled implicitly by actors, but there is still one issues that occasionally needs to be dealt with--interleaving.

An actor will only process one message at a time--it is single threaded, though it does not use a dedicated thread. But when an actor sends a request to another actor, the actor may start processing another request or process the response to another request that was sent earlier. An actor is free to interleave the processing of multiple requests, though it can only actively process one request at a time. This is sometimes an issue that needs to be addressed.

The complication here is that AsyncFP actors must accept for processing all the requests that they receive. This is in contrast to Scala actors, which can select which messages they are ready to process. The AsyncFP answer is to use a different form of binding for requests which may conflict with other requests and to enqueue these requests until they can be processed without conflicts.

Transactions, which is to say requests which may conflict with other requests sent to the same actor, come in two flavors: Query and Update. The processing of a query can be interleaved with the processing of other queries, while the processing of an update can not be interleaved with the processing of any other transactions.

Here is a very simple example of an actor with both a query and an update. Again, we are using Pause to simulate a workload.

case class SimpleQuery(name: String)

case class SimpleUpdate(name: String)

class SimpleTransactionProcessor extends Actor {
  bindMessageLogic(classOf[SimpleQuery], new Query(query))
  bindMessageLogic(classOf[SimpleUpdate], new Update(update))

  def query(msg: AnyRef, rf: Any => Unit) {
    val name = msg.asInstanceOf[SimpleQuery].name
    println("start query " + name)
    Pause {
      rsp => {
        println("  end query " + name)
        rf(null)
      }
    }
  }

  def update(msg: AnyRef, rf: Any => Unit) {
    val name = msg.asInstanceOf[SimpleUpdate].name
    println("start update " + name)
    Pause {
      rsp => {
        println("  end update " + name)
        rf(null)
      }
    }
  }
}

The driver to test this code is also quite simple: it just launches 6 transactions in rapid succession:

case class Doit()

class Driver extends Actor {
  bind(classOf[Doit], doit)

  lazy val simpleTransactionProcessor = {
    val stp = new SimpleTransactionProcessor
    stp.setExchangeMessenger(newAsyncMailbox)
    stp
  }

  var rem = 0
  var rf: Any => Unit = null

  def r(rsp: Any) {
    rem -= 1
    if (rem == 0) rf(null)
  }

  def doit(msg: AnyRef, _rf: Any => Unit) {
    rf = _rf
    rem = 6
    simpleTransactionProcessor(SimpleQuery("1"))(r)
    simpleTransactionProcessor(SimpleQuery("2"))(r)
    simpleTransactionProcessor(SimpleUpdate("3"))(r)
    simpleTransactionProcessor(SimpleUpdate("4"))(r)
    simpleTransactionProcessor(SimpleQuery("5"))(r)
    simpleTransactionProcessor(SimpleQuery("6"))(r)
  }
}

Output:

start query 1
start query 2
  end query 2
  end query 1
start update 3
  end update 3
start update 4
  end update 4
start query 5
start query 6
  end query 5
  end query 6

TransactionTest

Clone this wiki locally