Skip to content
Browse files

Split up ServerChannelHandler to support generating network statistics

  • Loading branch information...
1 parent 2e375c4 commit 4ca9a1fd8366b893340accd4ec14b417491b9180 @rhavyn committed
View
21 network/src/main/scala/com/linkedin/norbert/network/common/RequestContext.scala
@@ -1,21 +0,0 @@
-/*
- * Copyright 2009-2010 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.linkedin.norbert.network.common
-
-import java.util.UUID
-import com.google.protobuf.Message
-
-case class RequestContext(requestId: UUID, receivedAt: Long, message: Message)
View
2 network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkServer.scala
@@ -60,6 +60,7 @@ class NettyNetworkServer(serverConfig: NetworkServerConfig) extends NetworkServe
bootstrap.setPipelineFactory(new ChannelPipelineFactory {
private val loggingHandler = new LoggingHandler
private val protobufDecoder = new ProtobufDecoder(NorbertProtos.NorbertMessage.getDefaultInstance)
+ private val requestContextDecoder = new RequestContextDecoder
private val frameEncoder = new LengthFieldPrepender(4)
private val protobufEncoder = new ProtobufEncoder
private val handler = new ServerChannelHandler(channelGroup, messageHandlerRegistry, messageExecutor)
@@ -75,6 +76,7 @@ class NettyNetworkServer(serverConfig: NetworkServerConfig) extends NetworkServe
p.addLast("frameEncoder", frameEncoder)
p.addLast("protobufEncoder", protobufEncoder)
+ p.addLast("requestContextDecoder", requestContextDecoder)
p.addLast("requestHandler", handler)
p
View
54 network/src/main/scala/com/linkedin/norbert/network/netty/ServerChannelHandler.scala
@@ -23,7 +23,25 @@ import org.jboss.netty.channel._
import com.google.protobuf.{InvalidProtocolBufferException, Message}
import com.linkedin.norbert.network.server.{MessageHandlerRegistry, MessageExecutor, MessageExecutor, MessageHandlerRegistry}
import java.util.UUID
-import com.linkedin.norbert.network.common.RequestContext
+import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
+
+case class RequestContext(requestId: UUID, receivedAt: Long = System.currentTimeMillis)
+
+@ChannelPipelineCoverage("all")
+class RequestContextDecoder extends OneToOneDecoder {
+ def decode(ctx: ChannelHandlerContext, channel: Channel, msg: Any) = {
+ val norbertMessage = msg.asInstanceOf[NorbertProtos.NorbertMessage]
+ val requestId = new UUID(norbertMessage.getRequestIdMsb, norbertMessage.getRequestIdLsb)
+
+ if (norbertMessage.getStatus != NorbertProtos.NorbertMessage.Status.OK) {
+ val ex = new InvalidMessageException("Invalid request, message has status set to ERROR")
+ Channels.write(ctx, Channels.future(channel), ResponseHelper.errorResponse(requestId, ex))
+ throw ex
+ }
+
+ (RequestContext(requestId), norbertMessage)
+ }
+}
@ChannelPipelineCoverage("all")
class ServerChannelHandler(channelGroup: ChannelGroup, messageHandlerRegistry: MessageHandlerRegistry, messageExecutor: MessageExecutor) extends SimpleChannelHandler with Logging {
@@ -34,30 +52,26 @@ class ServerChannelHandler(channelGroup: ChannelGroup, messageHandlerRegistry: M
}
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ val (context, norbertMessage) = e.getMessage.asInstanceOf[(RequestContext, NorbertProtos.NorbertMessage)]
val channel = e.getChannel
- val norbertMessage = e.getMessage.asInstanceOf[NorbertProtos.NorbertMessage]
- log.ifTrace("messageRecieved [%s]: %s", channel, norbertMessage)
-
- val requestId = new UUID(norbertMessage.getRequestIdMsb, norbertMessage.getRequestIdLsb)
-
- if (norbertMessage.getStatus != NorbertProtos.NorbertMessage.Status.OK) {
- log.warn("Received invalid message: %s", norbertMessage)
- channel.write(ResponseHelper.errorResponse(requestId, new InvalidMessageException("Recieved a request in the error state")))
- } else {
+ val message = messageHandlerRegistry.requestMessageDefaultInstanceFor(norbertMessage.getMessageName) map { di =>
try {
- val di = messageHandlerRegistry.requestMessageDefaultInstanceFor(norbertMessage.getMessageName)
- val message = di.newBuilderForType.mergeFrom(norbertMessage.getMessage).build
- log.ifDebug("Queuing to MessageExecutor: %s", message)
- val context = RequestContext(requestId, System.currentTimeMillis, message)
- messageExecutor.executeMessage(message, either => responseHandler(context, channel, either))
+ di.newBuilderForType.mergeFrom(norbertMessage.getMessage).build
} catch {
- case ex: InvalidMessageException => log.error(ex, "Recieved invalid message")
- case ex: InvalidProtocolBufferException => log.error(ex, "Error deserializing message")
+ case ex: InvalidProtocolBufferException =>
+ Channels.write(ctx, Channels.future(channel), ResponseHelper.errorResponse(context.requestId, ex))
+ throw ex
}
+ } getOrElse {
+ val ex = new InvalidMessageException("No such message of type %s registered".format(norbertMessage.getMessageName))
+ Channels.write(ctx, Channels.future(channel), ResponseHelper.errorResponse(context.requestId, ex))
+ throw ex
}
+
+ messageExecutor.executeMessage(message, either => responseHandler(context, e.getChannel, either))
}
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) = log.info(e.getCause, "Caught exception in network layer")
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) = log.info(e.getCause, "Caught exception in channel: %s".format(e.getChannel))
def responseHandler(context: RequestContext, channel: Channel, either: Either[Exception, Message]) {
val message = either match {
@@ -87,3 +101,7 @@ private[netty] object ResponseHelper {
.build
}
}
+
+trait NetworkStatisticsMBean {
+ def getRequestsPerSecond: Int
+}
View
4 .../src/main/scala/com/linkedin/norbert/network/server/MessageHandlerRegistryComponent.scala
@@ -39,8 +39,8 @@ class MessageHandlerRegistry {
getHandlerTuple(requestMessage)._3
}
- def requestMessageDefaultInstanceFor(name: String): Message = {
- handlerMap.get(name).getOrElse(throw new InvalidMessageException("No such message of type %s registered".format(name)))._1
+ def requestMessageDefaultInstanceFor(name: String): Option[Message] = {
+ handlerMap.get(name).map(_._1)
}
def validResponseFor(requestMessage: Message, responseMessage: Message): Boolean = {

0 comments on commit 4ca9a1f

Please sign in to comment.
Something went wrong with that request. Please try again.