Permalink
Browse files

[split] enforce completeness of the ClientBuilder via static typing. …

…we provide phantom types that specifies whether certain constraints are met (eg. the cluster is specified), and the various builder methods transform the builder type appropriately. using an implicit, when can then enforce that the builder is fully specified (statically) at the time build() is called.

this requires 0 code changes for "canonical" uses of builders -- but
whenever the builder types are named, this obviously needs to change.
this slight additional cost also brings an advantage: consumers of
builders may specify how complete the builder needs to be.
  • Loading branch information...
1 parent 6b24ae2 commit 837545f060c9904510e863bb82981bc15a8a39ec @mariusae mariusae committed Apr 26, 2011
Showing with 373 additions and 213 deletions.
  1. +3 −3 finagle-core/src/main/java/com/twitter/finagle/javaapi/HttpClientTest.java
  2. +275 −154 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
  3. +24 −21 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
  4. +4 −0 finagle-core/src/test/scala/com/twitter/finagle/builder/ClientBuilderSpec.scala
  5. +2 −1 finagle-core/src/test/scala/com/twitter/finagle/service/ClientSpec.scala
  6. +2 −1 finagle-example/src/main/scala/com/twitter/finagle/example/echo/EchoClient.scala
  7. +2 −1 finagle-example/src/main/scala/com/twitter/finagle/example/http/HttpClient.scala
  8. +2 −1 finagle-example/src/main/scala/com/twitter/finagle/example/memcachedproxy/MemcachedProxy.scala
  9. +2 −1 finagle-example/src/main/scala/com/twitter/finagle/example/spritzer2kestrel/Spritzer2Kestrel.scala
  10. +2 −1 finagle-example/src/main/scala/com/twitter/finagle/example/stream/StreamClient.scala
  11. +2 −1 finagle-example/src/main/scala/com/twitter/finagle/example/thrift/ThriftClient.scala
  12. +1 −0 finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/Client.scala
  13. +2 −1 finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/integration/ClientSpec.scala
  14. +2 −1 finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/integration/InterpreterServiceSpec.scala
  15. +6 −5 finagle-memcached/src/main/java/com/twitter/finagle/memcached/java/ClientTest.java
  16. +11 −9 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Client.scala
  17. +6 −5 finagle-memcached/src/test/java/com/twitter/finagle/memcached/integration/TestClient.java
  18. +1 −0 finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientSpec.scala
  19. +2 −1 ...e-memcached/src/test/scala/com/twitter/finagle/memcached/integration/InterpreterServiceSpec.scala
  20. +1 −0 finagle-memcached/src/test/scala/com/twitter/finagle/memcached/stress/InterpreterServiceSpec.scala
  21. +1 −0 finagle-native/src/test/scala/com/twitter/finagle/SslSpec.scala
  22. +1 −0 finagle-serversets/src/test/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetClusterSpec.scala
  23. +2 −1 finagle-stream/src/test/scala/com/twitter/finagle/stream/EndToEndSpec.scala
  24. +2 −1 finagle-stream/src/test/scala/com/twitter/finagle/stream/PubSubSpec.scala
  25. +3 −0 finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala
  26. +1 −1 finagle-stress/src/main/scala/com/twitter/finagle/stress/EndToEndStress.scala
  27. +3 −2 finagle-stress/src/main/scala/com/twitter/finagle/stress/LoadBalancerTest.scala
  28. +1 −0 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala
  29. +5 −0 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/FinagleClientThriftServerSpec.scala
  30. +2 −1 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ServiceEndToEndSpec.scala
