Permalink
Browse files

adding stats to server

  • Loading branch information...
1 parent 29625e5 commit 5e1ceb7c8464b4a33a348c93d4b275dba459f404 Nick Kallen committed Jan 12, 2011
@@ -17,7 +17,7 @@ import com.twitter.util.TimeConversions._
import com.twitter.finagle._
import channel.{Job, QueueingChannelHandler}
import com.twitter.finagle.util._
-import service.{ServiceToChannelHandler, Service}
+import service.{StatsFilter, ServiceToChannelHandler, Service}
import stats.{StatsReceiver}
object ServerBuilder {
@@ -136,12 +136,12 @@ case class ServerBuilder[Req <: AnyRef, Res <: AnyRef](
pipeline.addFirst("ssl", new SslHandler(sslEngine, _startTls))
}
-// _statsReceiver foreach { statsReceiver =>
-// pipeline.addLast("stats", new SampleHandler(statsReceiver))
-// }
-
_service.foreach { service =>
- pipeline.addLast("service", new ServiceToChannelHandler(service))
+ val serviceWithStats =
+ if (_statsReceiver.isDefined)
+ new StatsFilter(_statsReceiver.get).andThen(service)
+ else service
+ pipeline.addLast("service", new ServiceToChannelHandler(serviceWithStats))
}
pipeline
@@ -2,7 +2,6 @@ package com.twitter.finagle.channel
import collection.mutable.Queue
-import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.{Channels, Channel}
import java.util.concurrent.ConcurrentLinkedQueue
@@ -71,7 +71,7 @@ class ExponentialBackoffRetryStrategy(
def apply() = {
val future = new Promise[RetryStrategy]
- timer.schedule(delay) {
+ timer.schedule(delay.fromNow) {
future() = Return(
new ExponentialBackoffRetryStrategy(delay * multiplier, multiplier))
}
@@ -1,163 +0,0 @@
-package com.twitter.finagle.integration
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.jboss.netty.handler.codec.http._
-
-import org.specs.Specification
-
-import com.twitter.conversions.time._
-import com.twitter.util.{Return, Throw, CountDownLatch, Time}
-
-import com.twitter.ostrich.{StatsCollection, StatsProvider}
-
-import com.twitter.finagle.service.Service
-import com.twitter.finagle.builder.{ClientBuilder, Http}
-
-object LoadBalancerIntegrationSpec extends Specification {
- def prettyPrintStats(stats: StatsProvider) {
- stats.getCounterStats foreach { case (name, count) =>
- println("# %-30s %d".format(name, count))
- }
- }
-
- "Load Balancer" should {
- // def runSuite(client: Service[HttpRequest, HttpResponse])
- val numRequests = 50000
- val concurrency = 50
-
- val servers = (0 until 3).toArray map(_ => EmbeddedServer())
- val stats = new StatsCollection
- val requestNumber = new AtomicInteger(0)
- val requestCount = new AtomicInteger(numRequests)
- val latch = new CountDownLatch(concurrency)
-
- servers foreach { server =>
- server.setLatency(5.milliseconds)
- }
-
- // XXX - periodically print load, etc [or any kind of debugging
- // information from the loadbalancer]
-
- doAfter {
- servers.zipWithIndex foreach { case (server, which) =>
- server.stop()
- println("> SERVER[%d]".format(which))
- prettyPrintStats(server.stats)
- }
- }
-
- def dispatch(client: Service[HttpRequest, HttpResponse], f: PartialFunction[Int, Unit]) {
- val num = requestNumber.incrementAndGet()
- if (f.isDefinedAt(num))
- f(num)
-
- client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) respond { result =>
- result match {
- case Return(_) =>
- stats.incr("success")
- case Throw(exc) =>
- stats.incr("fail")
- stats.incr("fail_%s".format(exc.getClass.getName.split('.').last))
- }
-
- if (requestCount.decrementAndGet() > 0)
- dispatch(client, f)
- else
- latch.countDown()
- }
- }
-
- def runTest[A](client: Service[HttpRequest, HttpResponse])(f: PartialFunction[Int, Unit]) {
- val begin = Time.now
- 0 until concurrency foreach { _ => dispatch(client, f) }
- latch.await()
- val duration = begin.untilNow()
- val rps = (numRequests.toDouble / duration.inMilliseconds.toDouble) * 1000.0
-
- println("> STATS")
- val succ = stats.getCounter("success")().toDouble
- val fail = stats.getCounter("fail")().toDouble
- println("> success rate: %.2f".format(100.0 * succ / (succ + fail)))
- println("> request rate: %.2f".format(rps))
- prettyPrintStats(stats)
- }
-
- "balance: baseline" in {
- val client = ClientBuilder()
- .codec(Http)
- .hosts(servers map(_.addr))
- .retries(2)
- .requestTimeout(50.milliseconds)
- .buildService[HttpRequest, HttpResponse]
-
- runTest(client) { case _ => () }
-
- true must beTrue
- }
-
- "balance: server goes offline" in {
- val client = ClientBuilder()
- .codec(Http)
- .hosts(servers map(_.addr))
- .retries(2)
- .requestTimeout(50.milliseconds)
- .buildService[HttpRequest, HttpResponse]
-
- runTest(client) {
- case 100 =>
- servers(1).stop()
- }
-
- true must beTrue
- }
-
- "balance: application becomes nonresponsive" in {
- val client = ClientBuilder()
- .codec(Http)
- .hosts(servers map(_.addr))
- .requestTimeout(50.milliseconds)
- // .retries(2)
- .buildService[HttpRequest, HttpResponse]
-
- runTest(client) {
- case 100 =>
- servers(1).becomeApplicationNonresponsive()
- }
-
- true must beTrue
- }
-
- "balance: connection becomes nonresponsive" in {
- val client = ClientBuilder()
- .codec(Http)
- .hosts(servers map(_.addr))
- // .retries(2)
- .requestTimeout(50.milliseconds)
- .buildService[HttpRequest, HttpResponse]
-
- runTest(client) {
- case 100 =>
- servers(1).becomeConnectionNonresponsive()
- }
-
- true must beTrue
- }
-
- "balance: server has protocol error" in {
- val client = ClientBuilder()
- .codec(Http)
- .hosts(servers map(_.addr))
- // .retries(2)
- .requestTimeout(50.milliseconds)
- .buildService[HttpRequest, HttpResponse]
-
- runTest(client) {
- case 100 =>
- servers(1).becomeBelligerent()
- }
-
- true must beTrue
- }
- }
-}

0 comments on commit 5e1ceb7

Please sign in to comment.