Skip to content

Commit

Permalink
finagle-integration => finagle-stress
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusae committed Jan 11, 2011
1 parent f3e9a4a commit e57d5e4
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 26 deletions.
Expand Up @@ -3,4 +3,4 @@ package com.twitter.finagle.stats
trait StatsReceiver { trait StatsReceiver {
def observer(prefix: String, label: String): (Seq[String], Int, Int) => Unit def observer(prefix: String, label: String): (Seq[String], Int, Int) => Unit
def makeGauge(name: String, f: => Float) def makeGauge(name: String, f: => Float)
} }
Expand Up @@ -28,7 +28,7 @@ object EmbeddedServer {
class EmbeddedServer(val addr: SocketAddress) { class EmbeddedServer(val addr: SocketAddress) {
import EmbeddedServer._ import EmbeddedServer._


// (Publically accessible) stats covering this server. // (Publicly accessible) stats covering this server.
val stats = new StatsCollection val stats = new StatsCollection


// Server state: // Server state:
Expand Down Expand Up @@ -86,6 +86,10 @@ class EmbeddedServer(val addr: SocketAddress) {
super.messageReceived(ctx, e) super.messageReceived(ctx, e)
} }


override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
stats.incr("exc_%s".format(e.getCause.getClass.getName.split('.').last))
}

}) })


