Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize reads #1607

Merged
merged 8 commits into from Aug 17, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2,6 +2,7 @@ package geotrellis.spark.io.cassandra

import geotrellis.spark._
import geotrellis.spark.io._

import com.datastax.driver.core.{ResultSet, Session}
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{set, eq => eqs}
Expand Down
Expand Up @@ -6,11 +6,15 @@ import geotrellis.spark.{Boundable, KeyBounds, LayerId}
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.io.index.{IndexRanges, MergeQueue}

import scalaz.concurrent.Task
import scalaz.std.vector._
import scalaz.stream.{Process, nondeterminism}
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs}
import org.apache.avro.Schema
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import java.util.concurrent.Executors

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand All @@ -34,9 +38,9 @@ object CassandraRDDReader {
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable

val ranges = if (queryKeyBounds.length > 1)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
else
queryKeyBounds.flatMap(decomposeBounds)

val bins = IndexRanges.bin(ranges, numPartitions.getOrElse(sc.defaultParallelism))

Expand All @@ -47,36 +51,42 @@ object CassandraRDDReader {
.and(eqs("zoom", layerId.zoom))
.toString

val rdd: RDD[(K, V)] =
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
instance.withSession { session =>
val statement = session.prepare(query)
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
instance.withSession { session =>
val statement = session.prepare(query)

val tileSeq: Iterator[Seq[(K, V)]] =
for {
rangeList <- partition // Unpack the one element of this partition, the rangeList.
range <- rangeList
index <- range._1 to range._2
} yield {
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) recs
else recs.filter { row => includeKey(row._1) }
} else {
Seq.empty
}
val result = partition map { seq =>
val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter =>
if (iter.hasNext) {
val (start, end) = iter.next()
Some((start to end).toIterator, iter)
} else None
}

val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator =>
Process.unfold(iterator) { iter =>
if (iter.hasNext) {
val index = iter.next()
val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long]))
if (row.nonEmpty) {
val bytes = row.one().getBytes("value").array()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) Some(recs, iter)
else Some(recs.filter { row => includeKey(row._1) }, iter)
} else Some(Vector.empty, iter)
} else None
}
}

/** Close partition session */
(tileSeq ++ Iterator({
session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)]
})).flatten
nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }.runFoldMap(identity).unsafePerformSync
}
}

rdd
/** Close partition session */
(result ++ Iterator({
session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)]
})).flatten
}
}
}
}
Expand Up @@ -3,10 +3,12 @@ package geotrellis.spark.io.cassandra
import geotrellis.spark.io.avro._
import geotrellis.spark.io.avro.codecs._
import geotrellis.spark.LayerId

import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.schemabuilder.SchemaBuilder
import com.datastax.driver.core.DataType._
import com.datastax.driver.core.ResultSet
import com.datastax.driver.core.ResultSetFuture
import org.apache.spark.rdd.RDD

import scalaz.concurrent.Task
Expand Down Expand Up @@ -87,7 +89,7 @@ object CassandraRDDWriter {
Process eval Task {
session.closeAsync()
session.getCluster.closeAsync()
}
}(pool)
}

results.run.unsafePerformSync
Expand Down
65 changes: 35 additions & 30 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala
Expand Up @@ -2,19 +2,19 @@ package geotrellis.spark.io.s3

import geotrellis.spark._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.index.{MergeQueue, KeyIndex, IndexRanges}
import geotrellis.spark.io.index.{IndexRanges, MergeQueue}
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.util.KryoWrapper

import scalaz.concurrent.Task
import scalaz.std.vector._
import scalaz.stream.{Process, nondeterminism}
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.typesafe.scalalogging.slf4j.LazyLogging
import org.apache.avro.Schema
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.reflect.ClassTag

trait S3RDDReader {

def getS3Client: () => S3Client
Expand All @@ -31,7 +31,7 @@ trait S3RDDReader {
writerSchema: Option[Schema] = None,
numPartitions: Option[Int] = None
)(implicit sc: SparkContext): RDD[(K, V)] = {
if(queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)]
if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)]

