Skip to content
This repository
Browse code

merge master

  • Loading branch information...
commit 0e0b2a1217f2bcfd1859110ab24dcab0109db75f 2 parents 00a2785 + 47e5efa
authored March 26, 2012
1  project/build/GizzardProject.scala
@@ -20,6 +20,7 @@ with SubversionPublisher {
20 20
   val finagleThrift   = "com.twitter"          % "finagle-thrift"     % finagleVersion
21 21
   val finagleOstrich4 = "com.twitter"          % "finagle-ostrich4"   % finagleVersion
22 22
 
  23
+  val utilThrift      = "com.twitter"          % "util-thrift"        % "2.0.0"
23 24
   val jackson         = "org.codehaus.jackson" % "jackson-core-asl"   % "1.9.2"
24 25
   val jacksonMap      = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.2"
25 26
 
5  src/main/scala/com/twitter/gizzard/GizzardServer.scala
@@ -3,7 +3,7 @@ package com.twitter.gizzard
3 3
 import com.twitter.util.Duration
4 4
 import com.twitter.conversions.time._
5 5
 import com.twitter.logging.Logger
6  
-import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager}
  6
+import com.twitter.gizzard.nameserver.{NameServer, RemoteClusterManager, RollbackLogManager}
7 7
 import com.twitter.gizzard.scheduler._
8 8
 import com.twitter.gizzard.config.{GizzardServer => ServerConfig}
9 9
 import com.twitter.gizzard.proxy.LoggingProxy
@@ -27,6 +27,7 @@ abstract class GizzardServer(config: ServerConfig) {
27 27
   val nameServer           = config.buildNameServer()
28 28
   val remoteClusterManager = config.buildRemoteClusterManager()
29 29
   val shardManager         = nameServer.shardManager
  30
+  val rollbackLogManager   = new RollbackLogManager(nameServer.shard)
30 31
   lazy val adminJobManager = new AdminJobManager(nameServer.shardRepository, shardManager, jobScheduler(copyPriority))
31 32
 
32 33
   // job wiring
@@ -49,7 +50,7 @@ abstract class GizzardServer(config: ServerConfig) {
49 50
 
50 51
   // service wiring
51 52
 
52  
-  lazy val managerServer       = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, jobScheduler)
  53
+  lazy val managerServer       = new thrift.ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, rollbackLogManager, jobScheduler)
53 54
   lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer))
54 55
 
55 56
   lazy val jobInjectorServer       = new thrift.JobInjectorService(jobCodec, jobScheduler)
70  src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
... ...
@@ -1,18 +1,33 @@
1 1
 package com.twitter.gizzard.nameserver
2 2
 
  3
+import java.nio.ByteBuffer
3 4
 import scala.collection.mutable
4 5
 import com.twitter.gizzard.shards._
  6
+import com.twitter.gizzard.thrift
5 7
 
6 8
 
7 9
 /**
8 10
  * NameServer implementation that doesn't actually store anything anywhere.
9 11
  * Useful for tests or stubbing out the partitioning scheme.
10 12
  */
  13
