Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Multicopy #41

Open
wants to merge 6 commits into from

2 participants

@fizx

I added splitting integration tests. Re-pulling against master, not multi_cluster

@robey

just import mutable, not both mutable and mutable.ListBuffer.

space on line 45, "while ("

won't this have the side effect of making old copy jobs fail? (they won't have a destination_shard that the copy job can parse.) i guess this changes the api too. i guess that's okay since we're bumping to 1.6 and all the servers will have to change anyway.

line 49 is eeeeeevil! you know that will cause a reflection call, right? :) i guess it doesn't matter in this rarely-called code, but eeeeeeevil!

looks good aside from the 2 code style fixes.

@fizx

Robey: which line 49?

@robey

the one with the cast-to-existential-type. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 19, 2010
  1. (compiling, not tested) let copy job take multiple destination shards…

    Kyle Maxwell authored
    … with forwarding offsets
  2. fix mark busy bug, clean up tests

    Kyle Maxwell authored
  3. new build version

    Kyle Maxwell authored
  4. fix formatting

    Kyle Maxwell authored
  5. fix formatting

    Kyle Maxwell authored
Commits on Jan 4, 2011
  1. integration tests and merged master

    Kyle Maxwell authored
This page is out of date. Refresh to see the latest.
View
2  project/build.properties
@@ -3,7 +3,7 @@
project.organization=com.twitter
project.name=gizzard
sbt.version=0.7.4
-project.version=1.6-mc-SNAPSHOT
+project.version=1.6-splittable-SNAPSHOT
def.scala.version=2.7.7
build.scala.versions=2.7.7
project.initialize=false
View
81 src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala
@@ -3,6 +3,8 @@ package com.twitter.gizzard.scheduler
import com.twitter.ostrich.Stats
import com.twitter.util.TimeConversions._
import net.lag.logging.Logger
+import collection.mutable.ListBuffer
+import collection.mutable
import nameserver.{NameServer, NonExistentShard}
import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException}
@@ -14,7 +16,10 @@ object CopyJob {
* A factory for creating a new copy job (with default count and a starting cursor) from a source
* and destination shard ID.
*/
-trait CopyJobFactory[S <: Shard] extends ((ShardId, ShardId) => CopyJob[S])
+trait CopyJobFactory[S <: Shard] extends ((ShardId, List[CopyDestination]) => CopyJob[S])
+
+case class CopyDestination(shardId: ShardId, baseId: Option[Long])
+case class CopyDestinationShard[S](shard: S, baseId: Option[Long])
/**
* A parser that creates a copy job out of json. The basic attributes (source shard ID, destination)
@@ -23,14 +28,35 @@ trait CopyJobFactory[S <: Shard] extends ((ShardId, ShardId) => CopyJob[S])
*/
trait CopyJobParser[S <: Shard] extends JsonJobParser {
def deserialize(attributes: Map[String, Any], sourceId: ShardId,
- destinationId: ShardId, count: Int): CopyJob[S]
+ destinations: List[CopyDestination], count: Int): CopyJob[S]
def apply(attributes: Map[String, Any]): JsonJob = {
+ val sourceId = ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString)
+
deserialize(attributes,
- ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString),
- ShardId(attributes("destination_shard_hostname").toString, attributes("destination_shard_table_prefix").toString),
+ sourceId,
+ parseDestinations(attributes).toList,
attributes("count").asInstanceOf[{def toInt: Int}].toInt)
}
+
+ private def parseDestinations(attributes: Map[String, Any]) = {
+ val destinations = new ListBuffer[CopyDestination]
+ var i = 0
+ while(attributes.contains("destination_" + i + "_hostname")) {
+ val prefix = "destination_" + i
+ val baseKey = prefix + "_base_id"
+ val baseId = if (attributes.contains(baseKey)) {
+ Some(attributes(baseKey).asInstanceOf[{def toLong: Long}].toLong)
+ } else {
+ None
+ }
+ val shardId = ShardId(attributes(prefix + "_shard_hostname").toString, attributes(prefix + "_shard_table_prefix").toString)
+ destinations += CopyDestination(shardId, baseId)
+ i += 1
+ }
+
+ destinations.toList
+ }
}
/**
@@ -43,7 +69,7 @@ trait CopyJobParser[S <: Shard] extends JsonJobParser {
* the next chunk of work to do, or None if the entire copying job is complete.
*/
abstract case class CopyJob[S <: Shard](sourceId: ShardId,
- destinationId: ShardId,
+ destinations: List[CopyDestination],
var count: Int,
nameServer: NameServer[S],
scheduler: JobScheduler[JsonJob])
@@ -55,29 +81,50 @@ abstract case class CopyJob[S <: Shard](sourceId: ShardId,
def toMap = {
Map("source_shard_hostname" -> sourceId.hostname,
"source_shard_table_prefix" -> sourceId.tablePrefix,
- "destination_shard_hostname" -> destinationId.hostname,
- "destination_shard_table_prefix" -> destinationId.tablePrefix,
"count" -> count
- ) ++ serialize
+ ) ++ serialize ++ destinationsToMap
+ }
+
+ private def destinationsToMap = {
+ var i = 0
+ val map = mutable.Map[String, Any]()
+ destinations.foreach { destination =>
+ map("destination_" + i + "_hostname") = destination.shardId.hostname
+ map("destination_" + i + "_table_prefix") = destination.shardId.tablePrefix
+ destination.baseId.foreach { id =>
+ map("destination_" + i + "_base_id") = id
+ }
+ i += 1
+ }
+ map
}
def finish() {
- nameServer.markShardBusy(destinationId, shards.Busy.Normal)
+ destinations.foreach { dest =>
+ nameServer.markShardBusy(dest.shardId, shards.Busy.Normal)
+ }
log.info("Copying finished for (type %s) from %s to %s",
- getClass.getName.split("\\.").last, sourceId, destinationId)
+ getClass.getName.split("\\.").last, sourceId, destinations)
Stats.clearGauge(gaugeName)
}
def apply() {
try {
log.info("Copying shard block (type %s) from %s to %s: state=%s",
- getClass.getName.split("\\.").last, sourceId, destinationId, toMap)
+ getClass.getName.split("\\.").last, sourceId, destinations, toMap)
val sourceShard = nameServer.findShardById(sourceId)
- val destinationShard = nameServer.findShardById(destinationId)
- // do this on each iteration, so it happens in the queue and can be retried if the db is busy:
- nameServer.markShardBusy(destinationId, shards.Busy.Busy)
- val nextJob = copyPage(sourceShard, destinationShard, count)
+ val destinationShards = destinations.map { dest =>
+ CopyDestinationShard[S](nameServer.findShardById(dest.shardId), dest.baseId)
+ }
+
+ // do this on each iteration, so it happens in the queue and can be retried if the db is busy:
+ destinations.foreach { dest =>
+ nameServer.markShardBusy(dest.shardId, shards.Busy.Busy)
+ }
+
+ val nextJob = copyPage(sourceShard, destinationShards, count)
+
nextJob match {
case Some(job) =>
incrGauge
@@ -106,10 +153,10 @@ abstract case class CopyJob[S <: Shard](sourceId: ShardId,
}
private def gaugeName = {
- "x-copying-" + sourceId + "-" + destinationId
+ "x-copying-" + sourceId + "-" + destinations
}
- def copyPage(sourceShard: S, destinationShard: S, count: Int): Option[CopyJob[S]]
+ def copyPage(sourceShard: S, destinationShards: List[CopyDestinationShard[S]], count: Int): Option[CopyJob[S]]
def serialize: Map[String, Any]
}
View
6 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
@@ -9,7 +9,7 @@ import com.twitter.gizzard.thrift.conversions.ShardInfo._
import com.twitter.gizzard.thrift.conversions.Forwarding._
import com.twitter.gizzard.thrift.conversions.Host._
import com.twitter.gizzard.shards._
-import com.twitter.gizzard.scheduler.{CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler}
+import com.twitter.gizzard.scheduler.{CopyDestination, CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler}
import com.twitter.gizzard.nameserver._
import net.lag.logging.Logger
import java.util.{List => JList}
@@ -95,9 +95,9 @@ class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S],
wrapEx(nameServer.markShardBusy(id.fromThrift, busy.fromThrift))
}
def copy_shard(sourceId: ShardId, destinationId: ShardId) = {
- wrapEx(copyScheduler.put(copier(sourceId.fromThrift, destinationId.fromThrift)))
+ wrapEx(copyScheduler.put(copier(sourceId.fromThrift, List(CopyDestination(destinationId.fromThrift, None)))))
}
-
+
def dump_nameserver(tableId: Int) = wrapEx(nameServer.dumpStructure(tableId).toThrift)
View
14 src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala
@@ -22,7 +22,7 @@ trait IntegrationSpecification extends Specification {
trait TestServerFacts {
def enum: Int; def nsDatabaseName: String; def databaseName: String
def basePort: Int; def injectorPort: Int; def managerPort: Int
- def sqlShardInfo: shards.ShardInfo; def forwarding: nameserver.Forwarding
+ def sqlShardInfos: List[shards.ShardInfo]; def forwardings: List[nameserver.Forwarding]
def kestrelQueues: Seq[String]
}
@@ -36,9 +36,11 @@ trait IntegrationSpecification extends Specification {
val basePort = port
val injectorPort = port + 1
val managerPort = port + 2
- val sqlShardInfo = shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
- "TestShard", "int", "int", shards.Busy.Normal)
- val forwarding = nameserver.Forwarding(0, 0, sqlShardInfo.id)
+ val sqlShardInfos = List(shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
+ "TestShard", "int", "int", shards.Busy.Normal))
+
+ val forwardings = List(nameserver.Forwarding(0, 0, sqlShardInfos.first.id))
+
val kestrelQueues = Seq("gizzard_test_"+name+"_high_queue",
"gizzard_test_"+name+"_high_queue_errors",
"gizzard_test_"+name+"_low_queue",
@@ -81,8 +83,8 @@ trait IntegrationSpecification extends Specification {
servers.foreach { s =>
createTestServerDBs(s)
s.nameServer.rebuildSchema()
- s.nameServer.setForwarding(s.forwarding)
- s.nameServer.createShard(s.sqlShardInfo)
+ s.forwardings.foreach { f => s.nameServer.setForwarding(f) }
+ s.sqlShardInfos.foreach { ssi => s.nameServer.createShard(ssi) }
s.nameServer.reload()
}
}
View
95 src/test/scala/com/twitter/gizzard/integration/MulticopyIntegrationSpec.scala
@@ -0,0 +1,95 @@
+package com.twitter.gizzard.integration
+
+import com.twitter.gizzard.thrift.conversions.Sequences._
+import testserver._
+import testserver.config.TestServerConfig
+import testserver.thrift.TestResult
+import java.io.File
+import scheduler.CopyDestination
+import java.util.concurrent.atomic.AtomicInteger
+import org.specs.mock.{ClassMocker, JMocker}
+import net.lag.configgy.{Config => CConfig}
+import com.twitter.util.TimeConversions._
+import com.twitter.gizzard.thrift.{JobInjectorService, TThreadServer, JobInjector}
+import nameserver.{Host, HostStatus, JobRelay}
+
+class MulticopyIntegrationSpec extends IntegrationSpecification with ConfiguredSpecification {
+ override def testServer(i: Int) = {
+ val port = 8000 + (i - 1) * 3
+ val name = "testserver" + i
+ new TestServer(TestServerConfig(name, port)) with TestServerFacts {
+ val enum = i
+ val nsDatabaseName = "gizzard_test_"+name+"_ns"
+ val databaseName = "gizzard_test_"+name
+ val basePort = port
+ val injectorPort = port + 1
+ val managerPort = port + 2
+ val sqlShardInfos = List(
+ shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
+ "TestShard", "int", "int", shards.Busy.Normal),
+ shards.ShardInfo(shards.ShardId("localhost", "t0_1"),
+ "TestShard", "int", "int", shards.Busy.Normal),
+ shards.ShardInfo(shards.ShardId("localhost", "t0_2"),
+ "TestShard", "int", "int", shards.Busy.Normal)
+ )
+ val forwardings = List(
+ nameserver.Forwarding(0, 0, sqlShardInfos.first.id)
+ )
+ val kestrelQueues = Seq("gizzard_test_"+name+"_high_queue",
+ "gizzard_test_"+name+"_high_queue_errors",
+ "gizzard_test_"+name+"_low_queue",
+ "gizzard_test_"+name+"_low_queue_errors")
+ }
+ }
+
+ "Multicopy" should {
+ val server = testServer(1)
+ val client = testServerClient(server)
+
+ doBefore {
+ resetTestServerDBs(server)
+ setupServers(server)
+ server.nameServer.reload()
+ }
+
+ doAfter { stopServers(server) }
+
+ "copy to multiple locations" in {
+ startServers(server)
+ val nameserver = server.nameServer
+
+ val sourceInfo :: dest0info :: dest2info :: _ = server.sqlShardInfos
+
+ client.put(0, "foo")
+ client.put(1, "bar")
+ client.put(2, "baz")
+ client.put(3, "bonk")
+ client.get(3) must eventually(be_==(List(new TestResult(3, "bonk", 1)).toJavaList))
+
+ val dest0 = CopyDestination(dest0info.id, Some(0))
+ val dest2 = CopyDestination(dest2info.id, Some(2))
+
+ val copy = new TestSplitFactory(server.nameServer, server.jobScheduler(Priority.Low.id))(sourceInfo.id, List(dest0, dest2))
+
+ copy()
+
+ val sourceShard = nameserver.findShardById(sourceInfo.id)
+ val dest0Shard = nameserver.findShardById(dest0info.id)
+ val dest2Shard = nameserver.findShardById(dest2info.id)
+
+ dest0Shard.get(0) must eventually(be_==(Some((0, "foo", 1))))
+ dest0Shard.get(1) must eventually(be_==(Some((1, "bar", 1))))
+ dest0Shard.get(2) must eventually(be_==(None))
+ dest0Shard.get(3) must eventually(be_==(None))
+
+ dest2Shard.get(0) must eventually(be_==(None))
+ dest2Shard.get(1) must eventually(be_==(None))
+ dest2Shard.get(2) must eventually(be_==(Some((2, "baz", 1))))
+ dest2Shard.get(3) must eventually(be_==(Some((3, "bonk", 1))))
+ }
+
+
+
+ }
+}
+
View
53 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
@@ -1,10 +1,12 @@
package com.twitter.gizzard.testserver
import java.sql.{ResultSet, SQLException}
+import java.util.TreeMap
import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
import com.twitter.querulous.config.Connection
import com.twitter.querulous.query.SqlQueryTimeoutException
import gizzard.GizzardServer
+import com.twitter.gizzard.scheduler.{CopyDestination, CopyDestinationShard}
import nameserver.NameServer
import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException}
import scheduler.{JobScheduler, JsonJob, CopyJob, CopyJobParser, CopyJobFactory, JsonJobParser, PrioritizingJobScheduler}
@@ -217,31 +219,60 @@ class PutJob(key: Int, value: String, forwarding: Long => TestShard) extends Jso
class TestCopyFactory(ns: NameServer[TestShard], s: JobScheduler[JsonJob])
extends CopyJobFactory[TestShard] {
- def apply(src: ShardId, dest: ShardId) = new TestCopy(src, dest, 0, 500, ns, s)
+ def apply(src: ShardId, dests: List[CopyDestination]) = new TestCopy(src, dests, -1, 500, ns, s)
}
class TestCopyParser(ns: NameServer[TestShard], s: JobScheduler[JsonJob])
extends CopyJobParser[TestShard] {
- def deserialize(m: Map[String, Any], src: ShardId, dest: ShardId, count: Int) = {
+ def deserialize(m: Map[String, Any], src: ShardId, dests: List[CopyDestination], count: Int) = {
val cursor = m("cursor").asInstanceOf[Int]
val count = m("count").asInstanceOf[Int]
- new TestCopy(src, dest, cursor, count, ns, s)
+ new TestCopy(src, dests, cursor, count, ns, s)
}
}
-class TestCopy(srcId: ShardId, destId: ShardId, cursor: Int, count: Int,
+class TestCopy(srcId: ShardId, destinations: List[CopyDestination], cursor: Int, count: Int,
ns: NameServer[TestShard], s: JobScheduler[JsonJob])
-extends CopyJob[TestShard](srcId, destId, count, ns, s) {
- def copyPage(src: TestShard, dest: TestShard, count: Int) = {
+extends CopyJob[TestShard](srcId, destinations, count, ns, s) {
+ def copyPage(src: TestShard, dests: List[CopyDestinationShard[TestShard]], count: Int) = {
val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) }
- if (rows.isEmpty) {
- None
- } else {
- dest.putAll(rows)
- Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s))
+ dests.foreach(_.shard.putAll(rows))
+
+ if (rows.isEmpty) None
+ else Some(new TestCopy(srcId, destinations, rows.last._1, count, ns, s))
+ }
+
+ def serialize = Map("cursor" -> cursor)
+}
+
+class TestSplitFactory(ns: NameServer[TestShard], s: JobScheduler[JsonJob])
+extends CopyJobFactory[TestShard] {
+ def apply(src: ShardId, dests: List[CopyDestination]) = new TestSplit(src, dests, -1, 500, ns, s)
+}
+
+class TestSplit(srcId: ShardId, destinations: List[CopyDestination], cursor: Int, count: Int,
+ ns: NameServer[TestShard], s: JobScheduler[JsonJob])
+extends CopyJob[TestShard](srcId, destinations, count, ns, s) {
+ def copyPage(src: TestShard, dests: List[CopyDestinationShard[TestShard]], count: Int) = {
+ val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) }
+
+ val byBaseIds = new TreeMap[Long, TestShard]()
+ dests.foreach { d => byBaseIds.put(d.baseId.getOrElse(0), d.shard) }
+
+ rows.foreach { row =>
+ val shard = byBaseIds.floorEntry(nameServer.mappingFunction(row._1)).getValue
+ if(shard != null) {
+ shard.put(row._1, row._2)
+ } else {
+ println("wtf: " + row._1 + " mapped to " + nameServer.mappingFunction(row._1) + " returned null")
+ }
}
+
+ if (rows.isEmpty) None
+ else Some(new TestCopy(srcId, destinations, rows.last._1, count, ns, s))
}
def serialize = Map("cursor" -> cursor)
}
+
View
26 src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala
@@ -5,18 +5,19 @@ import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
-class FakeCopy(val sourceShardId: shards.ShardId, val destinationShardId: shards.ShardId, count: Int,
+class FakeCopy(val sourceShardId: shards.ShardId, val dests: List[CopyDestination], count: Int,
nameServer: nameserver.NameServer[shards.Shard], scheduler: JobScheduler[JsonJob])(nextJob: => Option[FakeCopy])
- extends CopyJob[shards.Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) {
+ extends CopyJob[shards.Shard](sourceShardId, dests, count, nameServer, scheduler) {
def serialize = Map("cursor" -> 1)
@throws(classOf[Exception])
- def copyPage(sourceShard: shards.Shard, destinationShard: shards.Shard, count: Int) = nextJob
+ def copyPage(sourceShard: shards.Shard, destinationShards: List[CopyDestinationShard[shards.Shard]], count: Int) = {
+ nextJob
+ }
override def equals(that: Any) = that match {
case that: FakeCopy =>
- this.sourceShardId == that.sourceShardId &&
- this.destinationShardId == that.destinationShardId
+ this.sourceShardId == that.sourceShardId
case _ => false
}
}
@@ -25,11 +26,12 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"CopyJob" should {
val sourceShardId = shards.ShardId("testhost", "1")
val destinationShardId = shards.ShardId("testhost", "2")
+ val destinations = List(CopyDestination(destinationShardId, None))
val count = CopyJob.MIN_COPY + 1
val nextCopy = mock[FakeCopy]
val nameServer = mock[nameserver.NameServer[shards.Shard]]
val jobScheduler = mock[JobScheduler[JsonJob]]
- val makeCopy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(_)
+ val makeCopy = new FakeCopy(sourceShardId, destinations, count, nameServer, jobScheduler)(_)
val shard1 = mock[shards.Shard]
val shard2 = mock[shards.Shard]
@@ -38,8 +40,8 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
copy.toMap mustEqual Map(
"source_shard_hostname" -> sourceShardId.hostname,
"source_shard_table_prefix" -> sourceShardId.tablePrefix,
- "destination_shard_hostname" -> destinationShardId.hostname,
- "destination_shard_table_prefix" -> destinationShardId.tablePrefix,
+ "destination_0_hostname" -> destinationShardId.hostname,
+ "destination_0_table_prefix" -> destinationShardId.tablePrefix,
"count" -> count
) ++ copy.serialize
}
@@ -50,8 +52,8 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
json mustMatch "Copy"
json mustMatch "\"source_shard_hostname\":\"%s\"".format(sourceShardId.hostname)
json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(sourceShardId.tablePrefix)
- json mustMatch "\"destination_shard_hostname\":\"%s\"".format(destinationShardId.hostname)
- json mustMatch "\"destination_shard_table_prefix\":\"%s\"".format(destinationShardId.tablePrefix)
+ json mustMatch "\"destination_0_hostname\":\"%s\"".format(destinationShardId.hostname)
+ json mustMatch "\"destination_0_table_prefix\":\"%s\"".format(destinationShardId.tablePrefix)
json mustMatch "\"count\":" + count
}
@@ -86,7 +88,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
one(jobScheduler).put(copy)
}
-
+
copy.apply()
copy.toMap("count") mustEqual (count * 0.9).toInt
}
@@ -118,7 +120,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"after too many retries" in {
val count = CopyJob.MIN_COPY - 1
- val copy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId))
+ val copy = new FakeCopy(sourceShardId, destinations, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId))
expect {
one(nameServer).findShardById(sourceShardId) willReturn shard1
View
5 ...zard/thrift/ShardManagerServiceSpec.scala → ...r/gizzard/thrift/ManagerServiceSpec.scala
@@ -6,7 +6,7 @@ import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.gizzard.thrift.conversions.ShardId._
import com.twitter.gizzard.thrift.conversions.ShardInfo._
import shards.{Busy, Shard}
-import scheduler.{CopyJob, CopyJobFactory, JobScheduler, PrioritizingJobScheduler, JsonJob}
+import scheduler.{CopyJob, CopyDestination, CopyJobFactory, JobScheduler, PrioritizingJobScheduler, JsonJob}
@@ -110,10 +110,11 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas
"copy_shard" in {
val shardId1 = new shards.ShardId("hostname1", "table1")
val shardId2 = new shards.ShardId("hostname2", "table2")
+ val dests = List(CopyDestination(shardId2, None))
val copyJob = mock[CopyJob[Shard]]
expect {
- one(copier).apply(shardId1, shardId2) willReturn copyJob
+ one(copier).apply(shardId1, dests) willReturn copyJob
one(copyScheduler).put(copyJob)
}
Something went wrong with that request. Please try again.