Skip to content

Commit

Permalink
compiling tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kallen committed Jan 11, 2011
1 parent 8c0718b commit d3834dd
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ import org.jboss.netty.handler.ssl._
import org.jboss.netty.channel.socket.nio._

import com.twitter.util.TimeConversions._
import com.twitter.util.{Time, JavaTimer}

import com.twitter.finagle._
import channel.{Job, QueueingChannelHandler}
import com.twitter.finagle.util._
import com.twitter.finagle.service.{Service, ServicePipelineFactory}
import stats.{StatsRepository, StatsReceiver}
import service.{ServiceToChannelHandler, Service}
import stats.{StatsReceiver}

object ServerBuilder {
def apply() = new ServerBuilder()
Expand All @@ -31,58 +30,17 @@ object ServerBuilder {
Executors.newCachedThreadPool())
}

class SampleHandler(statsReceiver: StatsReceiver)
extends SimpleChannelHandler {
private[this] val dispatchSample = statsReceiver.counter("dispatches" -> "service")
private[this] val latencySample = statsReceiver.gauge("latency" -> "service")

case class Timing(requestedAt: Time = Time.now)

override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
ctx.getAttachment match {
case Timing(requestedAt: Time) =>
statsReceiver.counter("exception" -> e.getCause.getClass.getName).incr()
case _ => ()
}
super.exceptionCaught(ctx, e)
}

override def handleUpstream(ctx: ChannelHandlerContext, c: ChannelEvent) {
if (c.isInstanceOf[MessageEvent]) {
dispatchSample.incr()
ctx.setAttachment(Timing())
}

super.handleUpstream(ctx, c)
}

override def handleDownstream(ctx: ChannelHandlerContext, c: ChannelEvent) {
if (c.isInstanceOf[MessageEvent]) {
val e = c.asInstanceOf[MessageEvent]
ctx.getAttachment match {
case Timing(requestedAt) =>
latencySample.measure(requestedAt.untilNow.inMilliseconds.toInt)
ctx.setAttachment(null)
case _ =>
// Can this happen?
()
}
}

super.handleDownstream(ctx, c)
}
}

// TODO: common superclass between client & server builders for common
// concerns.

