Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

DS 133: Serverside rollback log support #94

Merged
merged 16 commits into from

2 participants

@stuhood
Collaborator

Adds support for rollback logs which are persisted in the nameserver database.

@stuhood
Collaborator

Refreshed to execute transform operations directly. e0aa443 and 0e28edc are cherry picks from the ds-61 branch, which will consume these same TransformOperation objects.

@jcorwin

#shipit
Looks good overall. As we discussed, I think we should have automatic server-side logging of all operations into a designated log.

@stuhood stuhood merged commit 0e28edc into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 28, 2012
  1. @stuhood
  2. @stuhood
Commits on Mar 7, 2012
  1. @stuhood
  2. @stuhood

    Bump

    stuhood authored
  3. @stuhood
Commits on Mar 13, 2012
  1. @stuhood
Commits on Mar 14, 2012
  1. @stuhood

    Add and test multi-peak

    stuhood authored
Commits on Mar 15, 2012
  1. @stuhood

    Merge branch 'master' into ds-133

    stuhood authored
    Conflicts:
    	project/build.properties
Commits on Mar 20, 2012
  1. @stuhood

    Merge branch 'master' into ds-133

    stuhood authored
    Conflicts:
    	project/build.properties
    	src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
    	src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
Commits on Mar 21, 2012
  1. @stuhood

    Lower max log name size, to account for the unique index size limit o…

    stuhood authored
    …f 768 bytes. Thanks MySQL!
  2. @stuhood

    Add TransformCommands to be used in batches and logging

    stuhood authored
    cherry-picked from da6f60c
    
    Conflicts:
    
    	src/main/thrift/Manager.thrift
Commits on Mar 22, 2012
  1. @stuhood
  2. @stuhood
  3. @stuhood
Commits on Mar 24, 2012
  1. @stuhood

    Slightly more logging

    stuhood authored
Commits on Mar 25, 2012
  1. @stuhood
