Skip to content

Commit

Permalink
Merge remote branch 'origin/master' into native
Browse files Browse the repository at this point in the history
Conflicts:
	finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
  • Loading branch information
bierbaum committed Feb 7, 2011
2 parents 5d55a5d + 9ddc846 commit 71b1e37
Show file tree
Hide file tree
Showing 61 changed files with 1,709 additions and 543 deletions.
Binary file added doc/finagle.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 12 additions & 0 deletions finagle-core/src/main/scala/com/twitter/finagle/Exceptions.scala
Expand Up @@ -20,6 +20,18 @@ class WriteException(e: Throwable) extends ChannelException {
override def toString = "%s: %s".format(super.toString, e.toString)
}

object ChannelException {
def apply(cause: Throwable) = {
cause match {
case exc: ChannelException => exc
case _: java.net.ConnectException => new ConnectionFailedException
case _: java.nio.channels.UnresolvedAddressException => new ConnectionFailedException
case _: java.nio.channels.ClosedChannelException => new ChannelClosedException
case e => new UnknownChannelException(e)
}
}
}

// Service layer errors.
class ServiceException extends Exception
class ServiceClosedException extends ServiceException
Expand Down
13 changes: 9 additions & 4 deletions finagle-core/src/main/scala/com/twitter/finagle/Service.scala
Expand Up @@ -2,6 +2,8 @@ package com.twitter.finagle

import com.twitter.util.Future

import com.twitter.finagle.service.RefcountedService

/**
* A Service is an asynchronous function from Request to Future[Response]. It is the
* basic unit of an RPC interface.
Expand All @@ -21,7 +23,8 @@ abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) {
def apply(request: Req): Future[Rep]

/**
* Relinquishes the use of this service instance.
* Relinquishes the use of this service instance. Behavior is
* undefined is apply() is called after resources are relinquished.
*/
def release() = ()

Expand Down Expand Up @@ -118,9 +121,11 @@ abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
*
*/
def andThen(service: Service[ReqOut, RepIn]) = new Service[ReqIn, RepOut] {
def apply(request: ReqIn) = Filter.this.apply(request, service)
override def release() = service.release()
override def isAvailable = service.isAvailable
private[this] val refcounted = new RefcountedService(service)

def apply(request: ReqIn) = Filter.this.apply(request, refcounted)
override def release() = refcounted.release()
override def isAvailable = refcounted.isAvailable
}

def andThen(factory: ServiceFactory[ReqOut, RepIn]): ServiceFactory[ReqIn, RepOut] =
Expand Down
@@ -0,0 +1,70 @@
package com.twitter.finagle.builder

import org.jboss.netty.channel.{
ChannelPipeline, ChannelFactory, ServerChannelFactory,
ServerChannel}

class ReferenceCountedChannelFactory(underlying: ChannelFactory)
extends ChannelFactory
{
private[this] var refcount = 0

def acquire() = synchronized {
refcount += 1
}

def newChannel(pipeline: ChannelPipeline) = underlying.newChannel(pipeline)

// TODO: after releasing external resources, we can still use the
// underlying factory? (ie. it'll create new threads, etc.?)
def releaseExternalResources() = synchronized {
refcount -= 1
if (refcount <= 0)
underlying.releaseExternalResources()
}
}

/**
* A "revivable" and lazy ChannelFactory that allows us to revive
* ChannelFactories after they have been released.
*/
class LazyRevivableChannelFactory(make: () => ChannelFactory)
extends ChannelFactory
{
@volatile private[this] var underlying: ChannelFactory = null

def newChannel(pipeline: ChannelPipeline) = {
if (underlying eq null) synchronized {
if (underlying eq null)
underlying = make()
}

underlying.newChannel(pipeline)
}

def releaseExternalResources() = synchronized {
if (underlying ne null) {
underlying.releaseExternalResources()
underlying = null
}
}
}

/**
* Yikes. This monstrosity is needed to wrap server channel factories
* generically. Netty only checks the actual channel factory types for
* servers dynamically, so using a ChannelFactory in the server
* results in a runtime error. However, the above wrappers are generic
* and should be shared among the different types of factories. We are
* in effect exchanging the possibility of one runtime error for
* another. Another approach would be to use dynamic proxies, but this
* seems a little simpler for such a simple interface.
*/
class ChannelFactoryToServerChannelFactory(underlying: ChannelFactory)
extends ServerChannelFactory
{
def newChannel(pipeline: ChannelPipeline) =
underlying.newChannel(pipeline).asInstanceOf[ServerChannel]
def releaseExternalResources() =
underlying.releaseExternalResources()
}
Expand Up @@ -10,7 +10,7 @@ import org.jboss.netty.channel.socket.nio._
import org.jboss.netty.handler.ssl._
import org.jboss.netty.bootstrap.ClientBootstrap

import com.twitter.util.Duration
import com.twitter.util.{Future, Duration}
import com.twitter.util.TimeConversions._

import com.twitter.finagle.channel._
Expand All @@ -21,35 +21,16 @@ import com.twitter.finagle.service._
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.loadbalancer.{LoadBalancedFactory, LeastQueuedStrategy}

class ReferenceCountedChannelFactory(underlying: ChannelFactory)
extends ChannelFactory
{
private[this] var refcount = 0

def acquire() = synchronized {
refcount += 1
}

def newChannel(pipeline: ChannelPipeline) = underlying.newChannel(pipeline)

// TODO: after releasing external resources, we can still use the
// underlying factory? (ie. it'll create new threads, etc.?)
def releaseExternalResources() = synchronized {
refcount -= 1
if (refcount == 0)
underlying.releaseExternalResources()
}
}

object ClientBuilder {
def apply() = new ClientBuilder[Any, Any]
def get() = apply()

lazy val defaultChannelFactory =
val defaultChannelFactory =
new ReferenceCountedChannelFactory(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()))
new LazyRevivableChannelFactory(() =>
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())))
}

/**
Expand All @@ -69,6 +50,7 @@ case class ClientBuilder[Req, Rep](
_hostConnectionCoresize: Option[Int],
_hostConnectionLimit: Option[Int],
_hostConnectionIdleTime: Option[Duration],
_hostConnectionMaxIdleTime: Option[Duration],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
_retries: Option[Int],
Expand All @@ -88,6 +70,7 @@ case class ClientBuilder[Req, Rep](
None, // hostConnectionCoresize
None, // hostConnectionLimit
None, // hostConnectionIdleTime
None, // hostConnectionMaxIdleTime
None, // sendBufferSize
None, // recvBufferSize
None, // retries
Expand All @@ -99,23 +82,24 @@ case class ClientBuilder[Req, Rep](

override def toString() = {
val options = Seq(
"name" -> _name,
"cluster" -> _cluster,
"protocol" -> _protocol,
"connectionTimeout" -> Some(_connectionTimeout),
"requestTimeout" -> Some(_requestTimeout),
"statsReceiver" -> _statsReceiver,
"loadStatistics" -> _loadStatistics,
"hostConnectionLimit" -> Some(_hostConnectionLimit),
"hostConnectionCoresize" -> Some(_hostConnectionCoresize),
"hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
"sendBufferSize" -> _sendBufferSize,
"recvBufferSize" -> _recvBufferSize,
"retries" -> _retries,
"logger" -> _logger,
"channelFactory" -> _channelFactory,
"tls" -> _tls,
"startTls" -> _startTls
"name" -> _name,
"cluster" -> _cluster,
"protocol" -> _protocol,
"connectionTimeout" -> Some(_connectionTimeout),
"requestTimeout" -> Some(_requestTimeout),
"statsReceiver" -> _statsReceiver,
"loadStatistics" -> _loadStatistics,
"hostConnectionLimit" -> Some(_hostConnectionLimit),
"hostConnectionCoresize" -> Some(_hostConnectionCoresize),
"hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
"hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime),
"sendBufferSize" -> _sendBufferSize,
"recvBufferSize" -> _recvBufferSize,
"retries" -> _retries,
"logger" -> _logger,
"channelFactory" -> _channelFactory,
"tls" -> _tls,
"startTls" -> _startTls
)

"ClientBuilder(%s)".format(
Expand Down Expand Up @@ -177,6 +161,9 @@ case class ClientBuilder[Req, Rep](
def hostConnectionIdleTime(timeout: Duration) =
copy(_hostConnectionIdleTime = Some(timeout))

def hostConnectionMaxIdleTime(timeout: Duration) =
copy(_hostConnectionMaxIdleTime = Some(timeout))

def retries(value: Int) =
copy(_retries = Some(value))

Expand Down Expand Up @@ -251,9 +238,15 @@ case class ClientBuilder[Req, Rep](

private[this] def prepareChannel(service: Service[Req, Rep]) = {
val protocol = _protocol.get
// First the codec, then the protocol.
protocol.codec.prepareClientChannel(service) flatMap
protocol.prepareChannel _
var future: Future[Service[Req, Rep]] = null

future = protocol.codec.prepareClientChannel(service)
future = future.flatMap { protocol.prepareChannel(_) }
_hostConnectionMaxIdleTime.foreach { idleTime =>
future = future.map { new ExpiringService(_, idleTime) }
}

future
}

def buildFactory(): ServiceFactory[Req, Rep] = {
Expand All @@ -262,6 +255,8 @@ case class ClientBuilder[Req, Rep](
if (!_protocol.isDefined)
throw new IncompleteSpecification("No protocol was specified")

Timer.default.acquire()

val cluster = _cluster.get
val protocol = _protocol.get

Expand All @@ -282,7 +277,7 @@ case class ClientBuilder[Req, Rep](
factory = buildPool(factory)

if (_requestTimeout < Duration.MaxValue) {
val filter = new TimeoutFilter[Req, Rep](Timer.default, _requestTimeout)
val filter = new TimeoutFilter[Req, Rep](_requestTimeout)
factory = filter andThen factory
}

Expand All @@ -295,8 +290,12 @@ case class ClientBuilder[Req, Rep](
factory
}

new LoadBalancedFactory(
hostFactories, new LeastQueuedStrategy[Req, Rep])
new LoadBalancedFactory(hostFactories, new LeastQueuedStrategy[Req, Rep]) {
override def close() = {
super.close()
Timer.default.stop()
}
}
}

def build(): Service[Req, Rep] = {
Expand Down
Expand Up @@ -14,6 +14,11 @@ class Http(compressionLevel: Int = 0) extends Codec[HttpRequest, HttpResponse] {
pipeline.addLast("httpCodec", new HttpClientCodec())
pipeline.addLast("httpDechunker", new HttpChunkAggregator(10<<20))
pipeline.addLast("httpDecompressor", new HttpContentDecompressor)

pipeline.addLast(
"connectionLifecycleManager",
new ClientConnectionManager)

pipeline
}
}
Expand All @@ -29,9 +34,13 @@ class Http(compressionLevel: Int = 0) extends Codec[HttpRequest, HttpResponse] {
new HttpContentCompressor(compressionLevel))
}

pipeline.addLast(
"httpRequestAggregator",
(new AggregateHttpRequest(10 << 20)).channelHandler)

pipeline.addLast(
"connectionLifecycleManager",
new HttpServerConnectionLifecycleManager)
new ServerConnectionManager)

pipeline
}
Expand Down

0 comments on commit 71b1e37

Please sign in to comment.