Skip to content
This repository
Browse code

split implementation

  • Loading branch information...
commit 9aac7378fa647a54391d2e2c696725d4995d51fe 1 parent 74bb5ce
authored December 19, 2010
377  src/main/scala/com/twitter/flockdb/jobs/Split.scala
... ...
@@ -1,253 +1,124 @@
1  
-// /*
2  
-//  * Copyright 2010 Twitter, Inc.
3  
-//  *
4  
-//  * Licensed under the Apache License, Version 2.0 (the "License"); you may
5  
-//  * not use this file except in compliance with the License. You may obtain
6  
-//  * a copy of the License at
7  
-//  *
8  
-//  *     http://www.apache.org/licenses/LICENSE-2.0
9  
-//  *
10  
-//  * Unless required by applicable law or agreed to in writing, software
11  
-//  * distributed under the License is distributed on an "AS IS" BASIS,
12  
-//  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  
-//  * See the License for the specific language governing permissions and
14  
-//  * limitations under the License.
15  
-//  */
16  
-// 
17  
-// package com.twitter.flockdb.jobs
18  
-// 
19  
-// import com.twitter.gizzard.scheduler._
20  
-// import com.twitter.gizzard.shards.ShardId
21  
-// import com.twitter.gizzard.nameserver.NameServer
22  
-// import com.twitter.ostrich.Stats
23  
-// import com.twitter.util.TimeConversions._
24  
-// import conversions.Numeric._
25  
-// import shards.Shard
26  
-// 
27  
-// 
28  
-// object Split {
29  
-//   type SplitCursor = (Cursor, Cursor)
30  
-// 
31  
-//   val START = (Cursor.Start, Cursor.Start)
32  
-//   val END = (Cursor.End, Cursor.End)
33  
-//   val COUNT = 10000
34  
-// }
35  
-// 
36  
-// case class Destination(shardId: ShardId, baseId: Long)
37  
-// 
38  
-// trait DestinationParser {
39  
-//   def parseDestinations(attributes: Map[String, Any], baseDestinationShard: ShardId) = {
40  
-//     destinations = new ListBuffer[Destination]
41  
-//     destinations += Destination(base, 0)
42  
-//     var i = 0
43  
-//     while(attributes.contains("destination_" + i + "_hostname")) {
44  
-//       val prefix = "destination_" + i
45  
-//       val baseId = attributes(prefix + "_base_id").asInstanceOf[AnyVal].toLong
46  
-//       val shardId = ShardId(attributes(prefix + "_shard_hostname").toString, attributes(prefix + "_shard_table_prefix").toString)
47  
-//       destinations += Destination(baseId, shardId)
48  
-//       i += 1
49  
-//     }
50  
-//     
51  
-//     destinations.toList
52  
-//   }
53  
-// }
54  
-// 
55  
-// class SplitParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
56  
-//       extends CopyJobParser[Shard] with DestinationParser {
57  
-//   def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
58  
-//     val cursor = (Cursor(attributes("cursor1").asInstanceOf[AnyVal].toLong),
59  
-//                   Cursor(attributes("cursor2").asInstanceOf[AnyVal].toLong))
60  
-//     
61  
-//     val destinations = parseDestinations(attributes, destinationId)            
62  
-//     new Split(sourceId, destinations, cursor, count, nameServer, scheduler)
63  
-//   }
64  
-// }
65  
-// 
66  
-// class Split(sourceShardId: ShardId, destinations: List[Destination], cursor: Copy.CopyCursor,
67  
-//            count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
68  
-//       extends CopyJob[Shard](sourceShardId, destinations(0).shardId, count, nameServer, scheduler) {
69  
-//         
70  
-//   def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
71  
-//     val (items, newCursor) = sourceShard.selectAll(cursor, count)
72  
-//     destinationShard.writeCopies(items)
73  
-//     Stats.incr("edges-copy", items.size)
74  
-//     if (newCursor == Copy.END) {
75  
-//       None
76  
-//     } else {
77  
-//       Some(new Copy(sourceShardId, destinationShardId, newCursor, count, nameServer, scheduler))
78  
-//     }
79  
-//   }
80  
-// 
81  
-//   def serialize = Map("cursor1" -> cursor._1.position, "cursor2" -> cursor._2.position)
82  
-// }
83  
-// 
84  
-// object MetadataCopy {
85  
-//   type CopyCursor = Cursor
86  
-//   val START = Cursor.Start
87  
-//   val END = Cursor.End
88  
-// }
89  
-// 
90  
-// class MetadataCopyParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
91  
-//       extends CopyJobParser[Shard] {
92  
-//   def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
93  
-//     val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
94  
-//     new MetadataCopy(sourceId, destinationId, cursor, count, nameServer, scheduler)
95  
-//   }
96  
-// }
97  
-// 
98  
-// class MetadataCopy(sourceShardId: ShardId, destinationShardId: ShardId, cursor: MetadataCopy.CopyCursor,
99  
-//                    count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
100  
-//       extends CopyJob[Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) {
101  
-//   def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
102  
-//     val (items, newCursor) = sourceShard.selectAllMetadata(cursor, count)
103  
-//     items.foreach { destinationShard.writeMetadata(_) }
104  
-//     Stats.incr("edges-copy", items.size)
105  
-//     if (newCursor == MetadataCopy.END)
106  
-//       Some(new Copy(sourceShardId, destinationShardId, Copy.START, Copy.COUNT, nameServer, scheduler))
107  
-//     else
108  
-//       Some(new MetadataCopy(sourceShardId, destinationShardId, newCursor, count, nameServer, scheduler))
109  
-//   }
110  
-// 
111  
-//   def serialize = Map("cursor" -> cursor.position)
112  
-// }
113  
-// 
114  
-// 
115  
-// // 
116  
-// // /*
117  
-// //  * Copyright 2010 Twitter, Inc.
118  
-// //  *
119  
-// //  * Licensed under the Apache License, Version 2.0 (the "License"); you may
120  
-// //  * not use this file except in compliance with the License. You may obtain
121  
-// //  * a copy of the License at
122  
-// //  *
123  
-// //  *     http://www.apache.org/licenses/LICENSE-2.0
124  
-// //  *
125  
-// //  * Unless required by applicable law or agreed to in writing, software
126  
-// //  * distributed under the License is distributed on an "AS IS" BASIS,
127  
-// //  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
128  
-// //  * See the License for the specific language governing permissions and
129  
-// //  * limitations under the License.
130  
-// //  */
131  
-// // 
132  
-// // package com.twitter.flockdb.jobs
133  
-// // 
134  
-// // import java.util.TreeMap
135  
-// // import collection.mutable.ListBuffer
136  
-// // import com.twitter.gizzard.scheduler._
137  
-// // import com.twitter.gizzard.shards.ShardId
138  
-// // import com.twitter.gizzard.nameserver.{NameServer}
139  
-// // import com.twitter.ostrich.Stats
140  
-// // import com.twitter.util.TimeConversions._
141  
-// // import conversions.Numeric._
142  
-// // import shards.Shard
143  
-// // 
144  
-// // object Split {
145  
-// //   type SplitCursor = (Cursor, Cursor)
146  
-// // 
147  
-// //   val START = (Cursor.Start, Cursor.Start)
148  
-// //   val END = (Cursor.End, Cursor.End)
149  
-// //   val COUNT = 10000
150  
-// // }
151  
-// // 
152  
-// // 
153  
-// // case class IdForwarding(baseId: Long, shardId: ShardId)
154  
-// // case class Forwarding(shard: Shard, baseId: Long)
155  
-// // 
156  
-// // class SplitFactory(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob]) {
157  
-// //   def apply(sourceShardId: ShardId, destinations: List[IdForwarding]) =
158  
-// //     new MetadataSplit(sourceShardId, destinations, MetadataCopy.START, Copy.COUNT,
159  
-// //                      nameServer, scheduler)
160  
-// // }
161  
-// // 
162  
-// // class SplitParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
163  
-// //       extends JsonJobParser {
164  
-// //   def apply(attributes: Map[String, Any]): JsonJob = {
165  
-// //     val sourceId = ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString)
166  
-// //     val count = attributes("count").asInstanceOf[{def toInt: Int}].toInt
167  
-// //     
168  
-// //     var i = 0
169  
-// //     val destinations = new ListBuffer[IdForwarding]
170  
-// //     while(attributes.contains("destination_" + i + "_hostname")) {
171  
-// //       val prefix = "destination_" + i
172  
-// //       val baseId = attributes(prefix + "_base_id").asInstanceOf[{def toInt: Int}].toInt
173  
-// //       val shardId = ShardId(attributes(prefix + "_shard_hostname").toString, attributes(prefix + "_shard_table_prefix").toString)
174  
-// //       destinations += IdForwarding(baseId, shardId)
175  
-// //       i += 1
176  
-// //     }
177  
-// // 
178  
-// //     val cursor = (Cursor(attributes("cursor1").asInstanceOf[AnyVal].toLong),
179  
-// //                   Cursor(attributes("cursor2").asInstanceOf[AnyVal].toLong))
180  
-// //     new Split(sourceId, destinations.toList, cursor, count, nameServer, scheduler)
181  
-// //   }
182  
-// // }
183  
-// // 
184  
-// // class Split(sourceShardId: ShardId, destinations: List[IdForwarding], cursor: Split.SplitCursor,
185  
-// //            count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
186  
-// //       extends JsonJob {
187  
-// //   def copyPage(sourceShard: Shard, destinationShards: List[Forwarding], count: Int) = {
188  
-// //     val (items, newCursor) = sourceShard.selectAll(cursor, count)
189  
-// //     
190  
-// //     val byBaseIds = new TreeMap[Long, (Shard, ListBuffer[Edge])]()
191  
-// //     destinationShards.foreach { d => 
192  
-// //       byBaseIds.put(d.baseId, ((d.shard, new ListBuffer[Edge])))
193  
-// //     }
194  
-// //     
195  
-// //     items.foreach { item => 
196  
-// //       byBaseIds.floorEntry(nameServer.mappingFunction(item.sourceId)).getValue._2 += item
197  
-// //     }
198  
-// //     
199  
-// //     val iter = byBaseIds.values.iterator
200  
-// //     while(iter.hasNext) {
201  
-// //       val (dest, list) = iter.next
202  
-// //       dest.writeCopies(list)
203  
-// //       Stats.incr("edges-split", list.size)
204  
-// //     }
205  
-// //     
206  
-// //     if (newCursor == Split.END) {
207  
-// //       None
208  
-// //     } else {
209  
-// //       Some(new Split(sourceShardId, destinations, newCursor, count, nameServer, scheduler))
210  
-// //     }
211  
-// //   }
212  
-// // 
213  
-// //   def serialize = Map("cursor1" -> cursor._1.position, "cursor2" -> cursor._2.position)
214  
-// // }
215  
-// // 
216  
-// // object MetadataSplit {
217  
-// //   type SplitCursor = Cursor
218  
-// //   val START = Cursor.Start
219  
-// //   val END = Cursor.End
220  
-// // }
221  
-// // 
222  
-// // class MetadataSplitParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
223  
-// //       extends JsonJobParser {
224  
-// //   def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinations: List[IdForwarding], count: Int) = {
225  
-// //     val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
226  
-// //     new MetadataSplit(sourceId, destinations, cursor, count, nameServer, scheduler)
227  
-// //   }
228  
-// // }
229  
-// // 
230  
-// // class MetadataSplit(sourceShardId: ShardId, destinations: List[IdForwarding], cursor: MetadataSplit.SplitCursor,
231  
-// //                    count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
232  
-// //       extends Job {
233  
-// //   def copyPage(sourceShard: Shard, destinationShards: List[Forwarding], count: Int) = {
234  
-// //     val (items, newCursor) = sourceShard.selectAllMetadata(cursor, count)
235  
-// //     
236  
-// //     val byBaseIds = new TreeMap[Long, Shard]()
237  
-// //     destinationShards.foreach { d => byBaseIds.put(d.baseId, d.shard) }
238  
-// //     
239  
-// //     items.foreach { item => 
240  
-// //       val shard = byBaseIds.floorEntry(nameServer.mappingFunction(item.sourceId)).getValue
241  
-// //       shard.writeMetadata(item)
242  
-// //     }
243  
-// //     
244  
-// //     Stats.incr("edges-metadata-split", items.size)
245  
-// //     if (newCursor == MetadataSplit.END)
246  
-// //       Some(new Split(sourceShardId, destinations, Split.START, Split.COUNT, nameServer, scheduler))
247  
-// //     else
248  
-// //       Some(new MetadataSplit(sourceShardId, destinations, newCursor, count, nameServer, scheduler))
249  
-// //   }
250  
-// // 
251  
-// //   def serialize = Map("cursor" -> cursor.position)
252  
-// // }
253  
-// // 
  1
