Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New opt-in partitioning strategy that may help with read optimization… #2855

Expand Up @@ -16,19 +16,21 @@

package geotrellis.spark.io.accumulo

import geotrellis.spark.LayerId
import geotrellis.spark.{Boundable, LayerId}
import geotrellis.spark.io._

import com.typesafe.scalalogging.LazyLogging
import geotrellis.spark.io.avro.AvroRecordCodec
import org.apache.accumulo.core.client.{BatchWriterConfig, Connector}
import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.core.data.{Range => AccumuloRange}
import spray.json.JsonFormat

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

class AccumuloLayerDeleter(val attributeStore: AttributeStore, connector: Connector) extends LazyLogging with LayerDeleter[LayerId] {

def delete(id: LayerId): Unit = {
def delete[K: ClassTag](id: LayerId): Unit = {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was able to reduce the number of required bounds/evidence parameters to just the ClassTag itself... evidently that's all that is required to look up the KeyIndex.

I'm curious as to whether the metadata in the attributes table that's already loaded in the delete implementation contains the required classname so that we could materialize the class/type required at runtime rather than have to rely upon an API change.

Even if that information is available though, I may need an assist using it. I'm not sure how I'd go from a reflective call to look up a class by name to a type constructor argument required for the KeyIndex lookup.

try {
val header = attributeStore.readHeader[AccumuloLayerHeader](id)
val numThreads = 1
Expand Down
Expand Up @@ -32,8 +32,8 @@ import scala.reflect.ClassTag

class AccumuloLayerManager(attributeStore: AccumuloAttributeStore, instance: AccumuloInstance)(implicit sc: SparkContext)
extends LayerManager[LayerId]{
def delete(id: LayerId): Unit =
AccumuloLayerDeleter(attributeStore, instance).delete(id)
def delete[K: ClassTag](id: LayerId): Unit =
AccumuloLayerDeleter(attributeStore, instance).delete[K](id)

def copy[
K: AvroRecordCodec: Boundable: JsonFormat: ClassTag,
Expand Down
Expand Up @@ -75,9 +75,9 @@ class AccumuloLayerReindexer(
val layerCopier = AccumuloLayerCopier(attributeStore, layerReader, layerWriter)

layerWriter.write(tmpId, layerReader.read[K, V, M](id), keyIndex)
layerDeleter.delete(id)
layerDeleter.delete[K](id)
layerCopier.copy[K, V, M](tmpId, id)
layerDeleter.delete(tmpId)
layerDeleter.delete[K](tmpId)
}

def reindex[
Expand All @@ -99,8 +99,8 @@ class AccumuloLayerReindexer(
val layerCopier = AccumuloLayerCopier(attributeStore, layerReader, layerWriter)

layerWriter.write(tmpId, layerReader.read[K, V, M](id), keyIndexMethod.createIndex(existingKeyIndex.keyBounds))
layerDeleter.delete(id)
layerDeleter.delete[K](id)
layerCopier.copy[K, V, M](tmpId, id)
layerDeleter.delete(tmpId)
layerDeleter.delete[K](tmpId)
}
}
24 changes: 16 additions & 8 deletions cassandra/src/main/resources/reference.conf
Expand Up @@ -13,19 +13,27 @@
# limitations under the License.

geotrellis.cassandra {
port = 9042
catalog = "metadata"
keyspace = "geotrellis"
replicationStrategy = "SimpleStrategy"
replicationFactor = 1
localDc = "datacenter1"
port = 9042
catalog = "metadata"
keyspace = "geotrellis"
replicationStrategy = "SimpleStrategy"
replicationFactor = 1
localDc = "datacenter1"
usedHostsPerRemoteDc = 0
allowRemoteDCsForLocalConsistencyLevel = false
# 'writeoptimized' or 'readoptimized', default is 'writeoptimized'.
# For 'readoptimized', clients can configure the maximum size of a partition
# read based on tiles in the partition. Careful tuning of this value can
# lead to optimized read performance when using key/partition caching.
indexStrategy = "writeoptimized"
# Only relevant if partitionStrategy is 'readoptimized' - configures the
# maximum number of tiles to store per parition.
# tilesPerParition = 64
threads {
collection.read = default
rdd {
write = default
read = default
read = default
}
}
}
}
Expand Up @@ -45,7 +45,7 @@ class CassandraCollectionLayerReader(val attributeStore: AttributeStore, instanc

val decompose = (bounds: KeyBounds[K]) => keyIndex.indexRanges(bounds)

val seq = CassandraCollectionReader.read[K, V](instance, header.keyspace, header.tileTable, id, queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema))
val seq = CassandraCollectionReader.read[K, V](instance, header.keyspace, header.tileTable, id, queryKeyBounds, decompose, filterIndexOnly, Some(writerSchema), keyIndex)
new ContextCollection(seq, layerMetadata)
}
}
Expand Down
Expand Up @@ -21,16 +21,12 @@ import geotrellis.spark.io._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.cassandra.conf.CassandraConfig
import geotrellis.spark.io.index.MergeQueue
import geotrellis.spark.io.index.{KeyIndex, MergeQueue}
import geotrellis.spark.util.KryoWrapper

import org.apache.avro.Schema
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import java.math.BigInteger

object CassandraCollectionReader {
Expand All @@ -45,10 +41,13 @@ object CassandraCollectionReader {
decomposeBounds: KeyBounds[K] => Seq[(BigInt, BigInt)],
filterIndexOnly: Boolean,
writerSchema: Option[Schema] = None,
keyIndex: KeyIndex[K],
threads: Int = defaultThreadCount
): Seq[(K, V)] = {
if (queryKeyBounds.isEmpty) return Seq.empty[(K, V)]

val indexStrategy = new CassandraIndexing[K](keyIndex, instance.cassandraConfig.tilesPerPartition)

val includeKey = (key: K) => queryKeyBounds.includeKey(key)
val _recordCodec = KeyValueRecordCodec[K, V]
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable
Expand All @@ -58,18 +57,20 @@ object CassandraCollectionReader {
else
queryKeyBounds.flatMap(decomposeBounds)

val query = QueryBuilder.select("value")
.from(keyspace, table)
.where(eqs("key", QueryBuilder.bindMarker()))
.and(eqs("name", layerId.name))
.and(eqs("zoom", layerId.zoom))
.toString
val query = indexStrategy.queryValueStatement(
instance.cassandraConfig.indexStrategy,
keyspace, table, layerId.name, layerId.zoom
)

instance.withSessionDo { session =>
val statement = session.prepare(query)
val statement = indexStrategy.prepareQuery(query)(session)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the evaluation of session lazy since you might want to get clever with a function that evaluates to a Session only when in the proper scope for it to be bound to a particular Executor in Spark.

Maybe I'm overthinking this... but not evaluating my Cassandra sessions lazily has been something that's bitten me before in other Spark code.


LayerReader.njoin[K, V](ranges.toIterator, threads){ index: BigInt =>
val row = session.execute(statement.bind(index: BigInteger))
val row = session.execute(indexStrategy.bindQuery(
instance.cassandraConfig.indexStrategy,
statement, index: BigInteger
))

if (row.asScala.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
Expand Down