Skip to content

Commit

Permalink
speed up the least queued loadbalancer strategy a bit by doing less c…
Browse files Browse the repository at this point in the history
…opies.
  • Loading branch information
mariusae committed Mar 18, 2011
1 parent 60ce7ad commit 22a03ef
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.finagle.loadbalancer

import util.Random
import collection.mutable.ArrayBuffer
import java.util.concurrent.atomic.AtomicInteger

import com.twitter.finagle.{Service, ServiceFactory}
Expand All @@ -15,18 +16,30 @@ class LeastQueuedStrategy[Req, Rep]
{
private[this] val rng = new Random
private[this] val queueStat = WeakMetadata[AtomicInteger] { new AtomicInteger(0) }
private[this] val leastQueuedOrdering =
Ordering.by { case (_, queueSize) => queueSize }: Ordering[(ServiceFactory[Req, Rep], Int)]

private[this] def leastQueued(pools: Seq[ServiceFactory[Req, Rep]]) = {
val snapshot = pools map { pool => (pool, queueStat(pool).get) }
val shuffled = rng.shuffle(snapshot)
val (selected, _) = shuffled.min(leastQueuedOrdering)
selected

private[this] def leastQueued(factories: Seq[ServiceFactory[Req, Rep]]) = {
var minLoad = Int.MaxValue
val mins = new ArrayBuffer[ServiceFactory[Req, Rep]]

factories foreach { factory =>
val load = queueStat(factory).get
if (load < minLoad) {
mins.clear()
mins += factory
minLoad = load
} else if (load == minLoad) {
mins += factory
}
}

if (mins.size == 1)
mins(0)
else
mins(rng.nextInt(mins.size))
}

def apply(pools: Seq[ServiceFactory[Req, Rep]]) = {
val pool = leastQueued(pools)
val pool = if (pools.size == 1) pools.head else leastQueued(pools)
val qs = queueStat(pool)
qs.incrementAndGet()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ class LoadBalancedFactory[Req, Rep](
def make(): Future[Service[Req, Rep]] = {
// We first create a snapshot since the underlying seq could
// change.
val snapshot = factories.toArray

val snapshot = factories.toSeq
if (snapshot.isEmpty)
return Future.exception(new NoBrokersAvailableException)

Expand Down

0 comments on commit 22a03ef

Please sign in to comment.