New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark-Enabled Cost-Distance #1999

Merged
merged 28 commits into from Mar 31, 2017

Conversation

Projects
None yet
4 participants
@jamesmcclain
Member

jamesmcclain commented Feb 2, 2017

  • Simplified cost-distance implementation
  • Implementation of distributed cost-distance algorithm
  • Unit Tests
  • Comments

Fixes #1980

screenshot from 2017-02-06 13 14 25

screenshot from 2017-02-06 13 22 14

screenshot from 2017-02-14 14 10 36

screenshot from 2017-02-14 14 10 40

@jamesmcclain jamesmcclain force-pushed the jamesmcclain:feature/cost-distance branch from 5f51e4b to afd4dd0 Feb 2, 2017

@lossyrob

This comment has been minimized.

Member

lossyrob commented Feb 2, 2017

party like it's PR 1999

@jamesmcclain

This comment has been minimized.

Member

jamesmcclain commented Feb 2, 2017

🎉 🎉 🎉 🎉 🎉 🎉 🎉

*
* 2. https://github.com/ngageoint/mrgeo/blob/0c6ed4a7e66bb0923ec5c570b102862aee9e885e/mrgeo-mapalgebra/mrgeo-mapalgebra-costdistance/src/main/scala/org/mrgeo/mapalgebra/CostDistanceMapOp.scala
*/
object MrGeoCostDistance {

This comment has been minimized.

@lossyrob

lossyrob Feb 6, 2017

Member

I think the reference in the doc string is sufficient pointing back to Mr Geo, the type names should probably just be CostDistance

(col, row, friction, 0.0)
})
accumulator.add((key, costs))
})

This comment has been minimized.

@lossyrob

lossyrob Feb 6, 2017

Member

I see the loop version does a count to force the execution and cache the RDD. Is it possible to avoid this foreach with a similar method, where this accumulator is moved to the map, that RDD persisted, and the RDD counted to force execution? Perhaps not the best optimization, but your opinion on this would probably help me understand the logic better.

This comment has been minimized.

@jamesmcclain

jamesmcclain Feb 6, 2017

Member

The foreach is strictly for the side-effects (to set the accumulator).

This comment has been minimized.

@lossyrob

lossyrob Feb 6, 2017

Member

Gotcha, but it causes a double iteration of the RDD. The accumulator could be set in the map, and then the map executed with caching, so that the RDD transformation is cached and iterated over once, and the accumulator value is set.

This comment has been minimized.

@jamesmcclain

jamesmcclain Feb 6, 2017

Member

Oh right

@jamesmcclain jamesmcclain changed the title from [WiP] Spark-Enabled Cost-Distance to Spark-Enabled Cost-Distance Feb 9, 2017

@jamesmcclain

This comment has been minimized.

Member

jamesmcclain commented Feb 9, 2017

All comments addressed.

@lossyrob

Looking good, the last thing is the performance question of the single threaded cost distance changes

