diff --git a/finagle-core/src/main/java/com/twitter/finagle/builder/Stats4J.java b/finagle-core/src/main/java/com/twitter/finagle/builder/Stats4J.java index 7d156e17a0..d96c3f15e4 100644 --- a/finagle-core/src/main/java/com/twitter/finagle/builder/Stats4J.java +++ b/finagle-core/src/main/java/com/twitter/finagle/builder/Stats4J.java @@ -1,9 +1,9 @@ package com.twitter.finagle.builder; -import com.twitter.ostrich.Stats$; +import com.twitter.finagle.stats.JavaLoggerStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; public class Stats4J { - public static StatsReceiver Ostrich = new Ostrich(Stats$.MODULE$); public static StatsReceiver Logger = - new JavaLogger(java.util.logging.Logger.getLogger("Finagle")); + new JavaLoggerStatsReceiver(java.util.logging.Logger.getLogger("Finagle")); } \ No newline at end of file diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala index 06eb4c9ffa..7dd4a1f116 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala @@ -10,13 +10,13 @@ import java.util.concurrent.Executors import org.jboss.netty.channel._ import org.jboss.netty.channel.socket.nio._ -import com.twitter.ostrich import com.twitter.util.Duration import com.twitter.util.TimeConversions._ import com.twitter.finagle.channel._ import com.twitter.finagle.util._ import com.twitter.finagle.service +import com.twitter.finagle.stats.StatsReceiver object ClientBuilder { def apply() = new ClientBuilder @@ -53,7 +53,6 @@ case class ClientBuilder( _hostConnectionLimit: Option[Int], _sendBufferSize: Option[Int], _recvBufferSize: Option[Int], - _exportLoadsToOstrich: Boolean, _failureAccrualWindow: Duration, _retries: Option[Int], _initialBackoff: Option[Duration], @@ -75,7 +74,6 @@ case class ClientBuilder( None, // hostConnectionLimit None, // sendBufferSize None, // recvBufferSize - false, // exportLoadsToOstrich 10.seconds, // failureAccrualWindow None, // retries None, // initialBackoff @@ -129,8 +127,6 @@ case class ClientBuilder( def sendBufferSize(value: Int): ClientBuilder = copy(_sendBufferSize = Some(value)) def recvBufferSize(value: Int): ClientBuilder = copy(_recvBufferSize = Some(value)) - def exportLoadsToOstrich(): ClientBuilder = copy(_exportLoadsToOstrich = true) - def failureAccrualWindow(window: Duration): ClientBuilder = copy(_failureAccrualWindow = window) @@ -252,11 +248,11 @@ case class ClientBuilder( val broker = makeBroker(codec, statsRepo)(host) - if (_exportLoadsToOstrich) { + _statsReceiver.foreach { statsReceiver => val hostString = host.toString - ostrich.Stats.makeGauge(hostString + "_load") { broker.load } - ostrich.Stats.makeGauge(hostString + "_weight") { broker.weight } - ostrich.Stats.makeGauge(hostString + "_available") { if (broker.isAvailable) 1 else 0 } + statsReceiver.makeGauge(hostString + "_load", broker.load) + statsReceiver.makeGauge(hostString + "_weight", broker.weight) + statsReceiver.makeGauge(hostString + "_available", if (broker.isAvailable) 1 else 0) } broker diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/Common.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/Common.scala index db432cdbde..066c2169c7 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/Common.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/builder/Common.scala @@ -13,11 +13,6 @@ trait Codec { val serverPipelineFactory: ChannelPipelineFactory } -object StatsReporter4J { - val ostrich = Ostrich() - val logger = JavaLogger() -} -trait StatsReceiver { - def observer(prefix: String, label: String): (Seq[String], Int, Int) => Unit -} + + diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/JavaLogger.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/JavaLogger.scala deleted file mode 100644 index fc2e16a2ea..0000000000 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/JavaLogger.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.twitter.finagle.builder - -import java.net.InetSocketAddress -import java.util.logging.Logger - -case class JavaLogger(underlying: Logger) extends StatsReceiver { - def observer(prefix: String, label: String) = { - val suffix = "_%s".format(label) - - (path: Seq[String], value: Int, count: Int) => { - val pathString = path mkString "__" - underlying.info(List(prefix, pathString, suffix, count) mkString " ") - } - } -} - -object JavaLogger { - def apply(): JavaLogger = JavaLogger(Logger.getLogger(getClass.getName)) -} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/Ostrich.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/Ostrich.scala deleted file mode 100644 index 61d610b2ce..0000000000 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/Ostrich.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.twitter.finagle.builder - -import java.net.InetSocketAddress - -import com.twitter.ostrich - -case class Ostrich(provider: ostrich.StatsProvider) extends StatsReceiver { - def observer(prefix: String, label: String) = { - val suffix = "_%s".format(label) - - (path: Seq[String], value: Int, count: Int) => { - // Enforce count == 1? - val pathString = path mkString "__" - provider.addTiming(prefix + pathString, value) - provider.addTiming(prefix + pathString + suffix, value) - } - } -} - -object Ostrich { - def apply(): Ostrich = Ostrich(ostrich.Stats) -} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala index 73b48819eb..9c32fa9eed 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala @@ -5,16 +5,13 @@ import scala.collection.JavaConversions._ import java.net.SocketAddress import java.util.concurrent.{Executors, LinkedBlockingQueue} import java.util.logging.Logger -import javax.net.ssl.{KeyManager, SSLContext} +import javax.net.ssl.SSLContext import org.jboss.netty.bootstrap.ServerBootstrap -import org.jboss.netty.buffer._ import org.jboss.netty.channel._ -import org.jboss.netty.handler.codec.http._ import org.jboss.netty.handler.ssl._ import org.jboss.netty.channel.socket.nio._ -import com.twitter.ostrich import com.twitter.util.TimeConversions._ import com.twitter.util.{Duration, Time} @@ -22,7 +19,7 @@ import com.twitter.finagle._ import channel.{Job, QueueingChannelHandler} import com.twitter.finagle.util._ import com.twitter.finagle.service.{Service, ServicePipelineFactory} -import org.jboss.netty.util.HashedWheelTimer +import stats.StatsReceiver object ServerBuilder { def apply() = new ServerBuilder() diff --git a/finagle-core/src/main/scala/com/twitter/finagle/channel/LoadedBroker.scala b/finagle-core/src/main/scala/com/twitter/finagle/channel/LoadedBroker.scala index 41b6034ec4..78bb38fbf5 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/channel/LoadedBroker.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/channel/LoadedBroker.scala @@ -1,15 +1,9 @@ package com.twitter.finagle.channel import scala.util.Random -import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.ConcurrentHashMap - -import org.jboss.netty.channel.MessageEvent - -import com.twitter.util.{Time, Duration, Return, Throw, Future} +import com.twitter.util.{Time, Return, Throw, Future} import com.twitter.util.TimeConversions._ import com.twitter.finagle.util._ import com.twitter.finagle.util.Conversions._ diff --git a/finagle-core/src/main/scala/com/twitter/finagle/channel/RetryingBroker.scala b/finagle-core/src/main/scala/com/twitter/finagle/channel/RetryingBroker.scala index 7d0e3f3fa0..ebbc962e21 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/channel/RetryingBroker.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/channel/RetryingBroker.scala @@ -1,13 +1,8 @@ package com.twitter.finagle.channel import java.net.SocketAddress - -import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout} - -import com.twitter.finagle.util.{Cancelled, Error, Ok, TimerFuture} +import org.jboss.netty.util.HashedWheelTimer import com.twitter.finagle.util.Conversions._ - -import com.twitter.util.TimeConversions._ import com.twitter.util.{Duration, Future, Promise, Throw, Return} object RetryingBroker { @@ -62,7 +57,7 @@ class NumTriesRetryStrategy(numTries: Int) extends RetryStrategy { // number of tries need to be bumped by one. if (numTries > 1) Future.value(new NumTriesRetryStrategy(numTries - 1)) - else + else Future.exception(new Exception) } } @@ -79,7 +74,7 @@ class ExponentialBackoffRetryStrategy(delay: Duration, multiplier: Int) def apply() = { val future = new Promise[RetryStrategy] - + timer(delay) { future() = Return( new ExponentialBackoffRetryStrategy(delay * multiplier, multiplier)) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/channel/TimeoutBroker.scala b/finagle-core/src/main/scala/com/twitter/finagle/channel/TimeoutBroker.scala index a2f5beb036..88a96c6357 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/channel/TimeoutBroker.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/channel/TimeoutBroker.scala @@ -1,11 +1,8 @@ package com.twitter.finagle.channel -import org.jboss.netty.util.{TimerTask, Timeout, Timer} -import org.jboss.netty.channel.MessageEvent +import org.jboss.netty.util.Timer +import com.twitter.util.{Duration, Throw} -import com.twitter.util.{Future, Duration, Promise, Return, Throw} - -import com.twitter.finagle.util.TimerFuture import com.twitter.finagle.util.Conversions._ class TimeoutBroker(timer: Timer, val underlying: Broker, timeout: Duration) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala b/finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala new file mode 100644 index 0000000000..1bf604cd3c --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala @@ -0,0 +1,29 @@ +package com.twitter.finagle.stats + +import java.util.logging.Logger +import org.jboss.netty.util.HashedWheelTimer +import com.twitter.conversions.time._ +import com.twitter.finagle.util.Conversions._ + +case class JavaLoggerStatsReceiver(logger: Logger) extends StatsReceiver { + val timer = new HashedWheelTimer() + + def observer(prefix: String, label: String) = { + val suffix = "_%s".format(label) + + (path: Seq[String], value: Int, count: Int) => { + val pathString = path mkString "__" + logger.info(List(prefix, pathString, suffix, count) mkString " ") + } + } + + def makeGauge(name: String, f: => Float) { + timer(10.seconds) { + logger.info("%s %2f".format(name, f)) + } + } +} + +object JavaLoggerStatsReceiver { + def apply(): JavaLoggerStatsReceiver = JavaLoggerStatsReceiver(Logger.getLogger(getClass.getName)) +} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReceiver.scala b/finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReceiver.scala new file mode 100644 index 0000000000..56c855443b --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReceiver.scala @@ -0,0 +1,6 @@ +package com.twitter.finagle.stats + +trait StatsReceiver { + def observer(prefix: String, label: String): (Seq[String], Int, Int) => Unit + def makeGauge(name: String, f: => Float) +} \ No newline at end of file diff --git a/finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReporter4J.scala b/finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReporter4J.scala new file mode 100644 index 0000000000..731d2bfbc1 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReporter4J.scala @@ -0,0 +1,6 @@ +package com.twitter.finagle.stats + + + + + diff --git a/finagle-core/src/main/scala/com/twitter/finagle/test/HttpClient.scala b/finagle-core/src/main/scala/com/twitter/finagle/test/HttpClient.scala index 4bcafe775e..eb7a6e01b9 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/test/HttpClient.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/test/HttpClient.scala @@ -3,30 +3,16 @@ package com.twitter.finagle.test import java.util.logging.Logger import org.jboss.netty.handler.codec.http._ -import com.twitter.ostrich -import com.twitter.finagle.builder.{ClientBuilder, Http, Ostrich} +import com.twitter.finagle.builder.{ClientBuilder, Http} import com.twitter.finagle.service.Service -import com.twitter.ostrich.RuntimeEnvironment -object HttpClient extends ostrich.Service { +object HttpClient { def main(args: Array[String]) { - val runtime = new RuntimeEnvironment(getClass) - ostrich.ServiceTracker.register(this) - val config = new ostrich.Config { - def telnetPort = 0 - def httpBacklog = 0 - def httpPort = 8890 - def jmxPackage = None - } - ostrich.ServiceTracker.startAdmin(config, runtime) - val client = ClientBuilder() .name("http") .hosts("localhost:10000,localhost:10001,localhost:10003") .codec(Http) - .exportLoadsToOstrich() - .reportTo(Ostrich()) .retries(2) .logger(Logger.getLogger("http")) .buildService[HttpRequest, HttpResponse]() diff --git a/finagle-core/src/main/scala/com/twitter/finagle/test/HttpServer.scala b/finagle-core/src/main/scala/com/twitter/finagle/test/HttpServer.scala index d138fc764a..1adb3fd447 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/test/HttpServer.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/test/HttpServer.scala @@ -3,30 +3,15 @@ package com.twitter.finagle.test import java.net.InetSocketAddress import org.jboss.netty.buffer._ -import org.jboss.netty.channel._ import org.jboss.netty.handler.codec.http._ -import com.twitter.ostrich import com.twitter.finagle.builder._ import com.twitter.finagle.service._ import com.twitter.util.Future -import com.twitter.ostrich.RuntimeEnvironment -object HttpServer extends ostrich.Service { +object HttpServer { def main(args: Array[String]) { - val runtime = new RuntimeEnvironment(getClass) - - val config = new ostrich.Config { - def telnetPort = 0 - def httpBacklog = 0 - def httpPort = 8889 - def jmxPackage = None - } - - ostrich.ServiceTracker.register(this) - ostrich.ServiceTracker.startAdmin(config, runtime) - val server = new Service[HttpRequest, HttpResponse] { def apply(request: HttpRequest) = Future { val response = new DefaultHttpResponse( @@ -38,7 +23,6 @@ object HttpServer extends ostrich.Service { ServerBuilder() .codec(Http) - .reportTo(Ostrich()) .service(server) .bindTo(new InetSocketAddress(10000)) .build diff --git a/finagle-core/src/main/scala/com/twitter/finagle/util/Sample.scala b/finagle-core/src/main/scala/com/twitter/finagle/util/Sample.scala index 7f0a40bed7..11d04c75c0 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/util/Sample.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/util/Sample.scala @@ -1,13 +1,11 @@ package com.twitter.finagle.util -import scala.annotation.tailrec import scala.collection.mutable.Queue import scala.collection.JavaConversions._ -import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.ConcurrentHashMap -import com.twitter.util.{Duration, Time} +import com.twitter.util.Duration import com.twitter.util.TimeConversions._ // TODO: do we want a decaying stat? diff --git a/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala b/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala index 50154b70ee..70a6d3a5d8 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala @@ -1,22 +1,8 @@ package com.twitter.finagle.util import java.util.concurrent.TimeUnit - import org.jboss.netty.util.{Timer, TimerTask, Timeout} - -import com.twitter.util.{Duration, Try, Promise} - -object TimerFuture { - import Conversions._ - - def apply[A](timer: Timer, after: Duration, tryValue: => Try[A]) = { - val future = new Promise[A] - timer(after) { - future() = tryValue - } - future - } -} +import com.twitter.util.Duration class RichTimer(val self: Timer) { def apply(after: Duration)(f: => Unit): Timeout = { diff --git a/finagle-core/src/test/scala/com/twitter/finagle/integration/EmbeddedServer.scala b/finagle-core/src/test/scala/com/twitter/finagle/integration/EmbeddedServer.scala index d5cc8a623f..bb732831b4 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/integration/EmbeddedServer.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/integration/EmbeddedServer.scala @@ -14,10 +14,10 @@ import org.jboss.netty.handler.codec.http._ import org.jboss.netty.util.HashedWheelTimer import com.twitter.conversions.time._ -import com.twitter.ostrich.StatsCollection import com.twitter.finagle.util.Conversions._ import com.twitter.util.{RandomSocket, Duration} +import com.twitter.ostrich.StatsCollection object EmbeddedServer { def apply() = new EmbeddedServer(RandomSocket()) @@ -47,8 +47,10 @@ class EmbeddedServer(val addr: SocketAddress) { val pipeline = Channels.pipeline() pipeline.addLast("transposer", new SimpleChannelDownstreamHandler { override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) { - if (!isBelligerent) - return super.writeRequested(ctx, e) + if (!isBelligerent) { + super.writeRequested(ctx, e) + return + } // Garble the message a bit. val buffer = e.getMessage.asInstanceOf[ChannelBuffer] diff --git a/finagle-core/src/test/scala/com/twitter/finagle/integration/LoadBalancer.scala b/finagle-core/src/test/scala/com/twitter/finagle/integration/LoadBalancerIntegrationSpec.scala similarity index 100% rename from finagle-core/src/test/scala/com/twitter/finagle/integration/LoadBalancer.scala rename to finagle-core/src/test/scala/com/twitter/finagle/integration/LoadBalancerIntegrationSpec.scala diff --git a/finagle-core/src/test/scala/com/twitter/finagle/integration/NettyAssumptions.scala b/finagle-core/src/test/scala/com/twitter/finagle/integration/NettyAssumptionsSpec.scala similarity index 89% rename from finagle-core/src/test/scala/com/twitter/finagle/integration/NettyAssumptions.scala rename to finagle-core/src/test/scala/com/twitter/finagle/integration/NettyAssumptionsSpec.scala index 192be203d1..8b8c2b1c67 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/integration/NettyAssumptions.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/integration/NettyAssumptionsSpec.scala @@ -5,12 +5,13 @@ import org.specs.Specification import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel._ -import com.twitter.util.{Promise, Return} - import com.twitter.finagle.builder.ClientBuilder import com.twitter.finagle.util.Conversions._ import com.twitter.finagle.util.Ok +import com.twitter.util.CountDownLatch +import com.twitter.conversions.time._ + /** * Here we test a number of assumptions we are making of Netty. This * is all stuff that's verified by examination of the Netty codebase, @@ -38,19 +39,19 @@ object NettyAssumptionsSpec extends Specification { }) bootstrap.setPipeline(pipeline) - val latch = new Promise[Unit] + val latch = new CountDownLatch(1) bootstrap.connect(server.addr) { case Ok(channel) => channel.isOpen must beTrue Channels.close(channel) channel.isOpen must beFalse - latch() = Return(()) + latch.countDown() case _ => throw new Exception("Failed to connect to the expected socket.") } - latch() + latch.await(1.second) must beTrue } } } diff --git a/finagle-core/src/test/scala/com/twitter/finagle/util/TimerSpec.scala b/finagle-core/src/test/scala/com/twitter/finagle/util/TimerSpec.scala index 8a6103a311..6d06de4137 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/util/TimerSpec.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/util/TimerSpec.scala @@ -43,39 +43,4 @@ object TimerSpec extends Specification with Mockito { wasInvoked must beFalse } } - - "TimerFuture" should { - val timer = mock[Timer] - val richTimer = new RichTimer(timer) - val timeout = mock[Timeout] - val taskCaptor = ArgumentCaptor.forClass(classOf[TimerTask]) - timer.newTimeout( - taskCaptor.capture, - Matchers.eq(10000L), - Matchers.eq(TimeUnit.MILLISECONDS)) returns timeout - val timerFuture = TimerFuture[Unit](timer, 10.seconds, Throw(new Exception)) - - there was one(timer).newTimeout( - any[TimerTask], - Matchers.eq(10000L), - Matchers.eq(TimeUnit.MILLISECONDS)) - - val timeoutTask = taskCaptor.getValue - - timerFuture.isDefined must beFalse - - "be satisfied when the timer is" in { - timeoutTask.run(timeout) - timerFuture() must throwA(new Exception) - } - - "compose with Future.select" in { - val future = new Promise[Unit] - val selected = Future.select(future, timerFuture) - selected.isDefined must beFalse - future() = Return(()) - selected() must be_==(()) - } - - } } diff --git a/finagle-ostrich/src/main/scala/com/twitter/finagle/OstrichStatsReceiver.scala b/finagle-ostrich/src/main/scala/com/twitter/finagle/OstrichStatsReceiver.scala new file mode 100644 index 0000000000..ba3a96902d --- /dev/null +++ b/finagle-ostrich/src/main/scala/com/twitter/finagle/OstrichStatsReceiver.scala @@ -0,0 +1,21 @@ +package com.twitter.finagle + +import com.twitter.ostrich.Stats +import com.twitter.finagle.stats.StatsReceiver + +class OstrichStatsReceiver extends StatsReceiver { + def observer(prefix: String, label: String) = { + val suffix = "_%s".format(label) + + (path: Seq[String], value: Int, count: Int) => { + // Enforce count == 1? + val pathString = path mkString "__" + Stats.addTiming(prefix + pathString, value) + Stats.addTiming(prefix + pathString + suffix, value) + } + } + + def makeGauge(name: String, f: => Float) { + Stats.makeGauge(name)(f) + } +} \ No newline at end of file diff --git a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Thrift.scala b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Thrift.scala index 34782792f0..e30c4bc9c1 100644 --- a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Thrift.scala +++ b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Thrift.scala @@ -4,7 +4,15 @@ import org.jboss.netty.channel.{Channels, ChannelPipelineFactory} import com.twitter.finagle.builder.Codec -class Thrift extends Codec { +class ThriftWithWrappedReplies extends Thrift with WrappedReplies + +trait WrappedReplies extends Thrift { + override val wrapReplies = true +} + +class Thrift extends Codec +{ + val wrapReplies = false val instance = this val clientPipelineFactory = @@ -13,7 +21,7 @@ class Thrift extends Codec { val pipeline = Channels.pipeline() pipeline.addLast("thriftFrameCodec", new ThriftFrameCodec) pipeline.addLast("thriftClientEncoder", new ThriftClientEncoder) - pipeline.addLast("thriftClientDecoder", new ThriftClientDecoder) + pipeline.addLast("thriftClientDecoder", new ThriftClientDecoder(wrapReplies)) pipeline } } diff --git a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientCodec.scala b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientCodec.scala index 7ae65da19b..f416aea405 100644 --- a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientCodec.scala +++ b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientCodec.scala @@ -34,7 +34,9 @@ class ThriftClientEncoder extends SimpleChannelDownstreamHandler { /** * Translate wire representation to ThriftReply */ -class ThriftClientDecoder extends ReplayingDecoder[VoidEnum] { +class ThriftClientDecoder(val wrapReplies: Boolean) + extends ReplayingDecoder[VoidEnum] +{ protected val protocolFactory = new TBinaryProtocol.Factory(true, true) def decodeThriftReply(ctx: ChannelHandlerContext, @@ -54,8 +56,11 @@ class ThriftClientDecoder extends ReplayingDecoder[VoidEnum] { null case TMessageType.REPLY => val call = ThriftTypes(message.name).newInstance() - val reply = call.readResponse(protocol) - reply.asInstanceOf[AnyRef] // Note reply may not be a success + val result = call.readResponse(protocol).asInstanceOf[AnyRef] + if (wrapReplies) + call.reply(result) + else + result case _ => Channels.fireExceptionCaught(ctx, new TApplicationException( TApplicationException.INVALID_MESSAGE_TYPE)) diff --git a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Types.scala b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Types.scala index 57ab8d850f..01dca1ee5e 100644 --- a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Types.scala +++ b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/Types.scala @@ -54,8 +54,8 @@ class ThriftCall[A <: TBase[_, _], R <: TBase[_, _]]( /** * Wrap a ReplyClass in a ThriftReply. */ - def reply(reply: R) = - new ThriftReply[R](reply, this) + def reply(reply: AnyRef) = + new ThriftReply[R](reply.asInstanceOf[R], this) /** * Read the argument list diff --git a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/AsyncServerEndToEnd.scala b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/AsyncServerEndToEnd.scala index c099cf5ce4..b275b155be 100644 --- a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/AsyncServerEndToEnd.scala +++ b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/AsyncServerEndToEnd.scala @@ -44,7 +44,7 @@ object AsyncServerEndToEndSpec extends Specification { } }) - val callResults = new Promise[Silly.bleep_result] + val callResults = new Promise[ThriftReply[Silly.bleep_result]] // ** Set up the client. val clientBootstrap = new ClientBootstrap(new DefaultLocalClientChannelFactory) @@ -52,11 +52,11 @@ object AsyncServerEndToEndSpec extends Specification { def getPipeline() = { val pipeline = Channels.pipeline() pipeline.addLast("framer", new ThriftFrameCodec) - pipeline.addLast("decode", new ThriftClientDecoder) + pipeline.addLast("decode", new ThriftClientDecoder(true)) pipeline.addLast("encode", new ThriftClientEncoder) pipeline.addLast("handler", new SimpleChannelUpstreamHandler { override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { - callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result]) + callResults() = Return(e.getMessage.asInstanceOf[ThriftReply[Silly.bleep_result]]) } }) @@ -78,7 +78,7 @@ object AsyncServerEndToEndSpec extends Specification { val result = callResults.within(1.second) result.isReturn must beTrue - result().success must be_==("yehyeh") + result().response.success must be_==("yehyeh") serverChannel.close().awaitUninterruptibly() serverBootstrap.getFactory.releaseExternalResources() diff --git a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ChannelBufferTransport.scala b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ChannelBufferTransport.scala index 00c324014b..90fcc9536d 100644 --- a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ChannelBufferTransport.scala +++ b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ChannelBufferTransport.scala @@ -40,7 +40,7 @@ object DuplexChannelBufferTransportSpec extends Specification with Mockito { val t = new DuplexChannelBufferTransport(in, out) val bb = "hello".getBytes - "writes to the output ChannelBuffe" in { + "writes to the output ChannelBuffer" in { t.write(bb, 0, 1) there was one(out).writeBytes(bb, 0, 1) diff --git a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEnd.scala b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEnd.scala index 57294ec1a3..ba86bf6873 100644 --- a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEnd.scala +++ b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEnd.scala @@ -55,7 +55,7 @@ object EndToEndSpec extends Specification { } }) - val callResults = new Promise[Silly.bleep_result] + val callResults = new Promise[ThriftReply[Silly.bleep_result]] // ** Set up the client. val clientBootstrap = new ClientBootstrap(new DefaultLocalClientChannelFactory) @@ -64,10 +64,10 @@ object EndToEndSpec extends Specification { val pipeline = Channels.pipeline() pipeline.addLast("framer", new ThriftFrameCodec) pipeline.addLast("encoder", new ThriftClientEncoder) - pipeline.addLast("decoder", new ThriftClientDecoder) + pipeline.addLast("decoder", new ThriftClientDecoder(true)) pipeline.addLast("handler", new SimpleChannelUpstreamHandler { override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { - callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result]) + callResults() = Return(e.getMessage.asInstanceOf[ThriftReply[Silly.bleep_result]]) Channels.close(ctx.getChannel) } }) @@ -89,7 +89,7 @@ object EndToEndSpec extends Specification { val result = callResults.within(1.second) result.isReturn must beTrue - result().success must be_==("yehyeh") + result().response.success must be_==("yehyeh") // ** Shutdown serverChannel.close().awaitUninterruptibly() @@ -98,7 +98,6 @@ object EndToEndSpec extends Specification { } "client" should { - "talk silly to an existing server" in { ThriftTypes.add(new ThriftCallFactory[Silly.bleep_args, Silly.bleep_result]( "bleep", classOf[Silly.bleep_args], classOf[Silly.bleep_result])) @@ -117,7 +116,7 @@ object EndToEndSpec extends Specification { processor, serverSocket, transportFactory, protocolFactory) - val callResults = new Promise[Silly.bleep_result] + val callResults = new Promise[ThriftReply[Silly.bleep_result]] val cf = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), @@ -129,10 +128,10 @@ object EndToEndSpec extends Specification { val pipeline = Channels.pipeline() pipeline.addLast("framer", new ThriftFrameCodec) pipeline.addLast("encoder", new ThriftClientEncoder) - pipeline.addLast("decoder", new ThriftClientDecoder) + pipeline.addLast("decoder", new ThriftClientDecoder(true)) pipeline.addLast("handler", new SimpleChannelUpstreamHandler { override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { - callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result]) + callResults() = Return(e.getMessage.asInstanceOf[ThriftReply[Silly.bleep_result]]) Channels.close(ctx.getChannel) } }) @@ -158,7 +157,7 @@ object EndToEndSpec extends Specification { val result = callResults.within(1.second) result.isReturn must beTrue - result().success must be_==("raboof") + result().response.success must be_==("raboof") thriftServer.stop() serverThread.join() diff --git a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ServiceEndToEnd.scala b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ServiceEndToEnd.scala index ddfbd2c3e5..ae3b1ca494 100644 --- a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ServiceEndToEnd.scala +++ b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ServiceEndToEnd.scala @@ -35,39 +35,65 @@ object ServiceEndToEndSpec extends Specification { } "Service based Thrift server" should { - "respond to calls" in { - ThriftTypes.add( - new ThriftCallFactory[Silly.bleep_args, Silly.bleep_result] - ("bleep", classOf[Silly.bleep_args], classOf[Silly.bleep_result])) + ThriftTypes.add( + new ThriftCallFactory[Silly.bleep_args, Silly.bleep_result] + ("bleep", classOf[Silly.bleep_args], classOf[Silly.bleep_result])) - val addr = RandomSocket.nextAddress() + val addr = RandomSocket.nextAddress() - val sillyService = new SillyService() - val server = ServerBuilder() - .codec(new Thrift) - .service(sillyService) - .bindTo(addr) - .build() + val sillyService = new SillyService() + val server = ServerBuilder() + .codec(new Thrift) + .service(sillyService) + .bindTo(addr) + .build() - val client = ClientBuilder() - .codec(new Thrift) - .hosts(Seq(addr)) - .buildService[ThriftCall[_ <:TBase[_, _],_ <: TBase[_, _]], ThriftReply[_]] + "with wrapped replies" in { + "respond to calls with ThriftReply[Call.response_type]" in { + val client = ClientBuilder() + .codec(new ThriftWithWrappedReplies) + .hosts(Seq(addr)) + .buildService[ThriftCall[_ <:TBase[_, _],_ <: TBase[_, _]], ThriftReply[_]] - val promise = new Promise[ThriftReply[_]] + val promise = new Promise[ThriftReply[_]] - val call = new ThriftCall("bleep", - new Silly.bleep_args("hello"), - classOf[Silly.bleep_result]) - client(call) respond { r => promise() = r } + val call = new ThriftCall("bleep", + new Silly.bleep_args("hello"), + classOf[Silly.bleep_result]) + client(call) respond { r => promise() = r } - val result = promise.within(1.second) + val result = promise.within(1.second) - result.isReturn must beTrue - val reply = result().asInstanceOf[Silly.bleep_result] - reply.success mustEqual "olleh" + result.isReturn must beTrue + val reply = result().asInstanceOf[ThriftReply[Silly.bleep_result]] + reply().response.success mustEqual "olleh" - server.close().awaitUninterruptibly() + server.close().awaitUninterruptibly() + } + } + + "without wrapped replies" in { + "respond to calls with ThriftReply[Call.response_type]" in { + val client = ClientBuilder() + .codec(new Thrift) + .hosts(Seq(addr)) + .buildService[ThriftCall[_ <:TBase[_, _],_ <: TBase[_, _]], Silly.bleep_result] + + val promise = new Promise[Silly.bleep_result] + + val call = new ThriftCall("bleep", + new Silly.bleep_args("hello"), + classOf[Silly.bleep_result]) + client(call) respond { r => promise() = r } + + val result = promise.within(1.second) + + result.isReturn must beTrue + val reply = result().asInstanceOf[Silly.bleep_result] + reply().success mustEqual "olleh" + + server.close().awaitUninterruptibly() + } } } } diff --git a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ThriftCodec.scala b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ThriftCodec.scala index 020a38d55d..89a74bdaec 100644 --- a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ThriftCodec.scala +++ b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/ThriftCodec.scala @@ -142,17 +142,16 @@ object ThriftCodecSpec extends Specification { "decode replys" in { // receive reply and decode val buffer = thriftToBuffer("bleep", TMessageType.REPLY, 23, new Silly.bleep_result("result")) - val channel = makeChannel(new ThriftClientDecoder) + val channel = makeChannel(new ThriftClientDecoder(true)) Channels.fireMessageReceived(channel, buffer) channel.upstreamEvents must haveSize(1) channel.downstreamEvents must haveSize(0) // verify decode val message = channel.upstreamEvents(0).asInstanceOf[MessageEvent].getMessage() - val result = message.asInstanceOf[Silly.bleep_result] + val result = message.asInstanceOf[ThriftReply[Silly.bleep_result]] result mustNot beNull - result.isSetSuccess must beTrue - result.success must be_==("result") + result.response.success must be_==("result") } "decode replys broken in two" in { @@ -160,7 +159,7 @@ object ThriftCodecSpec extends Specification { Range(0, buffer.readableBytes - 1).foreach { numBytes => // receive partial call - val channel = makeChannel(new ThriftClientDecoder) + val channel = makeChannel(new ThriftClientDecoder(true)) val truncatedBuffer = buffer.copy(buffer.readerIndex, numBytes) Channels.fireMessageReceived(channel, truncatedBuffer) @@ -177,10 +176,9 @@ object ThriftCodecSpec extends Specification { channel.upstreamEvents must haveSize(1) channel.downstreamEvents must haveSize(0) val message = channel.upstreamEvents(0).asInstanceOf[MessageEvent].getMessage() - val result = message.asInstanceOf[Silly.bleep_result] + val result = message.asInstanceOf[ThriftReply[Silly.bleep_result]] result mustNot beNull - result.isSetSuccess must beTrue - result.success must be_==("result") + result.response.success must be_==("result") } } @@ -188,7 +186,7 @@ object ThriftCodecSpec extends Specification { // receive exception and decode val buffer = thriftToBuffer("bleep", TMessageType.EXCEPTION, 23, new TApplicationException(TApplicationException.UNKNOWN_METHOD, "message")) - val channel = makeChannel(new ThriftClientDecoder) + val channel = makeChannel(new ThriftClientDecoder(true)) Channels.fireMessageReceived(channel, buffer) channel.upstreamEvents must haveSize(1) channel.downstreamEvents must haveSize(0) diff --git a/project/build/Project.scala b/project/build/Project.scala index 58f812c254..024cd7d974 100644 --- a/project/build/Project.scala +++ b/project/build/Project.scala @@ -4,7 +4,7 @@ import com.twitter.sbt._ class Project(info: ProjectInfo) extends StandardParentProject(info) with SubversionPublisher { - // override def parallelExecution = true +// override def parallelExecution = true override def subversionRepository = Some("http://svn.local.twitter.com/maven-public") val twitterRepo = "twitter.com" at "http://maven.twttr.com/" @@ -17,8 +17,7 @@ class Project(info: ProjectInfo) extends StandardParentProject(info) // val kestrelProject = project("finagle-kestrel", "finagle-kestrel", new KestrelProject(_)) // val hosebirdProject = project("finagle-hosebird", "finagle-hosebird", new HosebirdProject(_)) - class CoreProject(info: ProjectInfo) - extends StandardProject(info) + class CoreProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher with IntegrationSpecs with AdhocInlines { override def compileOrder = CompileOrder.ScalaThenJava @@ -28,14 +27,12 @@ class Project(info: ProjectInfo) extends StandardParentProject(info) val netty = "org.jboss.netty" % "netty" % "3.2.3.Final" val util = "com.twitter" % "util" % "1.4.8" - val mockito = "org.mockito" % "mockito-all" % "1.8.5" % "test" withSources() - val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test" withSources() - - val ostrich = "com.twitter" % "ostrich" % "2.3.4" + val mockito = "org.mockito" % "mockito-all" % "1.8.5" % "test" withSources() + val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test" withSources() + val ostrich = "com.twitter" % "ostrich" % "2.3.4" % "test" } - class ThriftProject(info: ProjectInfo) - extends StandardProject(info) + class ThriftProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher with LibDirClasspath with AdhocInlines { override def compileOrder = CompileOrder.ScalaThenJava @@ -44,8 +41,7 @@ class Project(info: ProjectInfo) extends StandardParentProject(info) val slf4jNop = "org.slf4j" % "slf4j-nop" % "1.5.2" % "provided" } - class OstrichProject(info: ProjectInfo) - extends StandardProject(info) + class OstrichProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher with AdhocInlines { val ostrich = "com.twitter" % "ostrich" % "2.3.4"