View
6 finagle-core/src/main/java/com/twitter/finagle/javaapi/HttpClientTest.java
@@ -11,10 +11,10 @@
public class HttpClientTest {
public static void main(String args[]) {
Service<HttpRequest, HttpResponse> client =
- ClientBuilder.get()
- .hosts("localhost:10000")
+ ClientBuilder.safeBuild(ClientBuilder.get()
.codec(Codec4J.Http)
- .build();
+ .hosts("localhost:10000")
+ .hostConnectionLimit(1));
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
View
429 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -1,17 +1,48 @@
package com.twitter.finagle.builder
-/*
- * Provides a class for building clients.
- * The main class to use is [[com.twitter.finagle.builder.ClientBuilder]], as so
+
+/**
+ * Provides a class for building clients. The main class to use is
+ * [[com.twitter.finagle.builder.ClientBuilder]], as so
+ *
* {{{
* val client = ClientBuilder()
* .codec(Http)
* .hosts("localhost:10000,localhost:10001,localhost:10003")
+ * .hostConnectionLimit(1)
* .connectionTimeout(1.second) // max time to spend establishing a TCP connection.
* .retries(2) // (1) per-request retries
* .reportTo(new OstrichStatsReceiver) // export host-level load data to ostrich
* .logger(Logger.getLogger("http"))
* .build()
* }}}
+ *
+ * The `ClientBuilder` requires the definition of `cluster`, `codec`,
+ * and `hostConnectionLimit`. In Scala, these are statically type
+ * checked, and in Java the lack of any of the above causes a runtime
+ * error.
+ *
+ * The `build` method uses an implicit argument to statically
+ * typecheck the builder (to ensure completeness, see above). The Java
+ * compiler cannot provide such implicit, so we provide a separate
+ * function in Java to accomplish this. Thus, the Java code for the
+ * above is
+ *
+ * {{{
+ * Service<HttpRequest, HttpResponse> service =
+ * ClientBuilder.safeBuild(
+ * ClientBuilder.get()
+ * .codec(new Http())
+ * .hosts("localhost:10000,localhost:10001,localhost:10003")
+ * .hostConnectionLimit(1)
+ * .connectionTimeout(1.second)
+ * .retries(2)
+ * .reportTo(new OstrichStatsReceiver())
+ * .logger(Logger.getLogger("http")))
+ * }}}
+ *
+ * Alternatively, using the `unsafeBuild` method on `ClientBuilder`
+ * verifies the builder dynamically, resulting in a runtime error
+ * instead of a compiler error.
*/
import java.net.{InetSocketAddress, SocketAddress}
@@ -39,210 +70,281 @@ import com.twitter.finagle.loadbalancer.{LoadBalancedFactory, LeastQueuedStrateg
* Factory for [[com.twitter.finagle.builder.ClientBuilder]] instances
*/
object ClientBuilder {
- def apply() = new ClientBuilder[Any, Any]
+ type Complete[Req, Rep] =
+ ClientBuilder[Req, Rep, ClientConfig.Yes, ClientConfig.Yes, ClientConfig.Yes]
+ type NoCluster[Req, Rep] =
+ ClientBuilder[Req, Rep, Nothing, ClientConfig.Yes, ClientConfig.Yes]
+ type NoCodec =
+ ClientBuilder[_, _, ClientConfig.Yes, Nothing, ClientConfig.Yes]
+
+ def apply() = new ClientBuilder()
+
+ /**
+ * Used for Java access.
+ */
def get() = apply()
- val defaultChannelFactory =
+ /**
+ * Provides a typesafe `build` for Java.
+ */
+ def safeBuild[Req, Rep](builder: Complete[Req, Rep]): Service[Req, Rep] =
+ builder.build()
+
+ /**
+ * Provides a typesafe `buildFactory` for Java.
+ */
+ def safeBuildFactory[Req, Rep](builder: Complete[Req, Rep]): ServiceFactory[Req, Rep] =
+ builder.buildFactory()
+
+ private[finagle] val defaultChannelFactory =
new ReferenceCountedChannelFactory(
new LazyRevivableChannelFactory(() =>
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())))
}
+object ClientConfig {
+ sealed abstract trait Yes
+}
+
/**
- * A handy way to construct an RPC Client.
- *
- * ''Note'': A word about the default values:
- *
- * o connectionTimeout: optimized for within a datanceter
- * o by default, no request timeout
+ * TODO: do we really need to specify HasCodec? -- it's implied in a
+ * way by the proper Req, Rep
*/
-case class ClientBuilder[Req, Rep](
- private val _cluster: Option[Cluster],
- private val _codec: Option[ClientCodec[Req, Rep]],
- private val _connectionTimeout: Duration,
- private val _requestTimeout: Duration,
- private val _statsReceiver: Option[StatsReceiver],
- private val _loadStatistics: (Int, Duration),
- private val _name: Option[String],
- private val _hostConnectionCoresize: Option[Int],
- private val _hostConnectionLimit: Option[Int],
- private val _hostConnectionIdleTime: Option[Duration],
- private val _hostConnectionMaxIdleTime: Option[Duration],
- private val _hostConnectionMaxLifeTime: Option[Duration],
- private val _sendBufferSize: Option[Int],
- private val _recvBufferSize: Option[Int],
- private val _retries: Option[Int],
- private val _logger: Option[Logger],
- private val _channelFactory: Option[ReferenceCountedChannelFactory],
- private val _tls: Option[SSLContext],
- private val _startTls: Boolean)
+
+final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit](
+ private val _cluster : Option[Cluster] = None,
+ private val _codec : Option[ClientCodec[Req, Rep]] = None,
+ private val _connectionTimeout : Duration = 10.milliseconds,
+ private val _requestTimeout : Duration = Duration.MaxValue,
+ private val _statsReceiver : Option[StatsReceiver] = None,
+ private val _name : Option[String] = Some("client"),
+ private val _hostConnectionCoresize : Option[Int] = None,
+ private val _hostConnectionLimit : Option[Int] = None,
+ private val _hostConnectionIdleTime : Option[Duration] = None,
+ private val _hostConnectionMaxIdleTime : Option[Duration] = None,
+ private val _hostConnectionMaxLifeTime : Option[Duration] = None,
+ private val _sendBufferSize : Option[Int] = None,
+ private val _recvBufferSize : Option[Int] = None,
+ private val _retries : Option[Int] = None,
+ private val _logger : Option[Logger] = None,
+ private val _channelFactory : Option[ReferenceCountedChannelFactory] = None,
+ private val _tls : Option[SSLContext] = None,
+ private val _startTls : Boolean = false)
{
- def this() = this(
- None, // cluster
- None, // codec
- 10.milliseconds, // connectionTimeout
- Duration.MaxValue, // requestTimeout
- None, // statsReceiver
- (60, 10.seconds), // loadStatistics
- Some("client"), // name
- None, // hostConnectionCoresize
- None, // hostConnectionLimit
- None, // hostConnectionIdleTime
- None, // hostConnectionMaxIdleTime
- None, // hostConnectionMaxLifeTime
- None, // sendBufferSize
- None, // recvBufferSize
- None, // retries
- None, // logger
- None, // channelFactory
- None, // tls
- false // startTls
- )
+ import ClientConfig._
- private[this] def options = Seq(
- "name" -> _name,
+ /**
+ * The Scala compiler errors if the case class members don't have underscores.
+ * Nevertheless, we want a friendly public API so we create delegators without
+ * underscores.
+ */
+ val cluster = _cluster
+ val codec = _codec
+ val connectionTimeout = _connectionTimeout
+ val requestTimeout = _requestTimeout
+ val statsReceiver = _statsReceiver
+ val name = _name
+ val hostConnectionCoresize = _hostConnectionCoresize
+ val hostConnectionLimit = _hostConnectionLimit
+ val hostConnectionIdleTime = _hostConnectionIdleTime
+ val hostConnectionMaxIdleTime = _hostConnectionMaxIdleTime
+ val hostConnectionMaxLifeTime = _hostConnectionMaxLifeTime
+ val sendBufferSize = _sendBufferSize
+ val recvBufferSize = _recvBufferSize
+ val retries = _retries
+ val logger = _logger
+ val channelFactory = _channelFactory
+ val tls = _tls
+ val startTls = _startTls
+
+ def toMap = Map(
"cluster" -> _cluster,
"codec" -> _codec,
"connectionTimeout" -> Some(_connectionTimeout),
"requestTimeout" -> Some(_requestTimeout),
"statsReceiver" -> _statsReceiver,
- "loadStatistics" -> _loadStatistics,
- "hostConnectionLimit" -> Some(_hostConnectionLimit),
- "hostConnectionCoresize" -> Some(_hostConnectionCoresize),
- "hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
- "hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime),
- "hostConnectionMaxLifeTime" -> Some(_hostConnectionMaxLifeTime),
+ "name" -> _name,
+ "hostConnectionCoresize" -> _hostConnectionCoresize,
+ "hostConnectionLimit" -> _hostConnectionLimit,
+ "hostConnectionIdleTime" -> _hostConnectionIdleTime,
+ "hostConnectionMaxIdleTime" -> _hostConnectionMaxIdleTime,
+ "hostConnectionMaxLifeTime" -> _hostConnectionMaxLifeTime,
"sendBufferSize" -> _sendBufferSize,
"recvBufferSize" -> _recvBufferSize,
"retries" -> _retries,
"logger" -> _logger,
"channelFactory" -> _channelFactory,
"tls" -> _tls,
- "startTls" -> _startTls
+ "startTls" -> Some(_startTls)
)
- override def toString() = {
- "ClientBuilder(%s)".format(
- options flatMap {
- case (k, Some(v)) => Some("%s=%s".format(k, v))
- case _ => None
+ override def toString = {
+ "ClientConfig(%s)".format(
+ toMap flatMap {
+ case (k, Some(v)) =>
+ Some("%s=%s".format(k, v))
+ case _ =>
+ None
} mkString(", "))
}
- def hosts(hostnamePortCombinations: String): ClientBuilder[Req, Rep] = {
- val addresses = InetSocketAddressUtil.parseHosts(
- hostnamePortCombinations)
- hosts(addresses)
- }
+ def validated: ClientConfig[Req, Rep, Yes, Yes, Yes] = {
+ cluster getOrElse { throw new IncompleteSpecification("No hosts were specified") }
+ codec getOrElse { throw new IncompleteSpecification("No codec was specified") }
+ hostConnectionLimit getOrElse {
+ throw new IncompleteSpecification("No host connection limit was specified")
+ }
- def hosts(addresses: Seq[SocketAddress]): ClientBuilder[Req, Rep] = {
- val _cluster = new SocketAddressCluster(addresses)
- cluster(_cluster)
+ copy()
}
+}
+
+class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] private[builder](
+ config: ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit]
+) {
+ import ClientConfig._
- def hosts(address: SocketAddress): ClientBuilder[Req, Rep] = hosts(Seq(address))
+ // Convenient aliases.
+ type FullySpecifiedConfig = ClientConfig[Req, Rep, Yes, Yes, Yes]
+ type ThisConfig = ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit]
+ type This = ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit]
- def cluster(cluster: Cluster): ClientBuilder[Req, Rep] = {
- copy(_cluster = Some(cluster))
+ private[builder] def this() = this(new ClientConfig)
+
+ override def toString() = "ClientBuilder(%s)".format(config.toString)
+
+ protected def copy[Req1, Rep1, HasCluster1, HasCodec1, HasHostConnectionLimit1](
+ config: ClientConfig[Req1, Rep1, HasCluster1, HasCodec1, HasHostConnectionLimit1]
+ ): ClientBuilder[Req1, Rep1, HasCluster1, HasCodec1, HasHostConnectionLimit1] = {
+ new ClientBuilder(config)
}
- def codec[Req1, Rep1](codec: ClientCodec[Req1, Rep1]) =
- copy(_codec = Some(codec))
+ protected def withConfig[Req1, Rep1, HasCluster1, HasCodec1, HasHostConnectionLimit1](
+ f: ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] =>
+ ClientConfig[Req1, Rep1, HasCluster1, HasCodec1, HasHostConnectionLimit1]
+ ): ClientBuilder[Req1, Rep1, HasCluster1, HasCodec1, HasHostConnectionLimit1] = copy(f(config))
+
+ def hosts(
+ hostnamePortCombinations: String
+ ): ClientBuilder[Req, Rep, Yes, HasCodec, HasHostConnectionLimit] = {
+ val addresses = InetSocketAddressUtil.parseHosts(hostnamePortCombinations)
+ hosts(addresses)
+ }
- def protocol[Req1, Rep1](protocol: Protocol[Req1, Rep1]) =
- copy(_codec = Some(new ClientCodec[Req1, Rep1] {
+ def hosts(
+ addresses: Seq[SocketAddress]
+ ): ClientBuilder[Req, Rep, Yes, HasCodec, HasHostConnectionLimit] =
+ cluster(new SocketAddressCluster(addresses))
+
+ def hosts(
+ address: SocketAddress
+ ): ClientBuilder[Req, Rep, Yes, HasCodec, HasHostConnectionLimit] =
+ hosts(Seq(address))
+
+ def cluster(
+ cluster: Cluster
+ ): ClientBuilder[Req, Rep, Yes, HasCodec, HasHostConnectionLimit] =
+ withConfig(_.copy(_cluster = Some(cluster)))
+
+ def codec[Req1, Rep1](
+ codec: ClientCodec[Req1, Rep1]
+ ): ClientBuilder[Req1, Rep1, HasCluster, Yes, HasHostConnectionLimit] =
+ withConfig(_.copy(_codec = Some(codec)))
+
+ def protocol[Req1, Rep1](
+ protocol: Protocol[Req1, Rep1]
+ ): ClientBuilder[Req1, Rep1, HasCluster, HasCodec, HasHostConnectionLimit] = {
+ val codec = new ClientCodec[Req1, Rep1] {
def pipelineFactory = protocol.codec.clientCodec.pipelineFactory
override def prepareService(underlying: Service[Req1, Rep1]) = {
val future = protocol.codec.clientCodec.prepareService(underlying)
future flatMap { protocol.prepareChannel(_) }
}
- }))
-
- def codec[Req1, Rep1](codec: Codec[Req1, Rep1]) =
- copy(_codec = Some(codec.clientCodec))
+ }
- def connectionTimeout(duration: Duration) =
- copy(_connectionTimeout = duration)
+ withConfig(_.copy(_codec = Some(codec)))
+ }
- def requestTimeout(duration: Duration) =
- copy(_requestTimeout = duration)
+ def codec[Req1, Rep1](
+ codec: Codec[Req1, Rep1]
+ ): ClientBuilder[Req1, Rep1, HasCluster, Yes, HasHostConnectionLimit] =
+ withConfig(_.copy(_codec = Some(codec.clientCodec)))
- def reportTo(receiver: StatsReceiver) =
- copy(_statsReceiver = Some(receiver))
+ def connectionTimeout(duration: Duration): This =
+ withConfig(_.copy(_connectionTimeout = duration))
- /**
- * The interval over which to aggregate load statistics.
- */
- def loadStatistics(numIntervals: Int, interval: Duration) = {
- require(numIntervals >= 1, "Must have at least 1 window to sample statistics over")
+ def requestTimeout(duration: Duration): This =
+ withConfig(_.copy(_requestTimeout = duration))
- copy(_loadStatistics = (numIntervals, interval))
- }
+ def reportTo(receiver: StatsReceiver): This =
+ withConfig(_.copy(_statsReceiver = Some(receiver)))
- def name(value: String) = copy(_name = Some(value))
+ def name(value: String): This = withConfig(_.copy(_name = Some(value)))
- def hostConnectionLimit(value: Int) =
- copy(_hostConnectionLimit = Some(value))
+ def hostConnectionLimit(value: Int): ClientBuilder[Req, Rep, HasCluster, HasCodec, Yes] =
+ withConfig(_.copy(_hostConnectionLimit = Some(value)))
- def hostConnectionCoresize(value: Int) =
- copy(_hostConnectionCoresize = Some(value))
+ def hostConnectionCoresize(value: Int): This =
+ withConfig(_.copy(_hostConnectionCoresize = Some(value)))
- def hostConnectionIdleTime(timeout: Duration) =
- copy(_hostConnectionIdleTime = Some(timeout))
+ def hostConnectionIdleTime(timeout: Duration): This =
+ withConfig(_.copy(_hostConnectionIdleTime = Some(timeout)))
- def hostConnectionMaxIdleTime(timeout: Duration) =
- copy(_hostConnectionMaxIdleTime = Some(timeout))
+ def hostConnectionMaxIdleTime(timeout: Duration): This =
+ withConfig(_.copy(_hostConnectionMaxIdleTime = Some(timeout)))
- def hostConnectionMaxLifeTime(timeout: Duration) =
- copy(_hostConnectionMaxLifeTime = Some(timeout))
+ def hostConnectionMaxLifeTime(timeout: Duration): This =
+ withConfig(_.copy(_hostConnectionMaxLifeTime = Some(timeout)))
- def retries(value: Int) =
- copy(_retries = Some(value))
+ def retries(value: Int): This =
+ withConfig(_.copy(_retries = Some(value)))
- def sendBufferSize(value: Int) = copy(_sendBufferSize = Some(value))
- def recvBufferSize(value: Int) = copy(_recvBufferSize = Some(value))
+ def sendBufferSize(value: Int): This = withConfig(_.copy(_sendBufferSize = Some(value)))
+ def recvBufferSize(value: Int): This = withConfig(_.copy(_recvBufferSize = Some(value)))
/**
* Use the given channel factory instead of the default. Note that
* when using a non-default ChannelFactory, finagle can't
* meaningfully reference count factory usage, and so the caller is
* responsible for calling ``releaseExternalResources()''.
*/
- def channelFactory(cf: ReferenceCountedChannelFactory) =
- copy(_channelFactory = Some(cf))
+ def channelFactory(cf: ReferenceCountedChannelFactory): This =
+ withConfig(_.copy(_channelFactory = Some(cf)))
+
+ def tls(): This =
+ withConfig(_.copy(_tls = Some(Ssl.client())))
- def tls() =
- copy(_tls = Some(Ssl.client()))
+ def tlsWithoutValidation(): This =
+ withConfig(_.copy(_tls = Some(Ssl.clientWithoutCertificateValidation())))
- def tlsWithoutValidation() =
- copy(_tls = Some(Ssl.clientWithoutCertificateValidation()))
+ def startTls(value: Boolean): This =
+ withConfig(_.copy(_startTls = true))
- def startTls(value: Boolean) =
- copy(_startTls = true)
+ def logger(logger: Logger): This = withConfig(_.copy(_logger = Some(logger)))
- def logger(logger: Logger) = copy(_logger = Some(logger))
+ /* BUILDING */
+ /* ======== */
- // ** BUILDING
private[this] def buildBootstrap(codec: ClientCodec[Req, Rep], host: SocketAddress) = {
- val cf = _channelFactory getOrElse ClientBuilder.defaultChannelFactory
+ val cf = config.channelFactory getOrElse ClientBuilder.defaultChannelFactory
cf.acquire()
val bs = new ClientBootstrap(cf)
val pf = new ChannelPipelineFactory {
override def getPipeline = {
val pipeline = codec.pipelineFactory.getPipeline
- for (ctx <- _tls) {
+ for (ctx <- config.tls) {
val sslEngine = ctx.createSSLEngine()
sslEngine.setUseClientMode(true)
// sslEngine.setEnableSessionCreation(true) // XXX - need this?
- pipeline.addFirst("ssl", new SslHandler(sslEngine, _startTls))
+ pipeline.addFirst("ssl", new SslHandler(sslEngine, config.startTls))
}
- for (logger <- _logger) {
+ for (logger <- config.logger) {
pipeline.addFirst("channelSnooper",
- ChannelSnooper(_name.get)(logger.info))
+ ChannelSnooper(config.name.get)(logger.info))
}
pipeline
@@ -251,55 +353,57 @@ case class ClientBuilder[Req, Rep](
bs.setPipelineFactory(pf)
bs.setOption("remoteAddress", host)
- bs.setOption("connectTimeoutMillis", _connectionTimeout.inMilliseconds)
+ bs.setOption("connectTimeoutMillis", config.connectionTimeout.inMilliseconds)
bs.setOption("tcpNoDelay", true) // fin NAGLE. get it?
// bs.setOption("soLinger", 0) (TODO)
bs.setOption("reuseAddress", true)
- _sendBufferSize foreach { s => bs.setOption("sendBufferSize", s) }
- _recvBufferSize foreach { s => bs.setOption("receiveBufferSize", s) }
+ config.sendBufferSize foreach { s => bs.setOption("sendBufferSize", s) }
+ config.recvBufferSize foreach { s => bs.setOption("receiveBufferSize", s) }
bs
}
private[this] def buildPool(factory: ServiceFactory[Req, Rep], statsReceiver: StatsReceiver) = {
// These are conservative defaults, but probably the only safe
// thing to do.
- val lowWatermark = _hostConnectionCoresize getOrElse(1)
- val highWatermark = _hostConnectionLimit getOrElse(Int.MaxValue)
- val idleTime = _hostConnectionIdleTime getOrElse(5.seconds)
+ val lowWatermark = config.hostConnectionCoresize getOrElse(1)
+ val highWatermark = config.hostConnectionLimit getOrElse(Int.MaxValue)
+ val idleTime = config.hostConnectionIdleTime getOrElse(5.seconds)
val cachingPool = new CachingPool(factory, idleTime)
new WatermarkPool[Req, Rep](cachingPool, lowWatermark, highWatermark, statsReceiver)
}
private[this] def prepareService(service: Service[Req, Rep]) = {
- val codec = _codec.get
+ val codec = config.codec.get
var future: Future[Service[Req, Rep]] = null
future = codec.prepareService(service)
- if (_hostConnectionMaxIdleTime.isDefined || _hostConnectionMaxLifeTime.isDefined) {
- future = future map {
- new ExpiringService(_, _hostConnectionMaxIdleTime, _hostConnectionMaxLifeTime)
+ if (config.hostConnectionMaxIdleTime.isDefined ||
+ config.hostConnectionMaxLifeTime.isDefined) {
+ future = future map { underlying =>
+ new ExpiringService(
+ underlying,
+ config.hostConnectionMaxIdleTime,
+ config.hostConnectionMaxLifeTime)
}
}
future
}
/**
- * Construct a ServiceFactory. This is useful for stateful protocols (e.g.,
- * those that support transactions or authentication).
+ * Construct a ServiceFactory. This is useful for stateful protocols
+ * (e.g., those that support transactions or authentication).
*/
- def buildFactory(): ServiceFactory[Req, Rep] = {
- if (!_cluster.isDefined)
- throw new IncompleteSpecification("No hosts were specified")
- if (!_codec.isDefined)
- throw new IncompleteSpecification("No codec was specified")
-
+ def buildFactory()(
+ implicit THE_BUILDER_IS_NOT_FULLY_SPECIFIED_SEE_ClientBuilder_DOCUMENTATION:
+ ThisConfig => FullySpecifiedConfig
+ ): ServiceFactory[Req, Rep] = {
Timer.default.acquire()
- val cluster = _cluster.get
- val codec = _codec.get
+ val cluster = config.cluster.get
+ val codec = config.codec.get
val hostFactories = cluster mkFactories { host =>
// The per-host stack is as follows:
@@ -311,13 +415,13 @@ case class ClientBuilder[Req, Rep](
//
// the pool & below are host-specific,
- val hostStatsReceiver = _statsReceiver map { statsReceiver =>
+ val hostStatsReceiver = config.statsReceiver map { statsReceiver =>
val hostname = host match {
case iaddr: InetSocketAddress => "%s:%d".format(iaddr.getHostName, iaddr.getPort)
case other => other.toString
}
- val scoped = _name map (statsReceiver.scope(_)) getOrElse statsReceiver
+ val scoped = config.name map (statsReceiver.scope(_)) getOrElse statsReceiver
new RollupStatsReceiver(scoped).withSuffix(hostname)
} getOrElse NullStatsReceiver
@@ -328,8 +432,8 @@ case class ClientBuilder[Req, Rep](
bs, prepareService _, hostStatsReceiver)
factory = buildPool(factory, hostStatsReceiver)
- if (_requestTimeout < Duration.MaxValue) {
- val filter = new TimeoutFilter[Req, Rep](_requestTimeout)
+ if (config.requestTimeout < Duration.MaxValue) {
+ val filter = new TimeoutFilter[Req, Rep](config.requestTimeout)
factory = filter andThen factory
}
@@ -350,16 +454,33 @@ case class ClientBuilder[Req, Rep](
/**
* Construct a Service.
*/
- def build(): Service[Req, Rep] = {
+ def build()(
+ implicit THE_BUILDER_IS_NOT_FULLY_SPECIFIED_SEE_ClientBuilder_DOCUMENTATION:
+ ThisConfig => FullySpecifiedConfig
+ ): Service[Req, Rep] = {
var service: Service[Req, Rep] = new FactoryToService[Req, Rep](buildFactory())
// We keep the retrying filter at the very bottom: this allows us
// to retry across multiple hosts, etc.
- _retries map { numRetries =>
+ config.retries map { numRetries =>
val filter = new RetryingFilter[Req, Rep](new NumTriesRetryStrategy(numRetries))
service = filter andThen service
}
service
}
+
+ /**
+ * Construct a Service, with runtime checks for builder
+ * completeness.
+ */
+ def unsafeBuild(): Service[Req, Rep] =
+ withConfig(_.validated).build()
+
+ /**
+ * Construct a ServiceFactory, with runtime checks for builder
+ * completeness.
+ */
+ def unsafeBuildFactory(): ServiceFactory[Req, Rep] =
+ withConfig(_.validated).buildFactory()
}
View
45 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
@@ -153,9 +153,9 @@ final case class ServerConfig[Req, Rep](
}
/**
- * A handy Builder for constructing Servers (i.e., binding Services to a port).
- * This class is subclassable. Override copy() and build() to do your own
- * dirty work.
+ * A handy Builder for constructing Servers (i.e., binding Services to
+ * a port). This class is subclassable. Override copy() and build()
+ * to do your own dirty work.
*/
class ServerBuilder[Req, Rep](val config: ServerConfig[Req, Rep]) {
import ServerBuilder._
@@ -167,59 +167,62 @@ class ServerBuilder[Req, Rep](val config: ServerConfig[Req, Rep]) {
protected def copy[Req1, Rep1](config: ServerConfig[Req1, Rep1]) =
new ServerBuilder(config)
+ protected def withConfig[Req1, Rep1](f: ServerConfig[Req, Rep] => ServerConfig[Req1, Rep1]) =
+ copy(f(config))
+
def codec[Req1, Rep1](codec: Codec[Req1, Rep1]) =
- copy(config.copy(_codec = Some(codec.serverCodec)))
+ withConfig(_.copy(_codec = Some(codec.serverCodec)))
def codec[Req1, Rep1](codec: ServerCodec[Req1, Rep1]) =
- copy(config.copy(_codec = Some(codec)))
+ withConfig(_.copy(_codec = Some(codec)))
def reportTo(receiver: StatsReceiver) =
- copy(config.copy(_statsReceiver = Some(receiver)))
+ withConfig(_.copy(_statsReceiver = Some(receiver)))
def name(value: String) =
- copy(config.copy(_name = Some(value)))
+ withConfig(_.copy(_name = Some(value)))
def sendBufferSize(value: Int) =
- copy(config.copy(_sendBufferSize = Some(value)))
+ withConfig(_.copy(_sendBufferSize = Some(value)))
def recvBufferSize(value: Int) =
- copy(config.copy(_recvBufferSize = Some(value)))
+ withConfig(_.copy(_recvBufferSize = Some(value)))
def bindTo(address: SocketAddress) =
- copy(config.copy(_bindTo = Some(address)))
+ withConfig(_.copy(_bindTo = Some(address)))
def channelFactory(cf: ReferenceCountedChannelFactory) =
- copy(config.copy(_channelFactory = cf))
+ withConfig(_.copy(_channelFactory = cf))
def logger(logger: Logger) =
- copy(config.copy(_logger = Some(logger)))
+ withConfig(_.copy(_logger = Some(logger)))
def tls(certificatePath: String, keyPath: String) =
- copy(config.copy(_tls = Some(certificatePath, keyPath)))
+ withConfig(_.copy(_tls = Some(certificatePath, keyPath)))
def startTls(value: Boolean) =
- copy(config.copy(_startTls = true))
+ withConfig(_.copy(_startTls = true))
def maxConcurrentRequests(max: Int) =
- copy(config.copy(_maxConcurrentRequests = Some(max)))
+ withConfig(_.copy(_maxConcurrentRequests = Some(max)))
def hostConnectionMaxIdleTime(howlong: Duration) =
- copy(config.copy(_hostConnectionMaxIdleTime = Some(howlong)))
+ withConfig(_.copy(_hostConnectionMaxIdleTime = Some(howlong)))
def hostConnectionMaxLifeTime(howlong: Duration) =
- copy(config.copy(_hostConnectionMaxLifeTime = Some(howlong)))
+ withConfig(_.copy(_hostConnectionMaxLifeTime = Some(howlong)))
def requestTimeout(howlong: Duration) =
- copy(config.copy(_requestTimeout = Some(howlong)))
+ withConfig(_.copy(_requestTimeout = Some(howlong)))
def readTimeout(howlong: Duration) =
- copy(config.copy(_readTimeout = Some(howlong)))
+ withConfig(_.copy(_readTimeout = Some(howlong)))
def writeCompletionTimeout(howlong: Duration) =
- copy(config.copy(_writeCompletionTimeout = Some(howlong)))
+ withConfig(_.copy(_writeCompletionTimeout = Some(howlong)))
def traceReceiver(receiver: TraceReceiver) =
- copy(config.copy(_traceReceiver = receiver))
+ withConfig(_.copy(_traceReceiver = receiver))
/**
* Construct the Server, given the provided Service.
View
4 finagle-core/src/test/scala/com/twitter/finagle/builder/ClientBuilderSpec.scala
@@ -60,9 +60,11 @@ object ClientBuilderSpec extends Specification with Mockito {
// Client
val client = ClientBuilder()
+ .codec(_codec)
.channelFactory(refcountedChannelFactory)
.protocol(protocol)
.hosts(Seq(clientAddress))
+ .hostConnectionLimit(1)
.build()
val requestFuture = client(123)
@@ -86,12 +88,14 @@ object ClientBuilderSpec extends Specification with Mockito {
.channelFactory(refcountedChannelFactory)
.codec(_codec)
.hosts(Seq(clientAddress))
+ .hostConnectionLimit(1)
.build()
val client2 = ClientBuilder()
.channelFactory(refcountedChannelFactory)
.codec(_codec)
.hosts(Seq(clientAddress))
+ .hostConnectionLimit(1)
.build()
client1.release()
View
3 finagle-core/src/test/scala/com/twitter/finagle/service/ClientSpec.scala
@@ -14,7 +14,7 @@ import com.twitter.finagle.builder.{
import com.twitter.finagle.ChannelClosedException
object ClientSpec extends Specification {
- def withServer(handler: ChannelHandler)(spec: ClientBuilder[HttpRequest, HttpResponse] => Unit) {
+ def withServer(handler: ChannelHandler)(spec: ClientBuilder.Complete[HttpRequest, HttpResponse] => Unit) {
val cf = new DefaultLocalServerChannelFactory()
val bs = new ServerBootstrap(cf)
@@ -33,6 +33,7 @@ object ClientSpec extends Specification {
ClientBuilder()
.channelFactory(new ReferenceCountedChannelFactory(new DefaultLocalClientChannelFactory))
.hosts(Seq(serverAddress))
+ .hostConnectionLimit(1)
.codec(Http)
try {
View
3 finagle-example/src/main/scala/com/twitter/finagle/example/echo/EchoClient.scala
@@ -11,6 +11,7 @@ object EchoClient {
val client: Service[String, String] = ClientBuilder()
.codec(StringCodec)
.hosts(new InetSocketAddress(8080))
+ .hostConnectionLimit(1)
.build()
// Issue a newline-delimited request, respond to the result
@@ -24,4 +25,4 @@ object EchoClient {
client.release()
}
}
-}
+}
View
3 finagle-example/src/main/scala/com/twitter/finagle/example/http/HttpClient.scala
@@ -38,6 +38,7 @@ object HttpClient {
val clientWithoutErrorHandling: Service[HttpRequest, HttpResponse] = ClientBuilder()
.codec(Http)
.hosts(new InetSocketAddress(8080))
+ .hostConnectionLimit(1)
.build()
val handleErrors = new HandleErrors
@@ -76,4 +77,4 @@ object HttpClient {
println("))) Unauthorized request errored (as desired): " + error.getClass.getName)
}
}
-}
+}
View
3 ...le-example/src/main/scala/com/twitter/finagle/example/memcachedproxy/MemcachedProxy.scala
@@ -20,6 +20,7 @@ object MemcachedProxy {
val client: Service[Command, Response] = ClientBuilder()
.codec(Memcached)
.hosts(new InetSocketAddress(11211))
+ .hostConnectionLimit(1)
.build()
val proxyService = new Service[Command, Response] {
@@ -42,4 +43,4 @@ object MemcachedProxy {
}
}
-}
+}
View
3 ...xample/src/main/scala/com/twitter/finagle/example/spritzer2kestrel/Spritzer2Kestrel.scala
@@ -33,6 +33,7 @@ object Spritzer2Kestrel {
val spritzerClient = ClientBuilder()
.codec(Stream)
.hosts("stream.twitter.com:80")
+ .hostConnectionLimit(1)
.build()
val spritzer: Channel[ChannelBuffer] = {
val request = {
@@ -77,4 +78,4 @@ object Spritzer2Kestrel {
System.exit(1)
}
}
-}
+}
View
3 finagle-example/src/main/scala/com/twitter/finagle/example/stream/StreamClient.scala
@@ -22,6 +22,7 @@ object StreamClient {
val clientFactory: ServiceFactory[HttpRequest, Channel[ChannelBuffer]] = ClientBuilder()
.codec(Stream)
.hosts(new InetSocketAddress(8080))
+ .hostConnectionLimit(1)
.buildFactory()
for {
@@ -43,4 +44,4 @@ object StreamClient {
}
}
}
-}
+}
View
3 finagle-example/src/main/scala/com/twitter/finagle/example/thrift/ThriftClient.scala
@@ -13,6 +13,7 @@ object ThriftClient {
val service: Service[ThriftClientRequest, Array[Byte]] = ClientBuilder()
.hosts(new InetSocketAddress(8080))
.codec(ThriftClientFramedCodec())
+ .hostConnectionLimit(1)
.build()
// Wrap the raw Thrift service in a Client decorator. The client
@@ -26,4 +27,4 @@ object ThriftClient {
service.release()
}
}
-}
+}
View
1 finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/Client.scala
@@ -19,6 +19,7 @@ object Client {
val service = ClientBuilder()
.codec(new Kestrel)
.hosts(hosts)
+ .hostConnectionLimit(1)
.buildFactory()
apply(service)
}
View
3 finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/integration/ClientSpec.scala
@@ -20,6 +20,7 @@ object ClientSpec extends Specification {
val serviceFactory = ClientBuilder()
.hosts("localhost:22133")
.codec(new Kestrel)
+ .hostConnectionLimit(1)
.buildFactory()
val client = Client(serviceFactory)
@@ -87,4 +88,4 @@ object ClientSpec extends Specification {
}
}
}
-}
+}
View
3 ...strel/src/test/scala/com/twitter/finagle/kestrel/integration/InterpreterServiceSpec.scala
@@ -21,6 +21,7 @@ object InterpreterServiceSpec extends Specification {
client = ClientBuilder()
.hosts("localhost:" + address.getPort)
.codec(new Kestrel)
+ .hostConnectionLimit(1)
.build()
}
@@ -52,4 +53,4 @@ object InterpreterServiceSpec extends Specification {
}
}
}
-}
+}
View
11 finagle-memcached/src/main/java/com/twitter/finagle/memcached/java/ClientTest.java
@@ -11,11 +11,12 @@
public class ClientTest {
public static void main(String[] args) {
Service<Command, Response> service =
- ClientBuilder
- .get()
- .hosts("localhost:11211")
- .codec(new Memcached())
- .build();
+ ClientBuilder.safeBuild(
+ ClientBuilder
+ .get()
+ .hosts("localhost:11211")
+ .hostConnectionLimit(1)
+ .codec(new Memcached()));
Client client = Client.newInstance(service);
client.delete("foo").get();
View
20 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Client.scala
@@ -9,7 +9,7 @@ import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import org.jboss.netty.util.CharsetUtil
import org.jboss.netty.buffer.ChannelBuffer
import scala.collection.JavaConversions._
-import com.twitter.finagle.builder.ClientBuilder
+import com.twitter.finagle.builder.{ClientBuilder, ClientConfig}
import text.Memcached
import com.twitter.finagle.Service
import com.twitter.util.{Time, Future}
@@ -23,6 +23,7 @@ object Client {
def apply(host: String): Client = Client(
ClientBuilder()
.hosts(host)
+ .hostConnectionLimit(1)
.codec(new Memcached)
.build())
@@ -357,11 +358,10 @@ class KetamaClient(clients: Map[(String, Int, Int), Client], keyHasher: KeyHashe
}
}
-
case class KetamaClientBuilder(
_nodes: Seq[(String, Int, Int)],
_hashName: Option[String],
- _clientBuilder: Option[ClientBuilder[Command, Response]]) {
+ _clientBuilder: Option[ClientBuilder[_, _, Nothing, Nothing, ClientConfig.Yes]]) {
def this() = this(
Nil, // nodes
@@ -378,13 +378,15 @@ case class KetamaClientBuilder(
def hashName(hashName: String): KetamaClientBuilder =
copy(_hashName = Some(hashName))
- def clientBuilder(clientBuilder: ClientBuilder[Command, Response]): KetamaClientBuilder =
+ def clientBuilder(clientBuilder: ClientBuilder[_, _, Nothing, Nothing, ClientConfig.Yes]): KetamaClientBuilder =
copy(_clientBuilder = Some(clientBuilder))
def build(): PartitionedClient = {
- val builder = _clientBuilder getOrElse ClientBuilder()
+ val builder = _clientBuilder getOrElse ClientBuilder().hostConnectionLimit(1)
+
val clients = Map() ++ _nodes.map { case (hostname, port, weight) =>
- val client = Client(builder.hosts(hostname + ":" + port).codec(new Memcached).build())
+ val b = builder.hosts(hostname + ":" + port).codec(new Memcached)
+ val client = Client(b.build())
((hostname, port, weight) -> client)
}
val keyHasher = KeyHasher.byName(_hashName.getOrElse("ketama"))
@@ -409,7 +411,7 @@ class RubyMemCacheClient(clients: Seq[Client]) extends PartitionedClient {
*/
case class RubyMemCacheClientBuilder(
_nodes: Seq[(String, Int, Int)],
- _clientBuilder: Option[ClientBuilder[Command, Response]]) {
+ _clientBuilder: Option[ClientBuilder[_, _, Nothing, Nothing, ClientConfig.Yes]]) {
def this() = this(
Nil, // nodes
@@ -422,11 +424,11 @@ case class RubyMemCacheClientBuilder(
def nodes(hostPortWeights: String): RubyMemCacheClientBuilder =
copy(_nodes = PartitionedClient.parseHostPortWeights(hostPortWeights))
- def clientBuilder(clientBuilder: ClientBuilder[Command, Response]): RubyMemCacheClientBuilder =
+ def clientBuilder(clientBuilder: ClientBuilder[_, _, Nothing, Nothing, ClientConfig.Yes]): RubyMemCacheClientBuilder =
copy(_clientBuilder = Some(clientBuilder))
def build(): PartitionedClient = {
- val builder = _clientBuilder getOrElse ClientBuilder()
+ val builder = _clientBuilder getOrElse ClientBuilder().hostConnectionLimit(1)
val clients = _nodes.map { case (hostname, port, weight) =>
require(weight == 1, "Ruby memcache node weight must be 1")
Client(builder.hosts(hostname + ":" + port).codec(new Memcached).build())
View
11 finagle-memcached/src/test/java/com/twitter/finagle/memcached/integration/TestClient.java
@@ -18,11 +18,12 @@ public static void main(String[] args) {
public void testGetAndSet() {
Service<Command, Response> service =
- ClientBuilder
- .get()
- .hosts("localhost:11211")
- .codec(new Memcached())
- .build();
+ ClientBuilder.safeBuild(
+ ClientBuilder
+ .get()
+ .hosts("localhost:11211")
+ .codec(new Memcached())
+ .hostConnectionLimit(1));
Client client = ClientBase.newInstance(service);
client.delete("foo").get();
View
1 finagle-memcached/src/test/scala/com/twitter/finagle/memcached/integration/ClientSpec.scala
@@ -23,6 +23,7 @@ object ClientSpec extends Specification {
val service = ClientBuilder()
.hosts(Seq(ExternalMemcached.address.get))
.codec(new Memcached)
+ .hostConnectionLimit(1)
.build()
client = Client(service)
}
View
3 ...hed/src/test/scala/com/twitter/finagle/memcached/integration/InterpreterServiceSpec.scala
@@ -22,6 +22,7 @@ object InterpreterServiceSpec extends Specification {
client = ClientBuilder()
.hosts("localhost:" + address.getPort)
.codec(new Memcached)
+ .hostConnectionLimit(1)
.build()
}
@@ -40,4 +41,4 @@ object InterpreterServiceSpec extends Specification {
result(1.second) mustEqual Values(Seq(Value(key, value)))
}
}
-}
+}
View
1 ...emcached/src/test/scala/com/twitter/finagle/memcached/stress/InterpreterServiceSpec.scala
@@ -21,6 +21,7 @@ object InterpreterServiceSpec extends Specification {
client = ClientBuilder()
.hosts("localhost:" + address.getPort)
.codec(new Memcached)
+ .hostConnectionLimit(1)
.build()
}
View
1 finagle-native/src/test/scala/com/twitter/finagle/SslSpec.scala
@@ -105,6 +105,7 @@ object SslSpec extends Specification {
.name("http-client")
.hosts(Seq(address))
.codec(codec.clientCodec)
+ .hostConnectionLimit(1)
.tlsWithoutValidation()
.build()
View
1 ...rversets/src/test/scala/com/twitter/finagle/zookeeper/ZookeeperServerSetClusterSpec.scala
@@ -82,6 +82,7 @@ object ZookeeperServerSetClusterSpec extends Specification {
val client = ClientBuilder()
.cluster(cluster)
.codec(new StringCodec)
+ .hostConnectionLimit(1)
.build()
client("hello\n")(1.seconds) mustEqual "olleh"
View
3 finagle-stream/src/test/scala/com/twitter/finagle/stream/EndToEndSpec.scala
@@ -28,6 +28,7 @@ object EndToEndSpec extends Specification {
val client = ClientBuilder()
.codec(new Stream)
.hosts(Seq(address))
+ .hostConnectionLimit(1)
.build()
val channel = client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))(1.second)
@@ -69,4 +70,4 @@ object EndToEndSpec extends Specification {
}
}
}
-}
+}
View
3 finagle-stream/src/test/scala/com/twitter/finagle/stream/PubSubSpec.scala
@@ -30,6 +30,7 @@ object PubSubSpec extends Specification {
val client = ClientBuilder()
.codec(new Stream)
.hosts(Seq(address))
+ .hostConnectionLimit(1)
.buildFactory()
val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")
@@ -70,4 +71,4 @@ object PubSubSpec extends Specification {
}
}
-}
+}
View
3 finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala
@@ -15,6 +15,7 @@ object Tracing1Service extends Tracing1.ServiceIface {
private[this] val transport = ClientBuilder()
.hosts("localhost:6002")
.codec(ThriftClientFramedCodec())
+ .hostConnectionLimit(1)
.build()
private[this] val t2Client =
@@ -41,6 +42,7 @@ object Tracing2Service extends Tracing2.ServiceIface {
private[this] val transport = ClientBuilder()
.hosts("localhost:6003")
.codec(ThriftClientFramedCodec())
+ .hostConnectionLimit(1)
.build()
private[this] val t3Client =
@@ -91,6 +93,7 @@ object Client {
val transport = ClientBuilder()
.hosts("localhost:6001")
.codec(ThriftClientFramedCodec())
+ .hostConnectionLimit(1)
.build()
val client = new Tracing1.ServiceToClient(
View
2 finagle-stress/src/main/scala/com/twitter/finagle/stress/EndToEndStress.scala
@@ -54,7 +54,7 @@ object EndToEndStress {
.reportTo(new OstrichStatsReceiver)
.hosts(Seq(addr))
.codec(Http)
- //.hostConnectionLimit(10)
+ .hostConnectionLimit(10)
.build()
0 until concurrency foreach { _ => dispatchLoop(service) }
View
5 finagle-stress/src/main/scala/com/twitter/finagle/stress/LoadBalancerTest.scala
@@ -30,7 +30,7 @@ object LoadBalancerTest {
// TODO: proper resource releasing, etc.
}
- def runSuite(clientBuilder: ClientBuilder[_, _]) {
+ def runSuite(clientBuilder: ClientBuilder[_, _, _, _, _]) {
println("testing " + clientBuilder)
println("\n== baseline (warmup) ==\n")
new LoadBalancerTest(clientBuilder)({ case _ => }).run()
@@ -65,7 +65,7 @@ object LoadBalancerTest {
}
class LoadBalancerTest(
- clientBuilder: ClientBuilder[_, _],
+ clientBuilder: ClientBuilder[_, _, _, _, _],
serverLatency: Duration = 0.seconds,
numRequests: Int = 100000,
concurrency: Int = 20)(behavior: PartialFunction[(Int, Seq[EmbeddedServer]), Unit])
@@ -149,6 +149,7 @@ class LoadBalancerTest(
val client = clientBuilder
.codec(Http)
.hosts(servers map(_.addr))
+ .hostConnectionLimit(Int.MaxValue)
.reportTo(new OstrichStatsReceiver)
.build()
View
1 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala
@@ -46,6 +46,7 @@ object EndToEndSpec extends Specification {
val service = ClientBuilder()
.hosts(Seq(serverAddr))
.codec(ThriftClientFramedCodec())
+ .hostConnectionLimit(1)
.build()
val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
View
5 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/FinagleClientThriftServerSpec.scala
@@ -75,6 +75,7 @@ object FinagleClientThriftServerSpec extends Specification {
val service = ClientBuilder()
.hosts(Seq(thriftServerAddr))
.codec(codec)
+ .hostConnectionLimit(1)
.build()
val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
@@ -90,6 +91,7 @@ object FinagleClientThriftServerSpec extends Specification {
val service = ClientBuilder()
.hosts(Seq(thriftServerAddr))
.codec(codec)
+ .hostConnectionLimit(1)
.build()
val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
@@ -104,6 +106,7 @@ object FinagleClientThriftServerSpec extends Specification {
val service = ClientBuilder()
.hosts(Seq(thriftServerAddr))
.codec(codec)
+ .hostConnectionLimit(1)
.build()
val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
@@ -120,6 +123,7 @@ object FinagleClientThriftServerSpec extends Specification {
val service = ClientBuilder()
.hosts(Seq(thriftServerAddr))
.codec(codec)
+ .hostConnectionLimit(1)
.build()
val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
@@ -141,6 +145,7 @@ object FinagleClientThriftServerSpec extends Specification {
val service = ClientBuilder()
.hosts(addrs)
.codec(codec)
+ .hostConnectionLimit(1)
.build()
val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
View
3 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ServiceEndToEndSpec.scala
@@ -57,7 +57,8 @@ object ServiceEndToEndSpec extends Specification {
val client = ClientBuilder()
.codec(new Thrift)
.hosts(Seq(addr))
- .build
+ .hostConnectionLimit(1)
+ .build()
val promise = new Promise[ThriftReply[_]]

0 comments on commit 837545f

Please sign in to comment.