val cost2 = calcCost(c1, r1, dir(c, r), cost)
if (cost2.isDefined) {
curMinCost = math.min(curMinCost, source + cost1.get + cost2.get)
def compute(

This comment has been minimized.

@lossyrob

lossyrob Feb 21, 2017

Member

Have you benchmarked the single-threaded case for this change? It would be good to get numbers to prove this method works faster/just as fast.

This comment has been minimized.

@jamesmcclain

jamesmcclain Feb 21, 2017

Member

I have not run any benchmarks, but (informally) I did not detect any noticeable speed difference.

That having been said, it is possible (even likely) that there is a difference, because the changes that I made were very self-consciously deoptimizations.

That was necessary because the previous algorithm seemed to avoid putting elements into the priority queue whenever it could. That was laudable in the single-threaded case, but in order to maintain coherence in the case were points need to be transferred from some adjacent tile to the present one, I think that it makes sense to have one and only one place for points to enter (to ensure that points coming in from adjacent tiles and ponits that were already in the tile are treated exactly the same).

After getting the basics working, I then took another pass an re-added some mild optimizations. Generally speaking, I am pretty comfortable with were the single-threaded version is.

*/
def apply[K: (? => SpatialKey), V: (? => Tile)](
friction: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
points: Seq[Point],

This comment has been minimized.

@lossyrob

lossyrob Feb 21, 2017

Member

Discussion point, not a change request: What would it take to create an option to pass in an RDD of points? I'm curious if there are use cases for this, and if it would be possible with the current algorithm.

This comment has been minimized.

@jamesmcclain

jamesmcclain Feb 21, 2017

Member

I think that there would be some mechanical changes needed in order to support an RDD of points, probably the friction layer and the points would need to be joined (either logically or literally) as the first step.

As a practical matter, I do not think that supporting RDDs of points would add any real capability. If someone really did have so many points that they do not fit into memory on one machine and some appreciable percentage of those points have unique projections into the raster, then it is likely that the raster layer is so large (in terms of pixels) that this algorithm would run out of driver memory before even reaching the second iteration (driver memory requirements are a function of the total layer size -- this is unavoidable because propagation across tile boundaries must be coordinated by the driver).

On the other hand, if someone has a source RDD whose points can be clustered (or otherwise reduced) into a fairly small list, then that would be viable option (but that would be unrelated to this work).

@jamesmcclain

This comment has been minimized.

@jamesmcclain jamesmcclain force-pushed the jamesmcclain:feature/cost-distance branch from 713f6ec to 065398c Mar 3, 2017

@lossyrob lossyrob added this to the 1.2 milestone Mar 12, 2017

@pomadchin pomadchin self-assigned this Mar 17, 2017

@jamesmcclain jamesmcclain force-pushed the jamesmcclain:feature/cost-distance branch from 065398c to 7116d89 Mar 20, 2017

@pomadchin

I'm wondering about ident in atomic expressions like 1+1; should it be fixed to 1 + 1? (code style question)

*/
def generateEmptyQueue(cols: Int, rows: Int): Q = {
new PriorityQueue(
(cols*16 + rows*16), new java.util.Comparator[Cost] {

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

Unnecessary parentheses.

* "Propagating radial waves of travel cost in a grid."
* International Journal of Geographical Information Science 24.9 (2010): 1391-1413.
*
* @param friction Friction tile; pixels are interpreted as "second per meter"

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

@param frictionTile

val costTile = generateEmptyCostTile(cols, rows)
val q: Q = generateEmptyQueue(cols, rows)
points.foreach({ case (col, row) =>

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

cfor here would be a bit faster (according to benchmarks on aspect-tif.tif):

cfor(0)(_ < points.length, _ + 1) { i =>
  val (col, row) = points(i)
  q.add((col, row, frictionTile.getDouble(col, row), 0.0))
}
require(frictionTile.dimensions == costTile.dimensions)
def inTile(col: Int, row: Int): Boolean =
((0 <= col && col < cols) && (0 <= row && row < rows))

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

Unnecessary parentheses here too, and in methods below.

* @param col The column of the given location
* @param row The row of the given location
* @param friction1 The instantaneous cost (friction) at the neighboring location
* @param cost The length of the best-known path from a source to the neighboring location

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

@param neighborCost

costs.count
previous.unpersist()
} while (accumulator.value.size > 0)

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

accumulator.value.nonEmpty

val resolution = computeResolution(friction)
logger.debug(s"Computed resolution: $resolution meters/pixel")
val bounds = friction.metadata.bounds.asInstanceOf[KeyBounds[K]]

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

Eh, dirty cast here, you can use pattern match on Bounds[K], and to throw Exception (for example) on EmptyBounds

logger.debug(s"Computed resolution: $resolution meters/pixel")
val bounds = friction.metadata.bounds.asInstanceOf[KeyBounds[K]]
val minKey = implicitly[SpatialKey](bounds.minKey)

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

instead of the direct implicitly call, it is possible to write smth like:

val KeyBounds(minKey: SpatialKey, maxKey: SpatialKey) = bounds
val (minKeyCol, minKeyRow) = minKey

everywhere below can be changed, as there are lots of direct implicitly calls and tuple._{1/2} usages. I'll omit all comments below about the same thing.

// Construct return value and return it
val metadata = TileLayerMetadata(DoubleCellType, md.layout, md.extent, md.crs, md.bounds)
val rdd = costs.map({ case (k, _, cost) => (k, cost.asInstanceOf[Tile]) })

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

it's possible just to upcast DoubleArrayTile up to Tile:

(k, cost: Tile)
val mt = md.mapTransform
val kv = friction.first
val key = implicitly[SpatialKey](kv._1)
val tile = implicitly[Tile](kv._2)

This comment has been minimized.

@pomadchin

pomadchin Mar 22, 2017

Member

to avoid the direct implicitly call it's possible provide explicit type:

val (key: SpatialKey, tile: Tile) = kv

@pomadchin pomadchin assigned jamesmcclain and unassigned pomadchin Mar 22, 2017

@pomadchin

This comment has been minimized.

Member

pomadchin commented Mar 22, 2017

Benchmarks. Let me know if that's not enough and i need to test more, but i think it is already obvious that it's significantly slower.

@jamesmcclain

This comment has been minimized.

Member

jamesmcclain commented Mar 22, 2017

No, I'll just pull the original one out of version control and use that for the single-tile case.

Changes addressed in grisha's review

@jamesmcclain jamesmcclain force-pushed the jamesmcclain:feature/cost-distance branch 4 times, most recently from 2e2307c to 91777a6 Mar 27, 2017

jamesmcclain added some commits Feb 1, 2017

Remove Excess Whitespace
Signed-off-by: James McClain <jmcclain@azavea.com>
Bump Initial Priority Queue Size
Increase size by one binary order of magnitude.  Still proportional to
the square root of the area of the typical expected tile.

jamesmcclain added some commits Feb 3, 2017

First Iteration
This code implements the first iteration of the n-iteration distributed
cost-distance algorithm.
Allow Serial Cost-Distance To Terminate
Force cost values up as quickly as possible to allow paths to be pruned.
nth Iteration
05 Feb 19:44:24 INFO [cdistance.CostDistance$] - MILLIS: 155272
05 Feb 19:48:29 INFO [cdistance.CostDistance$] - MILLIS: 153500
Do Not Name Accumulator
Naming this accumulator adds a lot of clutter to the Spark UI.
Perform Cost-Distance Over Geometries
Previously, only points were accepted.

@jamesmcclain jamesmcclain force-pushed the jamesmcclain:feature/cost-distance branch from 91777a6 to b7b3580 Mar 27, 2017

@jamesmcclain jamesmcclain force-pushed the jamesmcclain:feature/cost-distance branch from b7b3580 to 30fa62f Mar 27, 2017

val costTile = generateEmptyCostTile(cols, rows)
val q: Q = generateEmptyQueue(cols, rows)
var i = 0; while (i < points.length) {

This comment has been minimized.

@pomadchin

pomadchin Mar 27, 2017

Member

Why don't to use just cfor? It's a common dep for the whole raster package. A bit less ugly ^^'.

val keys = mutable.ArrayBuffer.empty[SpatialKey]
val bounds = md.layout.mapTransform(g.envelope)
var row = bounds.rowMin; while (row <= bounds.rowMax) {

This comment has been minimized.

@pomadchin

pomadchin Mar 27, 2017

Member

Here cfor can be used too

other
}
def add(pair: KeyCostPair): Unit = {
this.synchronized { list.append(pair) }

This comment has been minimized.

@pomadchin

pomadchin Mar 30, 2017

Member

I totally forgot about it, but @echeipesh noticed these locks and reminded about it. @jamesmcclain, have you considered any java concurrent collection usage here, instead of synchronized everywhere?

This comment has been minimized.

@jamesmcclain

jamesmcclain Mar 31, 2017

Member

The most appropriate structure that I found was CopyOnWriteArrayList. The documentation for that class says the following:

This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads. The "snapshot" style iterator method uses a reference to the state of the array at the point that the iterator was created. This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException. The iterator will not reflect additions, removals, or changes to the list since the iterator was created. Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException.

(I am pointing in particular to the first sentence.)

I do not think that a synchronized concurrent data structure is warranted here since there is only one place for new data to go (the end of the list).

@echeipesh echeipesh modified the milestones: 1.1, 1.2 Mar 31, 2017

@echeipesh echeipesh merged commit d3943c8 into locationtech:master Mar 31, 2017

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
ip-validation
Details

@jamesmcclain jamesmcclain deleted the jamesmcclain:feature/cost-distance branch Mar 31, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment