-
Notifications
You must be signed in to change notification settings - Fork 362
/
CassandraRDDWriter.scala
100 lines (85 loc) · 3.22 KB
/
CassandraRDDWriter.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package geotrellis.spark.io.cassandra
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs._
import geotrellis.spark.LayerId
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.schemabuilder.SchemaBuilder
import com.datastax.driver.core.DataType._
import com.datastax.driver.core.ResultSet
import com.datastax.driver.core.ResultSetFuture
import org.apache.spark.rdd.RDD
import scalaz.concurrent.Task
import scalaz.stream.{Process, nondeterminism}
import java.nio.ByteBuffer
import java.util.concurrent.Executors
import scala.collection.JavaConversions._
object CassandraRDDWriter {
def write[K: AvroRecordCodec, V: AvroRecordCodec](
raster: RDD[(K, V)],
instance: CassandraInstance,
layerId: LayerId,
decomposeKey: K => Long,
keyspace: String,
table: String
): Unit = {
implicit val sc = raster.sparkContext
val codec = KeyValueRecordCodec[K, V]
val schema = codec.schema
instance.withSessionDo {
_.execute(
SchemaBuilder.createTable(keyspace, table).ifNotExists()
.addPartitionKey("key", bigint)
.addClusteringColumn("name", text)
.addClusteringColumn("zoom", cint)
.addColumn("value", blob)
)
}
val query =
QueryBuilder
.insertInto(keyspace, table)
.value("name", layerId.name)
.value("zoom", layerId.zoom)
.value("key", QueryBuilder.bindMarker())
.value("value", QueryBuilder.bindMarker())
.toString
// Call groupBy with numPartitions; if called without that argument or a partitioner,
// groupBy will reuse the partitioner on the parent RDD if it is set, which could be typed
// on a key type that may no longer by valid for the key type of the resulting RDD.
raster.groupBy({ row => decomposeKey(row._1) }, numPartitions = raster.partitions.length)
.foreachPartition { partition =>
instance.withSession { session =>
val statement = session.prepare(query)
val queries: Process[Task, (java.lang.Long, ByteBuffer)] =
Process.unfold(partition) { iter =>
if (iter.hasNext) {
val recs = iter.next()
val id = recs._1
val pairs = recs._2.toVector
val bytes = ByteBuffer.wrap(AvroEncoder.toBinary(pairs)(codec))
Some((id, bytes), iter)
} else {
None
}
}
/** magic number 32; for no reason; just because */
val pool = Executors.newFixedThreadPool(32)
val write: ((java.lang.Long, ByteBuffer)) => Process[Task, ResultSet] = {
case (id, value) =>
Process eval Task {
session.execute(statement.bind(id, value))
}(pool)
}
val results = nondeterminism.njoin(maxOpen = 32, maxQueued = 32) {
queries map write
} onComplete {
Process eval Task {
session.closeAsync()
session.getCluster.closeAsync()
}(pool)
}
results.run.unsafePerformSync
pool.shutdown()
}
}
}
}