Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/twitter/finagle
Browse files Browse the repository at this point in the history
Conflicts:
	finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
	finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
	finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala
  • Loading branch information
Wanli Yang authored and Wanli Yang committed Mar 23, 2011
2 parents c53c33a + 71a2239 commit eb1b626
Show file tree
Hide file tree
Showing 51 changed files with 324 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,33 @@ object ClientBuilder {
}

/**
* A word about the default values:
* A handy way to construct an RPC Client.
*
* ''Note'': A word about the default values:
*
* o connectionTimeout: optimized for within a datanceter
* o by default, no request timeout
*/
case class ClientBuilder[Req, Rep](
_cluster: Option[Cluster],
_protocol: Option[Protocol[Req, Rep]],
_connectionTimeout: Duration,
_requestTimeout: Duration,
_statsReceiver: Option[StatsReceiver],
_loadStatistics: (Int, Duration),
_name: Option[String],
_hostConnectionCoresize: Option[Int],
_hostConnectionLimit: Option[Int],
_hostConnectionIdleTime: Option[Duration],
_hostConnectionMaxIdleTime: Option[Duration],
_hostConnectionMaxLifeTime: Option[Duration],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
_retries: Option[Int],
_logger: Option[Logger],
_channelFactory: Option[ReferenceCountedChannelFactory],
_tls: Option[SSLContext],
_startTls: Boolean)
private val _cluster: Option[Cluster],
private val _protocol: Option[Protocol[Req, Rep]],
private val _connectionTimeout: Duration,
private val _requestTimeout: Duration,
private val _statsReceiver: Option[StatsReceiver],
private val _loadStatistics: (Int, Duration),
private val _name: Option[String],
private val _hostConnectionCoresize: Option[Int],
private val _hostConnectionLimit: Option[Int],
private val _hostConnectionIdleTime: Option[Duration],
private val _hostConnectionMaxIdleTime: Option[Duration],
private val _hostConnectionMaxLifeTime: Option[Duration],
private val _sendBufferSize: Option[Int],
private val _recvBufferSize: Option[Int],
private val _retries: Option[Int],
private val _logger: Option[Logger],
private val _channelFactory: Option[ReferenceCountedChannelFactory],
private val _tls: Option[SSLContext],
private val _startTls: Boolean)
{
def this() = this(
None, // cluster
Expand Down Expand Up @@ -258,6 +260,10 @@ case class ClientBuilder[Req, Rep](
future
}

/**
* Construct a ServiceFactory. This is useful for stateful protocols (e.g.,
* those that support transactions or authentication).
*/
def buildFactory(): ServiceFactory[Req, Rep] = {
if (!_cluster.isDefined)
throw new IncompleteSpecification("No hosts were specified")
Expand Down Expand Up @@ -315,6 +321,9 @@ case class ClientBuilder[Req, Rep](
}
}

/**
* Construct a Service.
*/
def build(): Service[Req, Rep] = {
var service: Service[Req, Rep] = new FactoryToService[Req, Rep](buildFactory())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package com.twitter.finagle.builder

import org.jboss.netty.channel.ChannelPipelineFactory

class IncompleteSpecification(message: String) extends Exception(message)
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,27 @@ object ServerBuilder {
Executors.newCachedThreadPool())))
}

// TODO: common superclass between client & server builders for common
// concerns.

