Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge remote branch 'origin/master' into ssl_track_cipher

Conflicts:
	finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
  • Loading branch information...
commit 7bd0097b831b5a543d893e31d80a0833a72d11df 2 parents c2a38cb + 62d4961
@bierbaum bierbaum authored
Showing with 319 additions and 385 deletions.
  1. +28 −19 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
  2. +0 −2  finagle-core/src/main/scala/com/twitter/finagle/builder/Common.scala
  3. +27 −19 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
  4. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelClosingHandler.scala
  5. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelRequestStatsHandler.scala
  6. +2 −2 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
  7. +14 −8 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelStatsHandler.scala
  8. +2 −2 finagle-core/src/main/scala/com/twitter/finagle/channel/ConnectionLifecycleHandler.scala
  9. +6 −7 finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
  10. +7 −9 finagle-core/src/main/scala/com/twitter/finagle/channel/WriteCompletionTimeoutHandler.scala
  11. +5 −11 finagle-core/src/main/scala/com/twitter/finagle/http/AggregateHttpRequest.scala
  12. +1 −4 finagle-core/src/main/scala/com/twitter/finagle/http/ClientConnectionManager.scala
  13. +0 −2  finagle-core/src/main/scala/com/twitter/finagle/http/ServerConnectionManager.scala
  14. +10 −10 finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala
  15. +0 −4 finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala
  16. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/service/RefcountedService.scala
  17. +0 −1  finagle-core/src/main/scala/com/twitter/finagle/service/RetryingFilter.scala
  18. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/service/SingletonFactory.scala
  19. +7 −7 finagle-core/src/main/scala/com/twitter/finagle/stats/CumulativeGauge.scala
  20. +0 −33 finagle-core/src/main/scala/com/twitter/finagle/test/HttpClient.scala
  21. +0 −35 finagle-core/src/main/scala/com/twitter/finagle/test/HttpServer.scala
  22. +0 −15 finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala
  23. +7 −7 finagle-core/src/main/scala/com/twitter/finagle/util/ChannelFuture.scala
  24. +0 −1  finagle-core/src/main/scala/com/twitter/finagle/util/ChannelSnooper.scala
  25. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/util/Conversions.scala
  26. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/util/InetSocketAddressUtil.scala
  27. +0 −4 finagle-core/src/main/scala/com/twitter/finagle/util/ReferenceCountedTimer.scala
  28. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/util/RichFuture.scala
  29. +1 −1  finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala
  30. +0 −25 finagle-core/src/main/scala/com/twitter/finagle/util/TracingHeader.scala
  31. +1 −1  finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/DecodingToCommand.scala
  32. +1 −1  finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/DecodingToResponse.scala
  33. +3 −3 finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Show.scala
  34. +0 −18 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Hash.scala
  35. +0 −2  finagle-memcached/src/main/scala/com/twitter/finagle/memcached/KeyHasher.scala
  36. +12 −13 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/ChannelBufferUtils.scala
  37. +0 −1  finagle-native/src/test/scala/com/twitter/finagle/SslSpec.scala
  38. +3 −3 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/InputBuffer.scala
  39. +2 −2 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/OutputBuffer.scala
  40. +31 −0 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftBufferCodec.scala
  41. +1 −1  finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftChannelBufferDecoder.scala
  42. +29 −0 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientBufferedCodec.scala
  43. +3 −3 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala
  44. +5 −11 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala
  45. +1 −1  finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftTracing.scala
  46. +2 −2 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ChannelBufferToTransport.scala
  47. +2 −2 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ThriftClientCodec.scala
  48. +3 −2 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ThriftProcessorHandler.scala
  49. +2 −2 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ThriftServerCodec.scala
  50. +94 −82 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/FinagleClientThriftServerSpec.scala
