-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental Layer Updater #2396
Conversation
5e31ed8
to
9621407
Compare
Schema update logic relies on loading entire layer into spark memory, which is not a viable strategy for large layers.
9621407
to
d224192
Compare
Removing duplicate arguments also factored out the update verification logic. deprecate LayerUpdater.update and preserve API
d224192
to
9a851a4
Compare
Helps run the cassandra tests on macs
to avoid java serialization errors in spark jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nitpick comments. My only concern from a global, long-term stability/complexity perspective is whether the tests are extensive enough; I don't have enough background to know.
val kvs1: Vector[(K,V)] = _kvs1.toVector | ||
val kvs2: Vector[(K,V)] = | ||
if (mergeFunc != None) { | ||
val scanner = instance.connector.createScanner(table, new Authorizations()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not much on the details of Accumulo, but is new Authorizations()
the same as Authorizations.EMPTY
? If so I'd use the latter to make it clear/explicit Authorizations
aren't being used. At any rate, it caught my eye.
.map { case (key, _kvs1) => | ||
val kvs1: Vector[(K,V)] = _kvs1.toVector | ||
val kvs2: Vector[(K,V)] = | ||
if (mergeFunc != None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nitpick, so ignore if you don't care, but below you do pattern matching on Option[(V,V) => V]
and here using identity boolean check. Consistency might help readability. I prefer pattern matching or mergeFunc.nonEmpty
myself.
update(id, rdd, None) | ||
} | ||
|
||
def update[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These LayerWriter
s seem to have a number of common delegate/convenience methods. Any way they can be abstracted over all/most of the cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could expose the overload with mergeFunc: Option[(V, V) => V]
but its a little iffy because having that as None
turns a function into overwrite mode, which is drastically more dangerous. I hesitated to that that, do you think it would be helpful though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure which way the thumb is going is it:
- "No, way too dangerous!"
- "Yes, Lets expose the common API"
decomposeKey: K => Long, | ||
keyspace: String, | ||
table: String, | ||
threads: Int = ConfigFactory.load().getThreads("geotrellis.cassandra.threads.rdd.write") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest pulling the default number of threads into a final (constant) member of the object.
} else None | ||
}) | ||
|
||
def elaborateRow(row: (String, Vector[(K,V)])): Process[Task, (String, Vector[(K,V)])] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a similar pattern to Cassandra case. Opportunity for refactoring?
@@ -4,7 +4,8 @@ docker pull cassandra:latest | |||
|
|||
docker run \ | |||
--rm \ | |||
--net=host \ | |||
-p 9160:9160 \ | |||
-p 9042:9042 \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
id: LayerId, | ||
as: AttributeStore, | ||
mergeFunc: Option[(V,V) => V], | ||
indexInterval: Int = 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious to know what's magic about 4... move to object constant with comment?
@@ -65,12 +65,21 @@ trait LayerUpdateSpaceTimeTileSpec | |||
updater.update(layerId, sample) | |||
} | |||
|
|||
it("should overwrite a layer") { | |||
updater.overwrite(layerId, sample) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually confirm that the update happened?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only confirms it didn't through, its possible that that an update was empty, either in bounds or in records in which case it would be NOOP.
Overall I agree it would be much more helpful if this returned number of records updated/written but that would require changing interface of _RDDWriter.write
... or (I'm realizing as I'm typing this) to add a new counting method and wrap it to preserve the interface.
I would push this change off to 2.0
though, what do you think?
it("should not update a layer (empty set)") { | ||
intercept[EmptyBoundsError] { | ||
updater.update(layerId, new ContextRDD[SpaceTimeKey, Tile, TileLayerMetadata[SpaceTimeKey]](sc.emptyRDD[(SpaceTimeKey, Tile)], emptyTileLayerMetadata)) | ||
} | ||
} | ||
|
||
it("should silently not overwrite a layer (empty set)") { | ||
updater.overwrite(layerId, new ContextRDD[SpaceTimeKey, Tile, TileLayerMetadata[SpaceTimeKey]](sc.emptyRDD[(SpaceTimeKey, Tile)], emptyTileLayerMetadata)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice, though I have some questions.
val LayerAttributes(header, metadata, keyIndex, writerSchema) = try { | ||
attributeStore.readLayerAttributes[AccumuloLayerHeader, M, K](id) | ||
} catch { | ||
case e: AttributeNotFoundError => throw new LayerUpdateError(id).initCause(e) | ||
case e: AttributeNotFoundError => throw new LayerReadError(id).initCause(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would not be there a confusion that LayerReaderError
was thrown during LayerUpdater
usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
V: AvroRecordCodec: ClassTag, | ||
M: JsonFormat: GetComponent[?, Bounds[K]]: Mergable | ||
](id: LayerId, rdd: RDD[(K, V)] with Metadata[M]): Unit = { | ||
val CassandraLayerHeader(_, _, keyspace, table) = attributeStore.readHeader[CassandraLayerHeader](id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if there is no layer? (question is about all attributeStore.readHeader
calls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll get AttributeReadError
with readHeader
in the stack trace. I think thats pretty descriptive.
Process eval Task ({ | ||
val (key, kvs1) = row | ||
val kvs2 = | ||
if (mergeFunc != None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mergeFunc.isDefined
; everywhere in this PR where mergeFunc != None
is used.
(kvs2 ++ kvs1) | ||
.groupBy({ case (k,v) => k }) | ||
.map({ case (k, kvs) => | ||
val vs = kvs.map({ case (k,v) => v }).toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is .toSeq
necessary here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't appear to be, but also it'll do basically nothing except waste a stack frame.
} | ||
|
||
val results = nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { | ||
queries map write | ||
rows flatMap elaborateRow flatMap rowToBytes map retire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks cool :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant:
rows flatMap elaborateRow flatMap rowToBytes map buyBoat map retire
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rows flatMap boat flatMap upstream flatMap map lifeTodream
@@ -37,71 +38,40 @@ class S3LayerUpdater( | |||
) extends LayerUpdater[LayerId] with LazyLogging { | |||
|
|||
def rddWriter: S3RDDWriter = S3RDDWriter | |||
def _rddWriter(): S3RDDWriter = rddWriter | |||
|
|||
class InnerS3LayerWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mb to make this class private? Or add some overloads to make construction of S3LayerWriter
with a custom rddWriter
more convenient?
override def rddWriter() = _rddWriter | ||
} | ||
|
||
val as = attributeStore.asInstanceOf[S3AttributeStore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cast looks very dirty and makes LayerUpdater
incompatible with other backends, it's a normal use case when our AttributeStore
and the catalog
itself are in different backends. What do you think about it? Or mb to make AttributeStore
types more restrictive in 2.0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the interface for S3LayerUpdater
is not great. I'm going to make these patch fields private and call it a day. The updaters should be removed in 2.0 and the way they're constructed right now should never trigger the error case.
case e: AmazonS3Exception if e.getStatusCode == 503 => true | ||
case _ => false | ||
} | ||
} | ||
|
||
val results = nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { requests map write }(Strategy.Executor(pool)) | ||
val results = nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { | ||
rows flatMap elaborateRow flatMap rowToRequest map retire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we abstract over it a bit more? Cassandra uses the same thing; or it's hardly possible?
final val DefaultIndexInterval = 4 | ||
|
||
// From https://github.com/apache/spark/blob/3b049abf102908ca72674139367e3b8d9ffcc283/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala | ||
private class SerializableConfiguration(@transient var value: Configuration) extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if a lack of Serializable
trait has ever caused so much grief as in Configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't put it into geotrellis.util
on account of the missing hadoop dependency but it has a home at geotrellis.spark.util
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's what I meant; thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if a lack of
Serializable
trait has ever caused so much grief as inConfiguration
CRS
not being Product
has a similar feel.
@@ -24,6 +24,11 @@ import java.nio.file._ | |||
|
|||
|
|||
object Filesystem { | |||
|
|||
def exists(path : String): Boolean = { | |||
Files.exists(Paths.get(path)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
raster.groupBy({ row => decomposeKey(row._1) }, numPartitions = raster.partitions.length) | ||
.foreachPartition { partition: Iterator[(Long, Iterable[(K, V)])] => | ||
if(partition.nonEmpty) { | ||
instance.withConnectionDo { connection => | ||
val mutator = connection.getBufferedMutator(table) | ||
val _table = instance.getConnection.getTable(table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sry I missed it %) we already have a connection
Just change one line, should be:
val _table = connection.getTable(table)
withConnectionDo
is a function which closes connection
after the block
of code is done:
def withConnectionDo[T](block: Connection => T): T = {
val connection = getConnection
try block(connection) finally connection.close()
}
I had to introduce this function to keep an eye on open connections as the blowed up HBase
even in local tests.
2d328b8
to
8e19fa9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
Supersedes: #2357 #2369 #2370
#2371 #2378 #2379 #2384
LayerWriter
interfaceLayerUpdater
traitAccumuloLayerWriter.update
does not support HDFS write strategy