Permalink
Browse files

Batch update API for the nameserver

  • Loading branch information...
1 parent 7be22ac commit 4f85ede7630dfd111bb45da108243e94ae7ea797 John Corwin committed Jan 20, 2012
@@ -16,7 +16,7 @@ class MemoryShardManagerSource extends ShardManagerSource {
val shardTable = new mutable.ListBuffer[ShardInfo]()
val parentTable = new mutable.ListBuffer[LinkInfo]()
val forwardingTable = new mutable.ListBuffer[Forwarding]()
- val updateVersion = new AtomicLong(0L)
+ val stateVersion = new AtomicLong(0L)
private def find(info: ShardInfo): Option[ShardInfo] = {
shardTable.find { x =>
@@ -156,10 +156,12 @@ class MemoryShardManagerSource extends ShardManagerSource {
forwardingTable.map(_.tableId).toSet.toSeq.sortWith((a,b) => a < b)
}
- def getUpdateVersion() : Long = updateVersion.get()
+ def getCurrentStateVersion() : Long = stateVersion.get()
- def incrementVersion() {
- updateVersion.incrementAndGet()
+ def getMasterStateVersion() : Long = stateVersion.get()
+
+ def incrementStateVersion() {
+ stateVersion.incrementAndGet()
}
def batchExecute(commands : Seq[BatchedCommand]) {
@@ -10,35 +10,36 @@ class PollingUpdater(nameServer : NameServer, pollInterval : Duration) {
private val threadFactory = new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("gizzard-polling-updater-%d").build()
private val scheduledExecutor = Executors.newScheduledThreadPool(1, threadFactory)
- @volatile private var lastVersion = 0L
private[nameserver] def poll() {
- val currentVersion = try {
- nameServer.shardManager.getUpdateVersion()
+ val (currentVersion, masterVersion) = try {
+ (nameServer.shardManager.getCurrentStateVersion(),
+ nameServer.shardManager.getMasterStateVersion())
} catch {
case e: Exception => {
- log.error(e, "[PollingUpdater] failed to read version from name server")
- lastVersion
+ log.error(e, "[PollingUpdater] failed to read state version from name server")
+ return
}
}
- log.ifDebug("[PollingUpdater] current version %d".format(currentVersion))
- if (currentVersion > lastVersion) {
+ log.ifDebug("[PollingUpdater] current version: %d, master version: %d".
+ format(currentVersion, masterVersion))
+ if (currentVersion < masterVersion) {
log.info("[PollingUpdater] detected version change. Old version: %d. New version %d. Reloading config",
- lastVersion, currentVersion)
- lastVersion = currentVersion
+ currentVersion)
try {
nameServer.reload()
} catch {
case e: Exception =>
log.error(e, "[PollingUpdater] exception while reloading config")
}
+ } else if (currentVersion > masterVersion) {
+ log.critical("[PollingUpdater] current version %d is greater than master version %d".
+ format(currentVersion, masterVersion))
}
}
def start() {
log.info("[PollingUpdater] starting. Poll interval: %s", pollInterval)
- lastVersion = nameServer.shardManager.getUpdateVersion()
- log.info("[PollingUpdater] initial version %d", lastVersion)
val intervalSec = pollInterval.inSeconds
val pollTask = new Runnable() {
override def run() {
@@ -46,8 +46,9 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def listHostnames() = shard.read.any(_.listHostnames())
def listTables() = shard.read.any(_.listTables())
- def getUpdateVersion() = shard.read.any(_.getUpdateVersion())
- def incrementVersion() { shard.write.foreach(_.incrementVersion()) }
+ def incrementStateVersion() { shard.write.foreach(_.incrementStateVersion()) }
+ def getCurrentStateVersion() = shard.read.any(_.getCurrentStateVersion())
+ def getMasterStateVersion() = shard.read.any(_.getMasterStateVersion())
def batchExecute(commands : Seq[BatchedCommand]) { shard.write.foreach(_.batchExecute(commands)) }
}
@@ -96,8 +97,9 @@ trait ShardManagerSource {
@throws(classOf[ShardException]) def listHostnames(): Seq[String]
@throws(classOf[ShardException]) def listTables(): Seq[Int]
- @throws(classOf[ShardException]) def getUpdateVersion() : Long
- @throws(classOf[ShardException]) def incrementVersion()
+ @throws(classOf[ShardException]) def getCurrentStateVersion() : Long
+ @throws(classOf[ShardException]) def getMasterStateVersion() : Long
+ @throws(classOf[ShardException]) def incrementStateVersion()
@throws(classOf[ShardException]) def batchExecute(commands : Seq[BatchedCommand])
}
@@ -1,6 +1,7 @@
package com.twitter.gizzard.nameserver
import java.sql.{ResultSet, SQLException, SQLIntegrityConstraintViolationException}
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import com.twitter.logging.Logger
import com.twitter.querulous.evaluator.{QueryEvaluator, Transaction}
@@ -184,14 +185,16 @@ class SqlShardManagerTransaction(transaction : Transaction) extends ShardManager
ancestorIds.foreach(id => replaceForwarding(id, id))
}
-
- // Version methods
- def getUpdateVersion() : Long = {
+ def getMasterStateVersion() : Long = {
val query = "SELECT counter FROM update_counters WHERE id = 'version'"
transaction.selectOne(query)(_.getLong("counter")).getOrElse(0L)
}
- def incrementVersion() {
+ def getCurrentStateVersion() : Long = {
+ throw new UnsupportedOperationException("Transactional contexts do not have an independent state version")
+ }
+
+ def incrementStateVersion() {
transaction.execute(
"INSERT INTO update_counters (id, counter) VALUES ('version', 1) ON DUPLICATE KEY UPDATE counter = counter + 1")
}
@@ -262,13 +265,12 @@ class SqlShardManagerTransaction(transaction : Transaction) extends ShardManager
transaction.select("SELECT * FROM shards where busy != 0")(SqlShard.rowToShardInfo).toList
}
- def reload() {
- throw new UnsupportedOperationException("reload not supported within a transactional context")
+ def latestUpdatedSeq() = {
+ val query = "SELECT counter FROM update_counters WHERE id = 'forwardings'"
+ transaction.selectOne(query)(_.getLong("counter")).getOrElse(0L)
}
- def currentState(): Seq[NameServerState] = {
- throw new UnsupportedOperationException("shard state not supported within a transactional context")
- }
+ def loadState() = dumpStructure(listTables)
def batchExecute(commands : Seq[BatchedCommand]) {
for (cmd <- commands) {
@@ -282,15 +284,73 @@ class SqlShardManagerTransaction(transaction : Transaction) extends ShardManager
}
}
}
+
+ def updateState(state: Seq[NameServerState], updatedSequence: Long) = {
+ import TreeUtils._
+
+ val oldForwardings = state.flatMap(_.forwardings).map(f => (f.tableId, f.baseId) -> f).toMap
+ val oldLinks = state.flatMap(_.links).toSet
+ val oldLinksByUpId = mapOfSets(oldLinks)(_.upId)
+ val oldShards = state.flatMap(_.shards).map(s => s.id -> s).toMap
+ val oldShardIds = oldShards.keySet
+
+ val newForwardings = mutable.Map[(Int,Long), Forwarding]()
+ val deletedForwardings = mutable.Map[(Int,Long), Forwarding]()
+
+ transaction.select("SELECT * FROM forwardings WHERE updated_seq > ?", updatedSequence) { row =>
+ val f = SqlShard.rowToForwarding(row)
+ val fs = if (row.getBoolean("deleted")) deletedForwardings else newForwardings
+
+ fs += (f.tableId, f.baseId) -> f
+ }
+
+ val newRootIds = newForwardings.map(_._2.shardId).toSet
+ val newLinks = descendantLinks(newRootIds)(listDownwardLinks)
+ val newShardIds = newRootIds ++ newLinks.map(_.downId)
+ val newShards = newShardIds.toList.map(id => id -> getShard(id)).toMap
+
+ val purgeableRootIds = newRootIds ++ deletedForwardings.map(_._2.shardId)
+ val purgeableLinks = descendantLinks(purgeableRootIds)(oldLinksByUpId)
+ val purgeableShardIds = purgeableRootIds ++ purgeableLinks.map(_.downId)
+
+ val updatedForwardings = (oldForwardings -- deletedForwardings.keys) ++ newForwardings
+ val updatedLinks = (oldLinks -- purgeableLinks) ++ newLinks
+ val updatedShards = (oldShards -- purgeableShardIds) ++ newShards
+
+ val forwardingsByTableId = mapOfSets(updatedForwardings.map(_._2))(_.tableId)
+ val linksByUpId = mapOfSets(updatedLinks)(_.upId)
+ val tableIds = forwardingsByTableId.keySet
+
+ def extractor(id: Int) = NameServerState.extractTable(id)(forwardingsByTableId)(linksByUpId)(updatedShards)
+
+ tableIds.map(t => extractor(t)).toSeq
+ }
+
+ def reload() {
+ throw new UnsupportedOperationException("reload not supported within a transactional context")
+ }
+
+ def currentState(): Seq[NameServerState] = {
+ throw new UnsupportedOperationException("shard state not supported within a transactional context")
+ }
}
class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManagerSource {
private val log = Logger.get(getClass.getName)
- private def withTransaction [T] (f : ShardManagerSource => T) : T = {
+ private def withTransaction [T] (f : SqlShardManagerTransaction => T) : T = {
queryEvaluator.transaction(t => f(new SqlShardManagerTransaction(t)))
}
+ private def updateWithTransaction [T] (f : SqlShardManagerTransaction => T) : T = {
+ queryEvaluator.transaction(t => {
+ val shardTransaction = new SqlShardManagerTransaction(t)
+ val result = f(shardTransaction)
+ shardTransaction.incrementStateVersion()
+ result
+ })
+ }
+
def reload() {
try {
synchronized {
@@ -315,32 +375,32 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
queryEvaluator.execute(SqlShard.UPDATE_COUNTER_DDL)
}
- def createShard(shardInfo: ShardInfo) { withTransaction(_.createShard(shardInfo)) }
- def deleteShard(id: ShardId) { withTransaction(_.deleteShard(id)) }
- def markShardBusy(id: ShardId, busy: Busy.Value) { withTransaction(_.markShardBusy(id, busy)) }
+ def createShard(shardInfo: ShardInfo) { updateWithTransaction(_.createShard(shardInfo)) }
+ def deleteShard(id: ShardId) { updateWithTransaction(_.deleteShard(id)) }
+ def markShardBusy(id: ShardId, busy: Busy.Value) { updateWithTransaction(_.markShardBusy(id, busy)) }
def getShard(id: ShardId) = withTransaction(_.getShard(id))
def shardsForHostname(hostname: String) = withTransaction(_.shardsForHostname(hostname))
def listShards() = withTransaction(_.listShards())
def getBusyShards() = withTransaction(_.getBusyShards())
def addLink(upId: ShardId, downId: ShardId, weight: Int) {
- withTransaction(_.addLink(upId, downId, weight))
+ updateWithTransaction(_.addLink(upId, downId, weight))
}
def removeLink(upId: ShardId, downId: ShardId) {
- withTransaction(_.removeLink(upId, downId))
+ updateWithTransaction(_.removeLink(upId, downId))
}
def listUpwardLinks(id: ShardId) = withTransaction(_.listUpwardLinks(id))
def listDownwardLinks(id: ShardId) = withTransaction(_.listDownwardLinks(id))
def listLinks() = withTransaction(_.listLinks())
- def setForwarding(forwarding: Forwarding) { withTransaction(_.setForwarding(forwarding)) }
- def removeForwarding(forwarding: Forwarding) { withTransaction(_.removeForwarding(forwarding)) }
+ def setForwarding(forwarding: Forwarding) { updateWithTransaction(_.setForwarding(forwarding)) }
+ def removeForwarding(forwarding: Forwarding) { updateWithTransaction(_.removeForwarding(forwarding)) }
def replaceForwarding(oldId: ShardId, newId: ShardId) {
- withTransaction(_.replaceForwarding(oldId, newId))
+ updateWithTransaction(_.replaceForwarding(oldId, newId))
}
def getForwarding(tableId: Int, baseId: Long) = withTransaction(_.getForwarding(tableId, baseId))
@@ -351,79 +411,37 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
def listHostnames() = withTransaction(_.listHostnames())
def listTables() = withTransaction(_.listTables())
- def getUpdateVersion() = withTransaction(_.getUpdateVersion())
- def incrementVersion() { withTransaction(_.incrementVersion()) }
+ def getMasterStateVersion() = withTransaction(_.getMasterStateVersion())
+ def incrementStateVersion() { updateWithTransaction(_ => ()) }
- def batchExecute(commands : Seq[BatchedCommand]) { withTransaction(_.batchExecute(commands)) }
+ def batchExecute(commands : Seq[BatchedCommand]) { updateWithTransaction(_.batchExecute(commands)) }
// Forwardings/Shard Management Read Methods
- private def loadState() = dumpStructure(listTables)
-
- private def updateState(state: Seq[NameServerState], updatedSequence: Long) = {
- import TreeUtils._
-
- val oldForwardings = state.flatMap(_.forwardings).map(f => (f.tableId, f.baseId) -> f).toMap
- val oldLinks = state.flatMap(_.links).toSet
- val oldLinksByUpId = mapOfSets(oldLinks)(_.upId)
- val oldShards = state.flatMap(_.shards).map(s => s.id -> s).toMap
- val oldShardIds = oldShards.keySet
-
- val newForwardings = mutable.Map[(Int,Long), Forwarding]()
- val deletedForwardings = mutable.Map[(Int,Long), Forwarding]()
-
- queryEvaluator.select("SELECT * FROM forwardings WHERE updated_seq > ?", updatedSequence) { row =>
- val f = SqlShard.rowToForwarding(row)
- val fs = if (row.getBoolean("deleted")) deletedForwardings else newForwardings
-
- fs += (f.tableId, f.baseId) -> f
- }
-
- val newRootIds = newForwardings.map(_._2.shardId).toSet
- val newLinks = descendantLinks(newRootIds)(listDownwardLinks)
- val newShardIds = newRootIds ++ newLinks.map(_.downId)
- val newShards = newShardIds.toList.map(id => id -> getShard(id)).toMap
-
- val purgeableRootIds = newRootIds ++ deletedForwardings.map(_._2.shardId)
- val purgeableLinks = descendantLinks(purgeableRootIds)(oldLinksByUpId)
- val purgeableShardIds = purgeableRootIds ++ purgeableLinks.map(_.downId)
-
- val updatedForwardings = (oldForwardings -- deletedForwardings.keys) ++ newForwardings
- val updatedLinks = (oldLinks -- purgeableLinks) ++ newLinks
- val updatedShards = (oldShards -- purgeableShardIds) ++ newShards
-
- val forwardingsByTableId = mapOfSets(updatedForwardings.map(_._2))(_.tableId)
- val linksByUpId = mapOfSets(updatedLinks)(_.upId)
- val tableIds = forwardingsByTableId.keySet
-
- def extractor(id: Int) = NameServerState.extractTable(id)(forwardingsByTableId)(linksByUpId)(updatedShards)
-
- tableIds.map(t => extractor(t)).toSeq
- }
-
@volatile private var _forwardingUpdatedSeq = 0L
@volatile private var _currentState: Seq[NameServerState] = null
-
- private def latestUpdatedSeq() = {
- val query = "SELECT counter FROM update_counters WHERE id = 'forwardings'"
- queryEvaluator.selectOne(query)(_.getLong("counter")).getOrElse(0L)
- }
+ private val _stateVersion = new AtomicLong(0L)
def currentState() = {
synchronized {
- val nextUpdatedSeq = latestUpdatedSeq()
+ withTransaction(t => {
+ val nextUpdatedSeq = t.latestUpdatedSeq()
- if (_currentState eq null) {
- _currentState = loadState()
- } else {
- _currentState = updateState(_currentState, _forwardingUpdatedSeq)
- }
+ if (_currentState eq null) {
+ _currentState = t.loadState()
+ } else {
+ _currentState = t.updateState(_currentState, _forwardingUpdatedSeq)
+ }
- _forwardingUpdatedSeq = nextUpdatedSeq
+ _forwardingUpdatedSeq = nextUpdatedSeq
+ _stateVersion.set(t.getMasterStateVersion())
- _currentState
+ _currentState
+ })
}
}
+
+ def getCurrentStateVersion() : Long = _stateVersion.get()
}
class SqlRemoteClusterManagerSource(queryEvaluator: QueryEvaluator) extends RemoteClusterManagerSource {
@@ -128,6 +128,18 @@ extends Manager.Iface {
wrapEx(shardManager.batchExecute(commands.map(nameserver.BatchedCommand.apply)))
}
+ def increment_state_version() {
+ wrapEx(shardManager.incrementStateVersion())
+ }
+
+ def get_current_state_version() : Long = {
+ wrapEx(shardManager.getCurrentStateVersion())
+ }
+
+ def get_master_state_version() : Long = {
+ wrapEx(shardManager.getMasterStateVersion())
+ }
+
// Job Scheduler Management
def retry_errors() = wrapEx(scheduler.retryErrors())
Oops, something went wrong.

0 comments on commit 4f85ede

Please sign in to comment.