Permalink
Browse files

Merge branch 'master' into memcached_performance_improvements

  • Loading branch information...
2 parents 8498be4 + ca3f46b commit df87cfefde6f9439f4d6e283961421125d7cf398 @mariusae mariusae committed Mar 18, 2011
@@ -1,6 +1,7 @@
package com.twitter.finagle.loadbalancer
import util.Random
+import collection.mutable.ArrayBuffer
import java.util.concurrent.atomic.AtomicInteger
import com.twitter.finagle.{Service, ServiceFactory}
@@ -15,18 +16,30 @@ class LeastQueuedStrategy[Req, Rep]
{
private[this] val rng = new Random
private[this] val queueStat = WeakMetadata[AtomicInteger] { new AtomicInteger(0) }
- private[this] val leastQueuedOrdering =
- Ordering.by { case (_, queueSize) => queueSize }: Ordering[(ServiceFactory[Req, Rep], Int)]
-
- private[this] def leastQueued(pools: Seq[ServiceFactory[Req, Rep]]) = {
- val snapshot = pools map { pool => (pool, queueStat(pool).get) }
- val shuffled = rng.shuffle(snapshot)
- val (selected, _) = shuffled.min(leastQueuedOrdering)
- selected
+
+ private[this] def leastQueued(factories: Seq[ServiceFactory[Req, Rep]]) = {
+ var minLoad = Int.MaxValue
+ val mins = new ArrayBuffer[ServiceFactory[Req, Rep]]
+
+ factories foreach { factory =>
+ val load = queueStat(factory).get
+ if (load < minLoad) {
+ mins.clear()
+ mins += factory
+ minLoad = load
+ } else if (load == minLoad) {
+ mins += factory
+ }
+ }
+
+ if (mins.size == 1)
+ mins(0)
+ else
+ mins(rng.nextInt(mins.size))
}
def apply(pools: Seq[ServiceFactory[Req, Rep]]) = {
- val pool = leastQueued(pools)
+ val pool = if (pools.size == 1) pools.head else leastQueued(pools)
val qs = queueStat(pool)
qs.incrementAndGet()
@@ -24,8 +24,7 @@ class LoadBalancedFactory[Req, Rep](
def make(): Future[Service[Req, Rep]] = {
// We first create a snapshot since the underlying seq could
// change.
- val snapshot = factories.toArray
-
+ val snapshot = factories.toSeq
if (snapshot.isEmpty)
return Future.exception(new NoBrokersAvailableException)
@@ -31,7 +31,7 @@ class CachingPool[Req, Rep](
deathRow += ((Time.now, underlying))
if (!isScheduled) {
isScheduled = true
- timer.schedule(timeout.fromNow)(collect)
+ timer.schedule(timeout.fromNow) { collect() }
}
} else {
underlying.release()
@@ -47,7 +47,10 @@ class CachingPool[Req, Rep](
timestamp.until(now) >= timeout
}
- dequeued foreach { case (_, service) => service.release() }
+ dequeued foreach { case (_, service) =>
+ service.release()
+ }
+
if (!deathRow.isEmpty) {
// TODO: what happens if an event is scheduled in the past?
timer.schedule(deathRow.head._1 + timeout)(collect)
@@ -64,6 +67,8 @@ class CachingPool[Req, Rep](
val (_, service) = deathRow.dequeue()
if (service.isAvailable)
return Future.value(new WrappedService(service))
+ else
+ service.release()
}
factory.make() map { new WrappedService(_) }
@@ -27,7 +27,8 @@ class WatermarkPool[Req, Rep](
private[this] var numServices = 0
private[this] var isOpen = true
- private[this] val waitersStat = statsReceiver.addGauge("pool_waiters")(waiters.size)
+ private[this] val waitersStat = statsReceiver.addGauge("pool_waiters") { waiters.size }
+ private[this] val sizeStat = statsReceiver.addGauge("pool_size") { numServices }
private[this] class ServiceWrapper(underlying: Service[Req, Rep])
extends Service[Req, Rep]
@@ -113,8 +113,8 @@ abstract class NameTranslatingStatsReceiver(val self: StatsReceiver)
{
protected[this] def translate(name: Seq[String]): Seq[String]
- def counter(name: String*) = self.counter(translate(name): _*)
- def stat(name: String*) = self.stat(translate(name): _*)
+ def counter(name: String*) = self.counter(translate(name): _*)
+ def stat(name: String*) = self.stat(translate(name): _*)
def addGauge(name: String*)(f: => Float) = self.addGauge(translate(name): _*)(f)
}
View
2 run
@@ -33,7 +33,7 @@ fi
path_append CP $root/project/boot/scala-2.8.1/lib/scala-library.jar
path_append CP $root/project/boot/scala-2.8.1/lib/scala-compiler.jar
-for project in $root ../ostrich ../util; do
+for project in $root/finagle-* ; do
for jar in $project/lib_managed/compile/*.jar; do
path_append CP $jar
done

0 comments on commit df87cfe

Please sign in to comment.