Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingesting large geotiff from s3 doesn't seem to be partitioning data properly #2469

Closed
jmelching opened this issue Nov 3, 2017 · 4 comments
Assignees
Labels
Milestone

Comments

@jmelching
Copy link
Contributor

jmelching commented Nov 3, 2017

I've been testing out the 1.2.0-RC1 with some existing code that just ingests one year of the USDA's cropland dataset and seeing a strange behavior... It appears that the entire geotiff (13 GB's) might be being read by a single task as it tries to crop the tiff into tiles.
Here's the thread dump of the only running executor:

java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.read(InputRecord.java:503) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) => holding Monitor(java.lang.Object@1820339677}) sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940) sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding Monitor(sun.security.ssl.AppInputStream@1648873021}) org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139) org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:200) org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178) org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) java.io.FilterInputStream.read(FilterInputStream.java:107) org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1792) org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1769) org.apache.commons.io.IOUtils.copy(IOUtils.java:1744) org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:462) geotrellis.spark.io.s3.AmazonS3Client.readRange(AmazonS3Client.scala:93) geotrellis.spark.io.s3.util.S3RangeReader.readClippedRange(S3RangeReader.scala:48) geotrellis.util.RangeReader$class.readRange(RangeReader.scala:36) geotrellis.spark.io.s3.util.S3RangeReader.readRange(S3RangeReader.scala:38) geotrellis.util.StreamingByteReader$$anonfun$1.apply(StreamingByteReader.scala:90) geotrellis.util.StreamingByteReader$$anonfun$1.apply(StreamingByteReader.scala:90) geotrellis.util.StreamingByteReader$Chunk.data(StreamingByteReader.scala:43) geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98) geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104) geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81) geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99) geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99) scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) geotrellis.raster.io.geotiff.GeoTiffTile.crop(GeoTiffTile.scala:541) geotrellis.spark.io.RasterReader$$anon$1$$anonfun$readWindows$2.apply(RasterReader.scala:191) geotrellis.spark.io.RasterReader$$anon$1$$anonfun$readWindows$2.apply(RasterReader.scala:191) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) scala.collection.TraversableLike$class.map(TraversableLike.scala:234) scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) geotrellis.spark.io.RasterReader$$anon$1.readWindows(RasterReader.scala:191) geotrellis.spark.io.RasterReader$$anon$1.readWindows(RasterReader.scala:173) geotrellis.spark.io.s3.S3GeoTiffRDD$$anonfun$1.apply(S3GeoTiffRDD.scala:182) geotrellis.spark.io.s3.S3GeoTiffRDD$$anonfun$1.apply(S3GeoTiffRDD.scala:181) scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336) org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1011) org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1009) org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:99) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

I have ran this before with a 1.2.0 milestone version successfully. I originally used 1.2.0-M1 to fix some of the issues i had with bigtiffs.
I have tried on spark 2.1 and 2.2.
I am using the Spark ETL standard stuff and just calling : Etl.ingestProjectedExtent, SpatialKey, Tile with
input:
{ "format": "geotiff", "name": "cropland_crop_raw_melch.tif", "cache": "NONE", "maxTileSize": 512, "numPartitions": 10000, "backend": { "type": "s3", "path": "s3://---.analytics/dsw/data/staging/v2.1/cropland/tiff/melch.tif" }

and output

{ "backend": { "type": "s3", "path": "s3://---.services/goliath/scratch/melching/catalog" }, "encoding":"geotiff", "reprojectMethod": "buffered", "pyramid": false, "tileSize": 256, "keyIndexMethod": { "type": "zorder" }, "resampleMethod": "nearest-neighbor", "layoutScheme": "floating", "crs": "EPSG:5070" }

@pomadchin pomadchin added the bug label Nov 3, 2017
@pomadchin pomadchin added this to the 1.2 milestone Nov 3, 2017
@pomadchin
Copy link
Member

pomadchin commented Nov 7, 2017

Bug investigation results:

The regression was introduced in #2402 and a bit corrected in #2439 though probably it made the problem even harder. I didn't notice that #2402 introduced some additional partitioning logic instead of this, M1 logic where we were trying to pack segments into some window partitioning logic taking into account segments location and their sizes.

link where this code was applied to track down the issue

// how bytes calculated now
val windowBytes = gb.sizeLong * depth // depth depends on the cell type
//> 523264

// what happens indeed (bytes required to perform crop on a such window)
val segmentBytes =
  md.segmentLayout.intersectingSegments(gb).map { i =>
    md.segmentBytes.getSegmentByteCount(i) * md.bandCount
  } sum
//> 157502464

// all in all 111 partitions
// 258 windows in each
// each window ~ 157502464 bytes
// size of each partition would be 
// 157502464 * 256 = 40320630784 bytes = 40320.5 mb ~ 40 gigs to fetch per partition

// how this function worked before #2402 (and had to work even in theory)
// all in all 111 partitions
// 873 windows each (each window) is a segment
// each segment size 153811
// 153811 * 873 = 134277003 bytes = 134 mb per partition
// means it reads ~ 13-14 gigs into spark memory and reads only 134 mb per partition 

What happens: partitioning is not optimal. It picks up some segments, and just generates windows without taking care about tiff segments and without taking into account segment sizes. Also the problem with double segment reads was introduced in #2402 again.

The solution is to rollback to M1 function implementation or to improve logic with @jamesmcclain powers.

13Gb tile provided by @jmelching: s3://bigtiffs-test/2469/2013_30m_cdls.tif

@pomadchin
Copy link
Member

pomadchin commented Nov 7, 2017

Hope it's enough for the proof.

master: m3.xlarge
slaves (2): m3.xlarge

Tests on M1 function:

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.s3._

implicit val _sc = sc
val rdd = S3GeoTiffRDD.spatial("bigtiffs-test", "2469/2013_30m_cdls.tif")

rdd.count()

// ==================================
// Repartition into 111 partitions.
// ==================================
// res7: Long = 96523

screen shot 2017-11-07 at 16 03 28

Tests on RC1 / master:

// could not wait until finish, but we can notice that it works really long

screen shot 2017-11-07 at 18 07 34

@echeipesh
Copy link
Contributor

@jmelching Thank you for testing and the report, we've cut RC2 that should resolve this issue. Please let us know if it behaves for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants