Permalink
Browse files

[split] finagle-core: introduce Transports and Dispatchers

A transport is a representation of a stream of objects that may be
read from and written to asynchronously. Transports are connected to
some endpoint, typically via a channel pipeline that performs
encoding and decoding.

Dispatchers bridge services onto transports.

This splits the finagle world into two neat(er) parts, and allows for
greater composability and modularity. In particular, after this
change, a codec may provide its own dispatcher to implement
multiplexing, pipelining and the like.

The change also simplifies a lot of the current functionality, making
it easier to reason about and test.

Next, I'm going to take this split a little bit further: I'll likely
introduce a "Listener" which can accept a transport:

trait Listener[In, Out] {
  def accept(): Future[Transport[In, Out]]
}

and a "Connecter" on the client

trait Connecter[In, Out] {
  def connect(remote: SocketAddress): Future[Transport[In, Out]]
}

after that, finagle will be neatly split into the pipeline stack and
the service stack, allowing us to build & test these separately. For
example, currently there is poor integration testing in the
particular service stack that the client & server builder sets up.
After this change, it'll be much simpler to test the two halves in
isolation.
  • Loading branch information...
1 parent ae5b65b commit 704a757029576e758a08ba80f5fac442be906f84 @mariusae mariusae committed Apr 12, 2012
Showing with 1,190 additions and 1,271 deletions.
  1. +16 −23 finagle-core/src/main/scala/com/twitter/finagle/Codec.scala
  2. +8 −1 finagle-core/src/main/scala/com/twitter/finagle/Exceptions.scala
  3. +25 −27 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
  4. +101 −108 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
  5. +0 −132 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelSemaphoreHandler.scala
  6. +52 −143 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
  7. +103 −0 finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceDispatcher.scala
  8. +0 −214 finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
  9. +48 −0 finagle-core/src/main/scala/com/twitter/finagle/dispatch/ClientDispatcher.scala
  10. +65 −0 finagle-core/src/main/scala/com/twitter/finagle/dispatch/ServerDispatcher.scala
  11. +10 −0 finagle-core/src/main/scala/com/twitter/finagle/dispatch/package.scala
  12. +19 −0 finagle-core/src/main/scala/com/twitter/finagle/filter/HandletimeFilter.scala
  13. +10 −0 finagle-core/src/main/scala/com/twitter/finagle/filter/RequestSemaphoreFilter.scala
  14. +12 −0 finagle-core/src/main/scala/com/twitter/finagle/filter/RequestSerializingFilter.scala
  15. +27 −0 finagle-core/src/main/scala/com/twitter/finagle/service/CancelOnHangupService.scala
  16. +3 −34 finagle-core/src/main/scala/com/twitter/finagle/service/FailFastFactory.scala
  17. +76 −0 finagle-core/src/main/scala/com/twitter/finagle/transport/ChannelTransport.scala
  18. +46 −0 finagle-core/src/main/scala/com/twitter/finagle/transport/Transport.scala
  19. +52 −0 finagle-core/src/main/scala/com/twitter/finagle/util/Chan.scala
  20. +1 −1 finagle-core/src/main/scala/com/twitter/finagle/util/ChannelSnooper.scala
  21. +3 −4 finagle-core/src/test/scala/com/twitter/finagle/builder/ClientBuilderSpec.scala
  22. +0 −71 finagle-core/src/test/scala/com/twitter/finagle/builder/ServerBuilderSpec.scala
  23. +0 −124 finagle-core/src/test/scala/com/twitter/finagle/channel/ChannelSemaphoreHandlerSpec.scala
  24. +37 −59 finagle-core/src/test/scala/com/twitter/finagle/channel/ChannelServiceSpec.scala
  25. +0 −184 finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala
  26. +90 −0 finagle-core/src/test/scala/com/twitter/finagle/dispatch/ClientDispatcherSpec.scala
  27. +88 −0 finagle-core/src/test/scala/com/twitter/finagle/dispatch/ServerDispatcherSpec.scala
  28. +24 −17 finagle-core/src/test/scala/com/twitter/finagle/integration/Base.scala
  29. +39 −0 finagle-core/src/test/scala/com/twitter/finagle/service/CancelOnHangupServiceSpec.scala
  30. +103 −0 finagle-core/src/test/scala/com/twitter/finagle/transport/ChannelTransportSpec.scala
  31. +38 −0 finagle-core/src/test/scala/com/twitter/finagle/util/ChanSpec.scala
  32. +16 −14 finagle-http/src/main/scala/com/twitter/finagle/http/Codec.scala
  33. +13 −18 finagle-http/src/main/scala/com/twitter/finagle/http/codec/ClientConnectionManager.scala
  34. +0 −5 finagle-http/src/main/scala/com/twitter/finagle/http/codec/ResponseDecoder.scala
  35. +26 −50 finagle-http/src/test/scala/com/twitter/finagle/http/codec/ConnectionManagerSpec.scala
  36. +0 −7 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/text/Encoder.scala
  37. +5 −3 finagle-stream/src/main/scala/com/twitter/finagle/stream/DuplexStreamCodec.scala
  38. +5 −6 finagle-stream/src/test/scala/com/twitter/finagle/stream/EndToEndSpec.scala
  39. +2 −1 finagle-stress/src/main/scala/com/twitter/finagle/stress/EmbeddedServer.scala
  40. +10 −23 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala
  41. +13 −0 link-netty.sh
  42. +4 −2 project/build/Project.scala
