Permalink
Browse files

Batch update API for the nameserver

  • Loading branch information...
1 parent 3f46666 commit 62d364ad984abd6054a50052688531704a1d9eed John Corwin committed with stuhood Jan 19, 2012
@@ -8,13 +8,15 @@ import com.twitter.gizzard.shards._
/**
* NameServer implementation that doesn't actually store anything anywhere.
* Useful for tests or stubbing out the partitioning scheme.
+ *
+ * Note: batch_execute commands are not executed atomically in this implementation.
*/
class MemoryShardManagerSource extends ShardManagerSource {
val shardTable = new mutable.ListBuffer[ShardInfo]()
val parentTable = new mutable.ListBuffer[LinkInfo]()
val forwardingTable = new mutable.ListBuffer[Forwarding]()
- val updateVersion = new AtomicLong(0L)
+ val stateVersion = new AtomicLong(0L)
private def find(info: ShardInfo): Option[ShardInfo] = {
shardTable.find { x =>
@@ -167,10 +169,12 @@ class MemoryShardManagerSource extends ShardManagerSource {
forwardingTable.map(_.tableId).toSet.toSeq.sortWith((a,b) => a < b)
}
- def getUpdateVersion() : Long = updateVersion.get()
+ def getCurrentStateVersion() : Long = stateVersion.get()
- def incrementVersion() {
- updateVersion.incrementAndGet()
+ def getMasterStateVersion() : Long = stateVersion.get()
+
+ def incrementStateVersion() {
+ stateVersion.incrementAndGet()
}
def reload() { }
@@ -10,35 +10,36 @@ class PollingUpdater(nameServer : NameServer, pollInterval : Duration) {
private val threadFactory = new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("gizzard-polling-updater-%d").build()
private val scheduledExecutor = Executors.newScheduledThreadPool(1, threadFactory)
- @volatile private var lastVersion = 0L
private[nameserver] def poll() {
- val currentVersion = try {
- nameServer.shardManager.getUpdateVersion()
+ val (currentVersion, masterVersion) = try {
+ (nameServer.shardManager.getCurrentStateVersion(),
+ nameServer.shardManager.getMasterStateVersion())
} catch {
case e: Exception => {
- log.error(e, "[PollingUpdater] failed to read version from name server")
- lastVersion
+ log.error(e, "[PollingUpdater] failed to read state version from name server")
+ return
}
}
- log.ifDebug("[PollingUpdater] current version %d".format(currentVersion))
- if (currentVersion > lastVersion) {
+ log.ifDebug("[PollingUpdater] current version: %d, master version: %d".
+ format(currentVersion, masterVersion))
+ if (currentVersion < masterVersion) {
log.info("[PollingUpdater] detected version change. Old version: %d. New version %d. Reloading config",
- lastVersion, currentVersion)
- lastVersion = currentVersion
+ currentVersion)
try {
nameServer.reload()
} catch {
case e: Exception =>
log.error(e, "[PollingUpdater] exception while reloading config")
}
+ } else if (currentVersion > masterVersion) {
+ log.critical("[PollingUpdater] current version %d is greater than master version %d".
+ format(currentVersion, masterVersion))
}
}
def start() {
log.info("[PollingUpdater] starting. Poll interval: %s", pollInterval)
- lastVersion = nameServer.shardManager.getUpdateVersion()
- log.info("[PollingUpdater] initial version %d", lastVersion)
val intervalSec = pollInterval.inSeconds
val pollTask = new Runnable() {
override def run() {
@@ -47,8 +47,9 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def listHostnames() = shard.read.any(_.listHostnames())
def listTables() = shard.read.any(_.listTables())
- def getUpdateVersion() = shard.read.any(_.getUpdateVersion())
- def incrementVersion() { shard.write.foreach(_.incrementVersion()) }
+ def incrementStateVersion() { shard.write.foreach(_.incrementStateVersion()) }
+ def getCurrentStateVersion() = shard.read.any(_.getCurrentStateVersion())
+ def getMasterStateVersion() = shard.read.any(_.getMasterStateVersion())
}
trait ShardManagerSource {
@@ -97,6 +98,7 @@ trait ShardManagerSource {
@throws(classOf[ShardException]) def listHostnames(): Seq[String]
@throws(classOf[ShardException]) def listTables(): Seq[Int]
- @throws(classOf[ShardException]) def getUpdateVersion() : Long
- @throws(classOf[ShardException]) def incrementVersion()
+ @throws(classOf[ShardException]) def getCurrentStateVersion() : Long
+ @throws(classOf[ShardException]) def getMasterStateVersion() : Long
+ @throws(classOf[ShardException]) def incrementStateVersion()
}
Oops, something went wrong.

0 comments on commit 62d364a

Please sign in to comment.