Skip to content
laforge49 edited this page Nov 25, 2011 · 11 revisions

Message request / response generally works better than event messaging, especially when a system is under load, because of the implicit flow control. There are also a critical speed enhancement that we can employ that work with request / response messaging but not with event messaging. But moving to request / response messaging is not in itself a speed enhancement. The echo timing test runs at about 1.5 microseconds per message and the time per message for the burst timing test is now about 103 nanoseconds.

##ExchangeMessengerSource

Once a request is processed, there needs to be an object to which the response can be sent. Objects which send requests need to implement the ExchangeMessengerSource trait, which is used to determine where to send responses.

/**
 * The ExchangeMessengerSource trait is implemented by objects which send
 * requests to an ExchangeMessenger.
 */
trait ExchangeMessengerSource {
  /**
   * The messageListDestination method returns a MessageListDestination object
   * to which lists of responses can be sent.
   */
  def messageListDestination: MessageListDestination[ExchangeMessengerMessage]

  /**
   * The responseFrom method is used to send the response to a request
   * that was sent to an ExchangeMessenger.
   */
  def responseFrom(respondingExchange: ExchangeMessenger, rsp: ExchangeMessengerResponse) {
    val currentRequest = respondingExchange.curReq
    rsp.setRequest(currentRequest)
    respondingExchange.putTo(messageListDestination, rsp)
  }
}

ExchangeMessengerSource

##ExchangeMessengerActor

Objects which service requests from other objects need to implement ExchangeMessengerActor.

/**
 * Objects which implement ExchangeMessengerActor can send and receive
 * both requests and responses.
 */
trait ExchangeMessengerActor
  extends ExchangeMessengerSource {

  /**
   * The exchangeMessenger method returns the actor's ExchangeMessenger object.
   */
  def exchangeMessenger: ExchangeMessenger

  /**
   * Lists of requests and responses are passed
   * to the actor's exchangeMessenger object.
   */
  def messageListDestination = exchangeMessenger

  /**
   * Enqueue a response for subsequent processing.
   */
  override def responseFrom(respondingExchange: ExchangeMessenger,
                            rsp: ExchangeMessengerResponse) {
    val currentRequest = respondingExchange.curReq
    rsp.setRequest(currentRequest)
    respondingExchange.sendResponse(exchangeMessenger, rsp)
  }
}

ExchangeMessengerActor

##ExchangeMessengerMessage

All request messages are ExchangeMessengerRequest objects and all response messages are ExchangeMessengerResponse objects.

/**
 * All messages sent to an ExchangeMessenger must ExchangeMessengerMessages.
 * And all ExchangeMessengerMessages must be either ExchangeMessengerRequests
 * or ExchangeMessengerResponses.
 */
class ExchangeMessengerMessage

/**
 * All requests sent to an ExchangeMessenger must be either
 * ExchangeMessengerRequests or subclasses of ExchangeMessengerRequest.
 */
class ExchangeMessengerRequest(_sender: ExchangeMessengerSource,
                               rf: Any => Unit)
  extends ExchangeMessengerMessage {

  /**
   * Request messages are linked together using _oldRequest.
   */
  private var _oldRequest: ExchangeMessengerRequest = null

  /**
   * Return the old request.
   */
  def oldRequest = _oldRequest

  /**
   * Link this request to the old request.
   */
  def setOldRequest(oldRequest: ExchangeMessengerRequest) {
    _oldRequest = oldRequest
  }

  /**
   * The sender method returns the object which sent the request.
   */
  def sender = _sender

  /**
   * The function to be executed when processing a response.
   */
  def responseFunction = rf

  /**
   * Send a response.
   */
  def reply(exchangeMessenger: ExchangeMessenger, content: Any) {
    val rsp = new ExchangeMessengerResponse(content)
    sender.responseFrom(exchangeMessenger, rsp)
  }
}

/**
 * All responses sent to an ExchangeMessenger must be either
 * ExchangeMessengerResponses or subclasses of ExchangeMessengerResponse.
 */
final class ExchangeMessengerResponse(data: Any)
  extends ExchangeMessengerMessage {

  /**
   * The request for which this is the response.
   */
  private var _request: ExchangeMessengerRequest = null

  /**
   * Associate this response with the request.
   */
  def setRequest(request: ExchangeMessengerRequest) {
    _request = request
  }

  /**
   * Return the previous request.
   * This method is used to set the current request in the
   * sending ExchangeMessenger just prior to processing the response.
   */
  def oldRequest = _request.oldRequest

  /**
   * The function to be used to process the response.
   */
  def responseFunction = _request.responseFunction

  /**
   * The actual response data.
   */
  def rsp = data
}