case class ServerBuilder(
case class ServerBuilder[Req <: AnyRef, Res <: AnyRef](
_codec: Option[Codec],
_statsReceiver: Option[StatsReceiver],
_name: Option[String],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
_pipelineFactory: Option[ChannelPipelineFactory],
_service: Option[Service[Req, Res]],
_bindTo: Option[SocketAddress],
_logger: Option[Logger],
_tls: Option[SSLContext],
Expand All @@ -100,6 +58,7 @@ case class ServerBuilder(
None, // sendBufferSize
None, // recvBufferSize
None, // pipelineFactory
None, // service
None, // bindTo
None, // logger
None, // tls
Expand All @@ -109,41 +68,41 @@ case class ServerBuilder(
None // maxQueueDepth
)

def codec(codec: Codec): ServerBuilder =
def codec(codec: Codec) =
copy(_codec = Some(codec))

def reportTo(receiver: StatsReceiver): ServerBuilder =
def reportTo(receiver: StatsReceiver) =
copy(_statsReceiver = Some(receiver))

def name(value: String): ServerBuilder = copy(_name = Some(value))
def name(value: String) = copy(_name = Some(value))

def sendBufferSize(value: Int): ServerBuilder = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int): ServerBuilder = copy(_recvBufferSize = Some(value))
def sendBufferSize(value: Int) = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int) = copy(_recvBufferSize = Some(value))

def pipelineFactory(value: ChannelPipelineFactory): ServerBuilder =
def pipelineFactory(value: ChannelPipelineFactory) =
copy(_pipelineFactory = Some(value))

def service[Req <: AnyRef, Rep <: AnyRef](service: Service[Req, Rep]): ServerBuilder =
copy(_pipelineFactory = Some(ServicePipelineFactory(service)))
def service[Req <: AnyRef, Rep <: AnyRef](service: Service[Req, Rep]) =
copy(_service = Some(service))

def bindTo(address: SocketAddress): ServerBuilder =
def bindTo(address: SocketAddress) =
copy(_bindTo = Some(address))

def channelFactory(cf: ChannelFactory): ServerBuilder =
def channelFactory(cf: ChannelFactory) =
copy(_channelFactory = Some(cf))

def logger(logger: Logger): ServerBuilder = copy(_logger = Some(logger))
def logger(logger: Logger) = copy(_logger = Some(logger))

def tls(path: String, password: String): ServerBuilder =
def tls(path: String, password: String) =
copy(_tls = Some(Ssl(path, password)))

def startTls(value: Boolean): ServerBuilder =
def startTls(value: Boolean) =
copy(_startTls = true)

def maxConcurrentRequests(max: Int): ServerBuilder =
def maxConcurrentRequests(max: Int) =
copy(_maxConcurrentRequests = Some(max))

def maxQueueDepth(max: Int): ServerBuilder =
def maxQueueDepth(max: Int) =
copy(_maxQueueDepth = Some(max))

def build(): Channel = {
Expand Down Expand Up @@ -187,13 +146,17 @@ case class ServerBuilder(
pipeline.addFirst("ssl", new SslHandler(sslEngine, _startTls))
}

_statsReceiver foreach { statsReceiver =>
pipeline.addLast("stats", new SampleHandler(statsReceiver))
}
// _statsReceiver foreach { statsReceiver =>
// pipeline.addLast("stats", new SampleHandler(statsReceiver))
// }

for ((name, handler) <- pipelineFactory.getPipeline.toMap)
pipeline.addLast(name, handler)

_service.foreach { service =>
pipeline.addLast("service", new ServiceToChannelHandler(service))
}

pipeline
}
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.twitter.finagle.channel

import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.{Channels, Channel, MessageEvent}
import org.jboss.netty.channel.{Channels, Channel}

import com.twitter.finagle.util.{Ok, Error}
import com.twitter.finagle.util.Conversions._

class BootstrapBroker(bootstrap: BrokerClientBootstrap)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package com.twitter.finagle.channel

import java.util.concurrent.atomic.AtomicReference

import org.jboss.netty.channel._

import com.twitter.finagle.util.Error
import com.twitter.finagle.util.Conversions._

import com.twitter.util.{Future, Promise, Return, Throw, Try}
import com.twitter.util.{Promise, Return, Throw, Try}

class BrokerAdapter extends SimpleChannelUpstreamHandler {
@volatile private[this] var replyFuture: Promise[AnyRef] = null

def writeAndRegisterReply(channel: Channel, message: AnyRef,
incomingReplyFuture: Promise[AnyRef]) {
// If there is an outstanding request, something up the stack has
// fucked up. We currently just fail this request immediately, and
// messed up. We currently just fail this request immediately, and
// let the current request complete.
if (replyFuture ne null) {
incomingReplyFuture.updateIfEmpty(Throw(new TooManyConcurrentRequestsException))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.twitter.finagle.channel

import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._

import com.twitter.util.{Promise, Future, Throw}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.twitter.finagle.channel

import com.twitter.finagle.util.{Error, Ok, Cancelled}
import com.twitter.finagle.util.Conversions._
import org.jboss.netty.channel._

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package com.twitter.finagle.service

import org.jboss.netty.channel.{Channels, MessageEvent}

import com.twitter.finagle.channel._
import com.twitter.finagle.util.{Ok, Error, Cancelled}
import com.twitter.util.{Future, Try}
import com.twitter.finagle.util.Conversions._

import com.twitter.util.{Future, Promise, Try}

class ReplyIsStreamingException extends Exception
class CancelledRequestException extends Exception
class InvalidMessageTypeException extends Exception
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.twitter.util.Future
/**
* A Service is an asynchronous function from Request to Future[Response]. It is the
* basic unit of an RPC interface.
*
* Currently this interface doesn't support streaming responses. This
* can be tackled in a number of ways:
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.twitter.finagle.service

import java.util.logging.Logger
import java.util.logging.Level

import org.jboss.netty.channel._

import com.twitter.util.{Return, Throw}

import com.twitter.finagle.util.Conversions._

class ServiceToChannelHandler[Req <: AnyRef, Rep <: AnyRef](service: Service[Req, Rep]) extends SimpleChannelUpstreamHandler {
private[this] val log = Logger.getLogger(getClass.getName)

override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
val channel = ctx.getChannel
val message = e.getMessage

try {
// for an invalid type, the exception would be caught by the
// SimpleChannelUpstreamHandler.
val req = message.asInstanceOf[Req]
service(req) respond {
case Return(value) =>
Channels.write(ctx.getChannel, value)

case Throw(e: Throwable) =>
log.log(Level.WARNING, e.getMessage, e)
Channels.close(channel)
}
} catch {
case e: ClassCastException =>
Channels.close(channel)
}
}

/**
* Catch and silence certain closed channel exceptions to avoid spamming
* the logger.
*/
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
val cause = e.getCause
val level = cause match {
case e: java.nio.channels.ClosedChannelException =>
Level.FINEST
case e: java.io.IOException
if (e.getMessage == "Connection reset by peer" ||
e.getMessage == "Broken pipe") =>
// XXX: we can probably just disregard all IOException throwables
Level.FINEST
case e: Throwable =>
Level.WARNING
}

log.log(level,
Option(cause.getMessage).getOrElse("Exception caught"),
cause)

ctx.getChannel match {
case c: Channel
if c.isOpen =>
Channels.close(c)
case _ =>
()
}
}
}
Loading

0 comments on commit d3834dd

Please sign in to comment.