From e767fc0562ac36c9cb4b08f6442e91b1ee827af6 Mon Sep 17 00:00:00 2001 From: Ed Ceaser Date: Tue, 16 Mar 2010 15:17:54 -0700 Subject: [PATCH] Pull CopyManager / Migration stuff out of NameServer --- .../gizzard/nameserver/NameServer.scala | 4 - .../nameserver/ReadWriteNameServer.scala | 4 - .../gizzard/nameserver/ShardMigration.scala | 35 ++++++ .../gizzard/nameserver/SqlNameServer.scala | 44 +------- .../gizzard/thrift/ShardManagerService.scala | 10 +- .../gizzard/nameserver/NameServerSpec.scala | 102 +----------------- .../shards/ShardsIntegrationSpec.scala | 4 +- .../thrift/ShardManagerServiceSpec.scala | 66 ++++++++++-- 8 files changed, 100 insertions(+), 169 deletions(-) diff --git a/src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala b/src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala index 30cecf36..d102dd28 100644 --- a/src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala +++ b/src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala @@ -13,10 +13,6 @@ trait NameServer[S <: Shard] extends Shard { def replaceChildShard(oldChildShardId: Int, newChildShardId: Int) def listShardChildren(shardId: Int): Seq[ChildInfo] def markShardBusy(shardId: Int, busy: Busy.Value) - def copyShard(sourceShardId: Int, destinationShardId: Int) - def setupMigration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo): ShardMigration - def migrateShard(migration: ShardMigration) - def finishMigration(migration: ShardMigration) def setForwarding(forwarding: Forwarding) def replaceForwarding(oldShardId: Int, newShardId: Int) def getForwarding(tableId: List[Int], baseId: Long): ShardInfo diff --git a/src/main/scala/com/twitter/gizzard/nameserver/ReadWriteNameServer.scala b/src/main/scala/com/twitter/gizzard/nameserver/ReadWriteNameServer.scala index 7f85bdfc..ab630700 100644 --- a/src/main/scala/com/twitter/gizzard/nameserver/ReadWriteNameServer.scala +++ b/src/main/scala/com/twitter/gizzard/nameserver/ReadWriteNameServer.scala @@ -32,8 +32,4 @@ trait ReadWriteNameServer[S <: Shard] extends NameServer[S] with ReadWriteShard[ def findShardById(shardId: Int, weight: Int) = readOperation(_.findShardById(shardId, weight)) def reload() = readOperation(_.reload()) def reloadForwardings() = readOperation(_.reloadForwardings()) - def finishMigration(migration: ShardMigration) = readOperation(_.finishMigration(migration)) // wtf - def migrateShard(migration: ShardMigration) = readOperation(_.migrateShard(migration)) // wtf - def setupMigration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo) = readOperation(_.setupMigration(sourceShardInfo, destinationShardInfo)) - def copyShard(sourceShardId: Int, destinationShardId: Int) = readOperation(_.copyShard(sourceShardId, destinationShardId)) } diff --git a/src/main/scala/com/twitter/gizzard/nameserver/ShardMigration.scala b/src/main/scala/com/twitter/gizzard/nameserver/ShardMigration.scala index b8f3344b..c52362d1 100644 --- a/src/main/scala/com/twitter/gizzard/nameserver/ShardMigration.scala +++ b/src/main/scala/com/twitter/gizzard/nameserver/ShardMigration.scala @@ -1,5 +1,40 @@ package com.twitter.gizzard.nameserver +import shards._ + + +object ShardMigration { + def setupMigration[S <: Shard](sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo, nameServer: NameServer[S]): ShardMigration = { + val lastDot = sourceShardInfo.className.lastIndexOf('.') + val packageName = if (lastDot >= 0) sourceShardInfo.className.substring(0, lastDot + 1) else "" + val sourceShardId = nameServer.findShard(sourceShardInfo) + val destinationShardId = nameServer.createShard(destinationShardInfo) + + val writeOnlyShard = new ShardInfo(packageName + "WriteOnlyShard", + sourceShardInfo.tablePrefix + "_migrate_write_only", "localhost", "", "", Busy.Normal, 0) + val writeOnlyShardId = nameServer.createShard(writeOnlyShard) + nameServer.addChildShard(writeOnlyShardId, destinationShardId, 1) + + val replicatingShard = new ShardInfo(packageName + "ReplicatingShard", + sourceShardInfo.tablePrefix + "_migrate_replicating", "localhost", "", "", Busy.Normal, 0) + val replicatingShardId = nameServer.createShard(replicatingShard) + nameServer.replaceChildShard(sourceShardId, replicatingShardId) + nameServer.addChildShard(replicatingShardId, sourceShardId, 1) + nameServer.addChildShard(replicatingShardId, writeOnlyShardId, 1) + + nameServer.replaceForwarding(sourceShardId, replicatingShardId) + new ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId) + } + + def finishMigration[S <: Shard](migration: ShardMigration, nameServer: NameServer[S]) { + nameServer.removeChildShard(migration.writeOnlyShardId, migration.destinationShardId) + nameServer.replaceChildShard(migration.replicatingShardId, migration.destinationShardId) + nameServer.replaceForwarding(migration.replicatingShardId, migration.destinationShardId) + nameServer.deleteShard(migration.replicatingShardId) + nameServer.deleteShard(migration.writeOnlyShardId) + nameServer.deleteShard(migration.sourceShardId) + } +} case class ShardMigration(sourceShardId: Int, destinationShardId: Int, replicatingShardId: Int, writeOnlyShardId: Int) diff --git a/src/main/scala/com/twitter/gizzard/nameserver/SqlNameServer.scala b/src/main/scala/com/twitter/gizzard/nameserver/SqlNameServer.scala index fae2b53c..1f1de3f4 100644 --- a/src/main/scala/com/twitter/gizzard/nameserver/SqlNameServer.scala +++ b/src/main/scala/com/twitter/gizzard/nameserver/SqlNameServer.scala @@ -71,10 +71,10 @@ CREATE TABLE shard_children ( } class SqlNameServer[S <: Shard](queryEvaluator: QueryEvaluator, shardRepository: ShardRepository[S], - tablePrefix: String, mappingFunction: Long => Long, copyManager: CopyManager[S]) + tablePrefix: String, mappingFunction: Long => Long) extends NameServer[S] { val children = List() - val shardInfo = new ShardInfo("asdf", "asdf", "asdf") + val shardInfo = new ShardInfo("com.twitter.gizzard.nameserver.SqlNameServer", tablePrefix, "") val weight = 1 // hardcode for now val FORWARDINGS_DDL = """ @@ -191,46 +191,6 @@ CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence ( } } - def copyShard(sourceShardId: Int, destinationShardId: Int) { - copyManager.newCopyJob(sourceShardId, destinationShardId).start(this, copyManager.scheduler) - } - - def setupMigration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo) = { - val lastDot = sourceShardInfo.className.lastIndexOf('.') - val packageName = if (lastDot >= 0) sourceShardInfo.className.substring(0, lastDot + 1) else "" - val sourceShardId = findShard(sourceShardInfo) - val destinationShardId = createShard(destinationShardInfo) - - val writeOnlyShard = new ShardInfo(packageName + "WriteOnlyShard", - sourceShardInfo.tablePrefix + "_migrate_write_only", "localhost", "", "", Busy.Normal, 0) - val writeOnlyShardId = createShard(writeOnlyShard) - addChildShard(writeOnlyShardId, destinationShardId, 1) - - val replicatingShard = new ShardInfo(packageName + "ReplicatingShard", - sourceShardInfo.tablePrefix + "_migrate_replicating", "localhost", "", "", Busy.Normal, 0) - val replicatingShardId = createShard(replicatingShard) - replaceChildShard(sourceShardId, replicatingShardId) - addChildShard(replicatingShardId, sourceShardId, 1) - addChildShard(replicatingShardId, writeOnlyShardId, 1) - - replaceForwarding(sourceShardId, replicatingShardId) - new ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId) - } - - def migrateShard(migration: ShardMigration) { - copyManager.newMigrateJob(migration).start(this, copyManager.scheduler) - } - - // to be called by Migrate jobs when they're finished. but also available as an RPC. - def finishMigration(migration: ShardMigration) { - removeChildShard(migration.writeOnlyShardId, migration.destinationShardId) - replaceChildShard(migration.replicatingShardId, migration.destinationShardId) - replaceForwarding(migration.replicatingShardId, migration.destinationShardId) - deleteShard(migration.replicatingShardId) - deleteShard(migration.writeOnlyShardId) - deleteShard(migration.sourceShardId) - } - def setForwarding(forwarding: Forwarding) { if (queryEvaluator.execute("UPDATE " + forwardingTable + " SET shard_id = ? WHERE base_source_id = ? AND table_id = ?", forwarding.shardId, forwarding.baseId, tableIdDbString(forwarding.tableId)) == 0) { diff --git a/src/main/scala/com/twitter/gizzard/thrift/ShardManagerService.scala b/src/main/scala/com/twitter/gizzard/thrift/ShardManagerService.scala index 73cef131..1cc61075 100644 --- a/src/main/scala/com/twitter/gizzard/thrift/ShardManagerService.scala +++ b/src/main/scala/com/twitter/gizzard/thrift/ShardManagerService.scala @@ -11,7 +11,7 @@ import com.twitter.gizzard.nameserver._ import net.lag.logging.Logger -class ShardManagerService[S <: Shard](nameServer: NameServer[S]) extends ShardManager.Iface { +class ShardManagerService[S <: Shard](nameServer: NameServer[S], copyManager: CopyManager[S]) extends ShardManager.Iface { val log = Logger.get(getClass.getName) def create_shard(shard: ShardInfo) = { @@ -55,19 +55,19 @@ class ShardManagerService[S <: Shard](nameServer: NameServer[S]) extends ShardMa } def copy_shard(sourceShardId: Int, destinationShardId: Int) { - nameServer.copyShard(sourceShardId, destinationShardId) + copyManager.newCopyJob(sourceShardId, destinationShardId).start(nameServer, copyManager.scheduler) } def setup_migration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo) = { - nameServer.setupMigration(sourceShardInfo.fromThrift, destinationShardInfo.fromThrift).toThrift + nameserver.ShardMigration.setupMigration(sourceShardInfo.fromThrift, destinationShardInfo.fromThrift, nameServer).toThrift } def migrate_shard(migration: ShardMigration) { - nameServer.migrateShard(migration.fromThrift) + copyManager.newMigrateJob(migration.fromThrift).start(nameServer, copyManager.scheduler) } def finish_migration(migration: ShardMigration) { - nameServer.finishMigration(migration.fromThrift) + nameserver.ShardMigration.finishMigration(migration.fromThrift, nameServer) } def set_forwarding(forwarding: Forwarding) { diff --git a/src/test/scala/com/twitter/gizzard/nameserver/NameServerSpec.scala b/src/test/scala/com/twitter/gizzard/nameserver/NameServerSpec.scala index 9cfb1c45..83613e14 100644 --- a/src/test/scala/com/twitter/gizzard/nameserver/NameServerSpec.scala +++ b/src/test/scala/com/twitter/gizzard/nameserver/NameServerSpec.scala @@ -36,8 +36,7 @@ object NameServerSpec extends Specification with JMocker with ClassMocker with D } shardRepository = mock[ShardRepository[Shard]] - copyManager = mock[CopyManager[Shard]] - nameServer = new SqlNameServer(queryEvaluator, shardRepository, "test", (id: Long) => id, copyManager) + nameServer = new SqlNameServer(queryEvaluator, shardRepository, "test", (id: Long) => id) nameServer.reload() @@ -169,105 +168,6 @@ object NameServerSpec extends Specification with JMocker with ClassMocker with D nameServer.getShard(shardId).busy mustEqual Busy.Busy } - "copy shard" in { - val copyJob = mock[CopyMachine[Shard]] - val scheduler = mock[JobScheduler] - - expect { - one(copyManager).newCopyJob(10, 20) willReturn copyJob - one(copyManager).scheduler willReturn scheduler - one(copyJob).start(nameServer, scheduler) - } - - nameServer.copyShard(10, 20) - } - - "setup migration" in { - val sourceShardInfo = new ShardInfo("com.example.SqlShard", "forward", "localhost") - val destinationShardInfo = new ShardInfo("com.example.SqlShard", "forward", "remotehost") - val writeOnlyShardInfo = new ShardInfo("com.example.WriteOnlyShard", - "forward_migrate_write_only", "localhost") - val replicatingShardInfo = new ShardInfo("com.example.ReplicatingShard", - "forward_migrate_replicating", "localhost") - - expect { - one(shardRepository).create(sourceShardInfo) - } - - val sourceShardId = nameServer.createShard(sourceShardInfo) - nameServer.setForwarding(new Forwarding(List(0), 1, sourceShardId)) - - expect { - one(shardRepository).create(destinationShardInfo) - one(shardRepository).create(writeOnlyShardInfo) - one(shardRepository).create(replicatingShardInfo) - } - - val migration = nameServer.setupMigration(sourceShardInfo, destinationShardInfo) - nameServer.getShard(migration.sourceShardId).shardId mustEqual sourceShardId - nameServer.findShard(destinationShardInfo) mustEqual migration.destinationShardId - nameServer.findShard(writeOnlyShardInfo) mustEqual migration.writeOnlyShardId - nameServer.findShard(replicatingShardInfo) mustEqual migration.replicatingShardId - nameServer.getForwardingForShard(migration.sourceShardId) must throwA[ShardException] - nameServer.getForwardingForShard(migration.replicatingShardId) mustNot throwA[ShardException] - - nameServer.listShardChildren(migration.replicatingShardId).map { _.shardId } mustEqual - List(migration.sourceShardId, migration.writeOnlyShardId) - nameServer.listShardChildren(migration.writeOnlyShardId).map { _.shardId } mustEqual - List(migration.destinationShardId) - } - - "migrate shard" in { - val migration = new ShardMigration(1, 2, 3, 4) - val migrateJob = mock[CopyMachine[Shard]] - val scheduler = mock[JobScheduler] - - expect { - one(copyManager).newMigrateJob(migration) willReturn migrateJob - one(copyManager).scheduler willReturn scheduler - one(migrateJob).start(nameServer, scheduler) - } - - nameServer.migrateShard(migration) - } - - "finish migration" in { - // tediously set this up. - val sourceShardInfo = new ShardInfo("com.example.SqlShard", "forward", "localhost") - val destinationShardInfo = new ShardInfo("com.example.SqlShard", "forward", "remotehost") - val writeOnlyShardInfo = new ShardInfo("com.example.WriteOnlyShard", - "forward_migrate_write_only", "localhost") - val replicatingShardInfo = new ShardInfo("com.example.ReplicatingShard", - "forward_migrate_replicating", "localhost") - - expect { - one(shardRepository).create(sourceShardInfo) - one(shardRepository).create(destinationShardInfo) - one(shardRepository).create(writeOnlyShardInfo) - one(shardRepository).create(replicatingShardInfo) - } - - val sourceShardId = nameServer.createShard(sourceShardInfo) - val destinationShardId = nameServer.createShard(destinationShardInfo) - val writeOnlyShardId = nameServer.createShard(writeOnlyShardInfo) - val replicatingShardId = nameServer.createShard(replicatingShardInfo) - nameServer.addChildShard(replicatingShardId, sourceShardId, 10) - nameServer.addChildShard(replicatingShardId, writeOnlyShardId, 10) - nameServer.addChildShard(writeOnlyShardId, destinationShardId, 20) - - val migration = new ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId) - - expect { - } - - nameServer.finishMigration(migration) - - nameServer.getShard(sourceShardId) must throwA[Exception] - nameServer.getShard(destinationShardId) - nameServer.getShard(writeOnlyShardId) must throwA[Exception] - nameServer.getShard(replicatingShardId) must throwA[Exception] - } - "forwarding changes" in { var shardId: Int = 0 var forwarding: Forwarding = null diff --git a/src/test/scala/com/twitter/gizzard/shards/ShardsIntegrationSpec.scala b/src/test/scala/com/twitter/gizzard/shards/ShardsIntegrationSpec.scala index 0883a61a..f81cc4c5 100644 --- a/src/test/scala/com/twitter/gizzard/shards/ShardsIntegrationSpec.scala +++ b/src/test/scala/com/twitter/gizzard/shards/ShardsIntegrationSpec.scala @@ -39,14 +39,12 @@ object ShardsIntegrationSpec extends Specification with JMocker with ClassMocker "Shards" should { var shardRepository: ShardRepository[UserShard] = null var nameServer: NameServer[UserShard] = null - var copyManager: CopyManager[UserShard] = null doBefore { shardRepository = new ShardRepository shardRepository += (("com.example.UserShard", factory)) shardRepository += (("com.example.SqlShard", factory)) - copyManager = mock[CopyManager[UserShard]] - nameServer = new SqlNameServer[UserShard](queryEvaluator, shardRepository, "test", (id: Long) => id, copyManager) + nameServer = new SqlNameServer[UserShard](queryEvaluator, shardRepository, "test", (id: Long) => id) nameServer.createShard(shardInfo1) nameServer.createShard(shardInfo2) diff --git a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala index 43217e1c..0174e14a 100644 --- a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala +++ b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala @@ -3,12 +3,16 @@ package com.twitter.gizzard.thrift import org.specs.mock.{ClassMocker, JMocker} import org.specs.Specification import com.twitter.gizzard.thrift.conversions.Sequences._ +import com.twitter.gizzard.thrift.conversions.ShardInfo._ +import com.twitter.gizzard.thrift.conversions.ShardMigration._ import shards.Busy +import jobs.{CopyMachine, JobScheduler} object ShardManagerServiceSpec extends Specification with JMocker with ClassMocker { val nameServer = mock[nameserver.NameServer[shards.Shard]] - val manager = new thrift.ShardManagerService(nameServer) + val copyManager = mock[nameserver.CopyManager[shards.Shard]] + val manager = new thrift.ShardManagerService(nameServer, copyManager) val thriftShardInfo1 = new thrift.ShardInfo("com.example.SqlShard", "table_prefix", "hostname", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal.id, 1) val shardInfo1 = new shards.ShardInfo("com.example.SqlShard", @@ -106,31 +110,73 @@ object ShardManagerServiceSpec extends Specification with JMocker with ClassMock } "copy_shard" in { + val copyJob = mock[CopyMachine[shards.Shard]] + val scheduler = mock[JobScheduler] + expect { - one(nameServer).copyShard(1, 2) + one(copyManager).newCopyJob(10, 20) willReturn copyJob + one(copyManager).scheduler willReturn scheduler + one(copyJob).start(nameServer, scheduler) } - manager.copy_shard(1, 2) + + manager.copy_shard(10, 20) } "setup_migration" in { + val writeOnlyShard = capturingParam[shards.ShardInfo] + val replicatingShard = capturingParam[shards.ShardInfo] + val sourceShardId = 1 + val destinationShardId = 2 + val writeOnlyShardId = 3 + val replicatingShardId = 4 + expect { - one(nameServer).setupMigration(shardInfo1, shardInfo2) willReturn new nameserver.ShardMigration(1, 2, 3, 4) + one(nameServer).findShard(thriftShardInfo1.fromThrift) willReturn sourceShardId + one(nameServer).createShard(thriftShardInfo2.fromThrift) willReturn destinationShardId + one(nameServer).createShard(writeOnlyShard.capture) willReturn writeOnlyShardId + one(nameServer).addChildShard(writeOnlyShardId, destinationShardId, 1) + one(nameServer).createShard(replicatingShard.capture) willReturn replicatingShardId + one(nameServer).replaceChildShard(sourceShardId, replicatingShardId) + one(nameServer).addChildShard(replicatingShardId, sourceShardId, 1) + one(nameServer).addChildShard(replicatingShardId, writeOnlyShardId, 1) + one(nameServer).replaceForwarding(sourceShardId, replicatingShardId) } - manager.setup_migration(thriftShardInfo1, thriftShardInfo2) mustEqual new thrift.ShardMigration(1, 2, 3, 4) + + val migration = new thrift.ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId) + manager.setup_migration(thriftShardInfo1, thriftShardInfo2) mustEqual migration } - "migrate_shard" in { + "migrate shard" in { + val migration = new ShardMigration(1, 2, 3, 4) + val migrateJob = mock[CopyMachine[shards.Shard]] + val scheduler = mock[JobScheduler] + expect { - one(nameServer).migrateShard(new nameserver.ShardMigration(1, 2, 3, 4)) + one(copyManager).newMigrateJob(migration.fromThrift) willReturn migrateJob + one(copyManager).scheduler willReturn scheduler + one(migrateJob).start(nameServer, scheduler) } - manager.migrate_shard(new thrift.ShardMigration(1, 2, 3, 4)) + + manager.migrate_shard(migration) } "finish_migration" in { + val sourceShardId = 1 + val destinationShardId = 2 + val writeOnlyShardId = 3 + val replicatingShardId = 4 + expect { - one(nameServer).finishMigration(new nameserver.ShardMigration(1, 2, 3, 4)) + one(nameServer).removeChildShard(writeOnlyShardId, destinationShardId) + one(nameServer).replaceChildShard(replicatingShardId, destinationShardId) + one(nameServer).replaceForwarding(replicatingShardId, destinationShardId) + one(nameServer).deleteShard(replicatingShardId) + one(nameServer).deleteShard(writeOnlyShardId) + one(nameServer).deleteShard(sourceShardId) } - manager.finish_migration(new thrift.ShardMigration(1, 2, 3, 4)) + + val migration = new thrift.ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId) + manager.finish_migration(migration) } "set_forwarding" in {