+object MemoryShardManagerSource {
  14
+  class LogEntry(val logId: ByteBuffer, val id: Int, val command: thrift.TransformOperation, var deleted: Boolean) {
  15
+    override def equals(o: Any) = o match {
  16
+      case that: LogEntry if that.logId == this.logId && that.id == this.id => true
  17
+      case _ => false
  18
+    }
  19
+
  20
+    override def hashCode() = logId.hashCode() + (31 * id)
  21
+  }
  22
+}
11 23
 class MemoryShardManagerSource extends ShardManagerSource {
  24
+  import MemoryShardManagerSource._
12 25
 
13 26
   val shardTable = new mutable.ListBuffer[ShardInfo]()
14 27
   val parentTable = new mutable.ListBuffer[LinkInfo]()
15 28
   val forwardingTable = new mutable.ListBuffer[Forwarding]()
  29
+  val logs = new mutable.ListBuffer[(ByteBuffer,String)]()
  30
+  val logEntries = new mutable.ListBuffer[LogEntry]()
16 31
 
17 32
   private def find(info: ShardInfo): Option[ShardInfo] = {
18 33
     shardTable.find { x =>
@@ -148,6 +163,19 @@ class MemoryShardManagerSource extends ShardManagerSource {
148 163
     parentTable.toList
149 164
   }
150 165
 
  166
+  def batchExecute(commands : Seq[TransformOperation]) {
  167
+    for (cmd <- commands) {
  168
+      cmd match {
  169
+        case CreateShard(shardInfo) => createShard(shardInfo)
  170
+        case DeleteShard(shardId) => deleteShard(shardId)
  171
+        case AddLink(upId, downId, weight) => addLink(upId, downId, weight)
  172
+        case RemoveLink(upId, downId) => removeLink(upId, downId)
  173
+        case SetForwarding(forwarding) => setForwarding(forwarding)
  174
+        case RemoveForwarding(forwarding) => removeForwarding(forwarding)
  175
+      }
  176
+    }
  177
+  }
  178
+
151 179
   def getBusyShards(): Seq[ShardInfo] = {
152 180
     shardTable.filter { _.busy.id > 0 }.toList
153 181
   }
@@ -156,6 +184,48 @@ class MemoryShardManagerSource extends ShardManagerSource {
156 184
     forwardingTable.map(_.tableId).toSet.toSeq.sortWith((a,b) => a < b)
157 185
   }
158 186
 
  187
+  def logCreate(id: Array[Byte], logName: String): Unit = {
  188
+    val pair = (ByteBuffer.wrap(id), logName)
  189
+    if (!logs.contains(pair))
  190
+      logs += pair
  191
+    else
  192
+      throw new RuntimeException("Log already exists: " + pair)
  193
+  }
  194
+
  195
+  def logGet(logName: String): Option[Array[Byte]] =
  196
+    logs.collect {
  197
+      case (id, `logName`) => id.array
  198
+    }.headOption
  199
+
  200
+  def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit = {
  201
+    val le = new LogEntry(ByteBuffer.wrap(logId), entry.id, entry.command, false)
  202
+    if (!logEntries.contains(le))
  203
+      logEntries += le
  204
+    else
  205
+      throw new RuntimeException("Log entry already exists: " + le)
  206
+  }
  207
+
  208
+  def logEntryPeek(rawLogId: Array[Byte], count: Int): Seq[thrift.LogEntry] = {
  209
+    val logId = ByteBuffer.wrap(rawLogId)
  210
+    val peeked =
  211
+      logEntries.reverseIterator.collect {
  212
+        case e if e.logId == logId && !e.deleted => e
  213
+      }.take(count)
  214
+    peeked.map { e =>
  215
+      new thrift.LogEntry().setId(e.id).setCommand(e.command)
  216
+    }.toSeq
  217
+  }
  218
+
  219
+  def logEntryPop(rawLogId: Array[Byte], entryId: Int): Unit = {
  220
+    val logId = ByteBuffer.wrap(rawLogId)
  221
+    val entry = logEntries.collect {
  222
+      case e if e.logId == logId && e.id == entryId => e
  223
+    }.headOption.getOrElse { throw new RuntimeException(entryId + " not found for " + logId) }
  224
+    
  225
+    // side effect: mark deleted
  226
+    entry.deleted = true
  227
+  }
  228
+
159 229
   def prepareReload() { }
160 230
 }
161 231
 
103  src/main/scala/com/twitter/gizzard/nameserver/RollbackLogManager.scala
... ...
@@ -0,0 +1,103 @@
  1
+package com.twitter.gizzard.nameserver
  2
+
  3
+import java.util.UUID
  4
+import java.nio.ByteBuffer
  5
+import scala.collection.mutable
  6
+import scala.math.Ordering
  7
+
  8
+import com.twitter.logging.Logger
  9
+import com.twitter.gizzard.shards.RoutingNode
  10
+import com.twitter.gizzard.thrift
  11
+
  12
+
  13
+class RollbackLogManager(shard: RoutingNode[ShardManagerSource]) {
  14
+  import RollbackLogManager._
  15
+
  16
+  def create(logName: String): ByteBuffer = {
  17
+    val idArray = new Array[Byte](UUID_BYTES)
  18
+    val id = ByteBuffer.wrap(idArray)
  19
+    val uuid = UUID.randomUUID
  20
+    id.asLongBuffer
  21
+      .put(uuid.getMostSignificantBits)
  22
+      .put(uuid.getLeastSignificantBits)
  23
+    shard.write.foreach(_.logCreate(idArray, logName))
  24
+    log.info("Created new rollback log for name '%s' with id %s", logName, uuid)
  25
+    id
  26
+  }
  27
+
  28
+  def get(logName: String): Option[ByteBuffer] =
  29
+    shard.read.any(_.logGet(logName).map(ByteBuffer.wrap))
  30
+
  31
+  def entryPush(logId: ByteBuffer, entry: thrift.LogEntry): Unit =
  32
+    shard.write.foreach(_.logEntryPush(unwrap(logId), entry))
  33
+
  34
+  /**
  35
+   * @return the entries with the highest ids on all replicas: note that this means
  36
+   * that logs cannot be rolled back while any nameserver replicas are unavailable.
  37
+   */
  38
+  def entryPeek(logId: ByteBuffer, count: Int): Seq[thrift.LogEntry] = {
  39
+    // collect k from each shard
  40
+    val descendingEntryLists = shard.read.map(_.logEntryPeek(unwrap(logId), count))
  41
+    // and take the top k
  42
+    RollbackLogManager.topK(descendingEntryLists, count)
  43
+  }
  44
+
  45
+  def entryPop(logId: ByteBuffer, entryId: Int): Unit =
  46
+    shard.write.foreach(_.logEntryPop(unwrap(logId), entryId))
  47
+
  48
+  private def unwrap(bb: ByteBuffer): Array[Byte] = {
  49
+    if (bb.hasArray && bb.remaining == bb.capacity) {
  50
+      bb.array
  51
+    } else {
  52
+      val arr = new Array[Byte](bb.remaining)
  53
+      bb.duplicate.get(arr)
  54
+      arr
  55
+    }
  56
+  }
  57
+}
  58
+
  59
+object RollbackLogManager {
  60
+  private val log = Logger.get(getClass.getName)
  61
+
  62
+  val UUID_BYTES = 16
  63
+
  64
+  /**
  65
+   * Calculates the top-k entries by id.
  66
+   * TODO: this is not the hot path, but we should eventually do a real linear merge.
  67
+   */
  68
+  def topK(
  69
+    descendingEntryLists: Iterable[Seq[thrift.LogEntry]],
  70
+    count: Int
  71
+  ): Seq[thrift.LogEntry] = count match {
  72
+    case 1 =>
  73
+      val heads = descendingEntryLists.flatMap(_.lastOption)
  74
+      // no optional form of max
  75
+      if (!heads.isEmpty)
  76
+        Seq(heads.max(EntryOrdering))
  77
+      else
  78
+        Nil
  79
+    case _ =>
  80
+      val q = new mutable.PriorityQueue[thrift.LogEntry]()(EntryOrdering)
  81
+      val result = mutable.Buffer[thrift.LogEntry]()
  82
+      descendingEntryLists.foreach { q ++= _ }
  83
+      val iterator = q.iterator
  84
+      // take the first k non-equal entries
  85
+      var gathered = 0
  86
+      var lastId = Int.MinValue
  87
+      while (gathered < count && iterator.hasNext) {
  88
+        val entry = iterator.next
  89
+        if (entry.id != lastId) {
  90
+          result += entry
  91
+          lastId = entry.id
  92
+          gathered += 1
  93
+        }
  94
+      }
  95
+      result
  96
+  }
  97
+
  98
+  // TODO: Scala 2.8.1 doesn't have maxBy
  99
+  object EntryOrdering extends Ordering[thrift.LogEntry] {
  100
+    override def compare(a: thrift.LogEntry, b: thrift.LogEntry) =
  101
+      Ordering.Int.compare(a.id, b.id)
  102
+  }
  103
+}
15  src/main/scala/com/twitter/gizzard/nameserver/ShardManager.scala
@@ -3,6 +3,7 @@ package com.twitter.gizzard.nameserver
3 3
 import scala.collection.mutable
4 4
 import com.twitter.gizzard.shards._
5 5
 import com.twitter.gizzard.util.TreeUtils
  6
+import com.twitter.gizzard.thrift
6 7
 
7 8
 
8 9
 class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository) {
@@ -11,6 +12,8 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
11 12
   def diffState(lastUpdatedSeq: Long)   = shard.read.any(_.diffState(lastUpdatedSeq))
12 13
   def dumpStructure(tableIds: Seq[Int]) = shard.read.any(_.dumpStructure(tableIds))
13 14
 
  15
+  def batchExecute(commands : Seq[TransformOperation]) { shard.write.foreach(_.batchExecute(commands)) }
  16
+
14 17
   @throws(classOf[ShardException])
15 18
   def createAndMaterializeShard(shardInfo: ShardInfo) {
16 19
     shard.write.foreach(_.createShard(shardInfo))
@@ -27,7 +30,6 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
27 30
   def listShards()                        = shard.read.any(_.listShards())
28 31
   def getBusyShards()                     = shard.read.any(_.getBusyShards())
29 32
 
30  
-
31 33
   def addLink(upId: ShardId, downId: ShardId, weight: Int) { shard.write.foreach(_.addLink(upId, downId, weight)) }
32 34
   def removeLink(upId: ShardId, downId: ShardId)           { shard.write.foreach(_.removeLink(upId, downId)) }
33 35
 
@@ -65,6 +67,8 @@ trait ShardManagerSource {
65 67
     tableIds.map(extractor)
66 68
   }
67 69
 
  70
+  @throws(classOf[ShardException]) def batchExecute(commands : Seq[TransformOperation])
  71
+
68 72
   @throws(classOf[ShardException]) def createShard(shardInfo: ShardInfo)
69 73
   @throws(classOf[ShardException]) def deleteShard(id: ShardId)
70 74
   @throws(classOf[ShardException]) def markShardBusy(id: ShardId, busy: Busy.Value)
@@ -93,4 +97,13 @@ trait ShardManagerSource {
93 97
 
94 98
   @throws(classOf[ShardException]) def listHostnames(): Seq[String]
95 99
   @throws(classOf[ShardException]) def listTables(): Seq[Int]
  100
+
  101
+  @throws(classOf[ShardException]) def logCreate(id: Array[Byte], logName: String): Unit
  102
+  @throws(classOf[ShardException]) def logGet(logName: String): Option[Array[Byte]]
  103
+  @throws(classOf[ShardException]) def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit
  104
+  @throws(classOf[ShardException]) def logEntryPeek(logId: Array[Byte], count: Int): Seq[thrift.LogEntry]
  105
+  @throws(classOf[ShardException]) def logEntryPop(logId: Array[Byte], entryId: Int): Unit
  106
+
  107
+  /** For JMocker. TODO: switch to a better mocking framework */
  108
+  override def toString() = "<%s>".format(this.getClass)
96 109
 }
224  src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
... ...
@@ -1,80 +1,107 @@
1 1
 package com.twitter.gizzard.nameserver
2 2
 
  3
+import java.nio.ByteBuffer
3 4
 import java.sql.{ResultSet, SQLException, SQLIntegrityConstraintViolationException}
4 5
 import scala.collection.mutable
5 6
 import com.twitter.querulous.evaluator.QueryEvaluator
6 7
 import com.twitter.gizzard.util.TreeUtils
7 8
 import com.twitter.gizzard.shards._
  9
+import com.twitter.gizzard.thrift
  10
+import com.twitter.logging.Logger
8 11
 
9 12
 
10 13
 object SqlShard {
11  
-  val SHARDS_DDL = """
12  
-CREATE TABLE IF NOT EXISTS shards (
13  
-    class_name              VARCHAR(125) NOT NULL,
14  
-    hostname                VARCHAR(125) NOT NULL,
15  
-    table_prefix            VARCHAR(125) NOT NULL,
16  
-    source_type             VARCHAR(125),
17  
-    destination_type        VARCHAR(125),
18  
-    busy                    TINYINT      NOT NULL DEFAULT 0,
19  
-
20  
-   PRIMARY KEY (hostname, table_prefix),
21  
-   INDEX idx_busy (busy)
22  
-) ENGINE=INNODB
23  
-"""
24  
-
25  
-  val SHARD_CHILDREN_DDL = """
26  
-CREATE TABLE IF NOT EXISTS shard_children (
27  
-    parent_hostname         VARCHAR(125) NOT NULL,
28  
-    parent_table_prefix     VARCHAR(125) NOT NULL,
29  
-    child_hostname          VARCHAR(125) NOT NULL,
30  
-    child_table_prefix      VARCHAR(125) NOT NULL,
31  
-    weight                  INT          NOT NULL DEFAULT 1,
32  
-
33  
-    PRIMARY KEY (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
34  
-    INDEX idx_parent (parent_hostname, parent_table_prefix, weight),
35  
-    INDEX idx_child (child_hostname, child_table_prefix, weight)
36  
-) ENGINE=INNODB
37  
-"""
38  
-
39  
-  val FORWARDINGS_DDL = """
40  
-CREATE TABLE IF NOT EXISTS forwardings (
41  
-    table_id                INT                     NOT NULL,
42  
-    base_source_id          BIGINT                  NOT NULL,
43  
-    shard_hostname          VARCHAR(125)            NOT NULL,
44  
-    shard_table_prefix      VARCHAR(125)            NOT NULL,
45  
-    deleted                 TINYINT                 NOT NULL DEFAULT 0,
46  
-    updated_seq             BIGINT                  NOT NULL,
47  
-
48  
-    PRIMARY KEY (table_id, base_source_id),
49  
-    UNIQUE uni_shard (shard_hostname, shard_table_prefix),
50  
-    INDEX  idx_updated  (updated_seq)
51  
-) ENGINE=INNODB;
52  
-"""
53  
-
54  
-  val UPDATE_COUNTER_DDL = """
55  
-CREATE TABLE IF NOT EXISTS update_counters (
56  
-    id                      VARCHAR(25) NOT NULL,
57  
-    counter                 BIGINT      NOT NULL DEFAULT 0,
58  
-
59  
-    PRIMARY KEY (id)
60  
-) ENGINE=INNODB;
61  
-"""
62  
-
63  
-  val HOSTS_DDL = """
64  
-CREATE TABLE IF NOT EXISTS hosts (
65  
-    hostname                VARCHAR(125) NOT NULL,
66  
-    port                    INT          NOT NULL,
67  
-    cluster                 VARCHAR(125) NOT NULL,
68  
-    status                  INT          NOT NULL DEFAULT 0,
69  
-
70  
-    PRIMARY KEY (hostname, port),
71  
-    INDEX idx_cluster (cluster, status)
72  
-) ENGINE=INNODB;
73  
-"""
  14
+  // ordered (name -> ddl), and a map for convenience
  15
+  val DDLS =
  16
+    Seq(
  17
+    "shards" -> """
  18
+      CREATE TABLE IF NOT EXISTS shards (
  19
+          class_name              VARCHAR(125) NOT NULL,
  20
+          hostname                VARCHAR(125) NOT NULL,
  21
+          table_prefix            VARCHAR(125) NOT NULL,
  22
+          source_type             VARCHAR(125),
  23
+          destination_type        VARCHAR(125),
  24
+          busy                    TINYINT      NOT NULL DEFAULT 0,
  25
+
  26
+        PRIMARY KEY (hostname, table_prefix),
  27
+        INDEX idx_busy (busy)
  28
+      ) ENGINE=INNODB
  29
+      """,
  30
+    "shard_children" -> """
  31
+      CREATE TABLE IF NOT EXISTS shard_children (
  32
+          parent_hostname         VARCHAR(125) NOT NULL,
  33
+          parent_table_prefix     VARCHAR(125) NOT NULL,
  34
+          child_hostname          VARCHAR(125) NOT NULL,
  35
+          child_table_prefix      VARCHAR(125) NOT NULL,
  36
+          weight                  INT          NOT NULL DEFAULT 1,
  37
+
  38
+          PRIMARY KEY (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
  39
+          INDEX idx_parent (parent_hostname, parent_table_prefix, weight),
  40
+          INDEX idx_child (child_hostname, child_table_prefix, weight)
  41
+      ) ENGINE=INNODB
  42
+      """,
  43
+    "forwardings" -> """
  44
+      CREATE TABLE IF NOT EXISTS forwardings (
  45
+          table_id                INT                     NOT NULL,
  46
+          base_source_id          BIGINT                  NOT NULL,
  47
+          shard_hostname          VARCHAR(125)            NOT NULL,
  48
+          shard_table_prefix      VARCHAR(125)            NOT NULL,
  49
+          deleted                 TINYINT                 NOT NULL DEFAULT 0,
  50
+          updated_seq             BIGINT                  NOT NULL,
  51
+
  52
+          PRIMARY KEY (table_id, base_source_id),
  53
+          UNIQUE uni_shard (shard_hostname, shard_table_prefix),
  54
+          INDEX  idx_updated  (updated_seq)
  55
+      ) ENGINE=INNODB;
  56
+      """,
  57
+    "update_counters" -> """
  58
+      CREATE TABLE IF NOT EXISTS update_counters (
  59
+          id                      VARCHAR(25) NOT NULL,
  60
+          counter                 BIGINT      NOT NULL DEFAULT 0,
  61
+
  62
+          PRIMARY KEY (id)
  63
+      ) ENGINE=INNODB;
  64
+      """,
  65
+    "hosts" -> """
  66
+      CREATE TABLE IF NOT EXISTS hosts (
  67
+          hostname                VARCHAR(125) NOT NULL,
  68
+          port                    INT          NOT NULL,
  69
+          cluster                 VARCHAR(125) NOT NULL,
  70
+          status                  INT          NOT NULL DEFAULT 0,
  71
+
  72
+          PRIMARY KEY (hostname, port),
  73
+          INDEX idx_cluster (cluster, status)
  74
+      ) ENGINE=INNODB;
  75
+      """,
  76
+    // logs have a UUID id in order to avoid coordination between nameservers
  77
+    "logs" -> """
  78
+      CREATE TABLE IF NOT EXISTS logs (
  79
+          id                        BINARY(16)   NOT NULL,
  80
+          name                      VARCHAR(128) NOT NULL,
  81
+
  82
+          PRIMARY KEY (id),
  83
+          UNIQUE KEY id (name)
  84
+      ) ENGINE=INNODB;
  85
+      """,
  86
+    "log_entries" -> """
  87
+      CREATE TABLE IF NOT EXISTS log_entries (
  88
+          log_id               BINARY(16)      NOT NULL,
  89
+          id                   INT             NOT NULL,
  90
+          command              VARBINARY(1024) NOT NULL,
  91
+          deleted              BOOL            NOT NULL DEFAULT FALSE,
  92
+
  93
+          PRIMARY KEY (log_id, id),
  94
+          FOREIGN KEY (log_id) REFERENCES logs(id) ON DELETE CASCADE
  95
+      ) ENGINE=INNODB;
  96
+      """
  97
+    )
  98
+  val DDLS_MAP = DDLS.toMap
  99
+
  100
+  val log = Logger.get(getClass)
74 101
 }
75 102
 
76  
-
77 103
 class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManagerSource {
  104
+  import SqlShard._
78 105
 
79 106
   private def rowToShardInfo(row: ResultSet) = {
80 107
     ShardInfo(ShardId(row.getString("hostname"), row.getString("table_prefix")), row.getString("class_name"),
@@ -286,10 +313,49 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
286 313
     queryEvaluator.select("SELECT * FROM shards where busy IN (1, 2, 3)")(rowToShardInfo).toList
287 314
   }
288 315
 
  316
+  def logCreate(id: Array[Byte], logName: String): Unit =
  317
+    queryEvaluator.execute(
  318
+      "INSERT INTO logs (id, name) VALUES (?, ?)",
  319
+      id,
  320
+      logName
  321
+    )
  322
+
  323
+  def logGet(logName: String): Option[Array[Byte]] =
  324
+    queryEvaluator.selectOne("SELECT id FROM logs WHERE name = ?", logName) { res =>
  325
+      res.getBytes("id")
  326
+    }
  327
+
  328
+  def logEntryPush(logId: Array[Byte], entry: thrift.LogEntry): Unit = {
  329
+    val commandBuffer = TransformOperation.serialize(entry.command)
  330
+    queryEvaluator.execute(
  331
+      "INSERT INTO log_entries (log_id, id, command) VALUES (?, ?, ?)",
  332
+      logId,
  333
+      entry.id,
  334
+      commandBuffer
  335
+    )
  336
+  }
  337
+
  338
+  def logEntryPeek(logId: Array[Byte], count: Int): Seq[thrift.LogEntry] =
  339
+    // select the 'last' K live entries
  340
+    queryEvaluator.select(
  341
+      "SELECT id, command FROM log_entries WHERE log_id = ? AND deleted = false ORDER BY log_id, id DESC LIMIT ?",
  342
+      logId,
  343
+      count
  344
+    ) { row =>
  345
+      new thrift.LogEntry(row.getInt("id"), TransformOperation.deserialize(row.getBytes("command")))
  346
+    }
  347
+
  348
+  def logEntryPop(logId: Array[Byte], entryId: Int): Unit =
  349
+    queryEvaluator.execute(
  350
+      "UPDATE log_entries SET deleted = true WHERE log_id = ? AND id = ? LIMIT 1",
  351
+      logId,
  352
+      entryId
  353
+    )
  354
+
289 355
   def prepareReload() {
290 356
     try {
291  
-      List("shards", "shard_children", "forwardings", "update_counters", "hosts").foreach { table =>
292  
-        queryEvaluator.select("DESCRIBE " + table) { row => }
  357
+      SqlShard.DDLS.foreach {
  358
+        case (name, ddl) => queryEvaluator.select("DESCRIBE " + name) { row => }
293 359
       }
294 360
     } catch {
295 361
       case e: SQLException =>
@@ -299,10 +365,25 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
299 365
   }
300 366
 
301 367
   def rebuildSchema() {
302  
-    queryEvaluator.execute(SqlShard.SHARDS_DDL)
303  
-    queryEvaluator.execute(SqlShard.SHARD_CHILDREN_DDL)
304  
-    queryEvaluator.execute(SqlShard.FORWARDINGS_DDL)
305  
-    queryEvaluator.execute(SqlShard.UPDATE_COUNTER_DDL)
  368
+    log.info("Rebuilding schemas...")
  369
+    SqlShard.DDLS.foreach {
  370
+      case (name, ddl) =>
  371
+        log.info("Schema: " + name)
  372
+        queryEvaluator.execute(ddl)
  373
+    }
  374
+  }
  375
+
  376
+  def batchExecute(commands : Seq[TransformOperation]) {
  377
+    for (cmd <- commands) {
  378
+      cmd match {
  379
+        case CreateShard(shardInfo) => createShard(shardInfo)
  380
+        case DeleteShard(shardId) => deleteShard(shardId)
  381
+        case AddLink(upId, downId, weight) => addLink(upId, downId, weight)
  382
+        case RemoveLink(upId, downId) => removeLink(upId, downId)
  383
+        case SetForwarding(forwarding) => setForwarding(forwarding)
  384
+        case RemoveForwarding(forwarding) => removeForwarding(forwarding)
  385
+      }
  386
+    }
306 387
   }
307 388
 }
308 389
 
@@ -371,7 +452,6 @@ class SqlRemoteClusterManagerSource(queryEvaluator: QueryEvaluator) extends Remo
371 452
   }
372 453
 
373 454
   def rebuildSchema() {
374  
-    queryEvaluator.execute(SqlShard.HOSTS_DDL)
  455
+    queryEvaluator.execute(SqlShard.DDLS_MAP("hosts"))
375 456
   }
376  
-
377 457
 }
47  src/main/scala/com/twitter/gizzard/nameserver/TransformOperation.scala
... ...
@@ -0,0 +1,47 @@
  1
+package com.twitter.gizzard.nameserver
  2
+
  3
+import com.twitter.gizzard.thrift.{TransformOperation => ThriftTransformOperation}
  4
+import com.twitter.gizzard.thrift.TransformOperation._Fields._
  5
+import com.twitter.gizzard.thrift.conversions.Forwarding._
  6
+import com.twitter.gizzard.thrift.conversions.ShardId._
  7
+import com.twitter.gizzard.thrift.conversions.ShardInfo._
  8
+import com.twitter.gizzard.shards.{ShardId, ShardInfo}
  9
+import com.twitter.util.CompactThriftSerializer
  10
+
  11
+sealed abstract class TransformOperation
  12
+case class CreateShard(shardInfo : ShardInfo) extends TransformOperation
  13
+case class DeleteShard(shardId : ShardId) extends TransformOperation
  14
+case class AddLink(upId : ShardId, downId : ShardId, weight : Int) extends TransformOperation
  15
+case class RemoveLink(upId : ShardId, downId : ShardId) extends TransformOperation
  16
+case class SetForwarding(forwarding : Forwarding) extends TransformOperation
  17
+case class RemoveForwarding(forwarding : Forwarding) extends TransformOperation
  18
+case object Commit extends TransformOperation
  19
+
  20
+object TransformOperation {
  21
+  def apply(thriftCommand : ThriftTransformOperation) : TransformOperation = {
  22
+    thriftCommand.getSetField() match {
  23
+      case CREATE_SHARD => CreateShard(thriftCommand.getCreate_shard().fromThrift)
  24
+      case DELETE_SHARD => DeleteShard(thriftCommand.getDelete_shard().fromThrift)
  25
+      case ADD_LINK => {
  26
+        val addLink = thriftCommand.getAdd_link()
  27
+        AddLink(addLink.getUp_id().fromThrift, addLink.getDown_id().fromThrift, addLink.getWeight())
  28
+      }
  29
+      case REMOVE_LINK => {
  30
+        val removeLink = thriftCommand.getRemove_link()
  31
+        RemoveLink(removeLink.getUp_id().fromThrift, removeLink.getDown_id().fromThrift)
  32
+      }
  33
+      case SET_FORWARDING => SetForwarding(thriftCommand.getSet_forwarding().fromThrift)
  34
+      case REMOVE_FORWARDING => RemoveForwarding(thriftCommand.getRemove_forwarding().fromThrift)
  35
+      case COMMIT => Commit
  36
+    }
  37
+  }
  38
+
  39
+  def serialize(thriftCommand: ThriftTransformOperation): Array[Byte] =
  40
+    new CompactThriftSerializer().toBytes(thriftCommand)
  41
+
  42
+  def deserialize(buffer: Array[Byte]): ThriftTransformOperation = {
  43
+    val thriftCommand = new ThriftTransformOperation()
  44
+    new CompactThriftSerializer().fromBytes(thriftCommand, buffer)
  45
+    thriftCommand
  46
+  }
  47
+}
3  src/main/scala/com/twitter/gizzard/shards/NodeSet.scala
@@ -194,6 +194,9 @@ trait NodeIterable[+T] {
194 194
   def foreach[U](f: T => U) {
195 195
     this foreach { (_, t) => f(t) }
196 196
   }
  197
+
  198
+  override def toString() =
  199
+    "<NodeIterable(%s)>".format(rootInfo)
197 200
 }
198 201
 
199 202
 class NodeSet[+T](
8  src/main/scala/com/twitter/gizzard/shards/RoutingNode.scala
@@ -130,4 +130,12 @@ abstract class RoutingNode[T] {
130 130
   }
131 131
 
132 132
   override def hashCode() = children.hashCode
  133
+
  134
+  override def toString() =
  135
+    "<RoutingNode(%s, %s, weight=%d, childCount=%d)>".format(
  136
+      shardType,
  137
+      shardInfo,
  138
+      weight,
  139
+      children.size
  140
+    )
133 141
 }
23  src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
... ...
@@ -1,5 +1,6 @@
1 1
 package com.twitter.gizzard.thrift
2 2
 
  3
+import java.nio.ByteBuffer
3 4
 import java.util.{List => JList}
4 5
 import scala.reflect.Manifest
5 6
 import scala.collection.JavaConversions._
@@ -12,6 +13,7 @@ import com.twitter.gizzard.thrift.conversions.Forwarding._
12 13
 import com.twitter.gizzard.thrift.conversions.Host._
13 14
 import com.twitter.gizzard.shards._
14 15
 import com.twitter.gizzard.scheduler._
  16
+import com.twitter.gizzard.nameserver
15 17
 import com.twitter.gizzard.nameserver._
16 18
 import com.twitter.logging.Logger
17 19
 
@@ -21,6 +23,7 @@ class ManagerService(
21 23
   shardManager: ShardManager,
22 24
   adminJobManager: AdminJobManager,
23 25
   remoteClusterManager: RemoteClusterManager,
  26
+  rollbackLogManager: RollbackLogManager,
24 27
   scheduler: PrioritizingJobScheduler)
25 28
 extends Manager.Iface {
26 29
 
@@ -33,7 +36,9 @@ extends Manager.Iface {
33 36
   }
34 37
 
35 38
   def reload_updated_forwardings() = wrapEx {
36  
-    nameServer.reloadUpdatedForwardings()  }
  39
+    nameServer.reloadUpdatedForwardings()
  40
+  }
  41
+
37 42
   def reload_config() = wrapEx {
38 43
     nameServer.reload()
39 44
     remoteClusterManager.reload()
@@ -111,6 +116,10 @@ extends Manager.Iface {
111 116
 
112 117
   def dump_nameserver(tableIds: JList[java.lang.Integer]) = wrapEx(shardManager.dumpStructure(tableIds.toList).map(_.toThrift))
113 118
 
  119
+  def batch_execute(commands : JList[TransformOperation]) {
  120
+    wrapEx(shardManager.batchExecute(commands.map(nameserver.TransformOperation.apply)))
  121
+  }
  122
+
114 123
   def copy_shard(shardIds: JList[ShardId]) = {
115 124
     wrapEx(adminJobManager.scheduleCopyJob(shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift)))
116 125
   }
@@ -143,6 +152,18 @@ extends Manager.Iface {
143 152
   def add_fanout_for(priority: Int, suffix: String)        = wrapEx(scheduler(priority).addFanout(suffix))
144 153
   def remove_fanout_for(priority: Int, suffix: String)     = wrapEx(scheduler(priority).removeFanout(suffix))
145 154
 
  155
+  // Rollback log management
  156
+
  157
+  def log_create(log_name: String): ByteBuffer =
  158
+    rollbackLogManager.create(log_name)
  159
+  def log_get(log_name: String): ByteBuffer =
  160
+    rollbackLogManager.get(log_name).orNull
  161
+  def log_entry_push(log_id: ByteBuffer, entry: LogEntry): Unit =
  162
+    rollbackLogManager.entryPush(log_id, entry)
  163
+  def log_entry_peek(log_id: ByteBuffer, count: Int): JList[LogEntry] =
  164
+    rollbackLogManager.entryPeek(log_id, count)
  165
+  def log_entry_pop(log_id: ByteBuffer, log_entry_id: Int): Unit =
  166
+    rollbackLogManager.entryPop(log_id, log_entry_id)
146 167
 
147 168
   // Remote Host Cluster Management
148 169
 
44  src/main/thrift/Manager.thrift
@@ -55,6 +55,35 @@ struct Host {
55 55
   4: HostStatus status
56 56
 }
57 57
 
  58
+struct AddLinkRequest {
  59
+  1: required ShardId up_id
  60
+  2: required ShardId down_id
  61
+  3: required i32 weight
  62
+}
  63
+
  64
+struct RemoveLinkRequest {
  65
+  1: required ShardId up_id
  66
+  2: required ShardId down_id
  67
+}
  68
+
  69
+# a 'commit' object is really just a placeholder, so we represent it here as 1 byte
  70
+typedef bool Commit
  71
+
  72
+union TransformOperation {
  73
+  1: optional ShardInfo create_shard
  74
+  2: optional ShardId delete_shard
  75
+  3: optional AddLinkRequest add_link
  76
+  4: optional RemoveLinkRequest remove_link
  77
+  5: optional Forwarding set_forwarding
  78
+  6: optional Forwarding remove_forwarding
  79
+  7: optional Commit commit
  80
+}
  81
+
  82
+struct LogEntry {
  83
+  1: required i32 id
  84
+  2: required TransformOperation command
  85
+}
  86
+
58 87
 service Manager {
59 88
   void reload_updated_forwardings() throws(1: GizzardException ex)
60 89
   void reload_config() throws(1: GizzardException ex)
@@ -96,6 +125,8 @@ service Manager {
96 125
 
97 126
   list<NameServerState> dump_nameserver(1: list<i32> table_id) throws(1: GizzardException ex)
98 127
 
  128
+  void batch_execute(1: list<TransformOperation> commands) throws (1: GizzardException ex)
  129
+
99 130
   // job scheduler management
100 131
 
101 132
   void retry_errors() throws(1: GizzardException ex)
@@ -117,6 +148,19 @@ service Manager {
117 148
   void add_fanout_for(1: i32 priority, 2: string suffix) throws(1: GizzardException ex)
118 149
   void remove_fanout_for(1: i32 priority, 2: string suffix) throws(1: GizzardException ex)
119 150
 
  151
+  // rollback log management
  152
+
  153
+  // create a new log for the given name (must not already exist), and return a log_id
  154
+  binary log_create(1: string log_name)
  155
+  // return the log_id for the given log_name, which must exist
  156
+  binary log_get(1: string log_name)
  157
+  // push the given command log entry to the end of the given log
  158
+  void log_entry_push(1: binary log_id, 2: LogEntry log_entry)
  159
+  // peek at (but don't remove) the last entry in the log
  160
+  list<LogEntry> log_entry_peek(1: binary log_id, 2: i32 count)
  161
+  // pop (remove) the last entry in the log, which must match the given id
  162
+  void log_entry_pop(1: binary log_id, 2: i32 log_entry_id)
  163
+
120 164
   // remote host cluster management
121 165
 
122 166
   void add_remote_host(1: Host host) throws(1: GizzardException ex)
37  src/test/scala/com/twitter/gizzard/nameserver/RollbackLogManagerSpec.scala
... ...
@@ -0,0 +1,37 @@
  1
+package com.twitter.gizzard.nameserver
  2
+
  3
+import java.nio.ByteBuffer
  4
+import scala.collection.generic.CanBuild
  5
+
  6
+import com.twitter.gizzard.ConfiguredSpecification
  7
+import com.twitter.gizzard.shards.{NodeSet, RoutingNode}
  8
+import com.twitter.gizzard.thrift
  9
+import org.specs.Specification
  10
+import org.specs.mock.{ClassMocker, JMocker}
  11
+
  12
+
  13
+object RollbackLogManagerSpec extends ConfiguredSpecification {
  14
+
  15
+  "RollbackLogManager" should {
  16
+    "implement topK for multiple sources" in {
  17
+      import RollbackLogManager.topK
  18
+      val (shared, second, third) = (logEntry(), logEntry(), logEntry())
  19
+      val inputs = Seq(
  20
+        Seq(),
  21
+        Seq(shared),
  22
+        Seq(shared, second),
  23
+        Seq(shared, third)
  24
+      )
  25
+      topK(inputs, 0) must beLike { case Seq() => true}
  26
+      topK(inputs, 1) must beLike { case Seq(shared) => true }
  27
+      topK(inputs, 2) must beLike { case Seq(shared, second) => true }
  28
+      topK(inputs, 3) must beLike { case Seq(shared, second, third) => true }
  29
+      topK(inputs, 4) must beLike { case Seq(shared, second, third) => true }
  30
+    }
  31
+  }
  32
+  
  33
+  val IdGen = new java.util.concurrent.atomic.AtomicInteger(0)
  34
+  def logEntry(
  35
+    id: Int = IdGen.getAndIncrement()
  36
+  ) = new thrift.LogEntry(id, null)
  37
+}
3  src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala
@@ -17,10 +17,11 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas
17 17
   val shardManager         = mock[nameserver.ShardManager]
18 18
   val adminJobManager      = mock[AdminJobManager]
19 19
   val remoteClusterManager = mock[nameserver.RemoteClusterManager]
  20
+  val rollbackLogManager = mock[nameserver.RollbackLogManager]
20 21
   val copier               = mock[CopyJobFactory[AnyRef]]
21 22
   val scheduler            = mock[PrioritizingJobScheduler]
22 23
   val subScheduler         = mock[JobScheduler]
23  
-  val manager              = new ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, scheduler)
  24
+  val manager              = new ManagerService(nameServer, shardManager, adminJobManager, remoteClusterManager, rollbackLogManager, scheduler)
24 25
 
25 26
   val shard = mock[RoutingNode[Nothing]]
26 27
   val thriftShardInfo1 = new thrift.ShardInfo(new thrift.ShardId("hostname", "table_prefix"),

0 notes on commit 0e0b2a1

Please sign in to comment.
Something went wrong with that request. Please try again.