Skip to content
Permalink
Browse files

Remove `segmentsByPartitionBytes`

  • Loading branch information
jamesmcclain authored and echeipesh committed Oct 11, 2017
1 parent 62321f5 commit 88155254541a2d32534bbe0697b7c4462b52f59b
Showing with 0 additions and 121 deletions.
  1. +0 −121 spark/src/main/scala/geotrellis/spark/io/GeoTiffInfoReader.scala
@@ -120,125 +120,4 @@ private [geotrellis] trait GeoTiffInfoReader extends LazyLogging {
allPartitions.toArray.map({ array => (uri, array) })
})
}

/**
* Function calculates a split of segments, to minimize segments reads.
*
* Returns RDD of pairs: (URI, Array[GridBounds])
* where GridBounds are grid bounds of a particular segment,
* each segment can only be in a single partition.
* */
def segmentsByPartitionBytes(
partitionBytes: Long = Long.MaxValue,
maxTileSize: Option[Int] = None
)(implicit sc: SparkContext): RDD[(String, Array[GridBounds])] = {
geoTiffInfoRdd.flatMap { uri =>
val bufferKey = uri

val md = getGeoTiffInfo(uri)
val allSegments = mutable.Set(md.segmentBytes.indices: _*)
val allSegmentsInitialSize = allSegments.size

val layout = md.segmentLayout
val segmentBytes = md.segmentBytes

// list of desired windows, we'll try to pack them with segments if its possible
val windows = RasterReader.listWindows(layout.totalCols, layout.totalRows, maxTileSize)
// a buffer with segments refs
val buf: mutable.ListBuffer[(String, Array[GridBounds])] = mutable.ListBuffer()

// walk though all desired windows
windows.foreach { gb =>
// buffer of segments which should fit bytes size & window size
val windowsBuffer: mutable.ListBuffer[Array[GridBounds]] = mutable.ListBuffer() // a buffer with segments refs
// current buffer
val currentBuffer: mutable.ListBuffer[GridBounds] = mutable.ListBuffer()
var currentSize = 0
var currentBoundsLength = 0

// go through all segments which intersect desired bounds and was not put into any partition yet
layout.intersectingSegments(gb).intersect(allSegments.toSeq).foreach { i =>
val segmentSize = layout.getSegmentSize(i)
val segmentSizeBytes = segmentBytes.getSegmentByteCount(i) * md.bandCount
val segmentGb = layout.getGridBounds(i)


// if segment is inside the window
if ((gb.contains(segmentGb) && layout.isTiled) || segmentSize <= gb.sizeLong && layout.isStriped) {
// if segment fits partition
if (segmentSizeBytes <= partitionBytes) {
// check if we still want to put segment into the same partition
if (currentSize <= partitionBytes && (layout.isTiled || layout.isStriped && currentBoundsLength <= gb.sizeLong)) {
currentSize += segmentSizeBytes
currentBuffer += segmentGb
currentBoundsLength += segmentSize
} else { // or put it into a separate partition
windowsBuffer += currentBuffer.toArray
currentBuffer.clear()
currentSize = segmentSizeBytes
currentBoundsLength = segmentSize
currentBuffer += segmentGb
}
} else {
// case when segment size is bigger than a desired partition size
// it is better to use a different strategy for these purposes
logger.warn("Segment size is bigger than a desired partition size, " +
"though it fits the window size. You can consider a different partitioning strategy.")
windowsBuffer += Array(segmentGb)
}
allSegments -= i
}
}

// if we have smth left in the current buffer
if(currentBuffer.nonEmpty) windowsBuffer += currentBuffer.toArray

windowsBuffer.foreach { indices => buf += (bufferKey -> indices) }
}

// there can be left some windows
if (allSegments.nonEmpty) {
logger.warn(s"Some segments don't fit windows (${allSegments.size} of $allSegmentsInitialSize).")

val windowsBuffer: mutable.ListBuffer[Array[GridBounds]] = mutable.ListBuffer() // a buffer with segments refs
val currentBuffer: mutable.ListBuffer[GridBounds] = mutable.ListBuffer()
val gbSize = windows.head.sizeLong
var currentSize = 0
var currentBoundsLength = 0

allSegments.foreach { i =>
val segmentSize = layout.getSegmentSize(i)
val segmentSizeBytes = segmentBytes.getSegmentByteCount(i) * md.bandCount
val segmentGb = layout.getGridBounds(i)

if (currentSize <= partitionBytes) {
if (currentSize <= partitionBytes && (layout.isTiled || layout.isStriped && currentBoundsLength <= gbSize)) {
currentSize += segmentSizeBytes
currentBuffer += segmentGb
currentBoundsLength += segmentSize
} else {
windowsBuffer += currentBuffer.toArray
currentBuffer.clear()
currentSize = segmentSizeBytes
currentBoundsLength = segmentSize
currentBuffer += segmentGb
}
} else {
// case when segment size is bigger than a desired partition size
// it is better to use a different strategy for these purposes
logger.warn("Segment size is bigger than a desired partition size, " +
"and it doesn't fit window a desired window size. You can consider a different partitioning strategy.")
windowsBuffer += Array(segmentGb)
}
}

// if we have smth left in the current buffer
if(currentBuffer.nonEmpty) windowsBuffer += currentBuffer.toArray

windowsBuffer.foreach { indices => buf += (bufferKey -> indices) }
}

buf
}
}
}

0 comments on commit 8815525

Please sign in to comment.
You can’t perform that action at this time.