Browse files

Merge branch 'master' of github.com:twitter/finagle into stats_refact…

…oring

Conflicts:
	finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReceiver.scala
	finagle-stress/src/main/scala/com/twitter/finagle/stress/EmbeddedServer.scala
  • Loading branch information...
2 parents aa3b9fd + 2211916 commit f9f84cd7b3c141ee148dc74cf9dbd1cbfd4b7485 Nick Kallen committed Jan 11, 2011
View
32 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -10,7 +10,7 @@ import java.util.concurrent.Executors
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio._
-import com.twitter.util.{Duration, JavaTimer}
+import com.twitter.util.Duration
import com.twitter.util.TimeConversions._
import com.twitter.finagle.channel._
@@ -81,6 +81,34 @@ case class ClientBuilder(
None // proactivelyConnect
)
+ override def toString() = {
+ val options = Seq(
+ "name" -> _name,
+ "hosts" -> _hosts,
+ "codec" -> _codec,
+ "connectionTimeout" -> Some(_connectionTimeout),
+ "requestTimeout" -> Some(_requestTimeout),
+ "statsReceiver" -> _statsReceiver,
+ "loadStatistics" -> _loadStatistics,
+ "failureAccrualStatistics" -> Some(_failureAccrualStatistics),
+ "hostConnectionLimit" -> Some(_hostConnectionLimit),
+ "sendBufferSize" -> _sendBufferSize,
+ "recvBufferSize" -> _recvBufferSize,
+ "retries" -> _retries,
+ "initialBackoff" -> _initialBackoff,
+ "backoffMultiplier" -> _backoffMultiplier,
+ "logger" -> _logger,
+ "channelFactory" -> _channelFactory,
+ "proactivelyConnect" -> _proactivelyConnect
+ )
+
+ "ClientBuilder(%s)".format(
+ options flatMap {
+ case (k, Some(v)) => Some("%s=%s".format(k, v))
+ case _ => None
+ } mkString(", "))
+ }
+
def hosts(hostnamePortCombinations: String): ClientBuilder =
copy(_hosts = Some(parseHosts(hostnamePortCombinations)))
@@ -218,7 +246,7 @@ case class ClientBuilder(
(hosts, codec)
}
- val timer = new JavaTimer
+ val timer = Timer.default
val brokers = hosts map { host =>
val statsRepository = {
val statsRepository = new TimeWindowedStatsRepository(
View
1 finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala
@@ -23,6 +23,7 @@ class JavaLoggerStatsReceiver(logger: Logger, timer: Timer) extends StatsReceive
def mkGauge(name: Seq[(String, String)], f: => Float) {
timer.schedule(10.seconds) {
logger.info("%s %2f".format(name, f))
+ makeGauge(name, f)
}
}
View
2 finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReceiver.scala
@@ -50,4 +50,4 @@ trait StatsReceiver {
def mkGauge(description1: (String, String), description2: (String, String), f: => Float) {
mkGauge(Seq(description1, description2), f)
}
-}
+}
View
5 finagle-core/src/main/scala/com/twitter/finagle/stats/TimeWindowedStatsRepository.scala
@@ -1,6 +1,7 @@
package com.twitter.finagle.stats
-import com.twitter.util.{Timer, Duration, JavaTimer, Time}
+import com.twitter.util.{Duration, Time}
+import com.twitter.finagle.util.Timer
/**
* A StatsRepository that keeps a rolling set of windows of data. Stats are
@@ -12,7 +13,7 @@ import com.twitter.util.{Timer, Duration, JavaTimer, Time}
* @param windows the number of time windows to keep around
* @param timer a timer to schedule creating and dropping time windows
*/
-class TimeWindowedStatsRepository(numIntervals: Int, interval: Duration, timer: Timer = new JavaTimer)
+class TimeWindowedStatsRepository(numIntervals: Int, interval: Duration, timer: Timer = Timer.default)
extends StatsRepository
{
@volatile private[this] var position = 0
View
31 finagle-core/src/test/scala/com/twitter/finagle/integration/NettyAssumptionsSpec.scala
@@ -1,15 +1,18 @@
package com.twitter.finagle.integration
+import java.util.concurrent.Executors
+
import org.specs.Specification
-import org.jboss.netty.bootstrap.ClientBootstrap
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap}
import org.jboss.netty.channel._
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util.Ok
-import com.twitter.util.CountDownLatch
+import com.twitter.util.{CountDownLatch, RandomSocket}
import com.twitter.conversions.time._
/**
@@ -22,8 +25,28 @@ import com.twitter.conversions.time._
* *are* making of Netty :-)
*/
object NettyAssumptionsSpec extends Specification {
+ private[this] val executor = Executors.newCachedThreadPool()
+ def makeServer() = {
+ val bootstrap = new ServerBootstrap(
+ new NioServerSocketChannelFactory(executor, executor))
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory {
+ def getPipeline = {
+ val pipeline = Channels.pipeline()
+ pipeline.addLast("stfu", new SimpleChannelUpstreamHandler {
+ override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ /* nothing */
+ }
+ })
+ pipeline
+ }
+ })
+ bootstrap.bind(RandomSocket())
+ }
+
"Channel.close()" should {
- val server = EmbeddedServer()
+ val ch = makeServer()
+ val addr = ch.getLocalAddress()
+ doAfter { ch.close().awaitUninterruptibly() }
// This test, like any involving timing, is of course fraught with
// races.
@@ -41,7 +64,7 @@ object NettyAssumptionsSpec extends Specification {
val latch = new CountDownLatch(1)
- bootstrap.connect(server.addr) {
+ bootstrap.connect(addr) {
case Ok(channel) =>
channel.isOpen must beTrue
Channels.close(channel)
View
10 .../finagle/integration/EmbeddedServer.scala → ...itter/finagle/stress/EmbeddedServer.scala
@@ -21,14 +21,13 @@ import com.twitter.finagle.util.Timer
object EmbeddedServer {
def apply() = new EmbeddedServer(RandomSocket())
- val executor = Executors.newCachedThreadPool()
val timer = Timer.default
}
class EmbeddedServer(val addr: SocketAddress) {
import EmbeddedServer._
- // (Publically accessible) stats covering this server.
+ // (Publicly accessible) stats covering this server.
val stats = new StatsCollection
// Server state:
@@ -39,6 +38,7 @@ class EmbeddedServer(val addr: SocketAddress) {
private[this] val channels = new DefaultChannelGroup
+ private[this] val executor = Executors.newCachedThreadPool()
private[this] val bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(executor, executor))
@@ -86,6 +86,10 @@ class EmbeddedServer(val addr: SocketAddress) {
super.messageReceived(ctx, e)
}
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ stats.incr("exc_%s".format(e.getCause.getClass.getName.split('.').last))
+ }
+
})
pipeline.addLast("latency", new SimpleChannelDownstreamHandler {
@@ -118,6 +122,8 @@ class EmbeddedServer(val addr: SocketAddress) {
channels.close().awaitUninterruptibly()
channels.clear()
+
+ bootstrap.releaseExternalResources()
}
def start() {
View
195 finagle-stress/src/main/scala/com/twitter/finagle/stress/LoadBalancerTest.scala
@@ -0,0 +1,195 @@
+package com.twitter.finagle.integration
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.jboss.netty.handler.codec.http._
+
+import com.twitter.ostrich.{StatsCollection, StatsProvider}
+import com.twitter.util.{Duration, CountDownLatch, Return, Throw, Time}
+import com.twitter.conversions.time._
+
+import com.twitter.finagle.builder.{ClientBuilder, Http}
+import com.twitter.finagle.service.Service
+import com.twitter.finagle.stats.NullStatsRepository
+import com.twitter.finagle.util.Timer
+import com.twitter.finagle.util.Conversions._
+
+object LoadBalancerTest {
+ def main(args: Array[String]) {
+ runSuite(
+ ClientBuilder()
+ .requestTimeout(40.milliseconds)
+ .retries(10)
+ )
+
+ // TODO: proper resource releasing, etc.
+ }
+
+ def runSuite(clientBuilder: ClientBuilder) {
+ println("testing " + clientBuilder)
+ println("\n== baseline ==\n")
+ new LoadBalancerTest(clientBuilder)({ case _ => }).run()
+
+ println("\n== 1 server goes offline ==\n")
+ new LoadBalancerTest(clientBuilder)({
+ case (1000, servers) =>
+ servers(1).stop()
+ }).run()
+
+ println("\n== 1 application becomes nonresponsive ==\n")
+ new LoadBalancerTest(clientBuilder)({
+ case (1000, servers) =>
+ servers(1).becomeApplicationNonresponsive()
+ }).run()
+
+ println("\n== 1 connection becomes nonresponsive ==\n")
+ new LoadBalancerTest(clientBuilder)({
+ case (1000, servers) =>
+ servers(1).becomeConnectionNonresponsive()
+ }).run()
+
+ println("\n== 1 server has a protocol error ==\n")
+ new LoadBalancerTest(clientBuilder)({
+ case (1000, servers) =>
+ servers(1).becomeBelligerent()
+ }).run()
+ }
+}
+
+class LoadBalancerTest(
+ clientBuilder: ClientBuilder,
+ serverLatency: Duration = 0.seconds,
+ numRequests: Int = 100000,
+ concurrency: Int = 20)(behavior: PartialFunction[(Int, Seq[EmbeddedServer]), Unit])
+{
+ private[this] val requestNumber = new AtomicInteger(0)
+ private[this] val requestCount = new AtomicInteger(numRequests)
+ private[this] val latch = new CountDownLatch(concurrency)
+ private[this] val stats = new StatsCollection
+ private[this] val gaugeValues = new ArrayBuffer[(Int, Map[String, Float])]
+
+ private[this] def prettyPrintStats(stats: StatsProvider) {
+ stats.getCounterStats foreach { case (name, count) =>
+ println("# %-30s %d".format(name, count))
+ }
+
+ stats.getTimingStats foreach { case (name, stat) =>
+ val statMap = stat.toMap
+ val keys = statMap.keys.toList.sorted
+
+ keys foreach { key =>
+ println("# %-30s %s".format("request_%s".format(key), statMap(key)))
+ }
+ }
+ }
+
+ private[this] def dispatch(
+ client: Service[HttpRequest, HttpResponse],
+ servers: Seq[EmbeddedServer],
+ f: PartialFunction[(Int, Seq[EmbeddedServer]), Unit]) {
+ val num = requestNumber.incrementAndGet()
+ if (f.isDefinedAt((num, servers)))
+ f((num, servers))
+
+ val beginTime = Time.now
+
+ client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) respond { result =>
+ result match {
+ case Return(_) =>
+ val duration = beginTime.untilNow
+ stats.addTiming("request", duration.inMilliseconds.toInt)
+ stats.incr("success")
+ case Throw(exc) =>
+ stats.incr("fail")
+ stats.incr("fail_%s".format(exc.getClass.getName.split('.').last))
+ }
+
+ if (requestCount.decrementAndGet() > 0)
+ dispatch(client, servers, f)
+ else
+ latch.countDown()
+ }
+ }
+
+ def run() {
+ val servers = (0 until 3).toArray map(_ => EmbeddedServer())
+
+ servers foreach { server =>
+ server.setLatency(serverLatency)
+ }
+
+ // Capture gauges to report them at the end.
+ val gauges = new HashMap[String, Function0[Float]]
+ val statsReceiver = new NullStatsRepository {
+ override def mkGauge(description: Seq[(String, String)], f: => Float) {
+ val name = description.map(_._2).mkString("_")
+ gauges += name -> (() => f)
+ }
+ }
+
+ def captureGauges() {
+ Timer.default.schedule(500.milliseconds) {
+ val now = requestNumber.get
+ val values = gauges map { case (k, v) => (k, v()) }
+ gaugeValues += ((now, Map() ++ values))
+ }
+ }
+
+ val client = clientBuilder
+ .codec(Http)
+ .hosts(servers map(_.addr))
+ .reportTo(statsReceiver)
+ .buildService[HttpRequest, HttpResponse]
+
+ val begin = Time.now
+ captureGauges()
+ 0 until concurrency foreach { _ => dispatch(client, servers, behavior) }
+ latch.await()
+ val duration = begin.untilNow
+ val rps = (numRequests.toDouble / duration.inMilliseconds.toDouble) * 1000.0
+
+ // Produce a "report" here instead, so we have some sort of
+ // semantic information here.
+
+ println("> STATS")
+ val succ = stats.getCounter("success")().toDouble
+ val fail = stats.getCounter("fail")().toDouble
+ println("> success rate: %.2f".format(100.0 * succ / (succ + fail)))
+ println("> request rate: %.2f".format(rps))
+ prettyPrintStats(stats)
+
+ val allGaugeNames = {
+ val unique = Set() ++ gaugeValues flatMap { case (_, gvs) => gvs map (_._1) }
+ unique.toList.sorted
+ }
+
+ val columnNames = allGaugeNames map { gaugeName =>
+ // Try to substitute a server.
+ val Array(host, name) = gaugeName.split("_")
+ val serverIndex = servers.findIndexOf { _.addr.toString == host }
+ val shortName = name match {
+ case "available" => "a"
+ case "load" => "l"
+ case "weight" => "w"
+ case n => n
+ }
+
+ "%d/%s".format(serverIndex, shortName)
+ }
+
+ println("> %5s %s".format("time", columnNames map("%-8s".format(_)) mkString(" ")))
+
+ gaugeValues foreach { case (requestNum, values) =>
+ val columns = allGaugeNames map { values.get(_).map("%.2e".format(_)).getOrElse("n/a") }
+ println("> %05d %s".format(requestNum, columns.map("%8s".format(_)).mkString(" ")))
+ }
+
+ servers.zipWithIndex foreach { case (server, which) =>
+ server.stop()
+ println("> SERVER[%d] (%s)".format(which, server.addr))
+ prettyPrintStats(server.stats)
+ }
+ }
+}
View
77 ...gration/LoadBalancerIntegrationSpec.scala → ...gration/LoadBalancerIntegrationSpec.scala
@@ -1,11 +1,13 @@
package com.twitter.finagle.integration
+import java.util.concurrent.atomic.AtomicInteger
+
import org.jboss.netty.handler.codec.http._
import org.specs.Specification
import com.twitter.conversions.time._
-import com.twitter.util.{Return, Throw}
+import com.twitter.util.{Return, Throw, CountDownLatch, Time}
import com.twitter.ostrich.{StatsCollection, StatsProvider}
@@ -15,19 +17,27 @@ import com.twitter.finagle.builder.{ClientBuilder, Http}
object LoadBalancerIntegrationSpec extends Specification {
def prettyPrintStats(stats: StatsProvider) {
stats.getCounterStats foreach { case (name, count) =>
- println("# %-15s %d".format(name, count))
+ println("# %-30s %d".format(name, count))
}
}
"Load Balancer" should {
+ // def runSuite(client: Service[HttpRequest, HttpResponse])
+ val numRequests = 50000
+ val concurrency = 50
+
val servers = (0 until 3).toArray map(_ => EmbeddedServer())
val stats = new StatsCollection
+ val requestNumber = new AtomicInteger(0)
+ val requestCount = new AtomicInteger(numRequests)
+ val latch = new CountDownLatch(concurrency)
servers foreach { server =>
- // server.setLatency(10.milliseconds)
+ server.setLatency(5.milliseconds)
}
- // TODO: parallelize these; measure throughput.
+ // XXX - periodically print load, etc [or any kind of debugging
+ // information from the loadbalancer]
doAfter {
servers.zipWithIndex foreach { case (server, which) =>
@@ -37,36 +47,61 @@ object LoadBalancerIntegrationSpec extends Specification {
}
}
- def runTest[A](client: Service[HttpRequest, HttpResponse])(f: PartialFunction[Int, Unit]) {
- 0 until 10000 foreach { i =>
- if (f.isDefinedAt(i))
- f(i)
-
- val future = client(
- new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
+ def dispatch(client: Service[HttpRequest, HttpResponse], f: PartialFunction[Int, Unit]) {
+ val num = requestNumber.incrementAndGet()
+ if (f.isDefinedAt(num))
+ f(num)
- future.within(10.seconds) match {
+ client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) respond { result =>
+ result match {
case Return(_) =>
stats.incr("success")
- case Throw(_) =>
+ case Throw(exc) =>
stats.incr("fail")
+ stats.incr("fail_%s".format(exc.getClass.getName.split('.').last))
}
+
+ if (requestCount.decrementAndGet() > 0)
+ dispatch(client, f)
+ else
+ latch.countDown()
}
+ }
+ def runTest[A](client: Service[HttpRequest, HttpResponse])(f: PartialFunction[Int, Unit]) {
+ val begin = Time.now
+ 0 until concurrency foreach { _ => dispatch(client, f) }
+ latch.await()
+ val duration = begin.untilNow()
+ val rps = (numRequests.toDouble / duration.inMilliseconds.toDouble) * 1000.0
+
println("> STATS")
val succ = stats.getCounter("success")().toDouble
val fail = stats.getCounter("fail")().toDouble
println("> success rate: %.2f".format(100.0 * succ / (succ + fail)))
-
+ println("> request rate: %.2f".format(rps))
prettyPrintStats(stats)
}
+ "balance: baseline" in {
+ val client = ClientBuilder()
+ .codec(Http)
+ .hosts(servers map(_.addr))
+ .retries(2)
+ .requestTimeout(50.milliseconds)
+ .buildService[HttpRequest, HttpResponse]
+
+ runTest(client) { case _ => () }
+
+ true must beTrue
+ }
+
"balance: server goes offline" in {
val client = ClientBuilder()
.codec(Http)
.hosts(servers map(_.addr))
.retries(2)
- .requestTimeout(10.milliseconds)
+ .requestTimeout(50.milliseconds)
.buildService[HttpRequest, HttpResponse]
runTest(client) {
@@ -81,8 +116,8 @@ object LoadBalancerIntegrationSpec extends Specification {
val client = ClientBuilder()
.codec(Http)
.hosts(servers map(_.addr))
- .requestTimeout(10.milliseconds)
- .retries(2)
+ .requestTimeout(50.milliseconds)
+ // .retries(2)
.buildService[HttpRequest, HttpResponse]
runTest(client) {
@@ -97,8 +132,8 @@ object LoadBalancerIntegrationSpec extends Specification {
val client = ClientBuilder()
.codec(Http)
.hosts(servers map(_.addr))
- .retries(2)
- .requestTimeout(10.milliseconds)
+ // .retries(2)
+ .requestTimeout(50.milliseconds)
.buildService[HttpRequest, HttpResponse]
runTest(client) {
@@ -113,8 +148,8 @@ object LoadBalancerIntegrationSpec extends Specification {
val client = ClientBuilder()
.codec(Http)
.hosts(servers map(_.addr))
- .retries(2)
- .requestTimeout(10.milliseconds)
+ // .retries(2)
+ .requestTimeout(50.milliseconds)
.buildService[HttpRequest, HttpResponse]
runTest(client) {
View
37 project/build/Project.scala
@@ -9,16 +9,30 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
val twitterRepo = "twitter.com" at "http://maven.twttr.com/"
- val coreProject = project("finagle-core", "finagle-core", new CoreProject(_))
- val ostrichProject = project("finagle-ostrich", "finagle-ostrich", new OstrichProject(_), coreProject)
- val thriftProject = project("finagle-thrift", "finagle-thrift", new ThriftProject(_), coreProject)
-// val httpProject = project("finagle-http", "finagle-http", new HttpProject(_))
-// val memcachedProject = project("finagle-memcached", "finagle-memcached", new MemcachedProject(_))
-// val kestrelProject = project("finagle-kestrel", "finagle-kestrel", new KestrelProject(_))
-// val hosebirdProject = project("finagle-hosebird", "finagle-hosebird", new HosebirdProject(_))
+ // finagle-core contains the finagle kernel itself, plus builders &
+ // HTTP codecs [HTTP may move to its own project soon]
+ val coreProject = project(
+ "finagle-core", "finagle-core",
+ new CoreProject(_))
+
+ // finagle-ostrich has a StatsReceiver for Ostrich
+ val ostrichProject = project(
+ "finagle-ostrich", "finagle-ostrich",
+ new OstrichProject(_), coreProject)
+
+ // finagle-thrift contains thrift codecs
+ val thriftProject = project(
+ "finagle-thrift", "finagle-thrift",
+ new ThriftProject(_), coreProject)
+
+ // finagle-integration has integration test suites & tools for
+ // development.
+ val stressProject = project(
+ "finagle-stress", "finagle-stress",
+ new StressProject(_), coreProject)
class CoreProject(info: ProjectInfo) extends StandardProject(info)
- with SubversionPublisher with IntegrationSpecs with AdhocInlines
+ with SubversionPublisher with AdhocInlines
{
override def compileOrder = CompileOrder.ScalaThenJava
@@ -29,7 +43,6 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
val mockito = "org.mockito" % "mockito-all" % "1.8.5" % "test" withSources()
val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test" withSources()
- val ostrich = "com.twitter" % "ostrich" % "2.3.4" % "test"
}
class ThriftProject(info: ProjectInfo) extends StandardProject(info)
@@ -46,4 +59,10 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
{
val ostrich = "com.twitter" % "ostrich" % "2.3.4"
}
+
+ class StressProject(info: ProjectInfo) extends StandardProject(info)
+ with SubversionPublisher with IntegrationSpecs with AdhocInlines
+ {
+ val ostrich = "com.twitter" % "ostrich" % "2.3.4"
+ }
}

0 comments on commit f9f84cd

Please sign in to comment.