Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Pull CopyManager / Migration stuff out of NameServer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ed Ceaser committed Mar 16, 2010
1 parent 2f2efec commit e767fc0
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ trait NameServer[S <: Shard] extends Shard {
def replaceChildShard(oldChildShardId: Int, newChildShardId: Int)
def listShardChildren(shardId: Int): Seq[ChildInfo]
def markShardBusy(shardId: Int, busy: Busy.Value)
def copyShard(sourceShardId: Int, destinationShardId: Int)
def setupMigration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo): ShardMigration
def migrateShard(migration: ShardMigration)
def finishMigration(migration: ShardMigration)
def setForwarding(forwarding: Forwarding)
def replaceForwarding(oldShardId: Int, newShardId: Int)
def getForwarding(tableId: List[Int], baseId: Long): ShardInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,4 @@ trait ReadWriteNameServer[S <: Shard] extends NameServer[S] with ReadWriteShard[
def findShardById(shardId: Int, weight: Int) = readOperation(_.findShardById(shardId, weight))
def reload() = readOperation(_.reload())
def reloadForwardings() = readOperation(_.reloadForwardings())
def finishMigration(migration: ShardMigration) = readOperation(_.finishMigration(migration)) // wtf
def migrateShard(migration: ShardMigration) = readOperation(_.migrateShard(migration)) // wtf
def setupMigration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo) = readOperation(_.setupMigration(sourceShardInfo, destinationShardInfo))
def copyShard(sourceShardId: Int, destinationShardId: Int) = readOperation(_.copyShard(sourceShardId, destinationShardId))
}
35 changes: 35 additions & 0 deletions src/main/scala/com/twitter/gizzard/nameserver/ShardMigration.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,40 @@
package com.twitter.gizzard.nameserver

import shards._


object ShardMigration {
def setupMigration[S <: Shard](sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo, nameServer: NameServer[S]): ShardMigration = {
val lastDot = sourceShardInfo.className.lastIndexOf('.')
val packageName = if (lastDot >= 0) sourceShardInfo.className.substring(0, lastDot + 1) else ""
val sourceShardId = nameServer.findShard(sourceShardInfo)
val destinationShardId = nameServer.createShard(destinationShardInfo)

val writeOnlyShard = new ShardInfo(packageName + "WriteOnlyShard",
sourceShardInfo.tablePrefix + "_migrate_write_only", "localhost", "", "", Busy.Normal, 0)
val writeOnlyShardId = nameServer.createShard(writeOnlyShard)
nameServer.addChildShard(writeOnlyShardId, destinationShardId, 1)

val replicatingShard = new ShardInfo(packageName + "ReplicatingShard",
sourceShardInfo.tablePrefix + "_migrate_replicating", "localhost", "", "", Busy.Normal, 0)
val replicatingShardId = nameServer.createShard(replicatingShard)
nameServer.replaceChildShard(sourceShardId, replicatingShardId)
nameServer.addChildShard(replicatingShardId, sourceShardId, 1)
nameServer.addChildShard(replicatingShardId, writeOnlyShardId, 1)

nameServer.replaceForwarding(sourceShardId, replicatingShardId)
new ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId)
}

def finishMigration[S <: Shard](migration: ShardMigration, nameServer: NameServer[S]) {
nameServer.removeChildShard(migration.writeOnlyShardId, migration.destinationShardId)
nameServer.replaceChildShard(migration.replicatingShardId, migration.destinationShardId)
nameServer.replaceForwarding(migration.replicatingShardId, migration.destinationShardId)
nameServer.deleteShard(migration.replicatingShardId)
nameServer.deleteShard(migration.writeOnlyShardId)
nameServer.deleteShard(migration.sourceShardId)
}
}

case class ShardMigration(sourceShardId: Int, destinationShardId: Int, replicatingShardId: Int,
writeOnlyShardId: Int)
44 changes: 2 additions & 42 deletions src/main/scala/com/twitter/gizzard/nameserver/SqlNameServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ CREATE TABLE shard_children (
}