val ranges = if (queryKeyBounds.length > 1)
MergeQueue(queryKeyBounds.flatMap(decomposeBounds))
Expand All @@ -45,37 +45,42 @@ trait S3RDDReader {
val _getS3Client = getS3Client
val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable

val rdd =
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
val s3client = _getS3Client()
sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
val s3client = _getS3Client()

partition flatMap { seq =>
val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter =>
if (iter.hasNext) {
val (start, end) = iter.next()
Some((start to end).toIterator, iter)
} else None
}

val tileSeq: Iterator[Seq[(K, V)]] =
for {
rangeList <- partition // Unpack the one element of this partition, the rangeList.
range <- rangeList
index <- range._1 to range._2
} yield {
val path = keyPath(index)
val getS3Bytes = () => IOUtils.toByteArray(s3client.getObject(bucket, path).getObjectContent)
val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator =>
Process.unfold(iterator) { iter =>
if (iter.hasNext) {
val index = iter.next()
val path = keyPath(index)
val getS3Bytes = () => IOUtils.toByteArray(s3client.getObject(bucket, path).getObjectContent)

try {
val bytes: Array[Byte] =
getS3Bytes()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if(filterIndexOnly)
recs
else
recs.filter { row => includeKey(row._1) }
} catch {
case e: AmazonS3Exception if e.getStatusCode == 404 => Seq.empty
try {
val bytes: Array[Byte] = getS3Bytes()
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) Some(recs, iter)
else Some(recs.filter { row => includeKey(row._1) }, iter)
} catch {
case e: AmazonS3Exception if e.getStatusCode == 404 => Some(Vector.empty, iter)
}
} else {
None
}
}
}

tileSeq.flatten
nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { range map read }.runFoldMap(identity).unsafePerformSync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice 8 is magic number here but 32 is for Cassandra and File. Did it benchmark to be best that way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best if we could configure this somehow, and not have magic numbers impossibly un-magicked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about using typesafe config for this? This is probably a system wide property rather than per-action property.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just experimented how tests would run faster on a local machine; though they are still magical. probably it makes sense to make them configurable, as speed of them depends on certain machine(s) configuration.

}

rdd
}
}
}

Expand Down
49 changes: 25 additions & 24 deletions spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala
Expand Up @@ -2,19 +2,18 @@ package geotrellis.spark.io.file

import geotrellis.spark._
import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec
import geotrellis.spark.io.index.{MergeQueue, KeyIndex, IndexRanges}
import geotrellis.spark.io.index.{MergeQueue, IndexRanges}
import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec}
import geotrellis.spark.util.KryoWrapper
import geotrellis.util.Filesystem

import scalaz.concurrent.Task
import scalaz.std.vector._
import scalaz.stream.{Process, nondeterminism}
import org.apache.avro.Schema
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import spire.syntax.cfor._
import scala.collection.mutable
import scala.reflect.ClassTag
import java.io.File

object FileRDDReader {
Expand Down Expand Up @@ -42,29 +41,31 @@ object FileRDDReader {

sc.parallelize(bins, bins.size)
.mapPartitions { partition: Iterator[Seq[(Long, Long)]] =>
val resultPartition = mutable.ListBuffer[(K, V)]()
partition flatMap { seq =>
val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter =>
if (iter.hasNext) {
val (start, end) = iter.next()
Some((start to end).toIterator, iter)
} else None
}

for(
rangeList <- partition;
range <- rangeList
) {
val (start, end) = range
cfor(start)(_ <= end, _ + 1) { index =>
val path = keyPath(index)
if(new File(path).exists) {
val bytes: Array[Byte] = Filesystem.slurp(path)
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
resultPartition ++= {
if(filterIndexOnly)
recs
else
recs.filter { row => includeKey(row._1) }
}
val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator =>
Process.unfold(iterator) { iter =>
if (iter.hasNext) {
val index = iter.next()
val path = keyPath(index)
if(new File(path).exists) {
val bytes: Array[Byte] = Filesystem.slurp(path)
val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec)
if (filterIndexOnly) Some(recs, iter)
else Some(recs.filter { row => includeKey(row._1) }, iter)
} else Some(Vector.empty, iter)
} else None
}
}
}

resultPartition.iterator
nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }.runFoldMap(identity).unsafePerformSync
}
}
}
}