@@ -6,9 +6,11 @@ package com.twitter.finagle
* from this codec.
*/
+import com.twitter.finagle.dispatch.{
+ ClientDispatcherFactory, SerialClientDispatcher, SerialServerDispatcher,
+ ServerDispatcherFactory}
import java.net.{InetSocketAddress, SocketAddress}
-import org.jboss.netty.channel.{ChannelPipelineFactory, ChannelPipeline}
-import com.twitter.util.Future
+import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory}
/**
* Superclass for all codecs.
@@ -20,39 +22,30 @@ trait Codec[Req, Rep] {
def pipelineFactory: ChannelPipelineFactory
/**
- * Prepare a factory for usage with the codec.
- * Used to allow codec modifications to the service at the top of the network stack.
+ * Prepare a factory for usage with the codec. Used to allow codec
+ * modifications to the service at the top of the network stack.
*/
def prepareServiceFactory(underlying: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep] =
underlying
/**
- * Prepare a connection factory.
- * Used to allow codec modifications to the service at the bottom of the stack
- * (connection level).
+ * Prepare a connection factory. Used to allow codec modifications
+ * to the service at the bottom of the stack (connection level).
*/
def prepareConnFactory(underlying: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep] =
underlying
/**
- * Raw version of `prepareConnFactory` for clients. This allows raw (as-yet untyped) '
- * access to the underlying codec, allowing the implementor to use different message
- * types for communications between the connection factory and the codec.
- *
- * Useful in implementing connection multiplexing.
+ * Note: the below ("raw") interfaces are low level, and require a
+ * good understanding of finagle internals to implement correctly.
+ * Proceed with care.
*/
- def rawPrepareClientConnFactory(underlying: ServiceFactory[Any, Any]): ServiceFactory[Req, Rep] =
- prepareConnFactory(underlying.asInstanceOf[ServiceFactory[Req, Rep]])
- /**
- * Raw version of `prepareConnFactory` for servers. This allows raw (as-yet untyped) '
- * access to the underlying codec, allowing the implementor to use different message
- * types for communications between the connection factory and the codec.
- *
- * Useful in implementing connection multiplexing.
- */
- def rawPrepareServerConnFactory(underlying: ServiceFactory[Req, Rep]): ServiceFactory[Any, Any] =
- prepareConnFactory(underlying).asInstanceOf[ServiceFactory[Any, Any]]
+ val mkClientDispatcher: ClientDispatcherFactory[Req, Rep] = (mkTrans) =>
+ new SerialClientDispatcher(mkTrans())
+
+ val mkServerDispatcher: ServerDispatcherFactory[Req, Rep] = (mkTrans, service) =>
+ new SerialServerDispatcher[Req, Rep](mkTrans(), service)
}
object Codec {
@@ -52,7 +52,9 @@ class NotShardableException extends NotServableException
class ShardNotAvailableException extends NotServableException
// Channel exceptions are failures on the channels themselves.
-class ChannelException(underlying: Throwable, val remoteAddress: SocketAddress) extends Exception(underlying) with SourcedException {
+class ChannelException(underlying: Throwable, val remoteAddress: SocketAddress)
+ extends Exception(underlying) with SourcedException
+{
def this(underlying: Throwable) = this(underlying, null)
def this() = this(null, null)
override def getMessage =
@@ -123,6 +125,11 @@ object ChannelException {
}
}
+// Transport layer errors
+class TransportException extends Exception with SourcedException
+class CancelledReadException extends TransportException
+class CancelledWriteException extends TransportException
+
// Service layer errors.
class ServiceException extends Exception with SourcedException
class ServiceClosedException extends ServiceException
@@ -45,33 +45,30 @@ package com.twitter.finagle.builder
* instead of a compiler error.
*/
+import com.twitter.concurrent.NamedPoolThreadFactory
+import com.twitter.finagle._
+import com.twitter.finagle.channel._
+import com.twitter.finagle.factory._
+import com.twitter.finagle.filter.{MonitorFilter, ExceptionSourceFilter}
+import com.twitter.finagle.loadbalancer.HeapBalancer
+import com.twitter.finagle.pool._
+import com.twitter.finagle.service._
+import com.twitter.finagle.ssl.{Engine, Ssl, SslConnectHandler}
+import com.twitter.finagle.stats.{
+ GlobalStatsReceiver, NullStatsReceiver, RollupStatsReceiver, StatsReceiver}
+import com.twitter.finagle.util._
+import com.twitter.util.TimeConversions._
+import com.twitter.util.{
+ Duration, Future, Monitor, NullMonitor, Promise, Return, Try}
import java.net.{InetSocketAddress, SocketAddress}
-import java.util.logging.Logger
import java.util.concurrent.{Executors, TimeUnit}
-
+import java.util.logging.Logger
+import javax.net.ssl.SSLContext
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio._
import org.jboss.netty.handler.ssl._
import org.jboss.netty.handler.timeout.IdleStateHandler
-
-import com.twitter.concurrent.NamedPoolThreadFactory
-import com.twitter.util.{Future, Duration, Try, Monitor, NullMonitor, Promise, Return }
-import com.twitter.util.TimeConversions._
-import javax.net.ssl.SSLContext
-
-import com.twitter.finagle.channel._
-import com.twitter.finagle.util._
-import com.twitter.finagle.pool._
-import com.twitter.finagle._
-import com.twitter.finagle.service._
-import com.twitter.finagle.factory._
-import com.twitter.finagle.filter.{MonitorFilter, ExceptionSourceFilter}
-import com.twitter.finagle.stats.{
- StatsReceiver, RollupStatsReceiver,
- NullStatsReceiver, GlobalStatsReceiver}
-import com.twitter.finagle.loadbalancer.HeapBalancer
-import com.twitter.finagle.ssl.{Engine, Ssl, SslConnectHandler}
import tracing.{NullTracer, TracingFilter, Tracer}
/**
@@ -503,20 +500,20 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
/**
* Encrypt the connection with SSL. The Engine to use can be passed into the client.
- * This allows the user to use client certificates
+ * This allows the user to use client certificates
* No SSL Hostname Validation is performed
*/
def tls(sslContext : SSLContext): This =
- withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, None)))
-
+ withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, None)))
+
/**
* Encrypt the connection with SSL. The Engine to use can be passed into the client.
- * This allows the user to use client certificates
+ * This allows the user to use client certificates
* SSL Hostname Validation is performed, on the passed in hostname
*/
def tls(sslContext : SSLContext, hostname : Option[String]): This =
- withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, hostname)))
-
+ withConfig(_.copy(_tls = Some({ () => Ssl.client(sslContext) }, hostname)))
+
/**
* Do not perform TLS validation. Probably dangerous.
*/
@@ -677,7 +674,8 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
var factory: ServiceFactory[Req, Rep] = null
val bs = buildBootstrap(codec, host)
- factory = codec.rawPrepareClientConnFactory(new ChannelServiceFactory[Any, Any](bs, hostStatsReceiver))
+ factory = codec.prepareConnFactory(
+ new ChannelServiceFactory[Req, Rep](bs, codec.mkClientDispatcher, hostStatsReceiver))
if (config.hostConnectionMaxIdleTime.isDefined ||
config.hostConnectionMaxLifeTime.isDefined) {
Oops, something went wrong.

0 comments on commit 704a757

Please sign in to comment.