Permalink
Browse files

Add the batch mutation method to execute TransformOperations

  • Loading branch information...
1 parent 18dc783 commit 0e28edc1bfc577abd1631937e7ffa7d680fec6d3 @stuhood stuhood committed Mar 25, 2012
View
13 src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
@@ -163,6 +163,19 @@ class MemoryShardManagerSource extends ShardManagerSource {
parentTable.toList
}
+ def batchExecute(commands : Seq[TransformOperation]) {
+ for (cmd <- commands) {
+ cmd match {
+ case CreateShard(shardInfo) => createShard(shardInfo)
+ case DeleteShard(shardId) => deleteShard(shardId)
+ case AddLink(upId, downId, weight) => addLink(upId, downId, weight)
+ case RemoveLink(upId, downId) => removeLink(upId, downId)
+ case SetForwarding(forwarding) => setForwarding(forwarding)
+ case RemoveForwarding(forwarding) => removeForwarding(forwarding)
+ }
+ }
+ }
+
def getBusyShards(): Seq[ShardInfo] = {
shardTable.filter { _.busy.id > 0 }.toList
}
View
5 src/main/scala/com/twitter/gizzard/nameserver/ShardManager.scala
@@ -12,6 +12,8 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def diffState(lastUpdatedSeq: Long) = shard.read.any(_.diffState(lastUpdatedSeq))
def dumpStructure(tableIds: Seq[Int]) = shard.read.any(_.dumpStructure(tableIds))
+ def batchExecute(commands : Seq[TransformOperation]) { shard.write.foreach(_.batchExecute(commands)) }
+
@throws(classOf[ShardException])
def createAndMaterializeShard(shardInfo: ShardInfo) {
shard.write.foreach(_.createShard(shardInfo))
@@ -28,7 +30,6 @@ class ShardManager(shard: RoutingNode[ShardManagerSource], repo: ShardRepository
def listShards() = shard.read.any(_.listShards())
def getBusyShards() = shard.read.any(_.getBusyShards())
-
def addLink(upId: ShardId, downId: ShardId, weight: Int) { shard.write.foreach(_.addLink(upId, downId, weight)) }
def removeLink(upId: ShardId, downId: ShardId) { shard.write.foreach(_.removeLink(upId, downId)) }
@@ -66,6 +67,8 @@ trait ShardManagerSource {
tableIds.map(extractor)
}
+ @throws(classOf[ShardException]) def batchExecute(commands : Seq[TransformOperation])
+
@throws(classOf[ShardException]) def createShard(shardInfo: ShardInfo)
@throws(classOf[ShardException]) def deleteShard(id: ShardId)
@throws(classOf[ShardException]) def markShardBusy(id: ShardId, busy: Busy.Value)
View
13 src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
@@ -372,6 +372,19 @@ class SqlShardManagerSource(queryEvaluator: QueryEvaluator) extends ShardManager
queryEvaluator.execute(ddl)
}
}
+
+ def batchExecute(commands : Seq[TransformOperation]) {
+ for (cmd <- commands) {
+ cmd match {
+ case CreateShard(shardInfo) => createShard(shardInfo)
+ case DeleteShard(shardId) => deleteShard(shardId)
+ case AddLink(upId, downId, weight) => addLink(upId, downId, weight)
+ case RemoveLink(upId, downId) => removeLink(upId, downId)
+ case SetForwarding(forwarding) => setForwarding(forwarding)
+ case RemoveForwarding(forwarding) => removeForwarding(forwarding)
+ }
+ }
+ }
}
class SqlRemoteClusterManagerSource(queryEvaluator: QueryEvaluator) extends RemoteClusterManagerSource {
View
5 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
@@ -13,6 +13,7 @@ import com.twitter.gizzard.thrift.conversions.Forwarding._
import com.twitter.gizzard.thrift.conversions.Host._
import com.twitter.gizzard.shards._
import com.twitter.gizzard.scheduler._
+import com.twitter.gizzard.nameserver
import com.twitter.gizzard.nameserver._
import com.twitter.logging.Logger
@@ -115,6 +116,10 @@ extends Manager.Iface {
def dump_nameserver(tableIds: JList[java.lang.Integer]) = wrapEx(shardManager.dumpStructure(tableIds.toList).map(_.toThrift))
+ def batch_execute(commands : JList[TransformOperation]) {
+ wrapEx(shardManager.batchExecute(commands.map(nameserver.TransformOperation.apply)))
+ }
+
def copy_shard(shardIds: JList[ShardId]) = {
wrapEx(adminJobManager.scheduleCopyJob(shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift)))
}
View
2 src/main/thrift/Manager.thrift
@@ -125,6 +125,8 @@ service Manager {
list<NameServerState> dump_nameserver(1: list<i32> table_id) throws(1: GizzardException ex)
+ void batch_execute(1: list<TransformOperation> commands) throws (1: GizzardException ex)
+
// job scheduler management
void retry_errors() throws(1: GizzardException ex)

0 comments on commit 0e28edc

Please sign in to comment.