Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Merge branch 'no_rw'
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Jun 6, 2011
2 parents c15a395 + 94057db commit 9db1a17
Show file tree
Hide file tree
Showing 43 changed files with 849 additions and 667 deletions.
6 changes: 2 additions & 4 deletions src/main/scala/com/twitter/gizzard/GizzardServer.scala
Expand Up @@ -5,13 +5,11 @@ import com.twitter.conversions.time._
import com.twitter.logging.Logger
import nameserver.{NameServer, BasicShardRepository}
import scheduler.{CopyJobFactory, JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec, RepairJobFactory}
import shards.{Shard, ReadWriteShard}
import config.{GizzardServer => ServerConfig}


abstract class GizzardServer[S <: Shard](config: ServerConfig) {
abstract class GizzardServer[S](config: ServerConfig) {

def readWriteShardAdapter: ReadWriteShard[S] => S
def copyFactory: CopyJobFactory[S]
def repairFactory: RepairJobFactory[S] = null
def diffFactory: RepairJobFactory[S] = null
Expand All @@ -30,7 +28,7 @@ abstract class GizzardServer[S <: Shard](config: ServerConfig) {
// nameserver/shard wiring

val replicationFuture: Option[Future] = None
lazy val shardRepo = new BasicShardRepository[S](readWriteShardAdapter, replicationFuture)
lazy val shardRepo = new BasicShardRepository[S](replicationFuture)
lazy val nameServer = config.nameServer(shardRepo)


Expand Down
13 changes: 6 additions & 7 deletions src/main/scala/com/twitter/gizzard/config/NameServer.scala
Expand Up @@ -50,21 +50,20 @@ trait NameServer {
def replicas: Seq[Replica]
var jobRelay: JobRelay = new JobRelay

protected def getMappingFunction: (Long => Long) = {
protected def getMappingFunction(): (Long => Long) = {
mappingFunction match {
case gizzard.config.ByteSwapper => nameserver.ByteSwapper
case gizzard.config.Identity => { n => n }
case gizzard.config.Fnv1a64 => nameserver.FnvHasher
}
}

def apply[S <: shards.Shard](shardRepository: nameserver.ShardRepository[S]) = {
val replicaShards = replicas.map(_.apply())
def apply[T](shardRepository: nameserver.ShardRepository[T]) = {
val replicaNodes = replicas map { replica => new shards.LeafRoutingNode(replica(), 1) }

val shardInfo = new shards.ShardInfo("com.twitter.gizzard.nameserver.ReplicatingShard", "", "")
val loadBalancer = new nameserver.LoadBalancer(replicaShards)
val shard = new nameserver.ReadWriteShardAdapter(
new shards.ReplicatingShard(shardInfo, 0, replicaShards, loadBalancer, None))
val replicating = new shards.ReplicatingShard(shardInfo, 0, replicaNodes)

new nameserver.NameServer(shard, shardRepository, jobRelay(), getMappingFunction)
new nameserver.NameServer(replicating, shardRepository, jobRelay(), getMappingFunction())
}
}
29 changes: 15 additions & 14 deletions src/main/scala/com/twitter/gizzard/nameserver/LoadBalancer.scala
Expand Up @@ -3,20 +3,21 @@ package nameserver

import scala.util.Random
import scala.collection.mutable.ArrayBuffer
import com.twitter.gizzard.shards.RoutingNode

class LoadBalancer[ConcreteShard <: shards.Shard](
random: Random,
replicas: Seq[ConcreteShard])
extends (() => Seq[ConcreteShard]) {
class LoadBalancer[T](
random: Random,
replicas: Seq[RoutingNode[T]])
extends (() => Seq[RoutingNode[T]]) {

def this(replicas: Seq[ConcreteShard]) = this(new Random, replicas)
def this(replicas: Seq[RoutingNode[T]]) = this(new Random, replicas)

def apply() = sort(replicas)

protected def sort(replicas: Seq[ConcreteShard]): List[ConcreteShard] =
protected def sort(replicas: Seq[RoutingNode[T]]): List[RoutingNode[T]] =
sort(replicas.foldLeft(0)(_ + _.weight), replicas)

protected def sort(totalWeight: Int, replicas: Seq[ConcreteShard]): List[ConcreteShard] = replicas match {
protected def sort(totalWeight: Int, replicas: Seq[RoutingNode[T]]): List[RoutingNode[T]] = replicas match {
case Seq() => Nil
case Seq(first, rest @ _*) =>
val remainingWeight = totalWeight - first.weight
Expand All @@ -31,12 +32,12 @@ class LoadBalancer[ConcreteShard <: shards.Shard](
}
}

class FailingOverLoadBalancer[ConcreteShard <: shards.Shard](
random: Random,
replicas: Seq[ConcreteShard])
extends LoadBalancer[ConcreteShard](random, replicas) {
class FailingOverLoadBalancer[T](
random: Random,
replicas: Seq[RoutingNode[T]])
extends LoadBalancer[T](random, replicas) {

def this(replicas: Seq[ConcreteShard]) = this(new Random, replicas)
def this(replicas: Seq[RoutingNode[T]]) = this(new Random, replicas)

val keepWarmFalloverRate = 0.01

Expand All @@ -53,8 +54,8 @@ class FailingOverLoadBalancer[ConcreteShard <: shards.Shard](
}
}

private def randomize(replicas: Seq[ConcreteShard]) = {
val buffer = new ArrayBuffer[ConcreteShard]
private def randomize(replicas: Seq[RoutingNode[T]]) = {
val buffer = new ArrayBuffer[RoutingNode[T]]
buffer ++= replicas

def swap(a: Int, b: Int) {
Expand Down
Expand Up @@ -39,7 +39,7 @@ class MemoryShard extends Shard {
dumpStructure(tableIds.toSeq)
}

def createShard[S <: shards.Shard](shardInfo: ShardInfo, repository: ShardRepository[S]) {
def createShard[T](shardInfo: ShardInfo, repository: ShardRepository[T]) {
find(shardInfo) match {
case Some(x) =>
if (x.className != shardInfo.className ||
Expand Down

0 comments on commit 9db1a17

Please sign in to comment.