Skip to content

Commit

Permalink
Merge pull request #14 from stromnet/master
Browse files Browse the repository at this point in the history
JMX fix allow multiple Norbert servers in same JVM
  • Loading branch information
jhartman committed Jun 5, 2013
2 parents d477e8f + 176826a commit 2092a22
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class NettyNetworkServer(serverConfig: NetworkServerConfig) extends NetworkServe
serverConfig.zooKeeperSessionTimeoutMillis)

val messageHandlerRegistry = new MessageHandlerRegistry
val messageExecutor = new ThreadPoolMessageExecutor(serviceName = clusterClient.serviceName,
val messageExecutor = new ThreadPoolMessageExecutor(clientName = clusterClient.clientName,
serviceName = clusterClient.serviceName,
messageHandlerRegistry = messageHandlerRegistry,
requestTimeout = serverConfig.requestTimeoutMillis,
corePoolSize = serverConfig.requestThreadCorePoolSize,
Expand All @@ -75,6 +76,7 @@ class NettyNetworkServer(serverConfig: NetworkServerConfig) extends NetworkServe

val serverFilterChannelHandler = new ServerFilterChannelHandler(messageExecutor)
val serverChannelHandler = new ServerChannelHandler(
clientName = clusterClient.clientName,
serviceName = clusterClient.serviceName,
channelGroup = channelGroup,
messageHandlerRegistry = messageHandlerRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,16 @@ class ServerFilterChannelHandler(messageExecutor: MessageExecutor) extends Simpl
}

@ChannelPipelineCoverage("all")
class ServerChannelHandler(serviceName: String,
class ServerChannelHandler(clientName: Option[String],
serviceName: String,
channelGroup: ChannelGroup,
messageHandlerRegistry: MessageHandlerRegistry,
messageExecutor: MessageExecutor,
requestStatisticsWindow: Long,
avoidByteStringCopy: Boolean) extends SimpleChannelHandler with Logging {
private val statsActor = CachedNetworkStatistics[Int, UUID](SystemClock, requestStatisticsWindow, 200L)

val statsJmx = JMX.register(new NetworkServerStatisticsMBeanImpl(serviceName, statsActor))
val statsJmx = JMX.register(new NetworkServerStatisticsMBeanImpl(clientName, serviceName, statsActor))

def shutdown: Unit = {
statsJmx.foreach { JMX.unregister(_) }
Expand Down Expand Up @@ -186,8 +187,8 @@ trait NetworkServerStatisticsMBean {
def getMedianTime: Double
}

class NetworkServerStatisticsMBeanImpl(serviceName: String, val stats: CachedNetworkStatistics[Int, UUID])
extends MBean(classOf[NetworkServerStatisticsMBean], JMX.name(None, serviceName)) with NetworkServerStatisticsMBean {
class NetworkServerStatisticsMBeanImpl(clientName: Option[String], serviceName: String, val stats: CachedNetworkStatistics[Int, UUID])
extends MBean(classOf[NetworkServerStatisticsMBean], JMX.name(clientName, serviceName)) with NetworkServerStatisticsMBean {

def getMedianTime = stats.getStatistics(0.5).map(_.finished.values.map(_.percentile)).flatten.sum

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ trait MessageExecutor {
def shutdown: Unit
}

class ThreadPoolMessageExecutor(serviceName: String,
class ThreadPoolMessageExecutor(clientName: Option[String],
serviceName: String,
messageHandlerRegistry: MessageHandlerRegistry,
val filters: MutableList[Filter],
requestTimeout: Long,
Expand All @@ -56,21 +57,22 @@ class ThreadPoolMessageExecutor(serviceName: String,
keepAliveTime: Int,
maxWaitingQueueSize: Int,
requestStatisticsWindow: Long) extends MessageExecutor with Logging {
def this(serviceName: String,
def this(clientName: Option[String],
serviceName: String,
messageHandlerRegistry: MessageHandlerRegistry,
requestTimeout: Long,
corePoolSize: Int,
maxPoolSize: Int,
keepAliveTime: Int,
maxWaitingQueueSize: Int,
requestStatisticsWindow: Long) =
this(serviceName, messageHandlerRegistry, new MutableList[Filter], requestTimeout, corePoolSize, maxPoolSize, keepAliveTime, maxWaitingQueueSize, requestStatisticsWindow)
this(clientName, serviceName, messageHandlerRegistry, new MutableList[Filter], requestTimeout, corePoolSize, maxPoolSize, keepAliveTime, maxWaitingQueueSize, requestStatisticsWindow)

private val statsActor = CachedNetworkStatistics[Int, Int](SystemClock, requestStatisticsWindow, 200L)
private val totalNumRejected = new AtomicInteger

val requestQueue = new ArrayBlockingQueue[Runnable](maxWaitingQueueSize)
val statsJmx = JMX.register(new RequestProcessorMBeanImpl(serviceName, statsActor, requestQueue))
val statsJmx = JMX.register(new RequestProcessorMBeanImpl(clientName, serviceName, statsActor, requestQueue))

private val threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, requestQueue,
new NamedPoolThreadFactory("norbert-message-executor")) {
Expand Down Expand Up @@ -167,8 +169,8 @@ class ThreadPoolMessageExecutor(serviceName: String,
def getMedianTime: Double
}

class RequestProcessorMBeanImpl(serviceName: String, val stats: CachedNetworkStatistics[Int, Int], queue: ArrayBlockingQueue[Runnable])
extends MBean(classOf[RequestProcessorMBean], JMX.name(None, serviceName)) with RequestProcessorMBean {
class RequestProcessorMBeanImpl(clientName: Option[String], serviceName: String, val stats: CachedNetworkStatistics[Int, Int], queue: ArrayBlockingQueue[Runnable])
extends MBean(classOf[RequestProcessorMBean], JMX.name(clientName, serviceName)) with RequestProcessorMBean {
def getQueueSize = queue.size

def getTotalNumRejected = totalNumRejected.get.abs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class MessageExecutorSpec extends Specification with Mockito with WaitFor with S
val filters = new MutableList[Filter]
filters ++= (List(filter1, filter2))

val messageExecutor = new ThreadPoolMessageExecutor("service",
val messageExecutor = new ThreadPoolMessageExecutor(None, "service",
messageHandlerRegistry,
filters,
1000L,
Expand Down

0 comments on commit 2092a22

Please sign in to comment.