This page is out of date. Refresh to see the latest.
View
2  project/build.properties
@@ -3,6 +3,6 @@
project.organization=com.twitter
project.name=gizzard
sbt.version=0.7.4
-project.version=3.0.8-SNAPSHOT
+project.version=3.0.8-SNAPSHOT-3-stuhood
build.scala.versions=2.8.1
project.initialize=false
View
1  project/build/GizzardProject.scala
@@ -20,6 +20,7 @@ with SubversionPublisher {
val finagleThrift = "com.twitter" % "finagle-thrift" % finagleVersion
val finagleOstrich4 = "com.twitter" % "finagle-ostrich4" % finagleVersion
+ val utilThrift = "com.twitter" % "util-thrift" % "2.0.0"
val jackson = "org.codehaus.jackson" % "jackson-core-asl" % "1.9.2"
val jacksonMap = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.2"
View
5 src/main/scala/com/twitter/gizzard/GizzardServer.scala
@@ -3,7 +3,7 @@ package com.twitter.gizzard
import com.twitter.util.Duration
import com.twitter.conversions.time._
import com.twitter.logging.Logger
-import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager}
+import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager, RollbackLogManager}
import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.config.{GizzardServer => ServerConfig}
import com.twitter.gizzard.proxy.LoggingProxy
@@ -27,6 +27,7 @@ abstract class GizzardServer(config: ServerConfig) {
val nameServer = config.buildNameServer()
val remoteClusterManager = config.buildRemoteClusterManager()
val shardManager = nameServer.shardManager
+ val rollbackLogManager = new RollbackLogManager(nameServer.shard)
lazy val adminJobManager = new AdminJobManager(nameServer.shardRepository, shardManager, jobScheduler(copyPriority))
// job wiring
@@ -49,7 +50,7 @@ abstract class GizzardServer(config: ServerConfig) {
// service wiring
- lazy val managerServer = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, jobScheduler)
+ lazy val managerServer = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, rollbackLogManager, jobScheduler)
lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer))
lazy val jobInjectorServer = new thrift.JobInjectorService(jobCodec, jobScheduler)
View
70 src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
@@ -1,18 +1,33 @@
package com.twitter.gizzard.nameserver
+import java.nio.ByteBuffer
import scala.collection.mutable
import com.twitter.gizzard.shards._
+import com.twitter.gizzard.thrift
/**
* NameServer implementation that doesn't actually store anything anywhere.
* Useful for tests or stubbing out the partitioning scheme.
*/
+object MemoryShardManagerSource {
+ class LogEntry(val logId: ByteBuffer, val id: Int, val command: thrift.TransformOperation, var deleted: Boolean) {
+ override def equals(o: Any) = o match {
+ case that: LogEntry if that.logId == this.logId && that.id == this.id => true
+ case _ => false
+ }
+
+ override def hashCode() = logId.hashCode() + (31 * id)
+ }
+}
class MemoryShardManagerSource extends ShardManagerSource {
+ import MemoryShardManagerSource._
val shardTable = new mutable.ListBuffer[ShardInfo]()
val parentTable = new mutable.ListBuffer[LinkInfo]()
val forwardingTable = new mutable.ListBuffer[Forwarding]()
+ val logs = new mutable.ListBuffer[(ByteBuffer,String)]()
+ val logEntries = new mutable.ListBuffer[LogEntry]()
private def find(info: ShardInfo): Option[ShardInfo] = {
shardTable.find { x =>
@@ -148,6 +163,19 @@ class MemoryShardManagerSource extends ShardManagerSource {
parentTable.toList
}
+ def batchExecute(commands : Seq[TransformOperation]) {
+ for (cmd <- commands) {
+ cmd match {
+ case CreateShard(shardInfo) => createShard(shardInfo)
+ case DeleteShard(shardId) => deleteShard(shardId)
+ case AddLink(upId, downId, weight) => addLink(upId, downId, weight)
+ case RemoveLink(upId, downId) => removeLink(upId, downId)
+ case SetForwarding(forwarding) => setForwarding(forwarding)
+ case RemoveForwarding(forwarding) => removeForwarding(forwarding)
+ }
+ }
+ }
+
def getBusyShards(): Seq[ShardInfo] = {
shardTable.filter { _.busy.id > 0 }.toList
}
@@ -156,6 +184,48 @@ class MemoryShardManagerSource extends ShardManagerSource {
forwardingTable.map(_.tableId).toSet.toSeq.sortWith((a,b) => a < b)
}
+ def logCreate(id: Array[Byte], logName: String): Unit = {
+ val pair = (ByteBuffer.wrap(id), logName)
+ if (!logs.contains(pair))
+ logs += pair
+ else
+ throw new RuntimeException("Log already exists: " + pair)
+ }
+
+ def logGet(logName: String): Option[Array[Byte]] =
+ logs.collect {
+ case (id, `logName`) => id.array
+ }.headOption
+
+ def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit = {
+ val le = new LogEntry(ByteBuffer.wrap(logId), entry.id, entry.command, false)
+ if (!logEntries.contains(le))
+ logEntries += le
+ else
+ throw new RuntimeException("Log entry already exists: " + le)
+ }
+
+ def logEntryPeek(rawLogId: Array[Byte], count: Int): Seq[thrift.LogEntry] = {
+ val logId = ByteBuffer.wrap(rawLogId)
+ val peeked =
+ logEntries.reverseIterator.collect {
+ case e if e.logId == logId && !e.deleted => e
+ }.take(count)
+ peeked.map { e =>
+ new thrift.LogEntry().setId(e.id).setCommand(e.command)
+ }.toSeq
+ }
+
+ def logEntryPop(rawLogId: Array[Byte], entryId: Int): Unit = {
+ val logId = ByteBuffer.wrap(rawLogId)
+ val entry = logEntries.collect {
+ case e if e.logId == logId && e.id == entryId => e
+ }.headOption.getOrElse { throw new RuntimeException(entryId + " not found for " + logId) }
+
+ // side effect: mark deleted
+ entry.deleted = true
+ }
+
def prepareReload() { }
}
View
103 src/main/scala/com/twitter/gizzard/nameserver/RollbackLogManager.scala
@@ -0,0 +1,103 @@
+package com.twitter.gizzard.nameserver
+
+import java.util.UUID
+import java.nio.ByteBuffer
+import scala.collection.mutable
+import scala.math.Ordering
+
+import com.twitter.logging.Logger
+import com.twitter.gizzard.shards.RoutingNode
+import com.twitter.gizzard.thrift
+
+
+class RollbackLogManager(shard: RoutingNode[ShardManagerSource]) {
+ import RollbackLogManager._
+
+ def create(logName: String): ByteBuffer = {
+ val idArray = new Array[Byte](UUID_BYTES)
+ val id = ByteBuffer.wrap(idArray)
+ val uuid = UUID.randomUUID
+ id.asLongBuffer
+ .put(uuid.getMostSignificantBits)
+ .put(uuid.getLeastSignificantBits)
+ shard.write.foreach(_.logCreate(idArray, logName))
+ log.info("Created new rollback log for name '%s' with id %s", logName, uuid)
+ id
+ }
+
+ def get(logName: String): Option[ByteBuffer] =
+ shard.read.any(_.logGet(logName).map(ByteBuffer.wrap))
+
+ def entryPush(logId: ByteBuffer, entry: thrift.LogEntry): Unit =
+ shard.write.foreach(_.logEntryPush(unwrap(logId), entry))
+
+ /**
+ * @return the entries with the highest ids on all replicas: note that this means
+ * that logs cannot be rolled back while any nameserver replicas are unavailable.
+ */
+ def entryPeek(logId: ByteBuffer, count: Int): Seq[thrift.LogEntry] = {
+ // collect k from each shard
+ val descendingEntryLists = shard.read.map(_.logEntryPeek(unwrap(logId), count))
+ // and take the top k
+ RollbackLogManager.topK(descendingEntryLists, count)
+ }
+
+ def entryPop(logId: ByteBuffer, entryId: Int): Unit =
+ shard.write.foreach(_.logEntryPop(unwrap(logId), entryId))
+
+ private def unwrap(bb: ByteBuffer): Array[Byte] = {
+ if (bb.hasArray && bb.remaining == bb.capacity) {
+ bb.array
+ } else {
+ val arr = new Array[Byte](bb.remaining)
+ bb.duplicate.get(arr)
+ arr
+ }
+ }
+}
+
+object RollbackLogManager {
+ private val log = Logger.get(getClass.getName)
+
+ val UUID_BYTES = 16
+
+ /**
+ * Calculates the top-k entries by id.
+ * TODO: this is not the hot path, but we should eventually do a real linear merge.
+ */
+ def topK(
+ descendingEntryLists: Iterable[Seq[thrift.LogEntry]],
+ count: Int
+ ): Seq[thrift.LogEntry] = count match {
+ case 1 =>
+ val heads = descendingEntryLists.flatMap(_.lastOption)
+ // no optional form of max
+ if (!heads.isEmpty)
+ Seq(heads.max(EntryOrdering))
+ else
+ Nil
+ case _ =>
+ val q = new mutable.PriorityQueue[thrift.LogEntry]()(EntryOrdering)
+ val result = mutable.Buffer[thrift.LogEntry]()
+ descendingEntryLists.foreach { q ++= _ }
+ val iterator = q.iterator
+ // take the first k non-equal entries
+ var gathered = 0
+ var lastId = Int.MinValue
+ while (gathered < count && iterator.hasNext) {
+ val entry = iterator.next
+ if (entry.id != lastId) {
+ result += entry
+ lastId = entry.id
+ gathered += 1
+ }
+ }
+ result
+ }
+
+ // TODO: Scala 2.8.1 doesn't have maxBy
+ object EntryOrdering extends Ordering[thrift.LogEntry] {
+ override def compare(a: thrift.LogEntry, b: thrift.LogEntry) =
+ Ordering.Int.compare(a.id, b.id)
+ }
+}
View
15 src/main/scala/com/twitter/gizzard/nameserver/ShardManager.scala
@@ -3,6 +3,7 @@ package com.twitter.gizzard.nameserver
import scala.collection.mutable
import com.twitter.gizzard.shards._
import com.twitter.gizzard.util.TreeUtils
+import com.twitter.gizzard.thrift
class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository) {
@@ -11,6 +12,8 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def diffState(lastUpdatedSeq: Long) = shard.read.any(_.diffState(lastUpdatedSeq))
def dumpStructure(tableIds: Seq[Int]) = shard.read.any(_.dumpStructure(tableIds))
+ def batchExecute(commands : Seq[TransformOperation]) { shard.write.foreach(_.batchExecute(commands)) }
+
@throws(classOf[ShardException])
def createAndMaterializeShard(shardInfo: ShardInfo) {
shard.write.foreach(_.createShard(shardInfo))
@@ -27,7 +30,6 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def listShards() = shard.read.any(_.listShards())
def getBusyShards() = shard.read.any(_.getBusyShards())
-
def addLink(upId: ShardId, downId: ShardId, weight: Int) { shard.write.foreach(_.addLink(upId, downId, weight)) }
def removeLink(upId: ShardId, downId: ShardId) { shard.write.foreach(_.removeLink(upId, downId)) }
@@ -65,6 +67,8 @@ trait ShardManagerSource {
tableIds.map(extractor)
}
+ @throws(classOf[ShardException]) def batchExecute(commands : Seq[TransformOperation])
+
@throws(classOf[ShardException]) def createShard(shardInfo: ShardInfo)
@throws(classOf[ShardException]) def deleteShard(id: ShardId)
@throws(classOf[ShardException]) def markShardBusy(id: ShardId, busy: Busy.Value)
@@ -93,4 +97,13 @@ trait ShardManagerSource {
@throws(classOf[ShardException]) def listHostnames(): Seq[String]
@throws(classOf[ShardException]) def listTables(): Seq[Int]
+
+ @throws(classOf[ShardException]) def logCreate(id: Array[Byte], logName: String): Unit
+ @throws(classOf[ShardException]) def logGet(logName: String): Option[Array[Byte]]
+ @throws(classOf[ShardException]) def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit
+ @throws(classOf[ShardException]) def logEntryPeek(logId: Array[Byte], count: Int): Seq[thrift.LogEntry]
+ @throws(classOf[ShardException]) def logEntryPop(logId: Array[Byte], entryId: Int): Unit
+
+ /** For JMocker. TODO: switch to a better mocking framework */
+ override def toString() = "<%s>".format(this.getClass)
}
View
224 src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
@@ -1,80 +1,107 @@
package com.twitter.gizzard.nameserver
+import java.nio.ByteBuffer
import java.sql.{ResultSet, SQLException, SQLIntegrityConstraintViolationException}
import scala.collection.mutable
import com.twitter.querulous.evaluator.QueryEvaluator
import com.twitter.gizzard.util.TreeUtils
import com.twitter.gizzard.shards._
+import com.twitter.gizzard.thrift
+import com.twitter.logging.Logger
object SqlShard {
- val SHARDS_DDL = """
-CREATE TABLE IF NOT EXISTS shards (
- class_name VARCHAR(125) NOT NULL,
- hostname VARCHAR(125) NOT NULL,
- table_prefix VARCHAR(125) NOT NULL,
- source_type VARCHAR(125),
- destination_type VARCHAR(125),
- busy TINYINT NOT NULL DEFAULT 0,
-
- PRIMARY KEY (hostname, table_prefix),
- INDEX idx_busy (busy)
-) ENGINE=INNODB
-"""
-
- val SHARD_CHILDREN_DDL = """
-CREATE TABLE IF NOT EXISTS shard_children (
- parent_hostname VARCHAR(125) NOT NULL,
- parent_table_prefix VARCHAR(125) NOT NULL,
- child_hostname VARCHAR(125) NOT NULL,
- child_table_prefix VARCHAR(125) NOT NULL,
- weight INT NOT NULL DEFAULT 1,
-
- PRIMARY KEY (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
- INDEX idx_parent (parent_hostname, parent_table_prefix, weight),
- INDEX idx_child (child_hostname, child_table_prefix, weight)
-) ENGINE=INNODB
-"""
-
- val FORWARDINGS_DDL = """
-CREATE TABLE IF NOT EXISTS forwardings (
- table_id INT NOT NULL,
- base_source_id BIGINT NOT NULL,
- shard_hostname VARCHAR(125) NOT NULL,
- shard_table_prefix VARCHAR(125) NOT NULL,
- deleted TINYINT NOT NULL DEFAULT 0,
- updated_seq BIGINT NOT NULL,
-
- PRIMARY KEY (table_id, base_source_id),
- UNIQUE uni_shard (shard_hostname, shard_table_prefix),
- INDEX idx_updated (updated_seq)
-) ENGINE=INNODB;
-"""
-
- val UPDATE_COUNTER_DDL = """
-CREATE TABLE IF NOT EXISTS update_counters (
- id VARCHAR(25) NOT NULL,
- counter BIGINT NOT NULL DEFAULT 0,
-
- PRIMARY KEY (id)
-) ENGINE=INNODB;
-"""
-
- val HOSTS_DDL = """
-CREATE TABLE IF NOT EXISTS hosts (
- hostname VARCHAR(125) NOT NULL,
- port INT NOT NULL,
- cluster VARCHAR(125) NOT NULL,
- status INT NOT NULL DEFAULT 0,
-
- PRIMARY KEY (hostname, port),
- INDEX idx_cluster (cluster, status)
-) ENGINE=INNODB;
-"""
+ // ordered (name -> ddl), and a map for convenience
+ val DDLS =
+ Seq(
+ "shards" -> """
+ CREATE TABLE IF NOT EXISTS shards (
+ class_name VARCHAR(125) NOT NULL,
+ hostname VARCHAR(125) NOT NULL,
+ table_prefix VARCHAR(125) NOT NULL,
+ source_type VARCHAR(125),
+ destination_type VARCHAR(125),
+ busy TINYINT NOT NULL DEFAULT 0,
+
+ PRIMARY KEY (hostname, table_prefix),
+ INDEX idx_busy (busy)
+ ) ENGINE=INNODB
+ """,
+ "shard_children" -> """
+ CREATE TABLE IF NOT EXISTS shard_children (
+ parent_hostname VARCHAR(125) NOT NULL,
+ parent_table_prefix VARCHAR(125) NOT NULL,
+ child_hostname VARCHAR(125) NOT NULL,
+ child_table_prefix VARCHAR(125) NOT NULL,
+ weight INT NOT NULL DEFAULT 1,
+
+ PRIMARY KEY (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
+ INDEX idx_parent (parent_hostname, parent_table_prefix, weight),
+ INDEX idx_child (child_hostname, child_table_prefix, weight)
+ ) ENGINE=INNODB
+ """,
+ "forwardings" -> """
+ CREATE TABLE IF NOT EXISTS forwardings (
+ table_id INT NOT NULL,
+ base_source_id BIGINT NOT NULL,
+ shard_hostname VARCHAR(125) NOT NULL,
+ shard_table_prefix VARCHAR(125) NOT NULL,
+ deleted TINYINT NOT NULL DEFAULT 0,
+ updated_seq BIGINT NOT NULL,
+
+ PRIMARY KEY (table_id, base_source_id),
+ UNIQUE uni_shard (shard_hostname, shard_table_prefix),
+ INDEX idx_updated (updated_seq)
+ ) ENGINE=INNODB;
+ """,
+ "update_counters" -> """
+ CREATE TABLE IF NOT EXISTS update_counters (
+ id VARCHAR(25) NOT NULL,
+ counter BIGINT NOT NULL DEFAULT 0,
+
+ PRIMARY KEY (id)
+ ) ENGINE=INNODB;
+ """,
+ "hosts" -> """
+ CREATE TABLE IF NOT EXISTS hosts (
+ hostname VARCHAR(125) NOT NULL,
+ port INT NOT NULL,
+ cluster VARCHAR(125) NOT NULL,
+ status INT NOT NULL DEFAULT 0,
+
+ PRIMARY KEY (hostname, port),
+ INDEX idx_cluster (cluster, status)
+ ) ENGINE=INNODB;
+ """,
+ // logs have a UUID id in order to avoid coordination between nameservers
+ "logs" -> """
+ CREATE TABLE IF NOT EXISTS logs (
+ id BINARY(16) NOT NULL,
+ name VARCHAR(128) NOT NULL,
+
+ PRIMARY KEY (id),
+ UNIQUE KEY id (name)
+ ) ENGINE=INNODB;
+ """,
+ "log_entries" -> """
+ CREATE TABLE IF NOT EXISTS log_entries (
+ log_id BINARY(16) NOT NULL,
+ id INT NOT NULL,
+ command VARBINARY(1024) NOT NULL,
+ deleted BOOL NOT NULL DEFAULT FALSE,
+
+ PRIMARY KEY (log_id, id),
+ FOREIGN KEY (log_id) REFERENCES logs(id) ON DELETE CASCADE
+ ) ENGINE=INNODB;
+ """
+ )
+ val DDLS_MAP = DDLS.toMap
+
+ val log = Logger.get(getClass)
}
-
class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManagerSource {
+ import SqlShard._
private def rowToShardInfo(row: ResultSet) = {
ShardInfo(ShardId(row.getString("hostname"), row.getString("table_prefix")), row.getString("class_name"),
@@ -286,10 +313,49 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
queryEvaluator.select("SELECT * FROM shards where busy IN (1, 2, 3)")(rowToShardInfo).toList
}
+ def logCreate(id: Array[Byte], logName: String): Unit =
+ queryEvaluator.execute(
+ "INSERT INTO logs (id, name) VALUES (?, ?)",
+ id,
+ logName
+ )
+
+ def logGet(logName: String): Option[Array[Byte]] =
+ queryEvaluator.selectOne("SELECT id FROM logs WHERE name = ?", logName) { res =>
+ res.getBytes("id")
+ }
+
+ def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit = {
+ val commandBuffer = TransformOperation.serialize(entry.command)
+ queryEvaluator.execute(
+ "INSERT INTO log_entries (log_id, id, command) VALUES (?, ?, ?)",
+ logId,
+ entry.id,
+ commandBuffer
+ )
+ }
+
+ def logEntryPeek(logId: Array[Byte], count: Int): Seq[thrift.LogEntry] =
+ // select the 'last' K live entries
+ queryEvaluator.select(
+ "SELECT id, command FROM log_entries WHERE log_id = ? AND deleted = false ORDER BY log_id, id DESC LIMIT ?",
+ logId,
+ count
+ ) { row =>
+ new thrift.LogEntry(row.getInt("id"), TransformOperation.deserialize(row.getBytes("command")))
+ }
+
+ def logEntryPop(logId: Array[Byte], entryId: Int): Unit =
+ queryEvaluator.execute(
+ "UPDATE log_entries SET deleted = true WHERE log_id = ? AND id = ? LIMIT 1",
+ logId,
+ entryId
+ )
+
def prepareReload() {
try {
- List("shards", "shard_children", "forwardings", "update_counters", "hosts").foreach { table =>
- queryEvaluator.select("DESCRIBE " + table) { row => }
+ SqlShard.DDLS.foreach {
+ case (name, ddl) => queryEvaluator.select("DESCRIBE " + name) { row => }
}
} catch {
case e: SQLException =>
@@ -299,10 +365,25 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
}
def rebuildSchema() {
- queryEvaluator.execute(SqlShard.SHARDS_DDL)
- queryEvaluator.execute(SqlShard.SHARD_CHILDREN_DDL)
- queryEvaluator.execute(SqlShard.FORWARDINGS_DDL)
- queryEvaluator.execute(SqlShard.UPDATE_COUNTER_DDL)
+ log.info("Rebuilding schemas...")
+ SqlShard.DDLS.foreach {
+ case (name, ddl) =>
+ log.info("Schema: " + name)
+ queryEvaluator.execute(ddl)
+ }
+ }
+
+ def batchExecute(commands : Seq[TransformOperation]) {
+ for (cmd <- commands) {
+ cmd match {
+ case CreateShard(shardInfo) => createShard(shardInfo)
+ case DeleteShard(shardId) => deleteShard(shardId)
+ case AddLink(upId, downId, weight) => addLink(upId, downId, weight)
+ case RemoveLink(upId, downId) => removeLink(upId, downId)
+ case SetForwarding(forwarding) => setForwarding(forwarding)
+ case RemoveForwarding(forwarding) => removeForwarding(forwarding)
+ }
+ }
}
}
@@ -371,7 +452,6 @@ class SqlRemoteClusterManagerSource(queryEvaluator: QueryEvaluator) extends Remo
}
def rebuildSchema() {
- queryEvaluator.execute(SqlShard.HOSTS_DDL)
+ queryEvaluator.execute(SqlShard.DDLS_MAP("hosts"))
}
-
}
View
47 src/main/scala/com/twitter/gizzard/nameserver/TransformOperation.scala
@@ -0,0 +1,47 @@
+package com.twitter.gizzard.nameserver
+
+import com.twitter.gizzard.thrift.{TransformOperation => ThriftTransformOperation}
+import com.twitter.gizzard.thrift.TransformOperation._Fields._
+import com.twitter.gizzard.thrift.conversions.Forwarding._
+import com.twitter.gizzard.thrift.conversions.ShardId._
+import com.twitter.gizzard.thrift.conversions.ShardInfo._
+import com.twitter.gizzard.shards.{ShardId, ShardInfo}
+import com.twitter.util.CompactThriftSerializer
+
+sealed abstract class TransformOperation
+case class CreateShard(shardInfo : ShardInfo) extends TransformOperation
+case class DeleteShard(shardId : ShardId) extends TransformOperation
+case class AddLink(upId : ShardId, downId : ShardId, weight : Int) extends TransformOperation
+case class RemoveLink(upId : ShardId, downId : ShardId) extends TransformOperation
+case class SetForwarding(forwarding : Forwarding) extends TransformOperation
+case class RemoveForwarding(forwarding : Forwarding) extends TransformOperation
+case object Commit extends TransformOperation
+
+object TransformOperation {
+ def apply(thriftCommand : ThriftTransformOperation) : TransformOperation = {
+ thriftCommand.getSetField() match {
+ case CREATE_SHARD => CreateShard(thriftCommand.getCreate_shard().fromThrift)
+ case DELETE_SHARD => DeleteShard(thriftCommand.getDelete_shard().fromThrift)
+ case ADD_LINK => {
+ val addLink = thriftCommand.getAdd_link()
+ AddLink(addLink.getUp_id().fromThrift, addLink.getDown_id().fromThrift, addLink.getWeight())
+ }
+ case REMOVE_LINK => {
+ val removeLink = thriftCommand.getRemove_link()
+ RemoveLink(removeLink.getUp_id().fromThrift, removeLink.getDown_id().fromThrift)
+ }
+ case SET_FORWARDING => SetForwarding(thriftCommand.getSet_forwarding().fromThrift)
+ case REMOVE_FORWARDING => RemoveForwarding(thriftCommand.getRemove_forwarding().fromThrift)
+ case COMMIT => Commit
+ }
+ }
+
+ def serialize(thriftCommand: ThriftTransformOperation): Array[Byte] =
+ new CompactThriftSerializer().toBytes(thriftCommand)
+
+ def deserialize(buffer: Array[Byte]): ThriftTransformOperation = {
+ val thriftCommand = new ThriftTransformOperation()
+ new CompactThriftSerializer().fromBytes(thriftCommand, buffer)
+ thriftCommand
+ }
+}
View
3  src/main/scala/com/twitter/gizzard/shards/NodeSet.scala
@@ -194,6 +194,9 @@ trait NodeIterable[+T] {
def foreach[U](f: T => U) {
this foreach { (_, t) => f(t) }
}
+
+ override def toString() =
+ "<NodeIterable(%s)>".format(rootInfo)
}
class NodeSet[+T](
View
8 src/main/scala/com/twitter/gizzard/shards/RoutingNode.scala
@@ -130,4 +130,12 @@ abstract class RoutingNode[T] {
}
override def hashCode() = children.hashCode
+
+ override def toString() =
+ "<RoutingNode(%s, %s, weight=%d, childCount=%d)>".format(
+ shardType,
+ shardInfo,
+ weight,
+ children.size
+ )
}
View
23 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
@@ -1,5 +1,6 @@
package com.twitter.gizzard.thrift
+import java.nio.ByteBuffer
import java.util.{List => JList}
import scala.reflect.Manifest
import scala.collection.JavaConversions._
@@ -12,6 +13,7 @@ import com.twitter.gizzard.thrift.conversions.Forwarding._
import com.twitter.gizzard.thrift.conversions.Host._
import com.twitter.gizzard.shards._
import com.twitter.gizzard.scheduler._
+import com.twitter.gizzard.nameserver
import com.twitter.gizzard.nameserver._
import com.twitter.logging.Logger
@@ -21,6 +23,7 @@ class ManagerService(
shardManager: ShardManager,
adminJobManager: AdminJobManager,
remoteClusterManager: RemoteClusterManager,
+ rollbackLogManager: RollbackLogManager,
scheduler: PrioritizingJobScheduler)
extends Manager.Iface {
@@ -33,7 +36,9 @@ extends Manager.Iface {
}
def reload_updated_forwardings() = wrapEx {
- nameServer.reloadUpdatedForwardings() }
+ nameServer.reloadUpdatedForwardings()
+ }
+
def reload_config() = wrapEx {
nameServer.reload()
remoteClusterManager.reload()
@@ -111,6 +116,10 @@ extends Manager.Iface {
def dump_nameserver(tableIds: JList[java.lang.Integer]) = wrapEx(shardManager.dumpStructure(tableIds.toList).map(_.toThrift))
+ def batch_execute(commands : JList[TransformOperation]) {
+ wrapEx(shardManager.batchExecute(commands.map(nameserver.TransformOperation.apply)))
+ }
+
def copy_shard(shardIds: JList[ShardId]) = {
wrapEx(adminJobManager.scheduleCopyJob(shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift)))
}
@@ -143,6 +152,18 @@ extends Manager.Iface {
def add_fanout_for(priority: Int, suffix: String) = wrapEx(scheduler(priority).addFanout(suffix))
def remove_fanout_for(priority: Int, suffix: String) = wrapEx(scheduler(priority).removeFanout(suffix))
+ // Rollback log management
+
+ def log_create(log_name: String): ByteBuffer =
+ rollbackLogManager.create(log_name)
+ def log_get(log_name: String): ByteBuffer =
+ rollbackLogManager.get(log_name).orNull
+ def log_entry_push(log_id: ByteBuffer, entry: LogEntry): Unit =
+ rollbackLogManager.entryPush(log_id, entry)
+ def log_entry_peek(log_id: ByteBuffer, count: Int): JList[LogEntry] =
+ rollbackLogManager.entryPeek(log_id, count)
+ def log_entry_pop(log_id: ByteBuffer, log_entry_id: Int): Unit =
+ rollbackLogManager.entryPop(log_id, log_entry_id)
// Remote Host Cluster Management
View
44 src/main/thrift/Manager.thrift
@@ -55,6 +55,35 @@ struct Host {
4: HostStatus status
}
+struct AddLinkRequest {
+ 1: required ShardId up_id
+ 2: required ShardId down_id
+ 3: required i32 weight
+}
+
+struct RemoveLinkRequest {
+ 1: required ShardId up_id
+ 2: required ShardId down_id
+}
+
+# a 'commit' object is really just a placeholder, so we represent it here as 1 byte
+typedef bool Commit
+
+union TransformOperation {
+ 1: optional ShardInfo create_shard
+ 2: optional ShardId delete_shard
+ 3: optional AddLinkRequest add_link
+ 4: optional RemoveLinkRequest remove_link
+ 5: optional Forwarding set_forwarding
+ 6: optional Forwarding remove_forwarding
+ 7: optional Commit commit
+}
+
+struct LogEntry {
+ 1: required i32 id
+ 2: required TransformOperation command
+}
+
service Manager {
void reload_updated_forwardings() throws(1: GizzardException ex)
void reload_config() throws(1: GizzardException ex)
@@ -96,6 +125,8 @@ service Manager {
list<NameServerState> dump_nameserver(1: list<i32> table_id) throws(1: GizzardException ex)
+ void batch_execute(1: list<TransformOperation> commands) throws (1: GizzardException ex)
+
// job scheduler management
void retry_errors() throws(1: GizzardException ex)
@@ -117,6 +148,19 @@ service Manager {
void add_fanout_for(1: i32 priority, 2: string suffix) throws(1: GizzardException ex)
void remove_fanout_for(1: i32 priority, 2: string suffix) throws(1: GizzardException ex)
+ // rollback log management
+
+ // create a new log for the given name (must not already exist), and return a log_id
+ binary log_create(1: string log_name)
+ // return the log_id for the given log_name, which must exist
+ binary log_get(1: string log_name)
+ // push the given command log entry to the end of the given log
+ void log_entry_push(1: binary log_id, 2: LogEntry log_entry)
+ // peek at (but don't remove) the last entry in the log
+ list<LogEntry> log_entry_peek(1: binary log_id, 2: i32 count)
+ // pop (remove) the last entry in the log, which must match the given id
+ void log_entry_pop(1: binary log_id, 2: i32 log_entry_id)
+
// remote host cluster management
void add_remote_host(1: Host host) throws(1: GizzardException ex)
View
37 src/test/scala/com/twitter/gizzard/nameserver/RollbackLogManagerSpec.scala
@@ -0,0 +1,37 @@
+package com.twitter.gizzard.nameserver
+
+import java.nio.ByteBuffer
+import scala.collection.generic.CanBuild
+
+import com.twitter.gizzard.ConfiguredSpecification
+import com.twitter.gizzard.shards.{NodeSet, RoutingNode}
+import com.twitter.gizzard.thrift
+import org.specs.Specification
+import org.specs.mock.{ClassMocker, JMocker}
+
+
+object RollbackLogManagerSpec extends ConfiguredSpecification {
+
+ "RollbackLogManager" should {
+ "implement topK for multiple sources" in {
+ import RollbackLogManager.topK
+ val (shared, second, third) = (logEntry(), logEntry(), logEntry())
+ val inputs = Seq(
+ Seq(),
+ Seq(shared),
+ Seq(shared, second),
+ Seq(shared, third)
+ )
+ topK(inputs, 0) must beLike { case Seq() => true}
+ topK(inputs, 1) must beLike { case Seq(shared) => true }
+ topK(inputs, 2) must beLike { case Seq(shared, second) => true }
+ topK(inputs, 3) must beLike { case Seq(shared, second, third) => true }
+ topK(inputs, 4) must beLike { case Seq(shared, second, third) => true }
+ }
+ }
+
+ val IdGen = new java.util.concurrent.atomic.AtomicInteger(0)
+ def logEntry(
+ id: Int = IdGen.getAndIncrement()
+ ) = new thrift.LogEntry(id, null)
+}
View
3  src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala
@@ -17,10 +17,11 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas
val shardManager = mock[nameserver.ShardManager]
val adminJobManager = mock[AdminJobManager]
val remoteClusterManager = mock[nameserver.RemoteClusterManager]
+ val rollbackLogManager = mock[nameserver.RollbackLogManager]
val copier = mock[CopyJobFactory[AnyRef]]
val scheduler = mock[PrioritizingJobScheduler]
val subScheduler = mock[JobScheduler]
- val manager = new ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, scheduler)
+ val manager = new ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, rollbackLogManager, scheduler)
val shard = mock[RoutingNode[Nothing]]
val thriftShardInfo1 = new thrift.ShardInfo(new thrift.ShardId("hostname", "table_prefix"),
Something went wrong with that request. Please try again.