pipeline.addLast("latency", new SimpleChannelDownstreamHandler { pipeline.addLast("latency", new SimpleChannelDownstreamHandler {
Expand Down
@@ -0,0 +1,107 @@
package com.twitter.finagle.integration

import java.util.concurrent.atomic.AtomicInteger

import org.jboss.netty.handler.codec.http._

import com.twitter.ostrich.{StatsCollection, StatsProvider}
import com.twitter.util.{Duration, CountDownLatch, Return, Throw, Time}
import com.twitter.conversions.time._

import com.twitter.finagle.builder.{ClientBuilder, Http}
import com.twitter.finagle.service.Service

object LoadBalancerTest {
def main(args: Array[String]) {
runSuite(
ClientBuilder()
.requestTimeout(10.milliseconds)
)

// TODO: proper resource releasing, etc.
}

def runSuite(clientBuilder: ClientBuilder) {
val baseline = new LoadBalancerTest(clientBuilder)({ case _ => })
println("** BASELINE")
baseline.run()
}

}

class LoadBalancerTest(
clientBuilder: ClientBuilder,
serverLatency: Duration = 0.seconds,
numRequests: Int = 100000,
concurrency: Int = 50)(behavior: PartialFunction[(Int, Seq[EmbeddedServer]), Unit])
{
private[this] val requestNumber = new AtomicInteger(0)
private[this] val requestCount = new AtomicInteger(numRequests)
private[this] val latch = new CountDownLatch(concurrency)
private[this] val stats = new StatsCollection

private[this] def prettyPrintStats(stats: StatsProvider) {
stats.getCounterStats foreach { case (name, count) =>
println("# %-30s %d".format(name, count))
}
}

private[this] def dispatch(
client: Service[HttpRequest, HttpResponse],
servers: Seq[EmbeddedServer],
f: PartialFunction[(Int, Seq[EmbeddedServer]), Unit]) {
val num = requestNumber.incrementAndGet()
if (f.isDefinedAt((num, servers)))
f((num, servers))

client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) respond { result =>
result match {
case Return(_) =>
stats.incr("success")
case Throw(exc) =>
stats.incr("fail")
stats.incr("fail_%s".format(exc.getClass.getName.split('.').last))
}

if (requestCount.decrementAndGet() > 0)
dispatch(client, servers, f)
else
latch.countDown()
}
}

def run() {
val servers = (0 until 3).toArray map(_ => EmbeddedServer())

servers foreach { server =>
server.setLatency(serverLatency)
}

val client = clientBuilder
.codec(Http)
.hosts(servers map(_.addr))
.buildService[HttpRequest, HttpResponse]

val begin = Time.now
0 until concurrency foreach { _ => dispatch(client, servers, behavior) }
latch.await()
val duration = begin.untilNow
val rps = (numRequests.toDouble / duration.inMilliseconds.toDouble) * 1000.0

// Produce a "report" here instead, so we have some sort of
// semantic information here.

println("> STATS")
val succ = stats.getCounter("success")().toDouble
val fail = stats.getCounter("fail")().toDouble
println("> success rate: %.2f".format(100.0 * succ / (succ + fail)))
println("> request rate: %.2f".format(rps))
prettyPrintStats(stats)

servers.zipWithIndex foreach { case (server, which) =>
server.stop()
println("> SERVER[%d]".format(which))
prettyPrintStats(server.stats)
}
}
}
Expand Up @@ -7,7 +7,7 @@ import org.jboss.netty.handler.codec.http._
import org.specs.Specification import org.specs.Specification


import com.twitter.conversions.time._ import com.twitter.conversions.time._
import com.twitter.util.{Return, Throw, CountDownLatch} import com.twitter.util.{Return, Throw, CountDownLatch, Time}


import com.twitter.ostrich.{StatsCollection, StatsProvider} import com.twitter.ostrich.{StatsCollection, StatsProvider}


Expand All @@ -17,24 +17,25 @@ import com.twitter.finagle.builder.{ClientBuilder, Http}
object LoadBalancerIntegrationSpec extends Specification { object LoadBalancerIntegrationSpec extends Specification {
def prettyPrintStats(stats: StatsProvider) { def prettyPrintStats(stats: StatsProvider) {
stats.getCounterStats foreach { case (name, count) => stats.getCounterStats foreach { case (name, count) =>
println("# %-15s %d".format(name, count)) println("# %-30s %d".format(name, count))
} }
} }


"Load Balancer" should { "Load Balancer" should {
// def runSuite(client: Service[HttpRequest, HttpResponse])
val numRequests = 50000
val concurrency = 50

val servers = (0 until 3).toArray map(_ => EmbeddedServer()) val servers = (0 until 3).toArray map(_ => EmbeddedServer())
val stats = new StatsCollection val stats = new StatsCollection
val requestNumber = new AtomicInteger(0) val requestNumber = new AtomicInteger(0)
val requestCount = new AtomicInteger(10000) val requestCount = new AtomicInteger(numRequests)
val concurrency = 50
val latch = new CountDownLatch(concurrency) val latch = new CountDownLatch(concurrency)


servers foreach { server => servers foreach { server =>
server.setLatency(5.milliseconds) server.setLatency(5.milliseconds)
} }


// TODO: parallelize these; measure throughput.

// XXX - periodically print load, etc [or any kind of debugging // XXX - periodically print load, etc [or any kind of debugging
// information from the loadbalancer] // information from the loadbalancer]


Expand All @@ -57,7 +58,7 @@ object LoadBalancerIntegrationSpec extends Specification {
stats.incr("success") stats.incr("success")
case Throw(exc) => case Throw(exc) =>
stats.incr("fail") stats.incr("fail")
stats.incr("fail_%s".format(exc.getClass.getName)) stats.incr("fail_%s".format(exc.getClass.getName.split('.').last))
} }


if (requestCount.decrementAndGet() > 0) if (requestCount.decrementAndGet() > 0)
Expand All @@ -66,21 +67,19 @@ object LoadBalancerIntegrationSpec extends Specification {
latch.countDown() latch.countDown()
} }
} }

def runTest[A](client: Service[HttpRequest, HttpResponse])(f: PartialFunction[Int, Unit]) { def runTest[A](client: Service[HttpRequest, HttpResponse])(f: PartialFunction[Int, Unit]) {
val begin = Time.now
0 until concurrency foreach { _ => dispatch(client, f) } 0 until concurrency foreach { _ => dispatch(client, f) }
latch.await() latch.await()

val duration = begin.untilNow()
val rps = (numRequests.toDouble / duration.inMilliseconds.toDouble) * 1000.0

println("> STATS") println("> STATS")
val succ = stats.getCounter("success")().toDouble val succ = stats.getCounter("success")().toDouble
val fail = stats.getCounter("fail")().toDouble val fail = stats.getCounter("fail")().toDouble
println("> success rate: %.2f".format(100.0 * succ / (succ + fail))) println("> success rate: %.2f".format(100.0 * succ / (succ + fail)))

println("> request rate: %.2f".format(rps))
// val counterKeys = (Set() ++ stats.getCounterKeys) -- Set("success", "fail")
// counterKeys foreach { key =>
// println("> %s = %d".format(key, stats.getCounter(key)()))
// }

prettyPrintStats(stats) prettyPrintStats(stats)
} }


Expand All @@ -89,7 +88,7 @@ object LoadBalancerIntegrationSpec extends Specification {
.codec(Http) .codec(Http)
.hosts(servers map(_.addr)) .hosts(servers map(_.addr))
.retries(2) .retries(2)
.requestTimeout(20.milliseconds) .requestTimeout(50.milliseconds)
.buildService[HttpRequest, HttpResponse] .buildService[HttpRequest, HttpResponse]


runTest(client) { case _ => () } runTest(client) { case _ => () }
Expand All @@ -102,7 +101,7 @@ object LoadBalancerIntegrationSpec extends Specification {
.codec(Http) .codec(Http)
.hosts(servers map(_.addr)) .hosts(servers map(_.addr))
.retries(2) .retries(2)
.requestTimeout(10.milliseconds) .requestTimeout(50.milliseconds)
.buildService[HttpRequest, HttpResponse] .buildService[HttpRequest, HttpResponse]


runTest(client) { runTest(client) {
Expand All @@ -117,7 +116,7 @@ object LoadBalancerIntegrationSpec extends Specification {
val client = ClientBuilder() val client = ClientBuilder()
.codec(Http) .codec(Http)
.hosts(servers map(_.addr)) .hosts(servers map(_.addr))
.requestTimeout(10.milliseconds) .requestTimeout(50.milliseconds)
// .retries(2) // .retries(2)
.buildService[HttpRequest, HttpResponse] .buildService[HttpRequest, HttpResponse]


Expand All @@ -134,7 +133,7 @@ object LoadBalancerIntegrationSpec extends Specification {
.codec(Http) .codec(Http)
.hosts(servers map(_.addr)) .hosts(servers map(_.addr))
// .retries(2) // .retries(2)
.requestTimeout(10.milliseconds) .requestTimeout(50.milliseconds)
.buildService[HttpRequest, HttpResponse] .buildService[HttpRequest, HttpResponse]


runTest(client) { runTest(client) {
Expand All @@ -150,7 +149,7 @@ object LoadBalancerIntegrationSpec extends Specification {
.codec(Http) .codec(Http)
.hosts(servers map(_.addr)) .hosts(servers map(_.addr))
// .retries(2) // .retries(2)
.requestTimeout(10.milliseconds) .requestTimeout(50.milliseconds)
.buildService[HttpRequest, HttpResponse] .buildService[HttpRequest, HttpResponse]


runTest(client) { runTest(client) {
Expand Down
8 changes: 4 additions & 4 deletions project/build/Project.scala
Expand Up @@ -27,9 +27,9 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)


// finagle-integration has integration test suites & tools for // finagle-integration has integration test suites & tools for
// development. // development.
val integrationProject = project( val stressProject = project(
"finagle-integration", "finagle-integration", "finagle-stress", "finagle-stress",
new IntegrationProject(_), coreProject) new StressProject(_), coreProject)


class CoreProject(info: ProjectInfo) extends StandardProject(info) class CoreProject(info: ProjectInfo) extends StandardProject(info)
with SubversionPublisher with AdhocInlines with SubversionPublisher with AdhocInlines
Expand Down Expand Up @@ -60,7 +60,7 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
val ostrich = "com.twitter" % "ostrich" % "2.3.4" val ostrich = "com.twitter" % "ostrich" % "2.3.4"
} }


class IntegrationProject(info: ProjectInfo) extends StandardProject(info) class StressProject(info: ProjectInfo) extends StandardProject(info)
with SubversionPublisher with IntegrationSpecs with AdhocInlines with SubversionPublisher with IntegrationSpecs with AdhocInlines
{ {
val ostrich = "com.twitter" % "ostrich" % "2.3.4" val ostrich = "com.twitter" % "ostrich" % "2.3.4"
Expand Down

0 comments on commit e57d5e4

Please sign in to comment.