Permalink
Browse files

Introduce RequestRunner class in MessageExecutor

  • Loading branch information...
1 parent f4785a9 commit 9ef2c9669aeebe2b0e7015a083213665debb42ac Chris Conrad committed May 26, 2010
@@ -20,6 +20,8 @@ import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, ThreadPoolExecutor}
import com.linkedin.norbert.logging.Logging
import com.linkedin.norbert.network.InvalidMessageException
import com.linkedin.norbert.util.NamedPoolThreadFactory
+import com.linkedin.norbert.jmx.JMX.MBean
+import com.linkedin.norbert.jmx.JMX
/**
* A component which submits incoming messages to their associated message handler.
@@ -38,39 +40,60 @@ class ThreadPoolMessageExecutor(messageHandlerRegistry: MessageHandlerRegistry,
private val threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable],
new NamedPoolThreadFactory("norbert-message-executor"))
+ JMX.register(new MBean(classOf[RequestProcessorMBean]) with RequestProcessorMBean {
+ def getQueueSize = threadPool.getQueue.size
+ def getAverageWaitTime = 0
+ def getAverageProcessingTime = 0
+ })
+
def executeMessage(message: Message, responseHandler: (Either[Exception, Message]) => Unit): Unit = {
- threadPool.execute(new Runnable {
- def run {
- try {
- log.ifDebug("Executing message: %s", message)
- val handler = messageHandlerRegistry.handlerFor(message)
+ threadPool.execute(new RequestRunner(message, responseHandler))
+ }
+
+ def shutdown {
+ threadPool.shutdown
+ log.debug("MessageExecutor shut down")
+ }
+
+ private class RequestRunner(message: Message, responseHandler: (Either[Exception, Message]) => Unit, queuedAt: Long = System.currentTimeMillis) extends Runnable {
+ def run = {
+ log.debug("Executing message: %s".format(message))
- try {
- val response = handler(message)
+ val response: Option[Either[Exception, Message]] = try {
+ val handler = messageHandlerRegistry.handlerFor(message)
- if (messageHandlerRegistry.validResponseFor(message, response)) {
- if (response != null) responseHandler(Right(response))
- } else {
- val name = if (response == null) "<null>" else response.getDescriptorForType.getFullName
- val errorMsg = "Message handler returned an invalid response message of type %s".format(name)
- log.error(errorMsg)
- responseHandler(Left(new InvalidMessageException(errorMsg)))
- }
- } catch {
- case ex: Exception =>
- log.error(ex, "Message handler threw an exception while processing message")
- responseHandler(Left(ex))
+ try {
+ val response = handler(message)
+ if (messageHandlerRegistry.validResponseFor(message, response)) {
+ if (response == null) None else Some(Right(response))
+ } else {
+ val name = if (response == null) "<null>" else response.getDescriptorForType.getFullName
+ val errorMsg = "Message handler returned an invalid response message of type %s".format(name)
+ log.error(errorMsg)
+ Some(Left(new InvalidMessageException(errorMsg)))
}
} catch {
- case ex: InvalidMessageException => log.error(ex, "Received an invalid message: %s", message)
- case ex: Exception => log.error(ex, "Unexpected error while handling message: %s", message)
+ case ex: Exception =>
+ log.error(ex, "Message handler threw an exception while processing message")
+ Some(Left(ex))
}
+ } catch {
+ case ex: InvalidMessageException =>
+ log.error(ex, "Received an invalid message: %s".format(message))
+ Some(Left(ex))
+
+ case ex: Exception =>
+ log.error(ex, "Unexpected error while handling message: %s".format(message))
+ Some(Left(ex))
}
- })
- }
- def shutdown {
- threadPool.shutdown
- log.ifDebug("MessageExecutor shut down")
+ response.foreach(responseHandler)
+ }
}
}
+
+trait RequestProcessorMBean {
+ def getQueueSize: Int
+ def getAverageWaitTime: Int
+ def getAverageProcessingTime: Int
+}
@@ -93,7 +93,9 @@ class MessageExecutorSpec extends SpecificationWithJUnit with Mockito with WaitF
waitFor(5.ms)
- handlerCalled must beFalse
+ handlerCalled must eventually(beTrue)
+ either.isLeft must beTrue
+ either.left.get must haveClass[InvalidMessageException]
}
"execute the responseHandler with Left(InvalidMessageException) if the response message is of the wrong type" in {

0 comments on commit 9ef2c96

Please sign in to comment.