Skip to content

Commit

Permalink
introduce an EndToEnd stress test that measures throughput between a
Browse files Browse the repository at this point in the history
finagle client and server.
  • Loading branch information
mariusae committed Feb 10, 2011
1 parent cc695f2 commit 5d8ae89
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.twitter.finagle.stress

import java.net.InetSocketAddress

import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.handler.codec.http.{
HttpRequest, HttpResponse, DefaultHttpResponse,
DefaultHttpRequest, HttpVersion, HttpResponseStatus,
HttpMethod, HttpHeaders}

import com.twitter.conversions.time._
import com.twitter.util.{Future, RandomSocket, Return, Throw, Time}
import com.twitter.ostrich

import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.finagle.builder.Http
import com.twitter.finagle.util.Timer
import com.twitter.finagle.Service

object EndToEndStress {
private[this] object HttpService
extends Service[HttpRequest, HttpResponse]
{
def apply(request: HttpRequest) = Future {
val response = new DefaultHttpResponse(
request.getProtocolVersion, HttpResponseStatus.OK)
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, 1)
response.setContent(ChannelBuffers.wrappedBuffer(".".getBytes))
response
}
}

def dispatchLoop(service: Service[HttpRequest, HttpResponse]) {
val request = new DefaultHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, "/")

val beginTime = Time.now

service(request) ensure {
dispatchLoop(service)
} respond {
case Return(_) =>
ostrich.Stats.addTiming("request", beginTime.untilNow.inMilliseconds.toInt)
case Throw(_) =>
ostrich.Stats.incr("failure")
}
}

private[this] def run(concurrency: Int, addr: InetSocketAddress) {
val service = ClientBuilder()
.hosts(Seq(addr))
.codec(Http)
.build()

0 until concurrency foreach { _ => dispatchLoop(service) }
}

def main(args: Array[String]) {
val serverAddr = RandomSocket()
val server = ServerBuilder()
.bindTo(serverAddr)
.codec(Http)
.build(HttpService)

val beginTime = Time.now
Timer.default.schedule(10.seconds) {
println("@@ %ds".format(beginTime.untilNow.inSeconds))
Stats.prettyPrint(ostrich.Stats)
}

run(10, serverAddr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import java.util.concurrent.atomic.AtomicInteger

import org.jboss.netty.handler.codec.http._

import com.twitter.ostrich.{StatsCollection, StatsProvider, Stats}
import com.twitter.ostrich
import com.twitter.ostrich.{StatsCollection, StatsProvider}
import com.twitter.util.{Duration, CountDownLatch, Return, Throw, Time}
import com.twitter.conversions.time._

Expand Down Expand Up @@ -75,21 +76,6 @@ class LoadBalancerTest(
private[this] val stats = new StatsCollection
private[this] val gaugeValues = new ArrayBuffer[(Int, Map[String, Float])]

private[this] def prettyPrintStats(stats: StatsProvider) {
stats.getCounterStats foreach { case (name, count) =>
println("# %-30s %d".format(name, count))
}

stats.getTimingStats foreach { case (name, stat) =>
val statMap = stat.toMap
val keys = statMap.keys.toList.sorted

keys foreach { key =>
println("# %-30s %s".format("request_%s".format(key), statMap(key)))
}
}
}

private[this] def dispatch(
client: Service[HttpRequest, HttpResponse],
servers: Seq[EmbeddedServer],
Expand Down Expand Up @@ -135,7 +121,7 @@ class LoadBalancerTest(
// }
// }
// Also report to the main Ostrich stats object.
Stats.clearAll()
ostrich.Stats.clearAll()
// val statsReceiver = localStatsReceiver.reportTo(new OstrichStatsReceiver)

// def captureGauges() {
Expand Down Expand Up @@ -181,7 +167,7 @@ class LoadBalancerTest(
val fail = stats.getCounter("fail")().toDouble
println("> success rate: %.2f".format(100.0 * succ / (succ + fail)))
println("> request rate: %.2f".format(rps))
prettyPrintStats(stats)
Stats.prettyPrint(stats)

val allGaugeNames = {
val unique = Set() ++ gaugeValues flatMap { case (_, gvs) => gvs map (_._1) }
Expand All @@ -202,10 +188,10 @@ class LoadBalancerTest(
servers.zipWithIndex foreach { case (server, which) =>
server.stop()
println("> SERVER[%d] (%s)".format(which, server.addr))
prettyPrintStats(server.stats)
Stats.prettyPrint(server.stats)
}

println("> OSTRICH counters")
prettyPrintStats(Stats)
Stats.prettyPrint(ostrich.Stats)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.twitter.finagle.stress

import com.twitter.ostrich.StatsProvider

object Stats {
def prettyPrint(stats: StatsProvider) {
stats.getCounterStats foreach { case (name, count) =>
println("# %-30s %d".format(name, count))
}

stats.getTimingStats foreach { case (name, stat) =>
val statMap = stat.toMap
val keys = statMap.keys.toList.sorted

keys foreach { key =>
println("# %-30s %s".format("timing_%s".format(key), statMap(key)))
}
}
}
}

0 comments on commit 5d8ae89

Please sign in to comment.