Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' into ds-133
Browse files Browse the repository at this point in the history
Conflicts:
	project/build.properties
	src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
	src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
  • Loading branch information
Stu Hood committed Mar 20, 2012
2 parents 6ebe841 + a1ea2ed commit ece0434
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 124 deletions.
4 changes: 2 additions & 2 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#Project properties
#Sun Mar 11 15:08:51 PDT 2012
#Thu Mar 15 14:21:12 PDT 2012
project.organization=com.twitter
project.name=gizzard
sbt.version=0.7.4
project.version=3.0.5-SNAPSHOT-stuhood
project.version=3.0.8-SNAPSHOT-stuhood
build.scala.versions=2.8.1
project.initialize=false
2 changes: 1 addition & 1 deletion project/build/GizzardProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ with SubversionPublisher {
override def filterScalaJars = false
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1"

val querulous = "com.twitter" % "querulous" % "2.7.1"
val querulous = "com.twitter" % "querulous" % "2.7.2"

//val kestrel = "net.lag" % "kestrel" % "1.2.7"
// remove when moved to libkestrel
Expand Down
6 changes: 3 additions & 3 deletions project/release.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Automatically generated by ReleaseManagement
#Sun Mar 11 15:08:51 PDT 2012
version=3.0.3
sha1=10893b8f05ab58d74e15a7b108d1ddbe32bfccc7
#Thu Mar 15 14:21:12 PDT 2012
version=3.0.7
sha1=747c37464a03cda7e8a3e566a76265a349d3bc67
10 changes: 7 additions & 3 deletions src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ class MemoryShardManagerSource extends ShardManagerSource {

def currentState() = {
val tableIds = forwardingTable.map(_.tableId).toSet
dumpStructure(tableIds.toSeq)
(dumpStructure(tableIds.toSeq), 0L)
}

def diffState(lastUpdatedSeq: Long) = {
throw new UnsupportedOperationException("diffState() not supported by MemoryShardManagerSource")
}

def createShard(shardInfo: ShardInfo) {
Expand Down Expand Up @@ -209,7 +213,7 @@ class MemoryShardManagerSource extends ShardManagerSource {
entry.deleted = true
}

def reload() { }
def prepareReload() { }
}

class MemoryRemoteClusterManagerSource extends RemoteClusterManagerSource {
Expand Down Expand Up @@ -250,5 +254,5 @@ class MemoryRemoteClusterManagerSource extends RemoteClusterManagerSource {
def listRemoteHosts() = hostTable.toList
def listRemoteHostsInCluster(c: String) = hostTable.filter(_.cluster == c).toList

def reload() { }
def prepareReload() { }
}
99 changes: 66 additions & 33 deletions src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.gizzard.nameserver

import java.util.TreeMap
import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
import scala.collection.mutable
import com.twitter.logging.Logger
import com.twitter.gizzard.shards._
Expand Down Expand Up @@ -34,11 +35,12 @@ class RoutingState(
}
}

def buildForwardingTree(): scala.collection.Map[Int, TreeMap[Long, RoutingNode[Any]]] = {
val rv = mutable.Map[Int, TreeMap[Long, RoutingNode[Any]]]()
def buildForwardingTree(): ConcurrentMap[Int, TreeMap[Long, RoutingNode[Any]]] = {
val rv = new ConcurrentHashMap[Int, TreeMap[Long, RoutingNode[Any]]]()

forwardings foreach { case Forwarding(tableId, baseId, rootShardId) =>
val tree = rv.getOrElseUpdate(tableId, new TreeMap[Long, RoutingNode[Any]])
rv.putIfAbsent(tableId, new TreeMap[Long, RoutingNode[Any]])
val tree = rv.get(tableId)
tree.put(baseId, constructRoutingNode(rootShardId, 1))
}

Expand All @@ -56,7 +58,8 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction
val shardRepository = new ShardRepository
val shardManager = new ShardManager(shard, shardRepository)

@volatile private var forwardingTree: scala.collection.Map[Int, TreeMap[Long, RoutingNode[Any]]] = _
@volatile private var forwardingTree: ConcurrentMap[Int, TreeMap[Long, RoutingNode[Any]]] = _
@volatile private var lastUpdatedSeq: Long = -1L

// Forwarders

Expand All @@ -80,38 +83,68 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction
forwarder
}

private def recreateInternalShardState() {
val infos = mutable.ArrayBuffer[ShardInfo]()
val links = mutable.ArrayBuffer[LinkInfo]()
val forwardings = mutable.ArrayBuffer[Forwarding]()
def reload() {
log.info("Loading name server configuration...")
synchronized {
shardManager.prepareReload()

val infos = mutable.ArrayBuffer[ShardInfo]()
val links = mutable.ArrayBuffer[LinkInfo]()
val forwardings = mutable.ArrayBuffer[Forwarding]()
val (states, updatedSeq) = shardManager.currentState()

states foreach { state =>
infos ++= state.shards
links ++= state.links
forwardings ++= state.forwardings
}

shardManager.currentState() foreach { state =>
infos ++= state.shards
links ++= state.links
forwardings ++= state.forwardings
}
val routes = new RoutingState(
shardRepository.instantiateNode,
infos,
links,
forwardings
)

val routes = new RoutingState(
shardRepository.instantiateNode,
infos,
links,
forwardings
)
forwardingTree = routes.buildForwardingTree()
lastUpdatedSeq = updatedSeq
}
log.info("Loading name server configuration is done.")
}

forwardingTree = routes.buildForwardingTree()
private def constructRoutingNode(shardId: ShardId, weight: Int = 1) : RoutingNode[Any] = {
val shardInfo = shardManager.getShard(shardId)
val children = shardManager.listDownwardLinks(shardId).map { link => constructRoutingNode(link.downId, link.weight) }
shardRepository.instantiateNode(shardInfo, weight, children)
}

def reloadUpdatedForwardings() {
log.info("Loading updated name server configuration...")
recreateInternalShardState()
log.info("Loading updated name server configuration is done.")
}
synchronized {
if (forwardingTree == null) throw new NameserverUninitialized

val changes: NameServerChanges = shardManager.diffState(lastUpdatedSeq)
val updatedForwardingsByTableId = changes.updatedForwardings.groupBy(_.tableId)
val deletedForwardingsByTableId = changes.deletedForwardings.groupBy(_.tableId)
val tableIds = updatedForwardingsByTableId.keySet ++ deletedForwardingsByTableId.keySet

tableIds foreach { tableId =>
val newTreeMap = forwardingTree.get(tableId) match {
case null => new TreeMap[Long, RoutingNode[Any]]()
case treeMap => new TreeMap[Long, RoutingNode[Any]](treeMap) // create a shallow copy
}

deletedForwardingsByTableId.get(tableId).getOrElse(Nil) foreach { f => newTreeMap.remove(f.baseId) }
updatedForwardingsByTableId.get(tableId).getOrElse(Nil) foreach { f =>
newTreeMap.put(f.baseId, constructRoutingNode(f.shardId))
}

forwardingTree.put(tableId, newTreeMap)
}

def reload() {
log.info("Loading name server configuration...")
shardManager.reload()
recreateInternalShardState()
log.info("Loading name server configuration is done.")
lastUpdatedSeq = changes.updatedSeq
}
log.info("Loading updated name server configuration is done.")
}

// XXX: removing this causes CopyJobSpec to fail.
Expand All @@ -132,9 +165,9 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction

@throws(classOf[NonExistentShard])
def findCurrentForwarding[T](tableId: Int, id: Long): RoutingNode[T] = {
if(forwardingTree == null) throw new NameserverUninitialized
if (forwardingTree == null) throw new NameserverUninitialized

val rv = forwardingTree.get(tableId) flatMap { treeMap =>
val rv = Option(forwardingTree.get(tableId)) flatMap { treeMap =>
treeMap.floorEntry(mappingFunction(id)) match {
case null => None
case item => Some(item.getValue)
Expand All @@ -151,10 +184,10 @@ class NameServer(val shard: RoutingNode[ShardManagerSource], val mappingFunction
def findForwardings[T](tableId: Int): Seq[RoutingNode[T]] = {
import scala.collection.JavaConversions._

if(forwardingTree == null) throw new NameserverUninitialized
if (forwardingTree == null) throw new NameserverUninitialized

val rv = forwardingTree.get(tableId) map { bySourceIds =>
bySourceIds.values.toSeq
val rv = Option(forwardingTree.get(tableId)) map { treeMap =>
treeMap.values.toSeq
} getOrElse {
throw new NonExistentShard("No shards for tableId: %s".format(tableId))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.twitter.gizzard.nameserver

/**
* Changes to NameServer State
*
* @param updatedForwardings Forwarding entries that have been added or updated
* @param deletedForwardings Forwarding entries that have been marked "deleted"
* @param updatedSeq This class contains all the changes up to this updatedSeq. Since this
* class is not guaranteed to be built atomically, it may contain some
* extra newer changes.
*/
case class NameServerChanges(updatedForwardings: Seq[Forwarding],
deletedForwardings: Seq[Forwarding],
updatedSeq: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RemoteClusterManager(shard: RoutingNode[RemoteClusterManagerSource], relay
log.info("Loading remote cluster configuration...")

initialized = true
shard.write.foreach(_.reload)
shard.write.foreach(_.prepareReload)

val newRemoteClusters = mutable.Map[String, List[Host]]()

Expand Down Expand Up @@ -52,7 +52,7 @@ class RemoteClusterManager(shard: RoutingNode[RemoteClusterManagerSource], relay
}

trait RemoteClusterManagerSource {
@throws(classOf[ShardException]) def reload()
@throws(classOf[ShardException]) def prepareReload()
@throws(classOf[ShardException]) def addRemoteHost(h: Host)
@throws(classOf[ShardException]) def removeRemoteHost(h: String, p: Int)
@throws(classOf[ShardException]) def setRemoteHostStatus(h: String, p: Int, s: HostStatus.Value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import com.twitter.gizzard.thrift


class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository) {
def reload() = shard.write.foreach(_.reload)
def prepareReload() = shard.write.foreach(_.prepareReload)
def currentState() = shard.read.any(_.currentState)
def diffState(lastUpdatedSeq: Long) = shard.read.any(_.diffState(lastUpdatedSeq))
def dumpStructure(tableIds: Seq[Int]) = shard.read.any(_.dumpStructure(tableIds))

@throws(classOf[ShardException])
Expand Down Expand Up @@ -50,8 +51,9 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
}

trait ShardManagerSource {
@throws(classOf[ShardException]) def reload()
@throws(classOf[ShardException]) def currentState(): Seq[NameServerState]
@throws(classOf[ShardException]) def prepareReload()
@throws(classOf[ShardException]) def currentState(): (Seq[NameServerState], Long)
@throws(classOf[ShardException]) def diffState(lastUpdatedSeq: Long): NameServerChanges
@throws(classOf[ShardException]) def dumpStructure(tableIds: Seq[Int]) = {
import TreeUtils._

Expand Down
82 changes: 18 additions & 64 deletions src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,71 +218,30 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager

// Forwardings/Shard Management Read Methods

private def loadState() = dumpStructure(listTables)

private def updateState(state: Seq[NameServerState], updatedSequence: Long) = {
import TreeUtils._

val oldForwardings = state.flatMap(_.forwardings).map(f => (f.tableId, f.baseId) -> f).toMap
val oldLinks = state.flatMap(_.links).toSet
val oldLinksByUpId = mapOfSets(oldLinks)(_.upId)
val oldShards = state.flatMap(_.shards).map(s => s.id -> s).toMap
val oldShardIds = oldShards.keySet

val newForwardings = mutable.Map[(Int,Long), Forwarding]()
val deletedForwardings = mutable.Map[(Int,Long), Forwarding]()

queryEvaluator.select("SELECT * FROM forwardings WHERE updated_seq > ?", updatedSequence) { row =>
val f = rowToForwarding(row)
val fs = if (row.getBoolean("deleted")) deletedForwardings else newForwardings

fs += (f.tableId, f.baseId) -> f
}

val newRootIds = newForwardings.map(_._2.shardId).toSet
val newLinks = descendantLinks(newRootIds)(listDownwardLinks)
val newShardIds = newRootIds ++ newLinks.map(_.downId)
val newShards = newShardIds.toList.map(id => id -> getShard(id)).toMap

val purgeableRootIds = newRootIds ++ deletedForwardings.map(_._2.shardId)
val purgeableLinks = descendantLinks(purgeableRootIds)(oldLinksByUpId)
val purgeableShardIds = purgeableRootIds ++ purgeableLinks.map(_.downId)

val updatedForwardings = (oldForwardings -- deletedForwardings.keys) ++ newForwardings
val updatedLinks = (oldLinks -- purgeableLinks) ++ newLinks
val updatedShards = (oldShards -- purgeableShardIds) ++ newShards

val forwardingsByTableId = mapOfSets(updatedForwardings.map(_._2))(_.tableId)
val linksByUpId = mapOfSets(updatedLinks)(_.upId)
val tableIds = forwardingsByTableId.keySet

def extractor(id: Int) = NameServerState.extractTable(id)(forwardingsByTableId)(linksByUpId)(updatedShards)

tableIds.map(t => extractor(t)).toSeq
}

@volatile private var _forwardingUpdatedSeq = 0L
@volatile private var _currentState: Seq[NameServerState] = null

private def latestUpdatedSeq() = {
val query = "SELECT counter FROM update_counters WHERE id = 'forwardings'"
queryEvaluator.selectOne(query)(_.getLong("counter")).getOrElse(0L)
}

def currentState() = {
synchronized {
val nextUpdatedSeq = latestUpdatedSeq()

if (_currentState eq null) {
_currentState = loadState()
} else {
_currentState = updateState(_currentState, _forwardingUpdatedSeq)
}
def currentState() : (Seq[NameServerState], Long) = {
val updatedSeq = latestUpdatedSeq()
(dumpStructure(listTables), updatedSeq)
}

_forwardingUpdatedSeq = nextUpdatedSeq
def diffState(lastUpdatedSeq: Long) : NameServerChanges = {
val updatedForwardings = mutable.Buffer[Forwarding]()
val deletedForwardings = mutable.Buffer[Forwarding]()
val newUpdatedSeq = latestUpdatedSeq()

_currentState
// NOTE: There will be only one row per each forwarding, as forwardings table uses
// (table_id, base_source_id) as its primary key.
queryEvaluator.select("SELECT * FROM forwardings WHERE updated_seq > ?", lastUpdatedSeq) { row =>
val f = rowToForwarding(row)
val buffer = if (row.getBoolean("deleted")) deletedForwardings else updatedForwardings
buffer += f
}

NameServerChanges(updatedForwardings, deletedForwardings, newUpdatedSeq)
}

def getShard(id: ShardId) = {
Expand Down Expand Up @@ -388,13 +347,8 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
entryId
)

def reload() {
def prepareReload() {
try {
synchronized {
_currentState = null
_forwardingUpdatedSeq = 0L
}

SqlShard.DDLS.foreach {
case (name, ddl) => queryEvaluator.select("DESCRIBE " + name) { row => }
}
Expand Down Expand Up @@ -464,7 +418,7 @@ class SqlRemoteClusterManagerSource(queryEvaluator: QueryEvaluator) extends Remo
queryEvaluator.select("SELECT * FROM hosts WHERE cluster = ?", c)(rowToHost).toList
}

def reload() {
def prepareReload() {
try {
List("hosts").foreach { table =>
queryEvaluator.select("DESCRIBE " + table) { row => }
Expand Down
Loading

0 comments on commit ece0434

Please sign in to comment.