+/*
  2
+ * Copyright 2010 Twitter, Inc.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
  5
+ * not use this file except in compliance with the License. You may obtain
  6
+ * a copy of the License at
  7
+ *
  8
+ *     http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License.
  15
+ */
  16
+
  17
+package com.twitter.flockdb.jobs
  18
+
  19
+import java.util.TreeMap
  20
+import collection.mutable.ListBuffer
  21
+import com.twitter.gizzard.scheduler._
  22
+import com.twitter.gizzard.shards.ShardId
  23
+import com.twitter.gizzard.nameserver.NameServer
  24
+import com.twitter.ostrich.Stats
  25
+import com.twitter.util.TimeConversions._
  26
+import conversions.Numeric._
  27
+import shards.Shard
  28
+
  29
+
  30
+object Split {
  31
+  type SplitCursor = (Cursor, Cursor)
  32
+
  33
+  val START = (Cursor.Start, Cursor.Start)
  34
+  val END = (Cursor.End, Cursor.End)
  35
+  val COUNT = 10000
  36
+}
  37
+
  38
+class SplitFactory(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
  39
+      extends CopyJobFactory[Shard] {
  40
+  def apply(sourceShardId: ShardId, destinations: List[CopyDestination]) =
  41
+    new MetadataSplit(sourceShardId, destinations, MetadataSplit.START, Split.COUNT,
  42
+                     nameServer, scheduler)
  43
+}
  44
+
  45
+class SplitParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
  46
+      extends CopyJobParser[Shard] {
  47
+  def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinations: List[CopyDestination], count: Int) = {
  48
+    val cursor = (Cursor(attributes("cursor1").asInstanceOf[AnyVal].toLong),
  49
+                  Cursor(attributes("cursor2").asInstanceOf[AnyVal].toLong))
  50
+    new Split(sourceId, destinations, cursor, count, nameServer, scheduler)
  51
+  }
  52
+}
  53