ExchangeMessengerMessage

##ExchangeMessenger

ExchangeMessenger is an abstract class that wrapps BufferedMessenger. It supports requests with response functions.

/**
 * ExchangeMessenger distinguishes between request and response messages.
 */
abstract class ExchangeMessenger(threadManager: ThreadManager,
                                 _bufferedMessenger: BufferedMessenger[ExchangeMessengerMessage] = null)
  extends MessageProcessor[ExchangeMessengerMessage]
  with MessageListDestination[ExchangeMessengerMessage] {

  protected var _curReq: ExchangeMessengerRequest = null
  val bufferedMessenger = {
    if (_bufferedMessenger != null) _bufferedMessenger
    else new BufferedMessenger[ExchangeMessengerMessage](threadManager)
  }

  bufferedMessenger.setMessageProcessor(this)

  /**
   * Returns the request being processed.
   */
  def curReq = _curReq

  /**
   * Specify a different message that is being processed.
   * This method is called when a new request is to be processed and
   * on receipt of a response message.
   */
  def setCurrentRequest(req: ExchangeMessengerRequest) {
    _curReq = req
  }

  /**
   * The incomingMessageList method is called to process a list of messages
   * when the current thread is different
   * from the thread being used by the object being called.
   */
  final def incomingMessageList(bufferedMessages: ArrayList[ExchangeMessengerMessage]) {
    bufferedMessenger.incomingMessageList(bufferedMessages)
  }

  /**
   * The putTo message builds lists of messages to be sent to other Buffered objects.
   */
  final def putTo(messageListDestination: MessageListDestination[ExchangeMessengerMessage], 
                  message: ExchangeMessengerMessage) {
    bufferedMessenger.putTo(messageListDestination, message)
  }

  /**
   * The isEmpty method returns true when there are no messages to be processed,
   * though the results may not always be correct due to concurrency issues.
   */
  final def isEmpty = bufferedMessenger.isEmpty

  /**
   * The poll method processes any messages in the queue.
   * Once complete, any pending outgoing messages are sent.
   */
  final protected def poll = bufferedMessenger.poll

  /**
   * The flushPendingMsgs is called when there are no pending incoming messages to process.
   */
  final protected def flushPendingMsgs = bufferedMessenger.flushPendingMsgs

  /**
   * The haveMessage method is called when there may be an incoming message to be processed.
   */
  override def haveMessage {
    poll
  }

 /**
   * The processMessage method is called when there is an incoming message to process.
   */
  final override def processMessage(msg: ExchangeMessengerMessage) {
    msg match {
      case req: ExchangeMessengerRequest => exchangeReq(req)
      case rsp: ExchangeMessengerResponse => exchangeRsp(rsp)
    }
  }

  /**
   * Enqueue a request for subsequent processing on another thread.
   */
  def sendReq(targetActor: ExchangeMessengerActor,
              exchangeMessengerRequest: ExchangeMessengerRequest,
              srcExchange: ExchangeMessenger) {
    exchangeMessengerRequest.setOldRequest(srcExchange.curReq)
    srcExchange.putTo(targetActor.messageListDestination, exchangeMessengerRequest)
  }

  /**
   * The exchangeReq method is called when there is an incoming request to process.
   */
  def exchangeReq(msg: ExchangeMessengerRequest) {
    setCurrentRequest(msg)
    processRequest
  }

  /**
   * Process the curReq message.
   */
  protected def processRequest

  /**
   * Send a response to the current message being processed.
   */
  def reply(content: Any) {
    curReq.reply(this, content)
  }

  /**
   * Enqueue a response message for subsequent processing on a different thread.
   */
  def sendResponse(senderExchange: ExchangeMessenger, rsp: ExchangeMessengerResponse) {
    putTo(senderExchange.bufferedMessenger, rsp)
  }

  /**
   * The exchangeRsp method is called when there is an incoming response to process.
   */
  def exchangeRsp(msg: ExchangeMessengerResponse) {
    setCurrentRequest(msg.oldRequest)
    msg.responseFunction(msg.rsp)
  }
}

ExchangeMessenger


Speed

Clone this wiki locally