forked from twitter-archive/Rowz
/
Copy.scala
45 lines (38 loc) · 1.34 KB
/
Copy.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.twitter.rowz
package jobs
import com.twitter.gizzard.nameserver.NameServer
import com.twitter.gizzard.shards.ShardId
import com.twitter.gizzard.scheduler._
import com.twitter.rowz.RowzShard.Cursor
class RowzCopyFactory(nameServer: NameServer[RowzShard], scheduler: JobScheduler, defaultCount: Int = 500)
extends CopyJobFactory[RowzShard] {
def apply(source: ShardId, dest: ShardId) = {
new RowzCopyJob(source, dest, 0, defaultCount, nameServer, scheduler)
}
}
class RowzCopyParser(nameServer: NameServer[RowzShard], scheduler: JobScheduler)
extends CopyJobParser[RowzShard] {
def deserialize(attributes: Map[String, Any], source: ShardId, dest: ShardId, count: Int) = {
val cursor = attributes("cursor").asInstanceOf[Int]
val count = attributes("count").asInstanceOf[Int]
new RowzCopyJob(source, dest, cursor, count, nameServer, scheduler)
}
}
class RowzCopyJob(
sourceId: ShardId,
destinationId: ShardId,
cursor: Cursor,
count: Int,
nameServer: NameServer[RowzShard],
scheduler: JobScheduler)
extends CopyJob[RowzShard] {
def copyPage(source: RowzShard, dest: RowzShard, count: Int) = {
val rows = source.selectAll(cursor, count)
if (rows.isEmpty) {
None
} else {
dest.write(rows)
Some(new RowzCopyJob(sourceId, destinationId, rows.last.id, count, nameServer, scheduler))
}
}
}