Skip to content

Commit

Permalink
[split] Don`t apply write weights, and set up reads to use the Deny b…
Browse files Browse the repository at this point in the history
…ehavior, rather than skipping visiting of the replicas.
  • Loading branch information
Stu Hood committed Apr 13, 2012
1 parent df4ed10 commit 5a635fd
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 22 deletions.
23 changes: 10 additions & 13 deletions src/main/scala/com/twitter/gizzard/nameserver/LoadBalancer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,38 @@ import scala.collection.mutable
import com.twitter.gizzard.shards.RoutingNode

trait LoadBalancer {
def balanced[T](replicas: Seq[RoutingNode[T]], readOnly: Boolean): Seq[RoutingNode[T]]
/** @return (shuffled non-0-weight entries, unordered 0-weight entries) */
def balanced[T](replicas: Seq[RoutingNode[T]]): (Seq[RoutingNode[T]], Seq[RoutingNode[T]])
}

object LoadBalancer {
object Fixed extends LoadBalancer {
override def balanced[T](replicas: Seq[RoutingNode[T]], readOnly: Boolean) =
replicas
override def balanced[T](replicas: Seq[RoutingNode[T]]) = (replicas, Nil)
}

object WeightedRandom extends LoadBalancer {
// "threadsafe enough"
private val random = new Random

override def balanced[T](replicas: Seq[RoutingNode[T]], readOnly: Boolean) = {
shuffle(if (readOnly) ReadSelector else WriteSelector, replicas)
}
override def balanced[T](replicas: Seq[RoutingNode[T]]) = shuffle(ReadSelector, replicas)

/**
* 1) sum all weights
* 2) recurse on remaining portion of the array
* a) at each step, choose random number between 0 and the remaining weight
* b) select bucket/item as if weights defined a sequential range
* c) move selected item to head, and increment offset
* @return (shuffled non-0-weight entries, unordered 0-weight entries)
*/
private[nameserver] final def shuffle[T](
selector: WeightSelector[T],
input: Seq[T]
): Seq[T] = {
val output = input.filter(selector(_) > 0).toBuffer
): (Seq[T],Seq[T]) = {
val (toShuffle, zeros) = input.partition(selector(_) > 0)
val buffer = toShuffle.toBuffer
// shuffle in place
shuffle(selector, output, 0, output.map(selector).sum)
output
shuffle(selector, buffer, 0, buffer.map(selector).sum)
(buffer, zeros)
}

private final def shuffle[T](
Expand Down Expand Up @@ -74,7 +74,4 @@ object LoadBalancer {
object ReadSelector extends WeightSelector[RoutingNode[_]] {
def apply(rn: RoutingNode[_]): Int = rn.weight.read
}
object WriteSelector extends WeightSelector[RoutingNode[_]] {
def apply(rn: RoutingNode[_]): Int = rn.weight.write
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ case class ReplicatingShard[T](shardInfo: ShardInfo, weight: Weight, children: S
extends RoutingNode[T] {
protected[shards] val loadBalancer: LoadBalancer = LoadBalancer.WeightedRandom
protected[shards] def collectedShards(readOnly: Boolean) = {
loadBalancer.balanced(children, readOnly).flatMap(_.collectedShards(readOnly))
val tovisit =
if (readOnly) {
val (ordered, denied) = loadBalancer.balanced(children)
// TODO: nodes 'denied' due to weights should eventually be 'Deny'd via Behavior
ordered
} else {
// TODO: write weights should eventually allow for fractional blocking of shards by
// 'Deny'ing a random fraction of writes matching the write weight
children
}
tovisit.flatMap(_.collectedShards(readOnly))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ object LoadBalancerSpec extends ConfiguredSpecification with JMocker with ClassM
"drop 0-weight items" in {
// exclude item
val histogram = histo(i => if (i == 2) 0 else 1)
histogram.keys must haveTheSameElementsAs(Seq(List(1, 3), List(3, 1)))
histogram.keys must haveTheSameElementsAs {
Seq(
(List(1, 3), List(2)),
(List(3, 1), List(2))
)
}
}

"be weighted" in {
Expand All @@ -37,21 +42,28 @@ object LoadBalancerSpec extends ConfiguredSpecification with JMocker with ClassM
val descending =
histogram.toSeq.sorted {
// TODO: use maxBy in Scala 2.9.x
new math.Ordering[(List[Int], AtomicLong)] {
def compare(x: (List[Int], AtomicLong), y: (List[Int], AtomicLong)) =
new math.Ordering[(WeightedAndDenied, AtomicLong)] {
def compare(x: (WeightedAndDenied, AtomicLong), y: (WeightedAndDenied, AtomicLong)) =
y._2.get.compareTo(x._2.get)
}
}.map(_._1)
descending.head must beEqualTo(List(3, 2, 1))
descending.last must beEqualTo(List(1, 2, 3))
descending.head must beEqualTo {
(List(3, 2, 1), Nil)
}
descending.last must beEqualTo {
(List(1, 2, 3), Nil)
}
}

type WeightedAndDenied = (List[Int], List[Int])

def histo(selector: LoadBalancer.WeightSelector[Int]) = {
val histogram = mutable.HashMap[List[Int], AtomicLong]()
val histogram = mutable.HashMap[WeightedAndDenied, AtomicLong]()
val start = System.currentTimeMillis
1.to(iterations).foreach { i =>
val result = LoadBalancer.WeightedRandom.shuffle(selector, in)
histogram.getOrElseUpdate(result.toList, new AtomicLong()).incrementAndGet()
val (allow, deny) = LoadBalancer.WeightedRandom.shuffle(selector, in)
val result = (allow.toList, deny.toList)
histogram.getOrElseUpdate(result, new AtomicLong()).incrementAndGet()
}
histogram
}
Expand Down

0 comments on commit 5a635fd

Please sign in to comment.