+
  54
+class Split(sourceShardId: ShardId, destinations: List[CopyDestination], cursor: Split.SplitCursor,
  55
+           count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
  56
+      extends CopyJob[Shard](sourceShardId, destinations, count, nameServer, scheduler) {
  57
+  def copyPage(sourceShard: Shard, destinationShards: List[CopyDestinationShard[Shard]], count: Int) = {
  58
+    val (items, newCursor) = sourceShard.selectAll(cursor, count)
  59
+
  60
+    val byBaseIds = new TreeMap[Long, (Shard, ListBuffer[Edge])]()
  61
+    destinationShards.foreach { d =>
  62
+      byBaseIds.put(d.baseId.getOrElse(0), ((d.shard, new ListBuffer[Edge])))
  63
+    }
  64
+
  65
+    println("WHAT! " + byBaseIds)
  66
+
  67
+    items.foreach { item =>
  68
+      byBaseIds.floorEntry(nameServer.mappingFunction(item.sourceId)).getValue._2 += item
  69
+    }
  70
+
  71
+    val iter = byBaseIds.values.iterator
  72
+    while(iter.hasNext) {
  73
+      val (dest, list) = iter.next
  74
+      dest.writeCopies(list.toList)
  75
+    }
  76
+
  77
+    Stats.incr("edges-split", items.size)
  78
+    if (newCursor == Split.END) {
  79
+      None
  80
+    } else {
  81
+      Some(new Split(sourceShardId, destinations, newCursor, count, nameServer, scheduler))
  82
+    }
  83
+  }
  84
+
  85
+  def serialize = Map("cursor1" -> cursor._1.position, "cursor2" -> cursor._2.position)
  86
+}
  87
+
  88
+object MetadataSplit {
  89
+  type SplitCursor = Cursor
  90
+  val START = Cursor.Start
  91
+  val END = Cursor.End
  92
+}
  93
+
  94
+class MetadataSplitParser(nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
  95
+      extends CopyJobParser[Shard] {
  96
+  def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinations: List[CopyDestination], count: Int) = {
  97
+    val cursor = Cursor(attributes("cursor").asInstanceOf[AnyVal].toLong)
  98
+    new MetadataSplit(sourceId, destinations, cursor, count, nameServer, scheduler)
  99
+  }
  100
+}
  101
+
  102
+class MetadataSplit(sourceShardId: ShardId, destinations: List[CopyDestination], cursor: MetadataSplit.SplitCursor,
  103
+                   count: Int, nameServer: NameServer[Shard], scheduler: JobScheduler[JsonJob])
  104
+      extends CopyJob[Shard](sourceShardId, destinations, count, nameServer, scheduler) {
  105
+  def copyPage(sourceShard: Shard, destinationShards: List[CopyDestinationShard[Shard]], count: Int) = {
  106
+    val (items, newCursor) = sourceShard.selectAllMetadata(cursor, count)
  107
+
  108
+    val byBaseIds = new TreeMap[Long, Shard]()
  109
+    destinationShards.foreach { d => byBaseIds.put(d.baseId.getOrElse(0), d.shard) }
  110
+
  111
+    items.foreach { item =>
  112
+      val shard = byBaseIds.floorEntry(nameServer.mappingFunction(item.sourceId)).getValue
  113
+      shard.writeMetadata(item)
  114
+    }
  115
+
  116
+    Stats.incr("edges-split", items.size)
  117
+    if (newCursor == MetadataSplit.END)
  118
+      Some(new Split(sourceShardId, destinations, Split.START, Split.COUNT, nameServer, scheduler))
  119
+    else
  120
+      Some(new MetadataSplit(sourceShardId, destinations, newCursor, count, nameServer, scheduler))
  121
+  }
  122
+
  123
+  def serialize = Map("cursor" -> cursor.position)
  124
+}
96  src/test/scala/com/twitter/flockdb/unit/SplitSpec.scala
... ...
@@ -0,0 +1,96 @@
  1
+/*
  2
+ * Copyright 2010 Twitter, Inc.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
  5
+ * not use this file except in compliance with the License. You may obtain
  6
+ * a copy of the License at
  7
+ *
  8
+ *     http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS,
  12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ * See the License for the specific language governing permissions and
  14
+ * limitations under the License.
  15
+ */
  16
+
  17
+package com.twitter.flockdb.unit
  18
+
  19
+import com.twitter.gizzard.nameserver.NameServer
  20
+import com.twitter.gizzard.scheduler._
  21
+import com.twitter.gizzard.shards.{Busy, ShardId, ShardTimeoutException}
  22
+import com.twitter.gizzard.thrift.conversions.Sequences._
  23
+import com.twitter.util.Time
  24
+import com.twitter.util.TimeConversions._
  25
+import org.specs.mock.{ClassMocker, JMocker}
  26
+import jobs._
  27
+import shards.{Metadata, Shard}
  28
+
  29
+class SplitSpec extends ConfiguredSpecification with JMocker with ClassMocker {
  30
+  val shard1Id = ShardId("test", "shard1")
  31
+  val shard2aId = ShardId("test", "shard2")
  32
+  val shard2bId = ShardId("test", "shard2")
  33
+  val dests = List(CopyDestination(shard2aId, Some(0)), CopyDestination(shard2bId, Some(50)))
  34
+  val count = 2300
  35
+  val shard1 = mock[Shard]
  36
+  val shard2a = mock[Shard]
  37
+  val shard2b = mock[Shard]
  38
+  val nameServer = mock[NameServer[Shard]]
  39
+  val scheduler = mock[JobScheduler[JsonJob]]
  40
+
  41
+  "Split" should {
  42
+    val cursor1 = Cursor(0)
  43
+    val cursor2 = Cursor(0)
  44
+
  45
+    "apply" in {
  46
+      val job = new Split(shard1Id, dests, (cursor1, cursor2), count, nameServer, scheduler)
  47
+      val edgeA = new Edge(1L, 2L, 3L, Time.now, 5, State.Normal)
  48
+      val edgeB = new Edge(2L, 2L, 3L, Time.now, 5, State.Normal)
  49
+      val edgeC = new Edge(100L, 2L, 3L, Time.now, 5, State.Normal)
  50
+
  51
+      "write the correct partial to each shard" >> {
  52
+        expect {
  53
+          allowing(nameServer).mappingFunction willReturn ((a: Long) => a)
  54
+          one(nameServer).markShardBusy(shard2aId, Busy.Busy)
  55
+          one(nameServer).markShardBusy(shard2bId, Busy.Busy)
  56
+          one(nameServer).findShardById(shard1Id) willReturn shard1
  57
+          one(nameServer).findShardById(shard2aId) willReturn shard2a
  58
+          one(nameServer).findShardById(shard2bId) willReturn shard2b
  59
+          one(shard1).selectAll((cursor1, cursor2), count) willReturn (List(edgeA, edgeB, edgeC), (cursor1, Cursor(cursor2.position + 1)))
  60
+          one(shard2a).writeCopies(List(edgeA, edgeB))
  61
+          one(shard2b).writeCopies(List(edgeC))
  62
+          one(scheduler).put(new Split(shard1Id, dests, (cursor1, Cursor(cursor2.position + 1)), count, nameServer, scheduler))
  63
+        }
  64
+        job.apply()
  65
+      }
  66
+    }
  67
+
  68
+  }
  69
+
  70
+  "MetadataSplit" should {
  71
+    val cursor = Cursor(0)
  72
+    val job = new MetadataSplit(shard1Id, dests, cursor, count, nameServer, scheduler)
  73
+    val metaA = Metadata(1L, State.Normal, 1, Time.now)
  74
+    val metaB = Metadata(2L, State.Normal, 1, Time.now)
  75
+    val metaC = Metadata(100L, State.Normal, 1, Time.now)
  76
+
  77
+    "apply" in {
  78
+      "write the correct partial to each shard" >> {
  79
+        expect {
  80
+          allowing(nameServer).mappingFunction willReturn ((a: Long) => a)
  81
+          one(nameServer).markShardBusy(shard2aId, Busy.Busy)
  82
+          one(nameServer).markShardBusy(shard2bId, Busy.Busy)
  83
+          one(nameServer).findShardById(shard1Id) willReturn shard1
  84
+          one(nameServer).findShardById(shard2aId) willReturn shard2a
  85
+          one(nameServer).findShardById(shard2bId) willReturn shard2b
  86
+          one(shard1).selectAllMetadata(cursor, count) willReturn (List(metaA, metaB, metaC), Cursor(cursor.position + 1))
  87
+          one(shard2a).writeMetadata(metaA)
  88
+          one(shard2a).writeMetadata(metaB)
  89
+          one(shard2b).writeMetadata(metaC)
  90
+          one(scheduler).put(new MetadataCopy(shard1Id, dests, Cursor(cursor.position + 1), count, nameServer, scheduler))
  91
+        }
  92
+        job.apply()
  93
+      }
  94
+    }
  95
+  }
  96
+}

0 notes on commit 9aac737

Please sign in to comment.
Something went wrong with that request. Please try again.