class SqlNameServer[S <: Shard](queryEvaluator: QueryEvaluator, shardRepository: ShardRepository[S],
tablePrefix: String, mappingFunction: Long => Long, copyManager: CopyManager[S])
tablePrefix: String, mappingFunction: Long => Long)
extends NameServer[S] {
val children = List()
val shardInfo = new ShardInfo("asdf", "asdf", "asdf")
val shardInfo = new ShardInfo("com.twitter.gizzard.nameserver.SqlNameServer", tablePrefix, "")
val weight = 1 // hardcode for now

val FORWARDINGS_DDL = """
Expand Down Expand Up @@ -191,46 +191,6 @@ CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence (
}
}

def copyShard(sourceShardId: Int, destinationShardId: Int) {
copyManager.newCopyJob(sourceShardId, destinationShardId).start(this, copyManager.scheduler)
}

def setupMigration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo) = {
val lastDot = sourceShardInfo.className.lastIndexOf('.')
val packageName = if (lastDot >= 0) sourceShardInfo.className.substring(0, lastDot + 1) else ""
val sourceShardId = findShard(sourceShardInfo)
val destinationShardId = createShard(destinationShardInfo)

val writeOnlyShard = new ShardInfo(packageName + "WriteOnlyShard",
sourceShardInfo.tablePrefix + "_migrate_write_only", "localhost", "", "", Busy.Normal, 0)
val writeOnlyShardId = createShard(writeOnlyShard)
addChildShard(writeOnlyShardId, destinationShardId, 1)

val replicatingShard = new ShardInfo(packageName + "ReplicatingShard",
sourceShardInfo.tablePrefix + "_migrate_replicating", "localhost", "", "", Busy.Normal, 0)
val replicatingShardId = createShard(replicatingShard)
replaceChildShard(sourceShardId, replicatingShardId)
addChildShard(replicatingShardId, sourceShardId, 1)
addChildShard(replicatingShardId, writeOnlyShardId, 1)

replaceForwarding(sourceShardId, replicatingShardId)
new ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId)
}

def migrateShard(migration: ShardMigration) {
copyManager.newMigrateJob(migration).start(this, copyManager.scheduler)
}

// to be called by Migrate jobs when they're finished. but also available as an RPC.
def finishMigration(migration: ShardMigration) {
removeChildShard(migration.writeOnlyShardId, migration.destinationShardId)
replaceChildShard(migration.replicatingShardId, migration.destinationShardId)
replaceForwarding(migration.replicatingShardId, migration.destinationShardId)
deleteShard(migration.replicatingShardId)
deleteShard(migration.writeOnlyShardId)
deleteShard(migration.sourceShardId)
}

def setForwarding(forwarding: Forwarding) {
if (queryEvaluator.execute("UPDATE " + forwardingTable + " SET shard_id = ? WHERE base_source_id = ? AND table_id = ?",
forwarding.shardId, forwarding.baseId, tableIdDbString(forwarding.tableId)) == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.twitter.gizzard.nameserver._
import net.lag.logging.Logger


class ShardManagerService[S <: Shard](nameServer: NameServer[S]) extends ShardManager.Iface {
class ShardManagerService[S <: Shard](nameServer: NameServer[S], copyManager: CopyManager[S]) extends ShardManager.Iface {
val log = Logger.get(getClass.getName)

def create_shard(shard: ShardInfo) = {
Expand Down Expand Up @@ -55,19 +55,19 @@ class ShardManagerService[S <: Shard](nameServer: NameServer[S]) extends ShardMa
}

def copy_shard(sourceShardId: Int, destinationShardId: Int) {
nameServer.copyShard(sourceShardId, destinationShardId)
copyManager.newCopyJob(sourceShardId, destinationShardId).start(nameServer, copyManager.scheduler)
}

def setup_migration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo) = {
nameServer.setupMigration(sourceShardInfo.fromThrift, destinationShardInfo.fromThrift).toThrift
nameserver.ShardMigration.setupMigration(sourceShardInfo.fromThrift, destinationShardInfo.fromThrift, nameServer).toThrift
}

def migrate_shard(migration: ShardMigration) {
nameServer.migrateShard(migration.fromThrift)
copyManager.newMigrateJob(migration.fromThrift).start(nameServer, copyManager.scheduler)
}

def finish_migration(migration: ShardMigration) {
nameServer.finishMigration(migration.fromThrift)
nameserver.ShardMigration.finishMigration(migration.fromThrift, nameServer)
}

def set_forwarding(forwarding: Forwarding) {
Expand Down
102 changes: 1 addition & 101 deletions src/test/scala/com/twitter/gizzard/nameserver/NameServerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ object NameServerSpec extends Specification with JMocker with ClassMocker with D
}

shardRepository = mock[ShardRepository[Shard]]
copyManager = mock[CopyManager[Shard]]
nameServer = new SqlNameServer(queryEvaluator, shardRepository, "test", (id: Long) => id, copyManager)
nameServer = new SqlNameServer(queryEvaluator, shardRepository, "test", (id: Long) => id)

nameServer.reload()

Expand Down Expand Up @@ -169,105 +168,6 @@ object NameServerSpec extends Specification with JMocker with ClassMocker with D
nameServer.getShard(shardId).busy mustEqual Busy.Busy
}

"copy shard" in {
val copyJob = mock[CopyMachine[Shard]]
val scheduler = mock[JobScheduler]

expect {
one(copyManager).newCopyJob(10, 20) willReturn copyJob
one(copyManager).scheduler willReturn scheduler
one(copyJob).start(nameServer, scheduler)
}

nameServer.copyShard(10, 20)
}

"setup migration" in {
val sourceShardInfo = new ShardInfo("com.example.SqlShard", "forward", "localhost")
val destinationShardInfo = new ShardInfo("com.example.SqlShard", "forward", "remotehost")
val writeOnlyShardInfo = new ShardInfo("com.example.WriteOnlyShard",
"forward_migrate_write_only", "localhost")
val replicatingShardInfo = new ShardInfo("com.example.ReplicatingShard",
"forward_migrate_replicating", "localhost")

expect {
one(shardRepository).create(sourceShardInfo)
}

val sourceShardId = nameServer.createShard(sourceShardInfo)
nameServer.setForwarding(new Forwarding(List(0), 1, sourceShardId))

expect {
one(shardRepository).create(destinationShardInfo)
one(shardRepository).create(writeOnlyShardInfo)
one(shardRepository).create(replicatingShardInfo)
}

val migration = nameServer.setupMigration(sourceShardInfo, destinationShardInfo)
nameServer.getShard(migration.sourceShardId).shardId mustEqual sourceShardId
nameServer.findShard(destinationShardInfo) mustEqual migration.destinationShardId
nameServer.findShard(writeOnlyShardInfo) mustEqual migration.writeOnlyShardId
nameServer.findShard(replicatingShardInfo) mustEqual migration.replicatingShardId
nameServer.getForwardingForShard(migration.sourceShardId) must throwA[ShardException]
nameServer.getForwardingForShard(migration.replicatingShardId) mustNot throwA[ShardException]

nameServer.listShardChildren(migration.replicatingShardId).map { _.shardId } mustEqual
List(migration.sourceShardId, migration.writeOnlyShardId)
nameServer.listShardChildren(migration.writeOnlyShardId).map { _.shardId } mustEqual
List(migration.destinationShardId)
}

"migrate shard" in {
val migration = new ShardMigration(1, 2, 3, 4)
val migrateJob = mock[CopyMachine[Shard]]
val scheduler = mock[JobScheduler]

expect {
one(copyManager).newMigrateJob(migration) willReturn migrateJob
one(copyManager).scheduler willReturn scheduler
one(migrateJob).start(nameServer, scheduler)
}

nameServer.migrateShard(migration)
}

"finish migration" in {
// tediously set this up.
val sourceShardInfo = new ShardInfo("com.example.SqlShard", "forward", "localhost")
val destinationShardInfo = new ShardInfo("com.example.SqlShard", "forward", "remotehost")
val writeOnlyShardInfo = new ShardInfo("com.example.WriteOnlyShard",
"forward_migrate_write_only", "localhost")
val replicatingShardInfo = new ShardInfo("com.example.ReplicatingShard",
"forward_migrate_replicating", "localhost")

expect {
one(shardRepository).create(sourceShardInfo)
one(shardRepository).create(destinationShardInfo)
one(shardRepository).create(writeOnlyShardInfo)
one(shardRepository).create(replicatingShardInfo)
}

val sourceShardId = nameServer.createShard(sourceShardInfo)
val destinationShardId = nameServer.createShard(destinationShardInfo)
val writeOnlyShardId = nameServer.createShard(writeOnlyShardInfo)
val replicatingShardId = nameServer.createShard(replicatingShardInfo)
nameServer.addChildShard(replicatingShardId, sourceShardId, 10)
nameServer.addChildShard(replicatingShardId, writeOnlyShardId, 10)
nameServer.addChildShard(writeOnlyShardId, destinationShardId, 20)

val migration = new ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId)

expect {
}

nameServer.finishMigration(migration)

nameServer.getShard(sourceShardId) must throwA[Exception]
nameServer.getShard(destinationShardId)
nameServer.getShard(writeOnlyShardId) must throwA[Exception]
nameServer.getShard(replicatingShardId) must throwA[Exception]
}

"forwarding changes" in {
var shardId: Int = 0
var forwarding: Forwarding = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ object ShardsIntegrationSpec extends Specification with JMocker with ClassMocker
"Shards" should {
var shardRepository: ShardRepository[UserShard] = null
var nameServer: NameServer[UserShard] = null
var copyManager: CopyManager[UserShard] = null

doBefore {
shardRepository = new ShardRepository
shardRepository += (("com.example.UserShard", factory))
shardRepository += (("com.example.SqlShard", factory))
copyManager = mock[CopyManager[UserShard]]
nameServer = new SqlNameServer[UserShard](queryEvaluator, shardRepository, "test", (id: Long) => id, copyManager)
nameServer = new SqlNameServer[UserShard](queryEvaluator, shardRepository, "test", (id: Long) => id)

nameServer.createShard(shardInfo1)
nameServer.createShard(shardInfo2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package com.twitter.gizzard.thrift
import org.specs.mock.{ClassMocker, JMocker}
import org.specs.Specification
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.thrift.conversions.ShardInfo._
import com.twitter.gizzard.thrift.conversions.ShardMigration._
import shards.Busy
import jobs.{CopyMachine, JobScheduler}


object ShardManagerServiceSpec extends Specification with JMocker with ClassMocker {
val nameServer = mock[nameserver.NameServer[shards.Shard]]
val manager = new thrift.ShardManagerService(nameServer)
val copyManager = mock[nameserver.CopyManager[shards.Shard]]
val manager = new thrift.ShardManagerService(nameServer, copyManager)
val thriftShardInfo1 = new thrift.ShardInfo("com.example.SqlShard",
"table_prefix", "hostname", "INT UNSIGNED", "INT UNSIGNED", Busy.Normal.id, 1)
val shardInfo1 = new shards.ShardInfo("com.example.SqlShard",
Expand Down Expand Up @@ -106,31 +110,73 @@ object ShardManagerServiceSpec extends Specification with JMocker with ClassMock
}

"copy_shard" in {
val copyJob = mock[CopyMachine[shards.Shard]]
val scheduler = mock[JobScheduler]

expect {
one(nameServer).copyShard(1, 2)
one(copyManager).newCopyJob(10, 20) willReturn copyJob
one(copyManager).scheduler willReturn scheduler
one(copyJob).start(nameServer, scheduler)
}
manager.copy_shard(1, 2)

manager.copy_shard(10, 20)
}

"setup_migration" in {
val writeOnlyShard = capturingParam[shards.ShardInfo]
val replicatingShard = capturingParam[shards.ShardInfo]
val sourceShardId = 1
val destinationShardId = 2
val writeOnlyShardId = 3
val replicatingShardId = 4

expect {
one(nameServer).setupMigration(shardInfo1, shardInfo2) willReturn new nameserver.ShardMigration(1, 2, 3, 4)
one(nameServer).findShard(thriftShardInfo1.fromThrift) willReturn sourceShardId
one(nameServer).createShard(thriftShardInfo2.fromThrift) willReturn destinationShardId
one(nameServer).createShard(writeOnlyShard.capture) willReturn writeOnlyShardId
one(nameServer).addChildShard(writeOnlyShardId, destinationShardId, 1)
one(nameServer).createShard(replicatingShard.capture) willReturn replicatingShardId
one(nameServer).replaceChildShard(sourceShardId, replicatingShardId)
one(nameServer).addChildShard(replicatingShardId, sourceShardId, 1)
one(nameServer).addChildShard(replicatingShardId, writeOnlyShardId, 1)
one(nameServer).replaceForwarding(sourceShardId, replicatingShardId)
}
manager.setup_migration(thriftShardInfo1, thriftShardInfo2) mustEqual new thrift.ShardMigration(1, 2, 3, 4)

val migration = new thrift.ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId)
manager.setup_migration(thriftShardInfo1, thriftShardInfo2) mustEqual migration
}

"migrate_shard" in {
"migrate shard" in {
val migration = new ShardMigration(1, 2, 3, 4)
val migrateJob = mock[CopyMachine[shards.Shard]]
val scheduler = mock[JobScheduler]

expect {
one(nameServer).migrateShard(new nameserver.ShardMigration(1, 2, 3, 4))
one(copyManager).newMigrateJob(migration.fromThrift) willReturn migrateJob
one(copyManager).scheduler willReturn scheduler
one(migrateJob).start(nameServer, scheduler)
}
manager.migrate_shard(new thrift.ShardMigration(1, 2, 3, 4))

manager.migrate_shard(migration)
}

"finish_migration" in {
val sourceShardId = 1
val destinationShardId = 2
val writeOnlyShardId = 3
val replicatingShardId = 4

expect {
one(nameServer).finishMigration(new nameserver.ShardMigration(1, 2, 3, 4))
one(nameServer).removeChildShard(writeOnlyShardId, destinationShardId)
one(nameServer).replaceChildShard(replicatingShardId, destinationShardId)
one(nameServer).replaceForwarding(replicatingShardId, destinationShardId)
one(nameServer).deleteShard(replicatingShardId)
one(nameServer).deleteShard(writeOnlyShardId)
one(nameServer).deleteShard(sourceShardId)
}
manager.finish_migration(new thrift.ShardMigration(1, 2, 3, 4))

val migration = new thrift.ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId)
manager.finish_migration(migration)
}

"set_forwarding" in {
Expand Down

0 comments on commit e767fc0

Please sign in to comment.