New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New opt-in partitioning strategy that may help with read optimization… #2855
New opt-in partitioning strategy that may help with read optimization… #2855
Conversation
43f07d7
to
7f2a7cf
Compare
Sorry about the PR churn... was fighting with gpg to get the signatures to work correctly. |
|
||
# Comment out the block above and uncomment this to run the Cassandra tests | ||
# using the new 'read-optimized-partitioner' strategy. | ||
#geotrellis.cassandra { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably remove this and just add in some commenting into the reference.conf to document the possibilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to remove it and to add commented out options into the original cassandra config:
# partitionStrategy = "read-optimized-partitioner"
# tilesPerPartition = 64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me. Happy to do whatever your project's convention is - just wasn't sure how to handle this.
binRanges.flatten.zipWithIndex | ||
} | ||
|
||
def zoomBin(key: BigInteger): java.lang.Integer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bit is duplicative across several classes and should be refactored into a common trait (or something).
More concerned with validating the approach is sound right now... can make it pretty if it seems reasonable.
Basic idea is to produce a consistent binning strategy for partitioning in Cassandra so that we don't have too many or too few tiles in a given partition.
I believe this lets us take more advantage of Cassandra's partition key cache when we're fetching tiles landing in the same Zoom bin.
I'm not sure if these zoom bins map consistently based on the underlying calculations in the intervals
val... I need some help understanding how that behaves to know if this is "safe" or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ALPSMAC look into the tests here: https://github.com/locationtech/sfcurve i hope it's better than just words here under the issue. Look into the tests. I don't think we need zoombin
at all.
SFC gives us a consistent enough ranges => i think we can use just this:
instance.cassandraConfig.partitionStrategy match {
case conf.RadOptimizedPartitioner =>
session.execute(
SchemaBuilder.createTable(keyspace, table).ifNotExists()
.addPartitionKey("name", text)
.addPartitionKey("zoom", cint)
.addClusteringColumn("key", varint)
.addColumn("value", blob)
)
case conf.WriteOptimizedPartitioner =>
session.execute(
SchemaBuilder.createTable(keyspace, table).ifNotExists()
.addPartitionKey("key", varint)
.addClusteringColumn("name", text)
.addClusteringColumn("zoom", cint)
.addColumn("value", blob)
)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @pomadchin - thanks for taking the time to review what I've got here so far. I appreciate your feedback!
It would be great news if, as you suggest, we could avoid the API change... this PR as it sits feels far more invasive than I'd like it to be. I'll poke around more in the sfcurve
repo as you suggested and see if the light-bulb flickers for me, but on first blush I'm not sure I follow your logic.
I don't want to jump to conclusions here - so please forgive my ignorance if I've grossly missed the mark, but...
Wouldn't creating the ReadOptimizedPartitioner
schema as you suggested here result in large partition sizes for higher zoom levels irrespective of the clustering column? The partition size, as far as I know, is a function of the uniqueness of the partition keys - so the more collisions of a partition key you have the larger the partition grows. In your example schema the partition size wouldn't have anything to do with the ranges of the SFC since that key is declared as only a clustering column (which determines order within a partition, but not the partition itself).
For instance - if I have a layer called "cloud_cover" and was indexing it at zoom level 15, wouldn't that mean that all of the tiles for zoom 15 for my cloud_cover layer would be stored in the same partition in Cassandra? That seems like an unrealistically large partition, especially if I'm storing large tiles or double array tiles that don't compress well...
Now - I am not at all a Cassandra expert (so perhaps I am completely out to lunch here), but Cassandra does warn about large partition sizes in its logs when they occur, and StackOverflow seems to agree (not that it's an authoritative source... but it's not a bad place to start): https://stackoverflow.com/questions/46272571/why-is-it-so-bad-to-have-large-partitions-in-cassandra
My premise in this PR was to give the client, if they so choose, an ability to tweak maximum partition sizes in a reasonable way to optimize performance for their given use case (with careful bench-marking). How would indexing as you suggest avoid unrealistically large partition sizes (which would seem to me to break that premise at higher zoom levels)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah it makes sense, I was just confused about what it means; your answer clarifies everything.
Sending you here than.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roger - thank you for the clarification :-)
So I'll proceed with code cleanup then as well as working up some benchmarks to either prove or disprove the utility of this PR.
} | ||
|
||
lazy val intervals: Vector[(Interval[BigInt], Int)] = { | ||
val keyIndex = attributeStore.readKeyIndex[K](id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what forced the change to the delete
API for me... I'm not sure if we can fetch the indexRanges
we need without it though... is there a better way that I'm missing?
@@ -41,10 +63,13 @@ case class CassandraConfig( | |||
localDc: String = "datacenter1", | |||
usedHostsPerRemoteDc: Int = 0, | |||
allowRemoteDCsForLocalConsistencyLevel: Boolean = false, | |||
threads: CassandraThreadsConfig = CassandraThreadsConfig() | |||
threads: CassandraThreadsConfig = CassandraThreadsConfig(), | |||
partitionStrategy: CassandraPartitionStrategy = `Write-Optimized-Partitioner`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default behavior is to use the same behavior as before.
NOTE: Changing the CassandraPartitionStrategy
requires dumping the old keyspaces associated with the layer - the schemas are not compatible.
) | ||
|
||
object CassandraConfig extends CamelCaseConfig { | ||
private implicit lazy val partitionStrategyHint = new EnumCoproductHint[CassandraPartitionStrategy] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required for ADT encoding of Enum to parse reasonably in pureconfig
@@ -39,6 +42,12 @@ trait CassandraTestEnvironment extends TestEnvironment { self: Suite => | |||
println("\u001b[0;33mA script for setting up the Cassandra environment necessary to run these tests can be found at scripts/cassandraTestDB.sh - requires a working docker setup\u001b[m") | |||
cancel | |||
} | |||
startTime = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably shouldn't keep this timing logic in here...
I'd love to contribute an IO benchmark for Cassandra layer access as well... but I have to admit I'm not quite sure where to start on that. I see the bench
subproject, but most of that seems to be about logic within geotrellis
itself and doesn't concern itself with the database IO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree, probably benchmarks can be placed here: https://github.com/locationtech/geotrellis/tree/master/bench
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me - I'll see what I can do about adding some Read, Write, and Mixed benchmarks for Cassandra under both schema strategies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed; wondering how much differently a PK change would behave in tests on reads; since we're not performing range queries: https://github.com/locationtech/geotrellis/blob/master/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala#L80
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed entirely - I don't want to introduce more overhead or complexity if it's not to the benefit of performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought is that we may benefit from Cassandra's row cache even across multiple queries if those queries land in the same partition. I must admit though that I can't confirm whether or not that's actually a reasonable expectation.
docs/cassandra/cassandra-test.md
Outdated
@@ -15,6 +15,11 @@ to launch tests. Before running Cassandra tests, be sure, that a local | |||
Cassandra instance using Docker is provided | |||
[here](https://github.com/pomadchin/geotrellis/blob/feature/cassandra-nmr/scripts/cassandraTestDB.sh). | |||
|
|||
One can also use [ccm](https://github.com/riptano/ccm) for running a Cassandra |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tested this PR by changing the configuration in reference.conf
and then running against a test geotrellis
cassandra cluster built by ccm
with vnodes enabled and partition key caching on.
@@ -274,6 +274,10 @@ fast, column-based NoSQL database. It is likely the most performant of | |||
our backends, although this has yet to be confirmed. To work with | |||
GeoTrellis, it requires an external Cassandra process to be running. | |||
|
|||
The indexing strategy can be optimized for read-heavy or write-heavy loads by adjusting | |||
the partition strategy applied - see the reference.conf in the `geotrellis-cassandra` | |||
subproject on github for more details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably more to be said about this to document it clearly...
Using the reference.conf
configuration method for db access seems pretty common across the different tile backends, but I wasn't exactly sure where those configuration options get documented. Can you point me to where that should be recorded?
@@ -128,10 +128,35 @@ trait ConfigFormats { | |||
} | |||
} | |||
|
|||
implicit val cassandraPartitionStrategyFormat = new JsonFormat[CassandraPartitionStrategy]{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think Spray JSON has a nice way to automatically encode/decode enums encoded as ADTs. There may be other add-on libraries out there that know how to automate this boilerplate as a macro or something... not sure what this project's feeling is on that sort of thing.
@pomadchin FYI - I'm back from the holidays and am looking at code cleanup now. I'll also take a look at how to do a better performance benchmark with results that I can believe a little bit more to convince ourselves we didn't introduce a feature that makes things worse ;-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main clarification is mostly related to the zoombin
parameter. It looks like we can avoid using it.
If it's not necessary then it's easy to avoid such a massive and undesired API change.
|
||
# Comment out the block above and uncomment this to run the Cassandra tests | ||
# using the new 'read-optimized-partitioner' strategy. | ||
#geotrellis.cassandra { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to remove it and to add commented out options into the original cassandra config:
# partitionStrategy = "read-optimized-partitioner"
# tilesPerPartition = 64
.and(eqs("zoom", layerId.zoom)) | ||
.toString | ||
val query = instance.cassandraConfig.partitionStrategy match { | ||
case conf.`Read-Optimized-Partitioner` => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReadOptimizedPartitioner
and WriteOptimizedPartitioner
looks better to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - and in uncommitted work I've moved away from the idea of naming it "Partitioner" to begin with since that conflates a different idea in Cassandra. We're not setting a partitioner... we're adjusting the strategy behind the schema optimization, so I've change the name of this around a bit.
I'll remove the dashes though as you suggest :-)
@@ -39,6 +42,12 @@ trait CassandraTestEnvironment extends TestEnvironment { self: Suite => | |||
println("\u001b[0;33mA script for setting up the Cassandra environment necessary to run these tests can be found at scripts/cassandraTestDB.sh - requires a working docker setup\u001b[m") | |||
cancel | |||
} | |||
startTime = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree, probably benchmarks can be placed here: https://github.com/locationtech/geotrellis/tree/master/bench
binRanges.flatten.zipWithIndex | ||
} | ||
|
||
def zoomBin(key: BigInteger): java.lang.Integer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ALPSMAC look into the tests here: https://github.com/locationtech/sfcurve i hope it's better than just words here under the issue. Look into the tests. I don't think we need zoombin
at all.
SFC gives us a consistent enough ranges => i think we can use just this:
instance.cassandraConfig.partitionStrategy match {
case conf.RadOptimizedPartitioner =>
session.execute(
SchemaBuilder.createTable(keyspace, table).ifNotExists()
.addPartitionKey("name", text)
.addPartitionKey("zoom", cint)
.addClusteringColumn("key", varint)
.addColumn("value", blob)
)
case conf.WriteOptimizedPartitioner =>
session.execute(
SchemaBuilder.createTable(keyspace, table).ifNotExists()
.addPartitionKey("key", varint)
.addClusteringColumn("name", text)
.addClusteringColumn("zoom", cint)
.addColumn("value", blob)
)
}
… for cassandra by taking better advantage of the SFC locality guarantees.
…nfo on cassandra partitioner strategies. TODO: I we should improve the reference.conf there to document the different strategies etc.
0ece030
to
c40619d
Compare
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I squeezed in a bit more time to work on this today and got some of the duplicate code cleaned up and pulled into this CassandraIndexing
class.
I'm going to try to dive into some sort of benchmarking thing to see if I can prove to myself whether or not this indexing change really has done anything to improve read speed next.
If nothing else, I like that this centralizes all of the schema-related stuff in one class so you don't have to hunt across the layering API to find it.
Thanks,
Andy
|
||
instance.withSessionDo { session => | ||
val statement = session.prepare(query) | ||
val statement = indexStrategy.prepareQuery(query)(session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the evaluation of session
lazy since you might want to get clever with a function that evaluates to a Session
only when in the proper scope for it to be bound to a particular Executor in Spark.
Maybe I'm overthinking this... but not evaluating my Cassandra sessions lazily has been something that's bitten me before in other Spark code.
//Value Classes - should be unwrapped at compile time: | ||
case class ZoomBinIntervals(intervals: Vector[(Interval[BigInt], Int)]) extends AnyVal | ||
|
||
final case class WriteValueStatement(statement: BuiltStatement) extends AnyVal with CassandraStatement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to add in some type safety here... Would have reached for tagged types, but I wasn't sure how the project felt about them (or which library to use for them). I opted for the native-supported "value classes" construct instead... which relies upon the compiler to do the "right thing" a little too much for my taste, but should still avoid some of the (un)boxing if I understand correctly.
.from(header.keyspace, header.tileTable).allowFiltering() | ||
.where(eqs("name", id.name)) | ||
.and(eqs("zoom", id.zoom)) | ||
val keyIndex = attributeStore.readKeyIndex[K](id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not thrilled with having to go out and fetch the KeyIndex
here, but I've yet to noodle through another way to fetch the range information I need in order to assign a consistent zoomBin
.
session.execute(statement.bind(entry.getVarint("key"))) | ||
//NOTE: use `iterator()` when possible as opposed to `all()` since the latter forces | ||
// materialization of the entirety of the query results into on-heap memory. | ||
session.execute(squery).iterator().asScala.map { entry => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note the change to iterator()
here from the all()
call earlier. Should hopefully reduce some heap churn on large deletes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change might be worth making independent of the rest of this PR as it is a simple optimization.
…nds/evidence parameters on delete API
|
||
class AccumuloLayerDeleter(val attributeStore: AttributeStore, connector: Connector) extends LazyLogging with LayerDeleter[LayerId] { | ||
|
||
def delete(id: LayerId): Unit = { | ||
def delete[K: ClassTag](id: LayerId): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was able to reduce the number of required bounds/evidence parameters to just the ClassTag itself... evidently that's all that is required to look up the KeyIndex
.
I'm curious as to whether the metadata in the attributes table that's already loaded in the delete
implementation contains the required classname so that we could materialize the class/type required at runtime rather than have to rely upon an API change.
Even if that information is available though, I may need an assist using it. I'm not sure how I'd go from a reflective call to look up a class by name to a type constructor argument required for the KeyIndex
lookup.
…mark are not positive, so likely will abandon this effort.
@ALPSMAC have you tried to run some tests on a real (N nodes) Cassandra cluster? |
@@ -0,0 +1,223 @@ | |||
package geotrellis.spark.io.cassandra.bench | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So ultimately I wasn't sure exactly where to add some sort of benchmarking to this PR. I decided on this location, but the location might be moot. I think I am going to recommend declining this PR for now...
Results of benchmarking were not favorable unfortunately! I was really hoping that leveraging the locality guarantees of the SFC would net us better performance out of Cassandra than simply ignoring it like we do now. Unfortunately that does not seem to be the case!
The following output from this test case was performed with ccm
with vnodes
on with 3 instances running on a 4 core (8 w/ hyperthreading) xeon processor with 64GB of RAM:
Average write-time for READ optimized schema: 2451.9333333333334ms
Average write-time for WRITE optimized schema: 1119.6666666666667ms
STDDEV write-time for READ optimized schema: 973.7087495185041ms
STDDEV write-time for WRITE optimized schema: 183.95712060755415ms
...
Average read-time for READ optimized schema: 311.19ms
Average read-time for WRITE optimized schema: 135.7ms
STDDEV read-time for READ optimized schema: 170.76438123917995ms
STDDEV read-time for WRITE optimized schema: 23.697468219200122ms
The gap narrows if you run with replicationFactor = 2 as opposed to 1 by default, but the story still isn't good:
Average write-time for READ optimized schema: 1938.0ms
Average write-time for WRITE optimized schema: 1155.2666666666667ms
STDDEV write-time for READ optimized schema: 478.0826985644499ms
STDDEV write-time for WRITE optimized schema: 72.90584491124304ms
...
Average read-time for READ optimized schema: 262.4ms
Average read-time for WRITE optimized schema: 207.8ms
STDDEV read-time for READ optimized schema: 106.12699939223765ms
STDDEV read-time for WRITE optimized schema: 43.671729986342434ms
At this point I'm a little stuck... my intuition says there should be some way to leverage the locality guarantees provided by the SFC within Cassandra, but if there is, I don't think this approach is going to get us there.
So I don't want you guys to lose anything that's in this PR that might be worth keeping around... but my suspicion is that the main thrust of it isn't worth merging. I really appreciate your time and help though while I worked through this implementation and got to a place where I could start testing and getting some numbers to invalidate my changes.
Perhaps if nothing else it's worth recording that this experiment was conducted somewhere in your internal docs so you'll know in the future that the proverbial "juice isn't worth the squeeze". Let me know if there's anything further I can do to help in that regard or if there's something else you'd like to see tried as part of this PR before abandoning it.
Kind Regards,
Andy
@@ -0,0 +1,202 @@ | |||
# geotrelis.cassandra.optimization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pomadchin - as per our gitter discussion, here's a summary doc. on what I did. Suggestions welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great! Figuring out how to merge everything
0feb792
to
5e034b3
Compare
Merging it into a separate branch! All docs would be back ported against master branch. |
… for cassandra by taking better advantage of the SFC locality guarantees.
cc @pomadchin
Overview
Strawman PR for comments/feedback... code is still quite rough but looking for thoughts on indexing changes and API changes required.
Addresses thoughts in #2831
Checklist
docs/CHANGELOG.rst
updated, if necessary (not sure if necessary?)docs
guides update, if necessaryNotes
I'm hoping that:
You can help me avoid having to grab the range of the key via the
KeyIndex
... Is there another way to access this information that I'm missing? If so then I think we can avoid the change to theLayerManager
andLayerDeleter
APIs. My suspicion is that I've missed something fundamental here about the abstraction, but I'm not quite sure what.You can help me understand the type signature of
KeyIndex.indexRanges
as I'm not sure I'm making proper usage of it. I'm guessing that, due to the fact that an SFC may have "breaks" in it where there are large contiguous blocks of its range that are not used, that theSeq[(BigInt, BigInt)]
return type is representing something like this:|(begin SFC index range)---------|(unused part of range index)*|(more used index)--------|(more unused index)|------(end SFC index range)|
so that really what the
(BigInt, BigInt)
tuples are capturing is the coverage area over the range of the SFC like this:|------|*****|------|****|------|
....0..............1............2...
Seq[(0th tuple, 1st tuple, 2nd tuple)]
is this correct?
With the following (updated) configuration (note the new "read-optimized-partitioner"):
I got these timings from the test spec:
And with this (original) configuration:
I got these timings from the test spec:
All of the differences nominally seem to me to be within the margin of error... if anything I've made things a little slower, which only makes sense... more data = more time to write it :-)
Anyway - I'd be interested to know if you have any thoughts on how to more properly benchmark this.
Addresses #2831