/**
* A handy Builder for constructing Servers (i.e., binding Services to a port).
*/
case class ServerBuilder[Req, Rep](
_codec: Option[Codec[Req, Rep]],
_statsReceiver: Option[StatsReceiver],
_name: Option[String],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
_bindTo: Option[SocketAddress],
_logger: Option[Logger],
_tls: Option[(String, String)],
_startTls: Boolean,
_channelFactory: Option[ReferenceCountedChannelFactory],
_maxConcurrentRequests: Option[Int],
_hostConnectionMaxIdleTime: Option[Duration],
_hostConnectionMaxLifeTime: Option[Duration],
_requestTimeout: Option[Duration],
_readTimeout: Option[Duration],
_writeCompletionTimeout: Option[Duration],
_traceReceiver: TraceReceiver)
private val _codec: Option[Codec[Req, Rep]],
private val _statsReceiver: Option[StatsReceiver],
private val _name: Option[String],
private val _sendBufferSize: Option[Int],
private val _recvBufferSize: Option[Int],
private val _bindTo: Option[SocketAddress],
private val _logger: Option[Logger],
private val _tls: Option[(String, String)],
private val _startTls: Boolean,
private val _channelFactory: Option[ReferenceCountedChannelFactory],
private val _maxConcurrentRequests: Option[Int],
private val _hostConnectionMaxIdleTime: Option[Duration],
private val _hostConnectionMaxLifeTime: Option[Duration],
private val _requestTimeout: Option[Duration],
private val _readTimeout: Option[Duration],
private val _writeCompletionTimeout: Option[Duration],
private val _traceReceiver: TraceReceiver)
{
import ServerBuilder._

Expand Down Expand Up @@ -174,8 +174,16 @@ case class ServerBuilder[Req, Rep](
private[this] def scopedStatsReceiver =
_statsReceiver map { sr => _name map (sr.scope(_)) getOrElse sr }

/**
* Construct the Server, given the provided Service.
*/
def build(service: Service[Req, Rep]): Server = build(() => service)

/**
* Construct the Server, given the provided ServiceFactory. This
* is useful if the protocol is stateful (e.g., requires authentication
* or supports transactions).
*/
def build(serviceFactory: () => Service[Req, Rep]): Server = {
val codec = _codec.getOrElse {
throw new IncompleteSpecification("No codec was specified")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.jboss.netty.channel.{
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util.LatentChannelFuture

class ChannelClosingHandler
private[finagle] class ChannelClosingHandler
extends SimpleChannelUpstreamHandler
with LifeCycleAwareChannelHandler
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.jboss.netty.channel.{
import com.twitter.util.Future
import com.twitter.finagle.stats.StatsReceiver

class ChannelRequestStatsHandler(statsReceiver: StatsReceiver)
private[finagle] class ChannelRequestStatsHandler(statsReceiver: StatsReceiver)
extends SimpleChannelHandler
with ConnectionLifecycleHandler
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.twitter.finagle.util.{Ok, Error, Cancelled, AsyncLatch}
* channel. It is responsible for requests dispatched to a given
* (connected) channel during its lifetime.
*/
class ChannelService[Req, Rep](channel: Channel, factory: ChannelServiceFactory[Req, Rep])
private[finagle] class ChannelService[Req, Rep](channel: Channel, factory: ChannelServiceFactory[Req, Rep])
extends Service[Req, Rep]
{
private[this] val currentReplyFuture = new AtomicReference[Promise[Rep]]
Expand Down Expand Up @@ -88,7 +88,7 @@ class ChannelService[Req, Rep](channel: Channel, factory: ChannelServiceFactory[
/**
* A factory for ChannelService instances, given a bootstrap.
*/
class ChannelServiceFactory[Req, Rep](
private[finagle] class ChannelServiceFactory[Req, Rep](
bootstrap: ClientBootstrap,
prepareChannel: Service[Req, Rep] => Future[Service[Req, Rep]],
statsReceiver: StatsReceiver = NullStatsReceiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ package com.twitter.finagle.channel
*/

import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.logging.Logger

import org.jboss.netty.channel.{
SimpleChannelHandler, ChannelHandlerContext,
WriteCompletionEvent, MessageEvent}
import org.jboss.netty.channel.{SimpleChannelHandler, ChannelHandlerContext, MessageEvent}
import org.jboss.netty.buffer.ChannelBuffer

import com.twitter.util.{Time, Future}
Expand All @@ -19,6 +18,8 @@ class ChannelStatsHandler(statsReceiver: StatsReceiver)
extends SimpleChannelHandler
with ConnectionLifecycleHandler
{
private[this] val log = Logger.getLogger(getClass.getName)

private[this] val connects = statsReceiver.counter("connects")
private[this] val connectionDuration = statsReceiver.stat("connection_duration")
private[this] val connectionReceivedBytes = statsReceiver.stat("connection_received_bytes")
Expand Down Expand Up @@ -48,12 +49,17 @@ class ChannelStatsHandler(statsReceiver: StatsReceiver)
}
}

override def writeComplete(ctx: ChannelHandlerContext, e: WriteCompletionEvent) {
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
val (_, channelWriteCount) = ctx.getAttachment().asInstanceOf[(AtomicLong, AtomicLong)]

channelWriteCount.getAndAdd(e.getWrittenAmount())
writeCount.getAndAdd(e.getWrittenAmount())
super.writeComplete(ctx, e)
e.getMessage match {
case buffer: ChannelBuffer =>
channelWriteCount.getAndAdd(buffer.readableBytes)
case _ =>
log.warning("ChannelStatsHandler received non-channelbuffer write")
}

super.writeRequested(ctx, e)
}

override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
Expand All @@ -63,7 +69,7 @@ class ChannelStatsHandler(statsReceiver: StatsReceiver)
channelReadCount.getAndAdd(buffer.readableBytes())
readCount.getAndAdd(buffer.readableBytes())
case _ =>
()
log.warning("ChannelStatsHandler received non-channelbuffer read")
}

super.messageReceived(ctx, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package com.twitter.finagle.channel

import org.jboss.netty.channel.{
SimpleChannelHandler, LifeCycleAwareChannelHandler,
Channel, ChannelHandlerContext, ChannelStateEvent}
ChannelHandlerContext, ChannelStateEvent}

import com.twitter.util.{Future, Promise, Return}

import com.twitter.finagle.util.Conversions._

trait ConnectionLifecycleHandler
private[finagle] trait ConnectionLifecycleHandler
extends SimpleChannelHandler
with LifeCycleAwareChannelHandler
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import org.jboss.netty.handler.timeout.ReadTimeoutException
import com.twitter.util.{Future, Promise, Return, Throw}

import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util.AsyncLatch
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.{CodecException, Service, WriteTimedOutException}

class ServiceToChannelHandler[Req, Rep](
private[finagle] class ServiceToChannelHandler[Req, Rep](
service: Service[Req, Rep],
statsReceiver: StatsReceiver,
log: Logger)
Expand All @@ -25,7 +24,7 @@ class ServiceToChannelHandler[Req, Rep](
def this(service: Service[Req, Rep]) =
this(service, NullStatsReceiver)

private[this] sealed trait State
private[this] sealed abstract class State

// valid transitions are:
//
Expand All @@ -46,15 +45,15 @@ class ServiceToChannelHandler[Req, Rep](
service.release()
}

/**
/**
* onShutdown: this Future is satisfied when the channel has been
* closed.
*/
*/
val onShutdown: Future[Unit] = onShutdownPromise

/**
/**
* drain(): admit no new requests.
*/
*/
def drain() = {
var continue = false
do {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package com.twitter.finagle.channel

/**
* A simple handler that times out a write if it fails to complete
* within the given time. This can be used to ensure that clients
* complete reception within a certain time, preventing a resource DoS
* on a server.
*/

import org.jboss.netty.channel.{
SimpleChannelDownstreamHandler, Channels,
ChannelHandlerContext, MessageEvent}

import com.twitter.util.{Time, Duration, Timer}
import com.twitter.conversions.time._

import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.WriteTimedOutException

class WriteCompletionTimeoutHandler(timer: Timer, timeout: Duration)
/**
* A simple handler that times out a write if it fails to complete
* within the given time. This can be used to ensure that clients
* complete reception within a certain time, preventing a resource DoS
* on a server.
*/
private[finagle] class WriteCompletionTimeoutHandler(timer: Timer, timeout: Duration)
extends SimpleChannelDownstreamHandler
{
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,19 @@ package com.twitter.finagle.http

import scala.collection.JavaConversions._

import java.nio.ByteOrder

import org.jboss.netty.channel.{
MessageEvent, Channels,
SimpleChannelUpstreamHandler,
ChannelHandlerContext}
MessageEvent, Channels, ChannelHandlerContext}
import org.jboss.netty.handler.codec.http.{
HttpHeaders, HttpRequest,
HttpResponse, HttpChunk,
DefaultHttpResponse,
HttpChunk, DefaultHttpResponse,
HttpVersion, HttpResponseStatus}
import org.jboss.netty.buffer.{
ChannelBuffer, ChannelBuffers,
CompositeChannelBuffer}
ChannelBuffer, ChannelBuffers}

import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.channel.LeftFoldUpstreamHandler

object OneHundredContinueResponse
private[finagle] object OneHundredContinueResponse
extends DefaultHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.CONTINUE)
Expand All @@ -35,7 +29,7 @@ class HttpFailure(ctx: ChannelHandlerContext, status: HttpResponseStatus)
{
val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
val future = Channels.future(ctx.getChannel)
Channels.write(ctx, future, response, ctx.getChannel.getRemoteAddress)
Channels.write(ctx, future, response, ctx.getChannel.getRemoteAddress)
future onSuccessOrFailure { ctx.getChannel.close() }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package com.twitter.finagle.http

import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._

import com.twitter.finagle.util.Conversions._

class ClientConnectionManager extends SimpleChannelHandler {
private[finagle] class ClientConnectionManager extends SimpleChannelHandler {
private[this] val manager = new ConnectionManager

// Note that for HTTP requests without a content length, the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.twitter.finagle.http

import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._

import com.twitter.finagle.util.Conversions._

/**
Expand Down
Loading

0 comments on commit eb1b626

Please sign in to comment.