Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

thrift support for shard splitting and other fancy copy-like operations

  • Loading branch information...
commit 447689355fc6e3076153ff9e385402cece56402a 1 parent bd1ea66
Kyle Maxwell authored
View
45 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
@@ -7,22 +7,51 @@ import com.twitter.gizzard.thrift.conversions.LinkInfo._
import com.twitter.gizzard.thrift.conversions.ShardId._
import com.twitter.gizzard.thrift.conversions.ShardInfo._
import com.twitter.gizzard.thrift.conversions.Forwarding._
+import com.twitter.gizzard.thrift.conversions.CopyDestination._
import com.twitter.gizzard.thrift.conversions.Host._
import com.twitter.gizzard.shards._
-import com.twitter.gizzard.scheduler.{CopyDestination, CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler}
+import com.twitter.gizzard.scheduler.{CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler}
+import com.twitter.gizzard.scheduler.{CopyDestination => SCopyDestination}
import com.twitter.gizzard.nameserver._
import net.lag.logging.Logger
+import collection.mutable
import java.util.{List => JList}
-class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S], copier: CopyJobFactory[S], scheduler: PrioritizingJobScheduler[J], copyScheduler: JobScheduler[JsonJob]) extends Manager.Iface {
+class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S], copier: CopyJobFactory[S], alternateCopiers: Seq[CopyJobFactory[S]], scheduler: PrioritizingJobScheduler[J], copyScheduler: JobScheduler[JsonJob]) extends Manager.Iface {
+
+ // back-compat constructor
+ def this(nameServer: NameServer[S], copier: CopyJobFactory[S], scheduler: PrioritizingJobScheduler[J], copyScheduler: JobScheduler[JsonJob]) = this(nameServer, copier, Seq.empty, scheduler, copyScheduler)
+
+ val defaultCopier = copier
+ val copiers = mutable.Map(copier.getClass.getName -> copier)
+ alternateCopiers.foreach(addCopyFactory(_))
+
+ def copierForName(name: String) = {
+ copiers.getOrElse(name, throw new thrift.GizzardException("CopyFactory not found: " + name))
+ }
+
+ def addCopyFactory(copier: CopyJobFactory[S]): Unit = {
+ addCopyFactory(copier.getClass.getName, copier)
+ }
+
+ def addCopyFactory(name: String, copier: CopyJobFactory[S]): Unit = {
+ copiers(name) = copier
+ }
+
val log = Logger.get(getClass.getName)
def wrapEx[A](f: => A): A = try { f } catch {
- case ex: Throwable => throw new thrift.GizzardException(ex.getMessage)
+ case ex: thrift.GizzardException => throw ex
+ case ex: Throwable => {
+ if(ex.getMessage != null) {
+ throw new thrift.GizzardException(ex.getMessage)
+ } else {
+ throw new thrift.GizzardException(ex.toString)
+ }
+ }
}
-
def reload_config() = wrapEx {
log.info("Reloading forwardings...")
nameServer.reload()
@@ -94,8 +123,14 @@ class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S],
def mark_shard_busy(id: ShardId, busy: Int) = {
wrapEx(nameServer.markShardBusy(id.fromThrift, busy.fromThrift))
}
+
def copy_shard(sourceId: ShardId, destinationId: ShardId) = {
- wrapEx(copyScheduler.put(copier(sourceId.fromThrift, List(CopyDestination(destinationId.fromThrift, None)))))
+ wrapEx(copyScheduler.put(copier(sourceId.fromThrift, List(SCopyDestination(destinationId.fromThrift, None)))))
+ }
+
+ def multicopy_shard(factoryName: String, sourceId: ShardId, thriftDestinations: java.util.List[thrift.CopyDestination]) = {
+ val destinations = thriftDestinations.toSeq.toList.map(_.fromThrift)
+ wrapEx(copyScheduler.put(copierForName(factoryName)(sourceId.fromThrift, destinations)))
}
def dump_nameserver(tableId: Int) = wrapEx(nameServer.dumpStructure(tableId).toThrift)
View
22 src/main/scala/com/twitter/gizzard/thrift/conversions/CopyDestination.scala
@@ -0,0 +1,22 @@
+package com.twitter.gizzard.thrift.conversions
+
+import com.twitter.gizzard.thrift.conversions.ShardId._
+
+object CopyDestination {
+ class RichCopyDestination(dest: scheduler.CopyDestination) {
+ def toThrift = {
+ val out = new thrift.CopyDestination(dest.shardId.toThrift)
+ dest.baseId.foreach { out.setBase_id(_) }
+ out
+ }
+ }
+ implicit def destToRichCopyDestination(dest: scheduler.CopyDestination) = new RichCopyDestination(dest)
+
+ class RichThriftCopyDestination(dest: thrift.CopyDestination) {
+ def fromThrift = {
+ val baseId = if(dest.isSetBase_id) Some(dest.base_id) else None
+ new scheduler.CopyDestination(dest.shard_id.fromThrift, baseId)
+ }
+ }
+ implicit def thriftCopyDestinationToRichCopyDestination(dest: thrift.CopyDestination) = new RichThriftCopyDestination(dest)
+}
View
7 src/main/thrift/Manager.thrift
@@ -12,6 +12,11 @@ struct ShardId {
2: string table_prefix
}
+struct CopyDestination {
+ 1: ShardId shard_id
+ 2: optional i64 base_id
+}
+
struct ShardInfo {
1: ShardId id
2: string class_name
@@ -90,6 +95,8 @@ service Manager {
void mark_shard_busy(1: ShardId id, 2: i32 busy) throws(1: GizzardException ex)
void copy_shard(1: ShardId source_id, 2: ShardId destination_id) throws(1: GizzardException ex)
+ void multicopy_shard(1: string factoryName, 2: ShardId source_id, 3: list<CopyDestination> destinations) throws(1: GizzardException ex)
+
NameserverState dump_nameserver(1: i32 table_id) throws(1: GizzardException ex)
// job scheduler management
View
23 src/test/scala/com/twitter/gizzard/thrift/ManagerServiceSpec.scala
@@ -5,8 +5,10 @@ import org.specs.Specification
import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.thrift.conversions.ShardId._
import com.twitter.gizzard.thrift.conversions.ShardInfo._
+import com.twitter.gizzard.thrift.conversions.CopyDestination._
import shards.{Busy, Shard}
-import scheduler.{CopyJob, CopyDestination, CopyJobFactory, JobScheduler, PrioritizingJobScheduler, JsonJob}
+import scheduler.{CopyJob, CopyJobFactory, JobScheduler, PrioritizingJobScheduler, JsonJob}
+import scheduler.{CopyDestination => SCopyDestination}
@@ -110,7 +112,7 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas
"copy_shard" in {
val shardId1 = new shards.ShardId("hostname1", "table1")
val shardId2 = new shards.ShardId("hostname2", "table2")
- val dests = List(CopyDestination(shardId2, None))
+ val dests = List(SCopyDestination(shardId2, None))
val copyJob = mock[CopyJob[Shard]]
expect {
@@ -121,6 +123,23 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas
manager.copy_shard(shardId1.toThrift, shardId2.toThrift)
}
+ "multicopy_shard" in {
+ val shardId1 = new shards.ShardId("hostname1", "table1")
+ val shardId2a = new shards.ShardId("hostname2", "table2a")
+ val shardId2b = new shards.ShardId("hostname2", "table2b")
+ val dests = List(SCopyDestination(shardId2a, Some(0)), SCopyDestination(shardId2b, Some(100)))
+ val copyJob = mock[CopyJob[Shard]]
+
+ expect {
+ one(copier).apply(shardId1, dests) willReturn copyJob
+ one(copyScheduler).put(copyJob)
+ }
+
+ manager.addCopyFactory("fake.job.Name", copier)
+ manager.multicopy_shard("fake.job.Name", shardId1.toThrift, dests.map(_.toThrift).toJavaList)
+ }
+
+
"set_forwarding" in {
expect {
one(nameServer).setForwarding(forwarding)
Please sign in to comment.
Something went wrong with that request. Please try again.