Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Batch update API for the nameserver
Browse files Browse the repository at this point in the history
  • Loading branch information
John Corwin authored and Stu Hood committed Mar 25, 2012
1 parent 3f46666 commit 62d364a
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 164 deletions.
12 changes: 8 additions & 4 deletions src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import com.twitter.gizzard.shards._
/**
* NameServer implementation that doesn't actually store anything anywhere.
* Useful for tests or stubbing out the partitioning scheme.
*
* Note: batch_execute commands are not executed atomically in this implementation.
*/
class MemoryShardManagerSource extends ShardManagerSource {

val shardTable = new mutable.ListBuffer[ShardInfo]()
val parentTable = new mutable.ListBuffer[LinkInfo]()
val forwardingTable = new mutable.ListBuffer[Forwarding]()
val updateVersion = new AtomicLong(0L)
val stateVersion = new AtomicLong(0L)

private def find(info: ShardInfo): Option[ShardInfo] = {
shardTable.find { x =>
Expand Down Expand Up @@ -167,10 +169,12 @@ class MemoryShardManagerSource extends ShardManagerSource {
forwardingTable.map(_.tableId).toSet.toSeq.sortWith((a,b) => a < b)
}

def getUpdateVersion() : Long = updateVersion.get()
def getCurrentStateVersion() : Long = stateVersion.get()

def incrementVersion() {
updateVersion.incrementAndGet()
def getMasterStateVersion() : Long = stateVersion.get()

def incrementStateVersion() {
stateVersion.incrementAndGet()
}

def reload() { }
Expand Down
23 changes: 12 additions & 11 deletions src/main/scala/com/twitter/gizzard/nameserver/PollingUpdater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,36 @@ class PollingUpdater(nameServer : NameServer, pollInterval : Duration) {
private val threadFactory = new ThreadFactoryBuilder().setDaemon(true).
setNameFormat("gizzard-polling-updater-%d").build()
private val scheduledExecutor = Executors.newScheduledThreadPool(1, threadFactory)
@volatile private var lastVersion = 0L

private[nameserver] def poll() {
val currentVersion = try {
nameServer.shardManager.getUpdateVersion()
val (currentVersion, masterVersion) = try {
(nameServer.shardManager.getCurrentStateVersion(),
nameServer.shardManager.getMasterStateVersion())
} catch {
case e: Exception => {
log.error(e, "[PollingUpdater] failed to read version from name server")
lastVersion
log.error(e, "[PollingUpdater] failed to read state version from name server")
return
}
}
log.ifDebug("[PollingUpdater] current version %d".format(currentVersion))
if (currentVersion > lastVersion) {
log.ifDebug("[PollingUpdater] current version: %d, master version: %d".
format(currentVersion, masterVersion))
if (currentVersion < masterVersion) {
log.info("[PollingUpdater] detected version change. Old version: %d. New version %d. Reloading config",
lastVersion, currentVersion)
lastVersion = currentVersion
currentVersion)
try {
nameServer.reload()
} catch {
case e: Exception =>
log.error(e, "[PollingUpdater] exception while reloading config")
}
} else if (currentVersion > masterVersion) {
log.critical("[PollingUpdater] current version %d is greater than master version %d".
format(currentVersion, masterVersion))
}
}

def start() {
log.info("[PollingUpdater] starting. Poll interval: %s", pollInterval)
lastVersion = nameServer.shardManager.getUpdateVersion()
log.info("[PollingUpdater] initial version %d", lastVersion)
val intervalSec = pollInterval.inSeconds
val pollTask = new Runnable() {
override def run() {
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/com/twitter/gizzard/nameserver/ShardManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def listHostnames() = shard.read.any(_.listHostnames())
def listTables() = shard.read.any(_.listTables())

def getUpdateVersion() = shard.read.any(_.getUpdateVersion())
def incrementVersion() { shard.write.foreach(_.incrementVersion()) }
def incrementStateVersion() { shard.write.foreach(_.incrementStateVersion()) }
def getCurrentStateVersion() = shard.read.any(_.getCurrentStateVersion())
def getMasterStateVersion() = shard.read.any(_.getMasterStateVersion())
}

trait ShardManagerSource {
Expand Down Expand Up @@ -97,6 +98,7 @@ trait ShardManagerSource {
@throws(classOf[ShardException]) def listHostnames(): Seq[String]
@throws(classOf[ShardException]) def listTables(): Seq[Int]

@throws(classOf[ShardException]) def getUpdateVersion() : Long
@throws(classOf[ShardException]) def incrementVersion()
@throws(classOf[ShardException]) def getCurrentStateVersion() : Long
@throws(classOf[ShardException]) def getMasterStateVersion() : Long
@throws(classOf[ShardException]) def incrementStateVersion()
}
Loading

0 comments on commit 62d364a

Please sign in to comment.