Skip to content
laforge49 edited this page Nov 11, 2011 · 3 revisions

The Messenger class provides a basic facility for sending messages that will be processed on another thread.

When a Messenger object calls the haveMessage method on a MessageProcessor object, the MessageProcessor object needs to call the poll method on its Messenger object. The Messenger object then calls the processMessage method on the MessageProcessor object for each of the messages found in the queue.

/**
 * A Messenger receives messages, queues them, and then processes them on another thread.
 */
class Messenger[T](threadManager: ThreadManager)
  extends Runnable {

  private var messageProcessor: MessageProcessor[T] = null
  private val queue = new ConcurrentLinkedBlockingQueue[T]
  private val running = new AtomicBoolean
  private var incomingMessage: T = null.asInstanceOf[T]

  /**
   * Specifies the object which will process the messages.
   */
  def setMessageProcessor(_messageProcessor: MessageProcessor[T]) {
    messageProcessor = _messageProcessor
  }

  /**
   * The isEmpty method returns true when there are no messages to be processed,
   * though the results may not always be correct due to concurrency issues.
   */
  def isEmpty = queue.size() == 0

  /**
   * The put method adds a message to the queue of messages to be processed.
   */
  def put(message: T) {
    queue.put(message)
    if (running.compareAndSet(false, true)) {
      threadManager.process(this)
    }
  }

  /**
   * The poll method processes any messages in the queue.
   * True is returned if any messages were processed.
   */
  def poll: Boolean = {
    if (incomingMessage == null) incomingMessage = queue.poll
    if (incomingMessage == null) return false
    while (incomingMessage != null) {
      val msg = incomingMessage
      incomingMessage = null.asInstanceOf[T]
      messageProcessor.processMessage(msg)
      incomingMessage = queue.poll
    }
    true
  }

  /**
   * The run method is used to process the messages in the message queue.
   * Each message is in turn processed using the MessageDispatcher.
   */
  @tailrec final override def run {
    incomingMessage = queue.poll
    if (incomingMessage == null) {
      running.set(false)
      if (queue.peek == null || !running.compareAndSet(false, true)) return
    }
    messageProcessor.haveMessage
    run
  }
}

Messenger

Asynchronous Messaging

Clone this wiki locally