Skip to content

Commit

Permalink
Refactor common code for index strategy management into utility class
Browse files Browse the repository at this point in the history
  • Loading branch information
ALPSMAC committed Feb 11, 2019
1 parent 77682e8 commit 0ece030
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 367 deletions.
33 changes: 10 additions & 23 deletions cassandra/src/main/resources/reference.conf
Expand Up @@ -21,33 +21,20 @@ geotrellis.cassandra {
localDc = "datacenter1"
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
# 'writeoptimized' or 'readoptimized', default is 'writeoptimized'.
# For 'readoptimized', clients can configure the maximum size of a partition
# read based on tiles in the partition. Careful tuning of this value can
# lead to optimized read performance when using key/partition caching.
# indexStrategy = "writeoptimized"
# Only relevant if partitionStrategy is 'readoptimized' - configures the
# maximum number of tiles to store per parition.
indexStrategy = "readoptimized"
tilesPerParition = 64
threads {
collection.read = default
rdd {
write = default
read = default
}
}
}

# Comment out the block above and uncomment this to run the Cassandra tests
# using the new 'read-optimized-partitioner' strategy.
#geotrellis.cassandra {
# port = 9042
# catalog = "metadata"
# keyspace = "geotrellis"
# replicationStrategy = "SimpleStrategy"
# replicationFactor = 1
# localDc = "datacenter1"
# usedHostsPerRemoteDc = 0
# allowRemoteDCsForLocalConsistencyLevel = false
# partitionStrategy = "read-optimized-partitioner"
# tilesPerPartition = 64
# threads {
# collection.read = default
# rdd {
# write = default
# read = default
# }
# }
#}
}
Expand Up @@ -24,19 +24,11 @@ import geotrellis.spark.io.cassandra.conf.CassandraConfig
import geotrellis.spark.io.index.{KeyIndex, MergeQueue}
import geotrellis.spark.util.KryoWrapper
import org.apache.avro.Schema
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import java.math.BigInteger

import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import spire.math.Interval
import spire.implicits._

import scala.collection.immutable.VectorBuilder

object CassandraCollectionReader {
final val defaultThreadCount = CassandraConfig.threads.collection.readThreads

Expand All @@ -54,6 +46,8 @@ object CassandraCollectionReader {
): Seq[(K, V)] = {
if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val indexStrategy = new CassandraIndexing[K](keyIndex, instance.cassandraConfig.tilesPerPartition)

val includeKey = (key: K) => queryKeyBounds.includeKey(key)
val _recordCodec = KeyValueRecordCodec[K, V]
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable
Expand All @@ -63,59 +57,20 @@ object CassandraCollectionReader {
else
queryKeyBounds.flatMap(decomposeBounds)

val query = instance.cassandraConfig.partitionStrategy match {
case conf.`Read-Optimized-Partitioner` =>
QueryBuilder.select("value")
.from(keyspace, table)
.where(eqs("name", layerId.name))
.and(eqs("zoom", layerId.zoom))
.and(eqs("zoombin", QueryBuilder.bindMarker()))
.and(eqs("key", QueryBuilder.bindMarker()))
.toString

case conf.`Write-Optimized-Partitioner` =>
QueryBuilder.select("value")
.from(keyspace, table)
.where(eqs("key", QueryBuilder.bindMarker()))
.and(eqs("name", layerId.name))
.and(eqs("zoom", layerId.zoom))
.toString
}

lazy val intervals: Vector[(Interval[BigInt], Int)] = {
val ranges = keyIndex.indexRanges(keyIndex.keyBounds)

val binRanges = ranges.toVector.map{ range =>
val vb = new VectorBuilder[Interval[BigInt]]()
cfor(range._1)(_ <= range._2, _ + instance.cassandraConfig.tilesPerPartition){ i =>
vb += Interval.openUpper(i, i + instance.cassandraConfig.tilesPerPartition)
}
vb.result()
}

binRanges.flatten.zipWithIndex
}

def zoomBin(key: BigInteger): java.lang.Integer = {
intervals.find{ case (interval, idx) => interval.contains(key) }.map {
_._2: java.lang.Integer
}.getOrElse(0: java.lang.Integer)
}

def bindQuery(statement: PreparedStatement, index: BigInteger): BoundStatement = {
instance.cassandraConfig.partitionStrategy match {
case conf.`Read-Optimized-Partitioner` =>
statement.bind(zoomBin(index), index)
case conf.`Write-Optimized-Partitioner` =>
statement.bind(index)
}
}
val query = indexStrategy.queryValueStatement(
instance.cassandraConfig.indexStrategy,
keyspace, table, layerId.name, layerId.zoom
)

instance.withSessionDo { session =>
val statement = session.prepare(query)
val statement = indexStrategy.prepareQuery(query)(session)

LayerReader.njoin[K, V](ranges.toIterator, threads){ index: BigInt =>
val row = session.execute(bindQuery(statement, index: BigInteger))
val row = session.execute(indexStrategy.bindQuery(
instance.cassandraConfig.indexStrategy,
statement, index: BigInteger
))

if (row.asScala.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
Expand Down

0 comments on commit 0ece030

Please sign in to comment.