View
47 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -34,30 +34,32 @@ object ClientBuilder {
}
/**
- * A word about the default values:
+ * A handy way to construct an RPC Client.
+ *
+ * ''Note'': A word about the default values:
*
* o connectionTimeout: optimized for within a datanceter
* o by default, no request timeout
*/
case class ClientBuilder[Req, Rep](
- _cluster: Option[Cluster],
- _protocol: Option[Protocol[Req, Rep]],
- _connectionTimeout: Duration,
- _requestTimeout: Duration,
- _statsReceiver: Option[StatsReceiver],
- _loadStatistics: (Int, Duration),
- _name: Option[String],
- _hostConnectionCoresize: Option[Int],
- _hostConnectionLimit: Option[Int],
- _hostConnectionIdleTime: Option[Duration],
- _hostConnectionMaxIdleTime: Option[Duration],
- _sendBufferSize: Option[Int],
- _recvBufferSize: Option[Int],
- _retries: Option[Int],
- _logger: Option[Logger],
- _channelFactory: Option[ReferenceCountedChannelFactory],
- _tls: Option[SSLContext],
- _startTls: Boolean)
+ private val _cluster: Option[Cluster],
+ private val _protocol: Option[Protocol[Req, Rep]],
+ private val _connectionTimeout: Duration,
+ private val _requestTimeout: Duration,
+ private val _statsReceiver: Option[StatsReceiver],
+ private val _loadStatistics: (Int, Duration),
+ private val _name: Option[String],
+ private val _hostConnectionCoresize: Option[Int],
+ private val _hostConnectionLimit: Option[Int],
+ private val _hostConnectionIdleTime: Option[Duration],
+ private val _hostConnectionMaxIdleTime: Option[Duration],
+ private val _sendBufferSize: Option[Int],
+ private val _recvBufferSize: Option[Int],
+ private val _retries: Option[Int],
+ private val _logger: Option[Logger],
+ private val _channelFactory: Option[ReferenceCountedChannelFactory],
+ private val _tls: Option[SSLContext],
+ private val _startTls: Boolean)
{
def this() = this(
None, // cluster
@@ -251,6 +253,10 @@ case class ClientBuilder[Req, Rep](
future
}
+ /**
+ * Construct a ServiceFactory. This is useful for stateful protocols (e.g.,
+ * those that support transactions or authentication).
+ */
def buildFactory(): ServiceFactory[Req, Rep] = {
if (!_cluster.isDefined)
throw new IncompleteSpecification("No hosts were specified")
@@ -308,6 +314,9 @@ case class ClientBuilder[Req, Rep](
}
}
+ /**
+ * Construct a Service.
+ */
def build(): Service[Req, Rep] = {
var service: Service[Req, Rep] = new FactoryToService[Req, Rep](buildFactory())
View
2  finagle-core/src/main/scala/com/twitter/finagle/builder/Common.scala
@@ -1,5 +1,3 @@
package com.twitter.finagle.builder
-import org.jboss.netty.channel.ChannelPipelineFactory
-
class IncompleteSpecification(message: String) extends Exception(message)
View
46 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
@@ -53,26 +53,26 @@ object ServerBuilder {
Executors.newCachedThreadPool())))
}
-// TODO: common superclass between client & server builders for common
-// concerns.
-
+/**
+ * A handy Builder for constructing Servers (i.e., binding Services to a port).
+ */
case class ServerBuilder[Req, Rep](
- _codec: Option[Codec[Req, Rep]],
- _statsReceiver: Option[StatsReceiver],
- _name: Option[String],
- _sendBufferSize: Option[Int],
- _recvBufferSize: Option[Int],
- _bindTo: Option[SocketAddress],
- _logger: Option[Logger],
- _tls: Option[SSLContext],
- _startTls: Boolean,
- _channelFactory: Option[ReferenceCountedChannelFactory],
- _maxConcurrentRequests: Option[Int],
- _hostConnectionMaxIdleTime: Option[Duration],
- _requestTimeout: Option[Duration],
- _readTimeout: Option[Duration],
- _writeCompletionTimeout: Option[Duration],
- _traceReceiver: TraceReceiver)
+ private val _codec: Option[Codec[Req, Rep]],
+ private val _statsReceiver: Option[StatsReceiver],
+ private val _name: Option[String],
+ private val _sendBufferSize: Option[Int],
+ private val _recvBufferSize: Option[Int],
+ private val _bindTo: Option[SocketAddress],
+ private val _logger: Option[Logger],
+ private val _tls: Option[SslContext,
+ private val _startTls: Boolean,
+ private val _channelFactory: Option[ReferenceCountedChannelFactory],
+ private val _maxConcurrentRequests: Option[Int],
+ private val _hostConnectionMaxIdleTime: Option[Duration],
+ private val _requestTimeout: Option[Duration],
+ private val _readTimeout: Option[Duration],
+ private val _writeCompletionTimeout: Option[Duration],
+ private val _traceReceiver: TraceReceiver)
{
import ServerBuilder._
@@ -168,8 +168,16 @@ case class ServerBuilder[Req, Rep](
private[this] def scopedStatsReceiver =
_statsReceiver map { sr => _name map (sr.scope(_)) getOrElse sr }
+ /**
+ * Construct the Server, given the provided Service.
+ */
def build(service: Service[Req, Rep]): Server = build(() => service)
+ /**
+ * Construct the Server, given the provided ServiceFactory. This
+ * is useful if the protocol is stateful (e.g., requires authentication
+ * or supports transactions).
+ */
def build(serviceFactory: () => Service[Req, Rep]): Server = {
val codec = _codec.getOrElse {
throw new IncompleteSpecification("No codec was specified")
View
2  finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelClosingHandler.scala
@@ -12,7 +12,7 @@ import org.jboss.netty.channel.{
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util.LatentChannelFuture
-class ChannelClosingHandler
+private[finagle] class ChannelClosingHandler
extends SimpleChannelUpstreamHandler
with LifeCycleAwareChannelHandler
{
View
2  finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelRequestStatsHandler.scala
@@ -14,7 +14,7 @@ import org.jboss.netty.channel.{
import com.twitter.util.Future
import com.twitter.finagle.stats.StatsReceiver
-class ChannelRequestStatsHandler(statsReceiver: StatsReceiver)
+private[finagle] class ChannelRequestStatsHandler(statsReceiver: StatsReceiver)
extends SimpleChannelHandler
with ConnectionLifecycleHandler
{
View
4 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
@@ -19,7 +19,7 @@ import com.twitter.finagle.util.{Ok, Error, Cancelled, AsyncLatch}
* channel. It is responsible for requests dispatched to a given
* (connected) channel during its lifetime.
*/
-class ChannelService[Req, Rep](channel: Channel, factory: ChannelServiceFactory[Req, Rep])
+private[finagle] class ChannelService[Req, Rep](channel: Channel, factory: ChannelServiceFactory[Req, Rep])
extends Service[Req, Rep]
{
private[this] val currentReplyFuture = new AtomicReference[Promise[Rep]]
@@ -88,7 +88,7 @@ class ChannelService[Req, Rep](channel: Channel, factory: ChannelServiceFactory[
/**
* A factory for ChannelService instances, given a bootstrap.
*/
-class ChannelServiceFactory[Req, Rep](
+private[finagle] class ChannelServiceFactory[Req, Rep](
bootstrap: ClientBootstrap,
prepareChannel: Service[Req, Rep] => Future[Service[Req, Rep]],
statsReceiver: StatsReceiver = NullStatsReceiver)
View
22 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelStatsHandler.scala
@@ -6,10 +6,9 @@ package com.twitter.finagle.channel
*/
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import java.util.logging.Logger
-import org.jboss.netty.channel.{
- SimpleChannelHandler, ChannelHandlerContext,
- WriteCompletionEvent, MessageEvent}
+import org.jboss.netty.channel.{SimpleChannelHandler, ChannelHandlerContext, MessageEvent}
import org.jboss.netty.buffer.ChannelBuffer
import com.twitter.util.{Time, Future}
@@ -19,6 +18,8 @@ class ChannelStatsHandler(statsReceiver: StatsReceiver)
extends SimpleChannelHandler
with ConnectionLifecycleHandler
{
+ private[this] val log = Logger.getLogger(getClass.getName)
+
private[this] val connects = statsReceiver.counter("connects")
private[this] val connectionDuration = statsReceiver.stat("connection_duration")
private[this] val connectionReceivedBytes = statsReceiver.stat("connection_received_bytes")
@@ -48,12 +49,17 @@ class ChannelStatsHandler(statsReceiver: StatsReceiver)
}
}
- override def writeComplete(ctx: ChannelHandlerContext, e: WriteCompletionEvent) {
+ override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
val (_, channelWriteCount) = ctx.getAttachment().asInstanceOf[(AtomicLong, AtomicLong)]
- channelWriteCount.getAndAdd(e.getWrittenAmount())
- writeCount.getAndAdd(e.getWrittenAmount())
- super.writeComplete(ctx, e)
+ e.getMessage match {
+ case buffer: ChannelBuffer =>
+ channelWriteCount.getAndAdd(buffer.readableBytes)
+ case _ =>
+ log.warning("ChannelStatsHandler received non-channelbuffer write")
+ }
+
+ super.writeRequested(ctx, e)
}
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
@@ -63,7 +69,7 @@ class ChannelStatsHandler(statsReceiver: StatsReceiver)
channelReadCount.getAndAdd(buffer.readableBytes())
readCount.getAndAdd(buffer.readableBytes())
case _ =>
- ()
+ log.warning("ChannelStatsHandler received non-channelbuffer read")
}
super.messageReceived(ctx, e)
View
4 finagle-core/src/main/scala/com/twitter/finagle/channel/ConnectionLifecycleHandler.scala
@@ -2,13 +2,13 @@ package com.twitter.finagle.channel
import org.jboss.netty.channel.{
SimpleChannelHandler, LifeCycleAwareChannelHandler,
- Channel, ChannelHandlerContext, ChannelStateEvent}
+ ChannelHandlerContext, ChannelStateEvent}
import com.twitter.util.{Future, Promise, Return}
import com.twitter.finagle.util.Conversions._
-trait ConnectionLifecycleHandler
+private[finagle] trait ConnectionLifecycleHandler
extends SimpleChannelHandler
with LifeCycleAwareChannelHandler
{
View
13 finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
@@ -10,11 +10,10 @@ import org.jboss.netty.handler.timeout.ReadTimeoutException
import com.twitter.util.{Future, Promise, Return, Throw}
import com.twitter.finagle.util.Conversions._
-import com.twitter.finagle.util.AsyncLatch
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.{CodecException, Service, WriteTimedOutException}
-class ServiceToChannelHandler[Req, Rep](
+private[finagle] class ServiceToChannelHandler[Req, Rep](
service: Service[Req, Rep],
statsReceiver: StatsReceiver,
log: Logger)
@@ -25,7 +24,7 @@ class ServiceToChannelHandler[Req, Rep](
def this(service: Service[Req, Rep]) =
this(service, NullStatsReceiver)
- private[this] sealed trait State
+ private[this] sealed abstract class State
// valid transitions are:
//
@@ -46,15 +45,15 @@ class ServiceToChannelHandler[Req, Rep](
service.release()
}
- /**
+ /**
* onShutdown: this Future is satisfied when the channel has been
* closed.
- */
+ */
val onShutdown: Future[Unit] = onShutdownPromise
- /**
+ /**
* drain(): admit no new requests.
- */
+ */
def drain() = {
var continue = false
do {
View
16 finagle-core/src/main/scala/com/twitter/finagle/channel/WriteCompletionTimeoutHandler.scala
@@ -1,23 +1,21 @@
package com.twitter.finagle.channel
-/**
- * A simple handler that times out a write if it fails to complete
- * within the given time. This can be used to ensure that clients
- * complete reception within a certain time, preventing a resource DoS
- * on a server.
- */
-
import org.jboss.netty.channel.{
SimpleChannelDownstreamHandler, Channels,
ChannelHandlerContext, MessageEvent}
import com.twitter.util.{Time, Duration, Timer}
-import com.twitter.conversions.time._
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.WriteTimedOutException
-class WriteCompletionTimeoutHandler(timer: Timer, timeout: Duration)
+/**
+ * A simple handler that times out a write if it fails to complete
+ * within the given time. This can be used to ensure that clients
+ * complete reception within a certain time, preventing a resource DoS
+ * on a server.
+ */
+private[finagle] class WriteCompletionTimeoutHandler(timer: Timer, timeout: Duration)
extends SimpleChannelDownstreamHandler
{
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
View
16 finagle-core/src/main/scala/com/twitter/finagle/http/AggregateHttpRequest.scala
@@ -6,25 +6,19 @@ package com.twitter.finagle.http
import scala.collection.JavaConversions._
-import java.nio.ByteOrder
-
import org.jboss.netty.channel.{
- MessageEvent, Channels,
- SimpleChannelUpstreamHandler,
- ChannelHandlerContext}
+ MessageEvent, Channels, ChannelHandlerContext}
import org.jboss.netty.handler.codec.http.{
HttpHeaders, HttpRequest,
- HttpResponse, HttpChunk,
- DefaultHttpResponse,
+ HttpChunk, DefaultHttpResponse,
HttpVersion, HttpResponseStatus}
import org.jboss.netty.buffer.{
- ChannelBuffer, ChannelBuffers,
- CompositeChannelBuffer}
+ ChannelBuffer, ChannelBuffers}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.channel.LeftFoldUpstreamHandler
-object OneHundredContinueResponse
+private[finagle] object OneHundredContinueResponse
extends DefaultHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.CONTINUE)
@@ -35,7 +29,7 @@ class HttpFailure(ctx: ChannelHandlerContext, status: HttpResponseStatus)
{
val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status)
val future = Channels.future(ctx.getChannel)
- Channels.write(ctx, future, response, ctx.getChannel.getRemoteAddress)
+ Channels.write(ctx, future, response, ctx.getChannel.getRemoteAddress)
future onSuccessOrFailure { ctx.getChannel.close() }
}
View
5 finagle-core/src/main/scala/com/twitter/finagle/http/ClientConnectionManager.scala
@@ -1,11 +1,8 @@
package com.twitter.finagle.http
import org.jboss.netty.channel._
-import org.jboss.netty.handler.codec.http._
-import com.twitter.finagle.util.Conversions._
-
-class ClientConnectionManager extends SimpleChannelHandler {
+private[finagle] class ClientConnectionManager extends SimpleChannelHandler {
private[this] val manager = new ConnectionManager
// Note that for HTTP requests without a content length, the
View
2  finagle-core/src/main/scala/com/twitter/finagle/http/ServerConnectionManager.scala
@@ -1,8 +1,6 @@
package com.twitter.finagle.http
import org.jboss.netty.channel._
-import org.jboss.netty.handler.codec.http._
-
import com.twitter.finagle.util.Conversions._
/**
View
20 finagle-core/src/main/scala/com/twitter/finagle/service/ExpiringService.scala
@@ -1,22 +1,22 @@
package com.twitter.finagle.service
-/**
- * A service wrapper that expires the underlying service after a
- * certain amount of idle time. By default, expiring calls
- * ``.release()'' on the underlying channel, but this action is
- * customizable.
- */
-
import com.twitter.util
import com.twitter.util.{Duration, Future}
import com.twitter.finagle.util.Timer
import com.twitter.finagle.{Service, WriteException, ChannelClosedException}
+
+/**
+ * A service wrapper that expires the underlying service after a
+ * certain amount of idle time. By default, expiring calls
+ * ``.release()'' on the underlying channel, but this action is
+ * customizable.
+ */
class ExpiringService[Req, Rep](
- underlying: Service[Req, Rep],
- maxIdleTime: Duration,
- timer: util.Timer = Timer.default)
+ underlying: Service[Req, Rep],
+ maxIdleTime: Duration,
+ timer: util.Timer = Timer.default)
extends Service[Req, Rep]
{
private[this] var requestCount = 0
View
4 finagle-core/src/main/scala/com/twitter/finagle/service/FailureAccrualFactory.scala
@@ -1,10 +1,6 @@
package com.twitter.finagle.service
-import java.util.concurrent.atomic.AtomicInteger
-
import com.twitter.util.{Time, Duration, Throw, Return}
-import com.twitter.conversions.time._
-
import com.twitter.finagle.{Service, ServiceFactory}
/**
View
2  finagle-core/src/main/scala/com/twitter/finagle/service/RefcountedService.scala
@@ -3,7 +3,7 @@ package com.twitter.finagle.service
import com.twitter.finagle.Service
import com.twitter.finagle.util.AsyncLatch
-class RefcountedService[Req, Rep](underlying: Service[Req, Rep])
+private[finagle] class RefcountedService[Req, Rep](underlying: Service[Req, Rep])
extends Service[Req, Rep]
{
protected[this] val replyLatch = new AsyncLatch
View
1  finagle-core/src/main/scala/com/twitter/finagle/service/RetryingFilter.scala
@@ -1,6 +1,5 @@
package com.twitter.finagle.service
-import com.twitter.finagle.util.Conversions._
import com.twitter.util._
import com.twitter.finagle.WriteException
import com.twitter.finagle.{SimpleFilter, Service}
View
2  finagle-core/src/main/scala/com/twitter/finagle/service/SingletonFactory.scala
@@ -5,7 +5,7 @@ import com.twitter.util.Future
import com.twitter.finagle.{Service, ServiceFactory}
import com.twitter.finagle.util.AsyncLatch
-class SingletonFactory[Req, Rep](service: Service[Req, Rep])
+private[finagle] class SingletonFactory[Req, Rep](service: Service[Req, Rep])
extends ServiceFactory[Req, Rep]
{
private[this] var latch = new AsyncLatch
View
14 finagle-core/src/main/scala/com/twitter/finagle/stats/CumulativeGauge.scala
@@ -1,15 +1,15 @@
package com.twitter.finagle.stats
+import ref.WeakReference
+import collection.mutable.WeakHashMap
+
+
/**
* CumulativeGauge provides a gauge that is composed of the (addition)
* of several underlying gauges. It follows the weak reference
* semantics of Gauges as outlined in StatsReceiver.
*/
-
-import ref.WeakReference
-import collection.mutable.WeakHashMap
-
-trait CumulativeGauge {
+private[finagle] trait CumulativeGauge {
private[this] case class UnderlyingGauge(f: () => Float) extends Gauge {
def remove() { removeGauge(this) }
}
@@ -51,7 +51,7 @@ trait CumulativeGauge {
def deregister(): Unit
}
-trait StatsReceiverWithCumulativeGauges extends StatsReceiver {
+private[finagle] trait StatsReceiverWithCumulativeGauges extends StatsReceiver {
private[this] val gaugeMap = new WeakHashMap[Seq[String], CumulativeGauge]
/**
@@ -60,7 +60,7 @@ trait StatsReceiverWithCumulativeGauges extends StatsReceiver {
*/
protected[this] def registerGauge(name: Seq[String], f: => Float)
protected[this] def deregisterGauge(name: Seq[String])
-
+
def addGauge(name: String*)(f: => Float) = synchronized {
val cumulativeGauge = gaugeMap getOrElseUpdate(name, {
new CumulativeGauge {
View
33 finagle-core/src/main/scala/com/twitter/finagle/test/HttpClient.scala
@@ -1,33 +0,0 @@
-package com.twitter.finagle.test
-
-import java.util.logging.Logger
-import org.jboss.netty.handler.codec.http._
-
-import com.twitter.finagle.builder.{ClientBuilder, Http}
-import com.twitter.finagle.Service
-
-object HttpClient {
- def main(args: Array[String]) {
- val client =
- ClientBuilder()
- .name("http")
- .hosts("localhost:10000,localhost:10001,localhost:10003")
- .codec(Http)
- .retries(2)
- .logger(Logger.getLogger("http"))
- .build()
-
- for (_ <- 0 until 100)
- makeRequest(client)
- }
-
- def makeRequest(client: Service[HttpRequest, HttpResponse]) {
- client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) respond {
- case _ =>
- makeRequest(client)
- }
- }
-
- def quiesce() = ()
- def shutdown() = ()
-}
View
35 finagle-core/src/main/scala/com/twitter/finagle/test/HttpServer.scala
@@ -1,35 +0,0 @@
-package com.twitter.finagle.test
-
-import java.net.InetSocketAddress
-import java.util.Date
-
-import org.jboss.netty.buffer._
-import org.jboss.netty.handler.codec.http._
-
-import com.twitter.finagle.builder._
-import com.twitter.finagle._
-
-import com.twitter.util.Future
-
-object HttpServer {
- def main(args: Array[String]) {
- val server = new Service[HttpRequest, HttpResponse] {
- def apply(request: HttpRequest) = Future {
- val response = new DefaultHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
- response.setContent(ChannelBuffers.wrappedBuffer(("Hello from Finagle HTTP server at " + new Date().toString()).getBytes))
- response
- }
- }
-
- val logger = java.util.logging.Logger.getLogger(getClass.getName)
- ServerBuilder()
- .codec(Http)
- .bindTo(new InetSocketAddress(10000))
- .logger(logger)
- .build(server)
- }
-
- def quiesce() = ()
- def shutdown() = ()
-}
View
15 finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala
@@ -1,20 +1,5 @@
package com.twitter.finagle.tracing
-/**
- * Support for tracing in finagle. The main abstraction herein is the
- * "Trace", which is a local that contains various metadata required
- * for distributed tracing as well as references to the local traced
- * events. We mimic Dapper in many ways, including borrowing its
- * nomenclature.
- *
- * “Dapper, a Large-Scale Distributed Systems Tracing Infrastructure”,
- * Benjamin H. Sigelman, Luiz André Barroso, Mike Burrows, Pat
- * Stephenson, Manoj Plakal, Donald Beaver, Saul Jaspan, Chandan
- * Shanbhag, 2010.
- *
- * http://research.google.com/pubs/archive/36356.pdf
- */
-
import scala.util.Random
import com.twitter.util.{Local, Time, TimeFormat, RichU64Long}
View
14 finagle-core/src/main/scala/com/twitter/finagle/util/ChannelFuture.scala
@@ -7,22 +7,22 @@ import org.jboss.netty.channel._
/**
* A ChannelFuture that doesn't need to have a channel on creation.
*/
-class LatentChannelFuture extends DefaultChannelFuture(null, false) {
+private[finagle] class LatentChannelFuture extends DefaultChannelFuture(null, false) {
@volatile private var channel: Channel = _
def setChannel(c: Channel) { channel = c }
override def getChannel() = channel
}
-sealed abstract class State
-case object Cancelled extends State
-case class Ok(channel: Channel) extends State
-case class Error(cause: Throwable) extends State
+private[finagle] sealed abstract class State
+private[finagle] case object Cancelled extends State
+private[finagle] case class Ok(channel: Channel) extends State
+private[finagle] case class Error(cause: Throwable) extends State
-class CancelledException extends Exception
+private[finagle] class CancelledException extends Exception
// TODO: decide what to do about cancellation here.
-class RichChannelFuture(val self: ChannelFuture) {
+private[finagle] class RichChannelFuture(val self: ChannelFuture) {
def apply(f: State => Unit) {
self.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
View
1  finagle-core/src/main/scala/com/twitter/finagle/util/ChannelSnooper.scala
@@ -2,7 +2,6 @@ package com.twitter.finagle.util
import java.io.PrintStream
import java.nio.charset.Charset
-import java.util.logging.Logger
import org.jboss.netty.channel._
import org.jboss.netty.buffer.ChannelBuffer
View
2  finagle-core/src/main/scala/com/twitter/finagle/util/Conversions.scala
@@ -4,7 +4,7 @@ import org.jboss.netty.channel.ChannelFuture
import com.twitter.util.Future
-object Conversions {
+private[finagle] object Conversions {
implicit def channelFutureToRichChannelFuture(f: ChannelFuture) = new RichChannelFuture(f)
implicit def futureToRichFuture[A](f: Future[A]) = new RichFuture(f)
}
View
2  finagle-core/src/main/scala/com/twitter/finagle/util/InetSocketAddressUtil.scala
@@ -2,7 +2,7 @@ package com.twitter.finagle.util
import java.net.InetSocketAddress
-object InetSocketAddressUtil {
+private[finagle] object InetSocketAddressUtil {
/**
* Parses a comma or space-delimited string of hostname and port pairs. For example,
*
View
4 finagle-core/src/main/scala/com/twitter/finagle/util/ReferenceCountedTimer.scala
@@ -1,4 +0,0 @@
-package com.twitter.finagle.util
-
-import org.jboss.netty.util.{Timer => NettyTimer}
-
View
2  finagle-core/src/main/scala/com/twitter/finagle/util/RichFuture.scala
@@ -5,7 +5,7 @@ import com.twitter.util.{Duration, Future, Promise, Try}
import Conversions._
-class RichFuture[A](self: Future[A]) {
+private[finagle] class RichFuture[A](self: Future[A]) {
def timeout(timer: util.Timer, howlong: Duration)(orElse: => Try[A]) = {
val promise = new Promise[A]
val timeout = timer.schedule(howlong.fromNow) { promise.updateIfEmpty(orElse) }
View
2  finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala
@@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit
import com.twitter.util.{Time, Duration, TimerTask, ReferenceCountedTimer}
import org.jboss.netty.util.{HashedWheelTimer, Timeout}
-object Timer {
+private[finagle] object Timer {
// This timer should only be used inside the context of finagle,
// since it requires explicit reference count management. (Via the
// builder routines.)
View
25 finagle-core/src/main/scala/com/twitter/finagle/util/TracingHeader.scala
@@ -1,25 +0,0 @@
-package com.twitter.finagle.util
-
-import java.io.{
- ByteArrayOutputStream, ByteArrayInputStream,
- DataOutputStream, DataInputStream}
-
-object TracingHeader {
- def encode(txid: Long, body: Array[Byte]) = {
- val bos = new ByteArrayOutputStream(8)
- val dos = new DataOutputStream(bos)
-
- dos.writeLong(txid)
- dos.flush()
-
- bos.toByteArray ++ body
- }
-
- def decode(bytes: Array[Byte]) = {
- val bis = new ByteArrayInputStream(bytes)
- val dis = new DataInputStream(bis)
- val txid = dis.readLong()
-
- (bytes drop 8, txid)
- }
-}
View
2  finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/DecodingToCommand.scala
@@ -7,7 +7,7 @@ import scala.Function.tupled
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import com.twitter.finagle.memcached.protocol.text.server.AbstractDecodingToCommand
-class DecodingToCommand extends AbstractDecodingToCommand[Command] {
+private[kestrel] class DecodingToCommand extends AbstractDecodingToCommand[Command] {
private[this] val GET = copiedBuffer("get" .getBytes)
private[this] val SET = copiedBuffer("set" .getBytes)
private[this] val DELETE = copiedBuffer("delete" .getBytes)
View
2  finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/DecodingToResponse.scala
@@ -4,7 +4,7 @@ import org.jboss.netty.buffer.ChannelBuffer
import com.twitter.finagle.memcached.protocol.text.TokensWithData
import com.twitter.finagle.memcached.protocol.text.client.AbstractDecodingToResponse
-class DecodingToResponse extends AbstractDecodingToResponse[Response] {
+private[kestrel] class DecodingToResponse extends AbstractDecodingToResponse[Response] {
import AbstractDecodingToResponse._
def parseResponse(tokens: Seq[ChannelBuffer]) = {
View
6 finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Show.scala
@@ -1,7 +1,7 @@
package com.twitter.finagle.kestrel.protocol
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
-import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
+import org.jboss.netty.buffer.{ChannelBuffers}
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import org.jboss.netty.channel._
import com.twitter.finagle.memcached.protocol.text.{Decoding, Tokens, TokensWithData, ValueLines}
@@ -9,7 +9,7 @@ import com.twitter.finagle.kestrel.protocol._
import org.jboss.netty.util.CharsetUtil
-class ResponseToEncoding extends OneToOneEncoder {
+private[kestrel] class ResponseToEncoding extends OneToOneEncoder {
private[this] val ZERO = "0"
private[this] val VALUE = "VALUE"
@@ -32,7 +32,7 @@ class ResponseToEncoding extends OneToOneEncoder {
}
}
-class CommandToEncoding extends OneToOneEncoder {
+private[kestrel] class CommandToEncoding extends OneToOneEncoder {
private[this] val ZERO = "0"
private[this] val OPEN = "open"
View
18 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Hash.scala
@@ -1,18 +0,0 @@
-package com.twitter.finagle.memcached
-
-object Hash {
-
- val FNV1_32_PRIME = 16777619
- def fnv1_32(key: String) : Long = {
- var i = 0
- val len = key.length
- var rv: Long = 0x811c9dc5L
- val keyBytes = key.getBytes("UTF-8")
- while (i < len) {
- rv = (rv * FNV1_32_PRIME) ^ (keyBytes(i) & 0xff)
- i += 1
- }
-
- rv & 0xffffffffL
- }
-}
View
2  finagle-memcached/src/main/scala/com/twitter/finagle/memcached/KeyHasher.scala
@@ -4,7 +4,6 @@ import scala.collection.mutable
import _root_.java.nio.{ByteBuffer, ByteOrder}
import _root_.java.security.MessageDigest
-
/**
* Hashes a memcache key into a 32-bit or 64-bit number (depending on the algorithm).
* This is a purely optional trait, meant to allow NodeLocator implementations to share
@@ -14,7 +13,6 @@ trait KeyHasher {
def hashKey(key: Array[Byte]): Long
}
-
/**
* Commonly used key hashing algorithms.
*/
View
25 finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/ChannelBufferUtils.scala
@@ -4,8 +4,7 @@ import org.jboss.netty.util.CharsetUtil
import collection.mutable.ArrayBuffer
import org.jboss.netty.buffer.{ChannelBufferIndexFinder, ChannelBuffers, ChannelBuffer}
-
-object ChannelBufferUtils {
+private[finagle] object ChannelBufferUtils {
private val FIND_SPACE = new ChannelBufferIndexFinder() {
def find(buffer: ChannelBuffer, guessedIndex: Int): Boolean = {
val enoughBytesForDelimeter = guessedIndex + 1
@@ -61,17 +60,17 @@ object ChannelBufferUtils {
implicit def stringToByteArray(string: String) =
string.getBytes
- implicit def stringToChannelBufferIndexFinder(string: String): ChannelBufferIndexFinder =
- new ChannelBufferIndexFinder {
- def find(buffer: ChannelBuffer, guessedIndex: Int): Boolean = {
- val array = string.toArray
- var i: Int = 0
- while (i < string.size) {
- if (buffer.getByte(guessedIndex + i) != array(i).toByte)
- return false
- i += 1
- }
- return true
+ implicit def stringToChannelBufferIndexFinder(string: String): ChannelBufferIndexFinder =
+ new ChannelBufferIndexFinder {
+ def find(buffer: ChannelBuffer, guessedIndex: Int): Boolean = {
+ val array = string.toArray
+ var i: Int = 0
+ while (i < string.size) {
+ if (buffer.getByte(guessedIndex + i) != array(i).toByte)
+ return false
+ i += 1
+ }
+ return true
}
}
}
View
1  finagle-native/src/test/scala/com/twitter/finagle/SslSpec.scala
@@ -2,7 +2,6 @@ import org.specs.Specification
import java.io.File
import java.security.Provider
-import java.util.logging.Logger
import org.jboss.netty.buffer._
import org.jboss.netty.channel._
View
6 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/InputBuffer.scala
@@ -4,7 +4,7 @@ import org.apache.thrift.TBase
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TMemoryInputTransport
-object InputBuffer {
+private[thrift] object InputBuffer {
private[thrift] val protocolFactory = new TBinaryProtocol.Factory()
def peelMessage(bytes: Array[Byte], message: TBase[_, _]) = {
@@ -14,12 +14,12 @@ object InputBuffer {
}
}
-class InputBuffer(bytes: Array[Byte]) {
+private[thrift] class InputBuffer(bytes: Array[Byte]) {
import InputBuffer._
private[this] val memoryTransport = new TMemoryInputTransport(bytes)
private[this] val iprot = protocolFactory.getProtocol(memoryTransport)
-
+
def apply() = iprot
def remainder = bytes drop memoryTransport.getBufferPosition
View
4 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/OutputBuffer.scala
@@ -9,7 +9,7 @@ import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TMemoryBuffer
import org.apache.thrift.TBase
-object OutputBuffer {
+private[thrift] object OutputBuffer {
private[thrift] val protocolFactory = new TBinaryProtocol.Factory()
def messageToArray(message: TBase[_, _]) = {
@@ -19,7 +19,7 @@ object OutputBuffer {
}
}
-class OutputBuffer {
+private[thrift] class OutputBuffer {
import OutputBuffer._
private[this] val memoryBuffer = new TMemoryBuffer(512)
View
31 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftBufferCodec.scala
@@ -0,0 +1,31 @@
+package com.twitter.finagle.thrift
+
+import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
+import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.handler.codec.replay.{ReplayingDecoder, VoidEnum}
+
+import org.apache.thrift.protocol.{TProtocolFactory, TProtocolUtil, TType}
+
+class ThriftBufferCodec(protocolFactory: TProtocolFactory)
+ extends ReplayingDecoder[VoidEnum]
+{
+ override def decode(
+ ctx: ChannelHandlerContext, channel: Channel,
+ buffer: ChannelBuffer, state: VoidEnum
+ ) = {
+ val transport = new ChannelBufferToTransport(buffer)
+ val iprot = protocolFactory.getProtocol(transport)
+
+ val beginIndex = buffer.readerIndex
+ buffer.markReaderIndex()
+
+ iprot.readMessageBegin()
+ TProtocolUtil.skip(iprot, TType.STRUCT)
+ iprot.readMessageEnd()
+
+ val endIndex = buffer.readerIndex
+ buffer.resetReaderIndex()
+
+ buffer.readSlice(endIndex - beginIndex)
+ }
+}
View
2  finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftChannelBufferDecoder.scala
@@ -9,7 +9,7 @@ import org.jboss.netty.channel.{ChannelHandlerContext, Channel}
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
-class ThriftChannelBufferDecoder extends OneToOneDecoder {
+private[thrift] class ThriftChannelBufferDecoder extends OneToOneDecoder {
def decode(ctx: ChannelHandlerContext, ch: Channel, message: Object) =
message match {
case buffer: ChannelBuffer => buffer.array() // is this kosher?
View
29 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientBufferedCodec.scala
@@ -0,0 +1,29 @@
+package com.twitter.finagle.thrift
+
+import org.jboss.netty.channel.ChannelPipelineFactory
+import org.apache.thrift.protocol.TProtocolFactory
+
+import com.twitter.finagle.Codec
+
+class ThriftClientBufferedCodec(protocolFactory: TProtocolFactory)
+ extends Codec[ThriftClientRequest, Array[Byte]]
+{
+ private[this] val framedCodec = new ThriftClientFramedCodec
+
+ val clientPipelineFactory = {
+ val framedPipelineFactory = framedCodec.clientPipelineFactory
+
+ new ChannelPipelineFactory {
+ def getPipeline() = {
+ val pipeline = framedPipelineFactory.getPipeline
+ pipeline.replace(
+ "thriftFrameCodec", "thriftBufferCodec",
+ new ThriftBufferCodec(protocolFactory))
+ pipeline
+ }
+ }
+ }
+
+ val serverPipelineFactory = clientPipelineFactory
+}
+
View
6 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala
@@ -1,6 +1,6 @@
package com.twitter.finagle.thrift
-import collection.JavaConversions._
+import scala.collection.JavaConversions._
import org.jboss.netty.channel.{
ChannelHandlerContext,
SimpleChannelDownstreamHandler, MessageEvent, Channels,
@@ -77,7 +77,7 @@ class ThriftClientFramedCodec extends Codec[ThriftClientRequest, Array[Byte]]
* bytes on the wire. It satisfies the request immediately if it is a
* "oneway" request.
*/
-class ThriftClientChannelBufferEncoder
+private[thrift] class ThriftClientChannelBufferEncoder
extends SimpleChannelDownstreamHandler
{
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) =
@@ -108,7 +108,7 @@ class ThriftClientChannelBufferEncoder
* on the wire. It is applied after all framing.
*/
-class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[Byte]]
+private[thrift] class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[Byte]]
{
def apply(request: ThriftClientRequest,
service: Service[ThriftClientRequest, Array[Byte]]) =
View
16 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala
@@ -1,22 +1,16 @@
package com.twitter.finagle.thrift
import org.apache.thrift.protocol.{TBinaryProtocol, TMessage, TMessageType}
-import org.apache.thrift.transport.{TMemoryBuffer, TMemoryInputTransport}
-
import org.jboss.netty.channel.{
- SimpleChannelHandler, Channel, ChannelEvent, ChannelHandlerContext,
+ ChannelHandlerContext,
SimpleChannelDownstreamHandler, MessageEvent, Channels,
ChannelPipelineFactory}
-import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler
-
+import org.jboss.netty.buffer.ChannelBuffers
import com.twitter.util.Future
import com.twitter.finagle._
-import com.twitter.finagle.util.TracingHeader
-import com.twitter.finagle.tracing.{BufferingTranscript, Trace}
+import com.twitter.finagle.tracing.Trace
-class ThriftServerChannelBufferEncoder extends SimpleChannelDownstreamHandler {
+private[thrift] class ThriftServerChannelBufferEncoder extends SimpleChannelDownstreamHandler {
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) = {
e.getMessage match {
// An empty array indicates a oneway reply.
@@ -33,7 +27,7 @@ object ThriftServerFramedCodec {
def apply() = new ThriftServerFramedCodec
}
-class ThriftServerTracingFilter
+private[thrift] class ThriftServerTracingFilter
extends SimpleFilter[Array[Byte], Array[Byte]]
{
// Concurrency is not an issue here since we have an instance per
View
2  finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftTracing.scala
@@ -1,6 +1,6 @@
package com.twitter.finagle.thrift
-object ThriftTracing {
+private[thrift] object ThriftTracing {
/**
* v1: transaction id frame
* v2: full tracing header
View
4 ...le-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ChannelBufferToTransport.scala
@@ -9,7 +9,7 @@ import org.jboss.netty.buffer.ChannelBuffer
* @param underlying a netty channelBuffer
*
*/
-class ChannelBufferToTransport(underlying: ChannelBuffer) extends TTransport {
+private[thrift] class ChannelBufferToTransport(underlying: ChannelBuffer) extends TTransport {
override def isOpen = true
override def open() {}
override def close() {}
@@ -32,7 +32,7 @@ class ChannelBufferToTransport(underlying: ChannelBuffer) extends TTransport {
* @param output a netty channelBuffer to write to
*
*/
-class DuplexChannelBufferTransport(input: ChannelBuffer, output: ChannelBuffer) extends TTransport {
+private[thrift] class DuplexChannelBufferTransport(input: ChannelBuffer, output: ChannelBuffer) extends TTransport {
override def isOpen = true
override def open() {}
override def close() {}
View
4 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ThriftClientCodec.scala
@@ -10,7 +10,7 @@ import org.jboss.netty.handler.codec.replay.{ReplayingDecoder, VoidEnum}
/**
* Translate ThriftCalls to their wire representation
*/
-class ThriftClientEncoder extends SimpleChannelDownstreamHandler {
+private[thrift] class ThriftClientEncoder extends SimpleChannelDownstreamHandler {
protected val protocolFactory = new TBinaryProtocol.Factory(true, true)
protected var seqid = 0
@@ -34,7 +34,7 @@ class ThriftClientEncoder extends SimpleChannelDownstreamHandler {
/**
* Translate wire representation to ThriftReply
*/
-class ThriftClientDecoder extends ReplayingDecoder[VoidEnum]
+private[thrift] class ThriftClientDecoder extends ReplayingDecoder[VoidEnum]
{
protected val protocolFactory = new TBinaryProtocol.Factory(true, true)
View
5 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ThriftProcessorHandler.scala
@@ -6,8 +6,9 @@ import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.apache.thrift.TProcessorFactory
import org.apache.thrift.protocol.TBinaryProtocol
-class ThriftProcessorHandler(processorFactory: TProcessorFactory)
-extends SimpleChannelUpstreamHandler {
+private[thrift] class ThriftProcessorHandler(processorFactory: TProcessorFactory)
+ extends SimpleChannelUpstreamHandler
+{
val protocolFactory = new TBinaryProtocol.Factory()
private def process(input: ChannelBuffer, output: ChannelBuffer) {
View
4 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/legacy/ThriftServerCodec.scala
@@ -12,7 +12,7 @@ import java.util.logging.{Logger, Level}
/*
* Translate ThriftReplys to wire representation
*/
-class ThriftServerEncoder extends SimpleChannelDownstreamHandler {
+private[thrift] class ThriftServerEncoder extends SimpleChannelDownstreamHandler {
protected val protocolFactory = new TBinaryProtocol.Factory(true, true)
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) =
@@ -31,7 +31,7 @@ class ThriftServerEncoder extends SimpleChannelDownstreamHandler {
/**
* Translate wire representation to ThriftCalls
*/
-class ThriftServerDecoder extends ReplayingDecoder[VoidEnum] {
+private[thrift] class ThriftServerDecoder extends ReplayingDecoder[VoidEnum] {
private[this] val logger = Logger.getLogger(getClass.getName)
private[this] val protocolFactory = new TBinaryProtocol.Factory(true, true)
View
176 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/FinagleClientThriftServerSpec.scala
@@ -6,7 +6,7 @@ import java.util.concurrent.CyclicBarrier
import org.specs.Specification
-import org.apache.thrift.transport.{TServerSocket, TFramedTransport}
+import org.apache.thrift.transport.{TServerSocket, TFramedTransport, TTransportFactory}
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.server.TSimpleServer
import org.apache.thrift.async.AsyncMethodCallback
@@ -14,12 +14,13 @@ import org.apache.thrift.async.AsyncMethodCallback
import com.twitter.test.{B, AnException, SomeStruct}
import com.twitter.util.{RandomSocket, Promise, Return, Throw, Future}
+import com.twitter.finagle.Codec
import com.twitter.finagle.builder.ClientBuilder
object FinagleClientThriftServerSpec extends Specification {
"finagle client vs. synchronous thrift server" should {
var somewayPromise = new Promise[Unit]
- def makeServer(f: (Int, Int) => Int) = {
+ def makeServer(transportFactory: TTransportFactory)(f: (Int, Int) => Int) = {
val processor = new B.Iface {
def multiply(a: Int, b: Int): Int = f(a, b)
def add(a: Int, b: Int): Int = { throw new AnException }
@@ -40,7 +41,7 @@ object FinagleClientThriftServerSpec extends Specification {
val server = new TSimpleServer(
new B.Processor(processor),
serverSocketTransport,
- new TFramedTransport.Factory(),
+ transportFactory,
new TBinaryProtocol.Factory()
)
@@ -60,90 +61,101 @@ object FinagleClientThriftServerSpec extends Specification {
thriftServerAddr
}
- "talk to each other" in {
- // TODO: interleave requests (to test seqids, etc.)
- val thriftServerAddr = makeServer { (a, b) => a + b }
-
- // ** Set up the client & query the server.
- val service = ClientBuilder()
- .hosts(Seq(thriftServerAddr))
- .codec(ThriftClientFramedCodec())
- .build()
-
- val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
-
- val future = client.multiply(1, 2)
- future() must be_==(3)
- }
-
- "handle exceptions" in {
- val thriftServerAddr = makeServer { (a, b) => a + b }
-
- // ** Set up the client & query the server.
- val service = ClientBuilder()
- .hosts(Seq(thriftServerAddr))
- .codec(ThriftClientFramedCodec())
- .build()
-
- val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
-
- client.add(1, 2)() must throwA[AnException]
- }
-
- "handle void returns" in {
- val thriftServerAddr = makeServer { (a, b) => a + b }
-
- // ** Set up the client & query the server.
- val service = ClientBuilder()
- .hosts(Seq(thriftServerAddr))
- .codec(ThriftClientFramedCodec())
- .build()
-
- val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
-
- client.add_one(1, 2)()
- true must beTrue
+ def doit(transportFactory: TTransportFactory, codec: Codec[ThriftClientRequest, Array[Byte]]) {
+ "talk to each other" in {
+ // TODO: interleave requests (to test seqids, etc.)
+
+ val thriftServerAddr = makeServer(transportFactory) { (a, b) => a + b }
+
+ // ** Set up the client & query the server.
+ val service = ClientBuilder()
+ .hosts(Seq(thriftServerAddr))
+ .codec(codec)
+ .build()
+
+ val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
+
+ val future = client.multiply(1, 2)
+ future() must be_==(3)
+ }
+
+ "handle exceptions" in {
+ val thriftServerAddr = makeServer(transportFactory) { (a, b) => a + b }
+
+ // ** Set up the client & query the server.
+ val service = ClientBuilder()
+ .hosts(Seq(thriftServerAddr))
+ .codec(codec)
+ .build()
+
+ val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
+
+ client.add(1, 2)() must throwA[AnException]
+ }
+
+ "handle void returns" in {
+ val thriftServerAddr = makeServer(transportFactory) { (a, b) => a + b }
+
+ // ** Set up the client & query the server.
+ val service = ClientBuilder()
+ .hosts(Seq(thriftServerAddr))
+ .codec(codec)
+ .build()
+
+ val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
+
+ client.add_one(1, 2)()
+ true must beTrue
+ }
+
+ // race condition..
+ "handle one-way calls" in {
+ val thriftServerAddr = makeServer(transportFactory) { (a, b) => a + b }
+
+ // ** Set up the client & query the server.
+ val service = ClientBuilder()
+ .hosts(Seq(thriftServerAddr))
+ .codec(codec)
+ .build()
+
+ val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
+
+ somewayPromise.isDefined must beFalse
+ client.someway()() must beNull // returns
+ somewayPromise() must be_==(())
+ }
+
+ "talk to multiple servers" in {
+ val NumParties = 10
+ val barrier = new CyclicBarrier(NumParties)
+
+ val addrs = 0 until NumParties map { _ =>
+ makeServer(transportFactory) { (a, b) => barrier.await(); a + b }
+ }
+
+ // ** Set up the client & query the server.
+ val service = ClientBuilder()
+ .hosts(addrs)
+ .codec(codec)
+ .build()
+
+ val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
+
+ {
+ val futures = 0 until NumParties map { _ => client.multiply(1, 2) }
+ val resolved = futures map(_())
+ resolved foreach { r => r must be_==(3) }
+ }
+ }
}
- // race condition..
- "handle one-way calls" in {
- val thriftServerAddr = makeServer { (a, b) => a + b }
-
- // ** Set up the client & query the server.
- val service = ClientBuilder()
- .hosts(Seq(thriftServerAddr))
- .codec(ThriftClientFramedCodec())
- .build()
-
- val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
-
- somewayPromise.isDefined must beFalse
- client.someway()() must beNull // returns
- somewayPromise() must be_==(())
+ "framed transport" in {
+ doit(new TFramedTransport.Factory(), ThriftClientFramedCodec())
}
- "talk to multiple servers" in {
- val NumParties = 10
- val barrier = new CyclicBarrier(NumParties)
-
- val addrs = 0 until NumParties map { _ =>
- makeServer { (a, b) => barrier.await(); a + b }
- }
-
- // ** Set up the client & query the server.
- val service = ClientBuilder()
- .hosts(addrs)
- .codec(ThriftClientFramedCodec())
- .build()
-
- val client = new B.ServiceToClient(service, new TBinaryProtocol.Factory())
-
- {
- val futures = 0 until NumParties map { _ => client.multiply(1, 2) }
- val resolved = futures map(_())
- resolved foreach { r => r must be_==(3) }
- }
+ "buffered transport" in {
+ doit(new TTransportFactory, new ThriftClientBufferedCodec(new TBinaryProtocol.Factory))
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.