Permalink
Browse files

tests of replication

  • Loading branch information...
1 parent b3071e5 commit dcf49835acc1fdb668ed4da3631c7c79ff29ee4c Nick Kallen committed Apr 6, 2010
@@ -8,7 +8,6 @@ import com.twitter.xrayspecs.TimeConversions._
import net.lag.logging.Logger
import com.twitter.gizzard.shards.ShardInfo
import com.twitter.gizzard.nameserver.Forwarding
-import com.twitter.gizzard.thrift.conversions.ShardInfo._
object RowzSpec extends Specification with Eventually {
@@ -17,24 +16,15 @@ object RowzSpec extends Specification with Eventually {
Time.freeze()
val state = Rowz(config, w3c, databaseFactory)
val rowzService = state.rowzService
- val shardManagerService = new ShardManagerService(state.nameServer, state.copyFactory, state.prioritizingScheduler(Priority.Medium.id))
+ val shardInfo = new ShardInfo("com.twitter.rowz.SqlShard", "shard_a", "localhost")
+ val queryEvaluator = queryEvaluatorFactory("localhost", null, config("rowz.db.username"), config("rowz.db.password"))
doBefore {
state.nameServer.rebuildSchema()
- val partitions = 1
- (0 until partitions) foreach { i =>
- val shardInfoA = new ShardInfo("com.twitter.rowz.SqlShard", "shard_a" + i, "localhost")
- val shardInfoB = new ShardInfo("com.twitter.rowz.SqlShard", "shard_b" + i, "localhost")
- val replicatingShardInfo = new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard", "replicating_" + i, "localhost")
- val shardIdA = state.nameServer.createShard(shardInfoA)
- val shardIdB = state.nameServer.createShard(shardInfoB)
- val replicatingShardId = state.nameServer.createShard(replicatingShardInfo)
+ queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name"))
- val weight = 1
- state.nameServer.addChildShard(replicatingShardId, shardIdA, weight)
- state.nameServer.addChildShard(replicatingShardId, shardIdB, weight)
- state.nameServer.setForwarding(new Forwarding(0, Math.MIN_LONG + (Math.MAX_LONG - Math.MIN_LONG) / partitions * i, replicatingShardId))
- }
+ val shardId = state.nameServer.createShard(shardInfo)
+ state.nameServer.setForwarding(new Forwarding(0, Math.MIN_LONG, shardId))
state.start()
}
@@ -46,16 +36,5 @@ object RowzSpec extends Specification with Eventually {
rowzService.destroy(row, 1.second.fromNow.inSeconds)
rowzService.read(id) must eventually(throwA[Exception])
}
-
- "shard migration" in {
- val id = rowzService.create("row", Time.now.inSeconds)
- rowzService.read(id) must eventually(not(throwA[Exception]))
-
- val sourceShardInfo = new ShardInfo("com.twitter.rowz.SqlShard", "shard_a" + 0, "localhost")
- val destinationShardInfo = new ShardInfo("com.twitter.rowz.SqlShard", "shard_c" + 0, "localhost")
- val migration = shardManagerService.setup_migration(sourceShardInfo.toThrift, destinationShardInfo.toThrift)
- shardManagerService.reload_forwardings()
- shardManagerService.migrate_shard(migration)
- }
}
}
@@ -0,0 +1,71 @@
+package com.twitter.rowz.integration
+
+import com.twitter.gizzard.thrift.ShardManagerService
+import org.specs.Specification
+import com.twitter.gizzard.scheduler.Priority
+import com.twitter.xrayspecs.{Time, Eventually}
+import com.twitter.xrayspecs.TimeConversions._
+import net.lag.logging.Logger
+import com.twitter.gizzard.shards.ShardInfo
+import com.twitter.gizzard.nameserver.Forwarding
+import com.twitter.gizzard.thrift.conversions.ShardInfo._
+
+
+object ShardManagerSpec extends Specification with Eventually {
+ "ShardManager" should {
+ import Database._
+ Time.freeze()
+ val state = Rowz(config, w3c, databaseFactory)
+ val rowzService = state.rowzService
+ val shardManagerService = new ShardManagerService(state.nameServer, state.copyFactory, state.prioritizingScheduler(Priority.Medium.id))
+ val shardInfoA = new ShardInfo("com.twitter.rowz.SqlShard", "shard_a", "localhost")
+ val shardInfoB = new ShardInfo("com.twitter.rowz.SqlShard", "shard_b", "localhost")
+ val replicatingShardInfo = new ShardInfo("com.twitter.gizzard.shards.ReplicatingShard", "replicating", "localhost")
+ val queryEvaluator = queryEvaluatorFactory("localhost", null, config("rowz.db.username"), config("rowz.db.password"))
+ var replicatingShardId = 0
+ var shardIdA = 0
+ var shardIdB = 0
+
+ doBefore {
+ state.nameServer.rebuildSchema()
+ queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name"))
+
+ shardIdA = state.nameServer.createShard(shardInfoA)
+ shardIdB = state.nameServer.createShard(shardInfoB)
+ replicatingShardId = state.nameServer.createShard(replicatingShardInfo)
+
+ val weight = 1
+ state.nameServer.addChildShard(replicatingShardId, shardIdA, weight)
+ state.nameServer.addChildShard(replicatingShardId, shardIdB, weight)
+ state.nameServer.setForwarding(new Forwarding(0, Math.MIN_LONG, replicatingShardId))
+ state.start()
+ }
+
+ "replication" in {
+ val id = rowzService.create("row", Time.now.inSeconds)
+ rowzService.read(id) must eventually(not(throwA[Exception]))
+
+ shardManagerService.replace_forwarding(replicatingShardId, shardIdA)
+ shardManagerService.reload_forwardings()
+ rowzService.read(id) must eventually(not(throwA[Exception]))
+
+ shardManagerService.replace_forwarding(replicatingShardId, shardIdB)
+ shardManagerService.reload_forwardings()
+ rowzService.read(id) must eventually(not(throwA[Exception]))
+ }
+
+ "shard migration" in {
+ val id = rowzService.create("row", Time.now.inSeconds)
+ rowzService.read(id) must eventually(not(throwA[Exception]))
+
+ val sourceShardInfo = shardInfoA
+ val destinationShardInfo = new ShardInfo("com.twitter.rowz.SqlShard", "shard_c" + 0, "localhost")
+ val migration = shardManagerService.setup_migration(sourceShardInfo.toThrift, destinationShardInfo.toThrift)
+ shardManagerService.reload_forwardings()
+ shardManagerService.migrate_shard(migration)
+ shardManagerService.replace_forwarding(replicatingShardId, migration.destination_shard_id)
+ shardManagerService.reload_forwardings()
+ rowzService.read(id) must eventually(not(throwA[Exception]))
+ }
+ }
+}
@@ -15,9 +15,9 @@ object SqlShard extends Specification with JMocker with ClassMocker {
val shardInfo = new ShardInfo("com.twitter.service.flock.edges.SqlShard",
"table_001", "localhost", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal, 1)
val sqlShard = shardFactory.instantiate(shardInfo, 1, List[Shard]())
- val queryEvaluator = queryEvaluatorFactory(shardInfo.hostname, null, config("rowz.db.username"), config("rowz.db.password"))
val row = new Row(1, "a row", Time.now, Time.now, State.Normal)
val row2 = new Row(2, "another row", Time.now, Time.now, State.Normal)
+ val queryEvaluator = queryEvaluatorFactory("localhost", null, config("rowz.db.username"), config("rowz.db.password"))
doBefore {
queryEvaluator.execute("DROP DATABASE IF EXISTS " + config("rowz.db.name"))

0 comments on commit dcf4983

Please sign in to comment.