From 2daf66562565ed3e0c725b942e3cfcdf7f8af989 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 2 Aug 2016 17:46:09 +0300 Subject: [PATCH 1/8] sim reads for s3 and for cassandra --- .../cassandra/CassandraAttributeStore.scala | 1 + .../io/cassandra/CassandraRDDReader.scala | 52 +++++++++++----- .../io/cassandra/CassandraRDDWriter.scala | 9 +++ .../geotrellis/spark/io/s3/S3RDDReader.scala | 62 ++++++++++++------- 4 files changed, 83 insertions(+), 41 deletions(-) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraAttributeStore.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraAttributeStore.scala index ace8e8bbf6..b153fbc382 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraAttributeStore.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraAttributeStore.scala @@ -2,6 +2,7 @@ package geotrellis.spark.io.cassandra import geotrellis.spark._ import geotrellis.spark.io._ + import com.datastax.driver.core.{ResultSet, Session} import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.querybuilder.QueryBuilder.{set, eq => eqs} diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala index 60b39d3149..8cd53d2ad4 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala @@ -6,13 +6,19 @@ import geotrellis.spark.{Boundable, KeyBounds, LayerId} import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.io.index.{IndexRanges, MergeQueue} +import scalaz.concurrent.Task +import scalaz.stream.{Process, nondeterminism} import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs} import org.apache.avro.Schema import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import java.util.concurrent.Executors + +import com.datastax.driver.core.ResultSetFuture import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag object CassandraRDDReader { @@ -52,27 +58,39 @@ object CassandraRDDReader { .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => instance.withSession { session => val statement = session.prepare(query) + val pool = Executors.newFixedThreadPool(8) + + val result = partition map { range => + val ranges = Process.unfold(range.toIterator) { iter: Iterator[(Long, Long)] => + if (iter.hasNext) Some(iter.next(), iter) + else None + } - val tileSeq: Iterator[Seq[(K, V)]] = - for { - rangeList <- partition // Unpack the one element of this partition, the rangeList. - range <- rangeList - index <- range._1 to range._2 - } yield { - val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long])) - if (row.nonEmpty) { - val bytes = row.one().getBytes("value").array() - val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) - if (filterIndexOnly) recs - else recs.filter { row => includeKey(row._1) } - } else { - Seq.empty - } + val read: ((Long, Long)) => Process[Task, List[(K, V)]] = { + case (start, end) => + Process eval { + Task.gatherUnordered(for { + index <- start to end + } yield Task { + val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long])) + if (row.nonEmpty) { + val bytes = row.one().getBytes("value").array() + val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) + if (filterIndexOnly) recs + else recs.filter { row => includeKey(row._1) } + } else { + Seq.empty + } + }(pool)).map(_.flatten) + } } + nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { ranges map read }.runLog.map(_.flatten).unsafePerformSync + } + /** Close partition session */ - (tileSeq ++ Iterator({ - session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] + (result ++ Iterator({ + pool.shutdown(); session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] })).flatten } } diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala index 0c8932aa28..bda43a3d25 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala @@ -3,10 +3,12 @@ package geotrellis.spark.io.cassandra import geotrellis.spark.io.avro._ import geotrellis.spark.io.avro.codecs._ import geotrellis.spark.LayerId + import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.schemabuilder.SchemaBuilder import com.datastax.driver.core.DataType._ import com.datastax.driver.core.ResultSet +import com.datastax.driver.core.ResultSetFuture import org.apache.spark.rdd.RDD import scalaz.concurrent.Task @@ -58,6 +60,13 @@ object CassandraRDDWriter { instance.withSession { session => val statement = session.prepare(query) + /*partition map { recs => + val id: java.lang.Long = recs._1 + val pairs = recs._2.toVector + val bytes = ByteBuffer.wrap(AvroEncoder.toBinary(pairs)(codec)) + session.executeAsync(statement.bind(id, bytes)) + } map { _.getUninterruptibly() }*/ + val queries: Process[Task, (java.lang.Long, ByteBuffer)] = Process.unfold(partition) { iter => if (iter.hasNext) { diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index 683ca8a19c..72f28012dc 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -2,18 +2,19 @@ package geotrellis.spark.io.s3 import geotrellis.spark._ import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec -import geotrellis.spark.io.index.{MergeQueue, KeyIndex, IndexRanges} +import geotrellis.spark.io.index.{IndexRanges, MergeQueue} import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.util.KryoWrapper +import scalaz.concurrent.Task +import scalaz.stream.{Process, nondeterminism} import com.amazonaws.services.s3.model.AmazonS3Exception -import com.typesafe.scalalogging.slf4j.LazyLogging import org.apache.avro.Schema import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import scala.reflect.ClassTag +import java.util.concurrent.Executors trait S3RDDReader { @@ -49,30 +50,43 @@ trait S3RDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => val s3client = _getS3Client() + val pool = Executors.newFixedThreadPool(8) - val tileSeq: Iterator[Seq[(K, V)]] = - for { - rangeList <- partition // Unpack the one element of this partition, the rangeList. - range <- rangeList - index <- range._1 to range._2 - } yield { - val path = keyPath(index) - val getS3Bytes = () => IOUtils.toByteArray(s3client.getObject(bucket, path).getObjectContent) - - try { - val bytes: Array[Byte] = - getS3Bytes() - val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) - if(filterIndexOnly) - recs - else - recs.filter { row => includeKey(row._1) } - } catch { - case e: AmazonS3Exception if e.getStatusCode == 404 => Seq.empty - } + val result = partition flatMap { range => + val ranges = Process.unfold(range.toIterator) { iter: Iterator[(Long, Long)] => + if (iter.hasNext) Some(iter.next(), iter) + else None } - tileSeq.flatten + val read: ((Long, Long)) => Process[Task, List[(K, V)]] = { + case (start, end) => + Process eval { + Task.gatherUnordered(for { + index <- start to end + } yield Task { + val path = keyPath(index) + val getS3Bytes = () => IOUtils.toByteArray(s3client.getObject(bucket, path).getObjectContent) + + try { + val bytes: Array[Byte] = + getS3Bytes() + val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) + if(filterIndexOnly) + recs + else + recs.filter { row => includeKey(row._1) } + } catch { + case e: AmazonS3Exception if e.getStatusCode == 404 => Seq.empty + } + }(pool)).map(_.flatten) + } + } + + nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { ranges map read }.runLog.map(_.flatten).unsafePerformSync + } + + pool.shutdown() + result } rdd From adc937e68f96c0a86bf1e536b95e1985718a240c Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 2 Aug 2016 20:07:56 +0300 Subject: [PATCH 2/8] fix s3 rdd reader thread pool close --- .../main/scala/geotrellis/spark/io/s3/S3RDDReader.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index 72f28012dc..8cb139a740 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -52,7 +52,7 @@ trait S3RDDReader { val s3client = _getS3Client() val pool = Executors.newFixedThreadPool(8) - val result = partition flatMap { range => + val result = partition map { range => val ranges = Process.unfold(range.toIterator) { iter: Iterator[(Long, Long)] => if (iter.hasNext) Some(iter.next(), iter) else None @@ -85,8 +85,10 @@ trait S3RDDReader { nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { ranges map read }.runLog.map(_.flatten).unsafePerformSync } - pool.shutdown() - result + /** Close partition pool */ + (result ++ Iterator({ + pool.shutdown(); Seq.empty[(K, V)] + })).flatten } rdd From c737e48b1335fe258efff0a42311a71200e8de05 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 2 Aug 2016 22:26:31 +0300 Subject: [PATCH 3/8] fix sim reads --- .../io/cassandra/CassandraRDDReader.scala | 37 +++++++-------- .../geotrellis/spark/io/s3/S3RDDReader.scala | 46 ++++++++----------- 2 files changed, 39 insertions(+), 44 deletions(-) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala index 8cd53d2ad4..31ead4bd4e 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala @@ -7,6 +7,7 @@ import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.io.index.{IndexRanges, MergeQueue} import scalaz.concurrent.Task +import scalaz.std.vector._ import scalaz.stream.{Process, nondeterminism} import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.querybuilder.QueryBuilder.{eq => eqs} @@ -15,10 +16,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import java.util.concurrent.Executors -import com.datastax.driver.core.ResultSetFuture - import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import scala.reflect.ClassTag object CassandraRDDReader { @@ -58,39 +56,42 @@ object CassandraRDDReader { .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => instance.withSession { session => val statement = session.prepare(query) - val pool = Executors.newFixedThreadPool(8) val result = partition map { range => - val ranges = Process.unfold(range.toIterator) { iter: Iterator[(Long, Long)] => - if (iter.hasNext) Some(iter.next(), iter) + val ranges: Process[Task, Iterator[Long]] = Process.unfold(range.toIterator) { iter => + if (iter.hasNext) { + val (start, end) = iter.next() + Some((start to end).toIterator, iter) + } else None } - val read: ((Long, Long)) => Process[Task, List[(K, V)]] = { - case (start, end) => - Process eval { - Task.gatherUnordered(for { - index <- start to end - } yield Task { + val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { + case iterator => + Process.unfold(iterator) { iter => + if(iter.hasNext) { + val index = iter.next() val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long])) if (row.nonEmpty) { val bytes = row.one().getBytes("value").array() val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) - if (filterIndexOnly) recs - else recs.filter { row => includeKey(row._1) } + if (filterIndexOnly) Some(recs, iter) + else Some(recs.filter { row => includeKey(row._1) }, iter) } else { - Seq.empty + Some(Vector.empty, iter) } - }(pool)).map(_.flatten) + } else { + None + } } } - nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { ranges map read }.runLog.map(_.flatten).unsafePerformSync + nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { ranges map read }.runFoldMap(identity).unsafePerformSync } /** Close partition session */ (result ++ Iterator({ - pool.shutdown(); session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] + session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] })).flatten } } diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index 8cb139a740..df3f640766 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -7,6 +7,7 @@ import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.util.KryoWrapper import scalaz.concurrent.Task +import scalaz.std.vector._ import scalaz.stream.{Process, nondeterminism} import com.amazonaws.services.s3.model.AmazonS3Exception import org.apache.avro.Schema @@ -14,8 +15,6 @@ import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import java.util.concurrent.Executors - trait S3RDDReader { def getS3Client: () => S3Client @@ -50,45 +49,40 @@ trait S3RDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => val s3client = _getS3Client() - val pool = Executors.newFixedThreadPool(8) - val result = partition map { range => - val ranges = Process.unfold(range.toIterator) { iter: Iterator[(Long, Long)] => - if (iter.hasNext) Some(iter.next(), iter) + partition flatMap { range => + val ranges: Process[Task, Iterator[Long]] = Process.unfold(range.toIterator) { iter => + if (iter.hasNext) { + val (start, end) = iter.next() + Some((start to end).toIterator, iter) + } else None } - val read: ((Long, Long)) => Process[Task, List[(K, V)]] = { - case (start, end) => - Process eval { - Task.gatherUnordered(for { - index <- start to end - } yield Task { + val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { + case iterator => + Process.unfold(iterator) { iter => + if(iter.hasNext) { + val index = iter.next() val path = keyPath(index) val getS3Bytes = () => IOUtils.toByteArray(s3client.getObject(bucket, path).getObjectContent) try { - val bytes: Array[Byte] = - getS3Bytes() + val bytes: Array[Byte] = getS3Bytes() val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) - if(filterIndexOnly) - recs - else - recs.filter { row => includeKey(row._1) } + if(filterIndexOnly) Some(recs, iter) + else Some(recs.filter { row => includeKey(row._1) }, iter) } catch { - case e: AmazonS3Exception if e.getStatusCode == 404 => Seq.empty + case e: AmazonS3Exception if e.getStatusCode == 404 => Some(Vector.empty, iter) } - }(pool)).map(_.flatten) + } else { + None + } } } - nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { ranges map read }.runLog.map(_.flatten).unsafePerformSync + nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { ranges map read }.runFoldMap(identity).unsafePerformSync } - - /** Close partition pool */ - (result ++ Iterator({ - pool.shutdown(); Seq.empty[(K, V)] - })).flatten } rdd From 1c26b0f2bfbc83dbe8e3fb9f1b912b0c29dd1072 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 2 Aug 2016 23:06:43 +0300 Subject: [PATCH 4/8] code refactor --- .../io/cassandra/CassandraRDDReader.scala | 75 ++++++++----------- .../geotrellis/spark/io/s3/S3RDDReader.scala | 65 ++++++++-------- 2 files changed, 63 insertions(+), 77 deletions(-) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala index 31ead4bd4e..0f676c22d8 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala @@ -38,9 +38,9 @@ object CassandraRDDReader { val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable val ranges = if (queryKeyBounds.length > 1) - MergeQueue(queryKeyBounds.flatMap(decomposeBounds)) - else - queryKeyBounds.flatMap(decomposeBounds) + MergeQueue(queryKeyBounds.flatMap(decomposeBounds)) + else + queryKeyBounds.flatMap(decomposeBounds) val bins = IndexRanges.bin(ranges, numPartitions.getOrElse(sc.defaultParallelism)) @@ -51,51 +51,42 @@ object CassandraRDDReader { .and(eqs("zoom", layerId.zoom)) .toString - val rdd: RDD[(K, V)] = - sc.parallelize(bins, bins.size) - .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => - instance.withSession { session => - val statement = session.prepare(query) + sc.parallelize(bins, bins.size) + .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => + instance.withSession { session => + val statement = session.prepare(query) - val result = partition map { range => - val ranges: Process[Task, Iterator[Long]] = Process.unfold(range.toIterator) { iter => - if (iter.hasNext) { - val (start, end) = iter.next() - Some((start to end).toIterator, iter) - } - else None - } + val result = partition map { seq => + val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => + if (iter.hasNext) { + val (start, end) = iter.next() + Some((start to end).toIterator, iter) + } else None + } - val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { - case iterator => - Process.unfold(iterator) { iter => - if(iter.hasNext) { - val index = iter.next() - val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long])) - if (row.nonEmpty) { - val bytes = row.one().getBytes("value").array() - val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) - if (filterIndexOnly) Some(recs, iter) - else Some(recs.filter { row => includeKey(row._1) }, iter) - } else { - Some(Vector.empty, iter) - } - } else { - None - } - } + val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator => + Process.unfold(iterator) { iter => + if (iter.hasNext) { + val index = iter.next() + val row = session.execute(statement.bind(index.asInstanceOf[java.lang.Long])) + if (row.nonEmpty) { + val bytes = row.one().getBytes("value").array() + val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) + if (filterIndexOnly) Some(recs, iter) + else Some(recs.filter { row => includeKey(row._1) }, iter) + } else Some(Vector.empty, iter) + } else None } - - nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { ranges map read }.runFoldMap(identity).unsafePerformSync } - /** Close partition session */ - (result ++ Iterator({ - session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] - })).flatten + nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }.runFoldMap(identity).unsafePerformSync } - } - rdd + /** Close partition session */ + (result ++ Iterator({ + session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] + })).flatten + } + } } } diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index df3f640766..754cfa393e 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -31,7 +31,7 @@ trait S3RDDReader { writerSchema: Option[Schema] = None, numPartitions: Option[Int] = None )(implicit sc: SparkContext): RDD[(K, V)] = { - if(queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)] + if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)] val ranges = if (queryKeyBounds.length > 1) MergeQueue(queryKeyBounds.flatMap(decomposeBounds)) @@ -45,47 +45,42 @@ trait S3RDDReader { val _getS3Client = getS3Client val kwWriterSchema = KryoWrapper(writerSchema) //Avro Schema is not Serializable - val rdd = - sc.parallelize(bins, bins.size) - .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => - val s3client = _getS3Client() + sc.parallelize(bins, bins.size) + .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => + val s3client = _getS3Client() - partition flatMap { range => - val ranges: Process[Task, Iterator[Long]] = Process.unfold(range.toIterator) { iter => - if (iter.hasNext) { - val (start, end) = iter.next() - Some((start to end).toIterator, iter) - } - else None - } + partition flatMap { seq => + val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => + if (iter.hasNext) { + val (start, end) = iter.next() + Some((start to end).toIterator, iter) + } else None + } - val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { - case iterator => - Process.unfold(iterator) { iter => - if(iter.hasNext) { - val index = iter.next() - val path = keyPath(index) - val getS3Bytes = () => IOUtils.toByteArray(s3client.getObject(bucket, path).getObjectContent) + val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator => + Process.unfold(iterator) { iter => + if (iter.hasNext) { + val index = iter.next() + val path = keyPath(index) + val getS3Bytes = () => IOUtils.toByteArray(s3client.getObject(bucket, path).getObjectContent) - try { - val bytes: Array[Byte] = getS3Bytes() - val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) - if(filterIndexOnly) Some(recs, iter) - else Some(recs.filter { row => includeKey(row._1) }, iter) - } catch { - case e: AmazonS3Exception if e.getStatusCode == 404 => Some(Vector.empty, iter) - } - } else { - None - } + try { + val bytes: Array[Byte] = getS3Bytes() + val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) + if (filterIndexOnly) Some(recs, iter) + else Some(recs.filter { row => includeKey(row._1) }, iter) + } catch { + case e: AmazonS3Exception if e.getStatusCode == 404 => Some(Vector.empty, iter) } + } else { + None + } } - - nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { ranges map read }.runFoldMap(identity).unsafePerformSync } - } - rdd + nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { range map read }.runFoldMap(identity).unsafePerformSync + } + } } } From 0d29af0de613e1f6b295ba09006b4c9433c32c1b Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 2 Aug 2016 23:07:02 +0300 Subject: [PATCH 5/8] +file rdd reader par --- .../io/cassandra/CassandraRDDWriter.scala | 9 +--- .../spark/io/file/FileRDDReader.scala | 49 ++++++++++--------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala index bda43a3d25..a3dda3ce49 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala @@ -60,13 +60,6 @@ object CassandraRDDWriter { instance.withSession { session => val statement = session.prepare(query) - /*partition map { recs => - val id: java.lang.Long = recs._1 - val pairs = recs._2.toVector - val bytes = ByteBuffer.wrap(AvroEncoder.toBinary(pairs)(codec)) - session.executeAsync(statement.bind(id, bytes)) - } map { _.getUninterruptibly() }*/ - val queries: Process[Task, (java.lang.Long, ByteBuffer)] = Process.unfold(partition) { iter => if (iter.hasNext) { @@ -96,7 +89,7 @@ object CassandraRDDWriter { Process eval Task { session.closeAsync() session.getCluster.closeAsync() - } + }(pool) } results.run.unsafePerformSync diff --git a/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala b/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala index f9ad4448eb..87b8ffd0dc 100644 --- a/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala @@ -2,19 +2,18 @@ package geotrellis.spark.io.file import geotrellis.spark._ import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec -import geotrellis.spark.io.index.{MergeQueue, KeyIndex, IndexRanges} +import geotrellis.spark.io.index.{MergeQueue, IndexRanges} import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.util.KryoWrapper import geotrellis.util.Filesystem +import scalaz.concurrent.Task +import scalaz.std.vector._ +import scalaz.stream.{Process, nondeterminism} import org.apache.avro.Schema -import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import spire.syntax.cfor._ -import scala.collection.mutable -import scala.reflect.ClassTag import java.io.File object FileRDDReader { @@ -42,29 +41,31 @@ object FileRDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => - val resultPartition = mutable.ListBuffer[(K, V)]() + partition flatMap { seq => + val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => + if (iter.hasNext) { + val (start, end) = iter.next() + Some((start to end).toIterator, iter) + } else None + } - for( - rangeList <- partition; - range <- rangeList - ) { - val (start, end) = range - cfor(start)(_ <= end, _ + 1) { index => - val path = keyPath(index) - if(new File(path).exists) { - val bytes: Array[Byte] = Filesystem.slurp(path) - val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) - resultPartition ++= { - if(filterIndexOnly) - recs - else - recs.filter { row => includeKey(row._1) } - } + val read: Iterator[Long] => Process[Task, Vector[(K, V)]] = { iterator => + Process.unfold(iterator) { iter => + if (iter.hasNext) { + val index = iter.next() + val path = keyPath(index) + if(new File(path).exists) { + val bytes: Array[Byte] = Filesystem.slurp(path) + val recs = AvroEncoder.fromBinary(kwWriterSchema.value.getOrElse(_recordCodec.schema), bytes)(_recordCodec) + if (filterIndexOnly) Some(recs, iter) + else Some(recs.filter { row => includeKey(row._1) }, iter) + } else Some(Vector.empty, iter) + } else None } } - } - resultPartition.iterator + nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }.runFoldMap(identity).unsafePerformSync + } } } } From 4b9bf7e7eb64ea42503d83ea1d90b5ba9cfb6e47 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 9 Aug 2016 14:58:19 +0300 Subject: [PATCH 6/8] fixed thread pool control in writes and reads --- .../io/accumulo/AccumuloWriteStrategy.scala | 21 ++++++------------- .../io/cassandra/CassandraRDDReader.scala | 7 ++++--- .../io/cassandra/CassandraRDDWriter.scala | 5 ++--- .../spark/io/hbase/HBaseSpaceTimeSpec.scala | 1 + .../spark/io/hbase/HBaseSpatialSpec.scala | 1 + .../hbase/HBaseTileFeatureSpaceTimeSpec.scala | 1 + .../hbase/HBaseTileFeatureSpatialSpec.scala | 1 + project/Version.scala | 2 +- .../geotrellis/spark/io/s3/S3RDDReader.scala | 10 +++++++-- .../geotrellis/spark/io/s3/S3RDDWriter.scala | 9 ++++---- 10 files changed, 30 insertions(+), 28 deletions(-) diff --git a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala index cc395536dd..0ef6b918b1 100644 --- a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala +++ b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala @@ -1,30 +1,20 @@ package geotrellis.spark.io.accumulo -import java.util.UUID - -import geotrellis.spark._ -import geotrellis.spark.io._ -import geotrellis.spark.io.index._ import geotrellis.spark.util._ import geotrellis.spark.io.hadoop._ -import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.fs.Path - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD - import org.apache.accumulo.core.data.{Key, Mutation, Value} import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat import org.apache.accumulo.core.client.BatchWriterConfig - -import scala.collection.JavaConversions._ - -import scalaz.concurrent.Task +import scalaz.concurrent.{Strategy, Task} import scalaz.stream._ +import java.util.UUID +import java.util.concurrent.Executors + object AccumuloWriteStrategy { def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest") } @@ -104,6 +94,7 @@ case class SocketWriteStrategy( def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = { val serializeWrapper = KryoWrapper(config) // BatchWriterConfig is not java serializable kvPairs.foreachPartition { partition => + val pool = Executors.newFixedThreadPool(32) val (config) = serializeWrapper.value val writer = instance.connector.createBatchWriter(table, config) @@ -121,7 +112,7 @@ case class SocketWriteStrategy( val writeChannel = channel.lift { (mutation: Mutation) => Task { writer.addMutation(mutation) } } val writes = mutations.tee(writeChannel)(tee.zipApply).map(Process.eval) - nondeterminism.njoin(maxOpen = 32, maxQueued = 32)(writes).run.unsafePerformSync + nondeterminism.njoin(maxOpen = 32, maxQueued = 32)(writes)(Strategy.Executor(pool)).run.unsafePerformSync writer.close() } } diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala index 0f676c22d8..729e6c852e 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala @@ -6,7 +6,7 @@ import geotrellis.spark.{Boundable, KeyBounds, LayerId} import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.io.index.{IndexRanges, MergeQueue} -import scalaz.concurrent.Task +import scalaz.concurrent.{Strategy, Task} import scalaz.std.vector._ import scalaz.stream.{Process, nondeterminism} import com.datastax.driver.core.querybuilder.QueryBuilder @@ -55,6 +55,7 @@ object CassandraRDDReader { .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => instance.withSession { session => val statement = session.prepare(query) + val pool = Executors.newFixedThreadPool(32) val result = partition map { seq => val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => @@ -79,12 +80,12 @@ object CassandraRDDReader { } } - nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }.runFoldMap(identity).unsafePerformSync + nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync } /** Close partition session */ (result ++ Iterator({ - session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] + pool.shutdown(); session.closeAsync(); session.getCluster.closeAsync(); Seq.empty[(K, V)] })).flatten } } diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala index a3dda3ce49..233ece81c7 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala @@ -8,10 +8,9 @@ import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.schemabuilder.SchemaBuilder import com.datastax.driver.core.DataType._ import com.datastax.driver.core.ResultSet -import com.datastax.driver.core.ResultSetFuture import org.apache.spark.rdd.RDD -import scalaz.concurrent.Task +import scalaz.concurrent.{Strategy, Task} import scalaz.stream.{Process, nondeterminism} import java.nio.ByteBuffer import java.util.concurrent.Executors @@ -85,7 +84,7 @@ object CassandraRDDWriter { val results = nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { queries map write - } onComplete { + }(Strategy.Executor(pool)) onComplete { Process eval Task { session.closeAsync() session.getCluster.closeAsync() diff --git a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpaceTimeSpec.scala b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpaceTimeSpec.scala index 9b496f4fa3..ec569e1068 100644 --- a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpaceTimeSpec.scala +++ b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpaceTimeSpec.scala @@ -19,6 +19,7 @@ class HBaseSpaceTimeSpec instance.getAdmin.disableTable("tiles") instance.getAdmin.deleteTable("metadata") instance.getAdmin.deleteTable("tiles") + instance.getAdmin.close() } lazy val instance = HBaseInstance(Seq("localhost"), "localhost") diff --git a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpatialSpec.scala b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpatialSpec.scala index ff41c55450..ef90a0d1b4 100644 --- a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpatialSpec.scala +++ b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseSpatialSpec.scala @@ -18,6 +18,7 @@ class HBaseSpatialSpec instance.getAdmin.disableTable("tiles") instance.getAdmin.deleteTable("metadata") instance.getAdmin.deleteTable("tiles") + instance.getAdmin.close() } lazy val instance = HBaseInstance(Seq("localhost"), "localhost") diff --git a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpaceTimeSpec.scala b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpaceTimeSpec.scala index d201baf33a..5c8c6016d2 100644 --- a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpaceTimeSpec.scala +++ b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpaceTimeSpec.scala @@ -19,6 +19,7 @@ class HBaseTileFeatureSpaceTimeSpec instance.getAdmin.disableTable("tiles") instance.getAdmin.deleteTable("metadata") instance.getAdmin.deleteTable("tiles") + instance.getAdmin.close() } lazy val instance = HBaseInstance(Seq("localhost"), "localhost") diff --git a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpatialSpec.scala b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpatialSpec.scala index 1b00f3af24..77f6afd102 100644 --- a/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpatialSpec.scala +++ b/hbase/src/test/scala/geotrellis/spark/io/hbase/HBaseTileFeatureSpatialSpec.scala @@ -18,6 +18,7 @@ class HBaseTileFeatureSpatialSpec instance.getAdmin.disableTable("tiles") instance.getAdmin.deleteTable("metadata") instance.getAdmin.deleteTable("tiles") + instance.getAdmin.close() } lazy val instance = HBaseInstance(Seq("localhost"), "localhost") diff --git a/project/Version.scala b/project/Version.scala index 0cba449056..c8af4951e8 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -31,7 +31,7 @@ object Version { val monocle = "1.2.1" val accumulo = "1.7.1" val cassandra = "3.0.3" - val hbase = "1.2.1" + val hbase = "1.2.2" lazy val hadoop = Environment.hadoopVersion lazy val spark = Environment.sparkVersion } diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index 754cfa393e..7691407166 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -6,7 +6,7 @@ import geotrellis.spark.io.index.{IndexRanges, MergeQueue} import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.util.KryoWrapper -import scalaz.concurrent.Task +import scalaz.concurrent.{Strategy, Task} import scalaz.std.vector._ import scalaz.stream.{Process, nondeterminism} import com.amazonaws.services.s3.model.AmazonS3Exception @@ -15,6 +15,8 @@ import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import java.util.concurrent.Executors + trait S3RDDReader { def getS3Client: () => S3Client @@ -48,6 +50,7 @@ trait S3RDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => val s3client = _getS3Client() + val pool = Executors.newFixedThreadPool(8) partition flatMap { seq => val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => @@ -78,7 +81,10 @@ trait S3RDDReader { } } - nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { range map read }.runFoldMap(identity).unsafePerformSync + val result = nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync + pool.shutdown() + + result } } } diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala index a0cadc81fc..50e8b8c010 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala @@ -5,13 +5,14 @@ import geotrellis.spark._ import geotrellis.spark.io._ import geotrellis.spark.io.avro._ import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec -import geotrellis.spark.io.index.{ZCurveKeyIndexMethod, KeyIndexMethod, KeyIndex} +import geotrellis.spark.io.index.{KeyIndex, KeyIndexMethod, ZCurveKeyIndexMethod} import geotrellis.spark.util.KryoWrapper -import com.amazonaws.services.s3.model.{AmazonS3Exception, PutObjectResult, ObjectMetadata, PutObjectRequest} +import com.amazonaws.services.s3.model.{AmazonS3Exception, ObjectMetadata, PutObjectRequest, PutObjectResult} import com.typesafe.scalalogging.slf4j._ import org.apache.spark.rdd.RDD -import scalaz.concurrent.Task + +import scalaz.concurrent.{Strategy, Task} import scalaz.stream.{Process, nondeterminism} import spray.json._ import spray.json.DefaultJsonProtocol._ @@ -78,7 +79,7 @@ trait S3RDDWriter { } } - val results = nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { requests map write } + val results = nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { requests map write } (Strategy.Executor(pool)) results.run.unsafePerformSync pool.shutdown() } From 44d4ad30fe77886fb834b78fb23ee17f173cd9cb Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 16 Aug 2016 12:54:13 +0300 Subject: [PATCH 7/8] reads/writes threads are now confiurable --- accumulo/src/main/resources/reference.conf | 5 ++++- .../io/accumulo/AccumuloWriteStrategy.scala | 18 +++++++++++------ cassandra/src/main/resources/reference.conf | 6 ++++++ .../io/cassandra/CassandraRDDReader.scala | 9 ++++++--- .../io/cassandra/CassandraRDDWriter.scala | 11 +++++----- s3/src/main/resources/reference.conf | 6 ++++++ .../geotrellis/spark/io/s3/S3RDDReader.scala | 16 ++++++++------- .../geotrellis/spark/io/s3/S3RDDWriter.scala | 20 +++++++------------ spark/src/main/resources/reference.conf | 3 +++ .../spark/io/file/FileRDDReader.scala | 20 +++++++++++++------ 10 files changed, 73 insertions(+), 41 deletions(-) create mode 100644 s3/src/main/resources/reference.conf create mode 100644 spark/src/main/resources/reference.conf diff --git a/accumulo/src/main/resources/reference.conf b/accumulo/src/main/resources/reference.conf index e83f830905..0364dd9eef 100644 --- a/accumulo/src/main/resources/reference.conf +++ b/accumulo/src/main/resources/reference.conf @@ -1 +1,4 @@ -geotrellis.accumulo.catalog = "metadata" \ No newline at end of file +geotrellis.accumulo { + catalog = "metadata" + threads.rdd.write = 32 +} \ No newline at end of file diff --git a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala index 0ef6b918b1..3131e680d6 100644 --- a/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala +++ b/accumulo/src/main/scala/geotrellis/spark/io/accumulo/AccumuloWriteStrategy.scala @@ -9,6 +9,7 @@ import org.apache.spark.rdd.RDD import org.apache.accumulo.core.data.{Key, Mutation, Value} import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat import org.apache.accumulo.core.client.BatchWriterConfig +import com.typesafe.config.ConfigFactory import scalaz.concurrent.{Strategy, Task} import scalaz.stream._ @@ -16,6 +17,8 @@ import java.util.UUID import java.util.concurrent.Executors object AccumuloWriteStrategy { + val threads = ConfigFactory.load().getInt("geotrellis.accumulo.threads.rdd.write") + def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest") } @@ -89,13 +92,16 @@ object HdfsWriteStrategy { * @param config Configuration for the BatchWriters */ case class SocketWriteStrategy( - config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(32) + config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(AccumuloWriteStrategy.threads), + threads: Int = AccumuloWriteStrategy.threads ) extends AccumuloWriteStrategy { def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = { val serializeWrapper = KryoWrapper(config) // BatchWriterConfig is not java serializable + val kwThreads = KryoWrapper(threads) kvPairs.foreachPartition { partition => - val pool = Executors.newFixedThreadPool(32) - val (config) = serializeWrapper.value + val poolSize = kwThreads.value + val pool = Executors.newFixedThreadPool(poolSize) + val config = serializeWrapper.value val writer = instance.connector.createBatchWriter(table, config) val mutations: Process[Task, Mutation] = @@ -105,15 +111,15 @@ case class SocketWriteStrategy( val mutation = new Mutation(key.getRow) mutation.put(key.getColumnFamily, key.getColumnQualifier, System.currentTimeMillis(), value) Some(mutation, iter) - } else { + } else { None } } val writeChannel = channel.lift { (mutation: Mutation) => Task { writer.addMutation(mutation) } } val writes = mutations.tee(writeChannel)(tee.zipApply).map(Process.eval) - nondeterminism.njoin(maxOpen = 32, maxQueued = 32)(writes)(Strategy.Executor(pool)).run.unsafePerformSync - writer.close() + nondeterminism.njoin(maxOpen = poolSize, maxQueued = poolSize)(writes)(Strategy.Executor(pool)).run.unsafePerformSync + writer.close(); pool.shutdown() } } } diff --git a/cassandra/src/main/resources/reference.conf b/cassandra/src/main/resources/reference.conf index 90fb733e4d..4c8c68c675 100644 --- a/cassandra/src/main/resources/reference.conf +++ b/cassandra/src/main/resources/reference.conf @@ -6,4 +6,10 @@ geotrellis.cassandra { localDc = "datacenter1" usedHostsPerRemoteDc = 0 allowRemoteDCsForLocalConsistencyLevel = false + threads = { + rdd = { + write = 32 + read = 32 + } + } } diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala index 729e6c852e..b3026a32dc 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDReader.scala @@ -16,6 +16,8 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import java.util.concurrent.Executors +import com.typesafe.config.ConfigFactory + import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -29,7 +31,8 @@ object CassandraRDDReader { decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, - numPartitions: Option[Int] = None + numPartitions: Option[Int] = None, + threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.read") )(implicit sc: SparkContext): RDD[(K, V)] = { if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)] @@ -55,7 +58,7 @@ object CassandraRDDReader { .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => instance.withSession { session => val statement = session.prepare(query) - val pool = Executors.newFixedThreadPool(32) + val pool = Executors.newFixedThreadPool(threads) val result = partition map { seq => val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => @@ -80,7 +83,7 @@ object CassandraRDDReader { } } - nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync + nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync } /** Close partition session */ diff --git a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala index 233ece81c7..5fc739e62b 100644 --- a/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala +++ b/cassandra/src/main/scala/geotrellis/spark/io/cassandra/CassandraRDDWriter.scala @@ -9,9 +9,10 @@ import com.datastax.driver.core.schemabuilder.SchemaBuilder import com.datastax.driver.core.DataType._ import com.datastax.driver.core.ResultSet import org.apache.spark.rdd.RDD - +import com.typesafe.config.ConfigFactory import scalaz.concurrent.{Strategy, Task} import scalaz.stream.{Process, nondeterminism} + import java.nio.ByteBuffer import java.util.concurrent.Executors @@ -25,7 +26,8 @@ object CassandraRDDWriter { layerId: LayerId, decomposeKey: K => Long, keyspace: String, - table: String + table: String, + threads: Int = ConfigFactory.load().getInt("geotrellis.cassandra.threads.rdd.write") ): Unit = { implicit val sc = raster.sparkContext @@ -72,8 +74,7 @@ object CassandraRDDWriter { } } - /** magic number 32; for no reason; just because */ - val pool = Executors.newFixedThreadPool(32) + val pool = Executors.newFixedThreadPool(threads) val write: ((java.lang.Long, ByteBuffer)) => Process[Task, ResultSet] = { case (id, value) => @@ -82,7 +83,7 @@ object CassandraRDDWriter { }(pool) } - val results = nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { + val results = nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { queries map write }(Strategy.Executor(pool)) onComplete { Process eval Task { diff --git a/s3/src/main/resources/reference.conf b/s3/src/main/resources/reference.conf new file mode 100644 index 0000000000..46ae4d38a0 --- /dev/null +++ b/s3/src/main/resources/reference.conf @@ -0,0 +1,6 @@ +geotrellis.s3.threads { + rdd = { + write = 8 + read = 8 + } +} diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala index 7691407166..f1a645e4bd 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDReader.scala @@ -14,6 +14,7 @@ import org.apache.avro.Schema import org.apache.commons.io.IOUtils import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import com.typesafe.config.ConfigFactory import java.util.concurrent.Executors @@ -31,7 +32,8 @@ trait S3RDDReader { decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, - numPartitions: Option[Int] = None + numPartitions: Option[Int] = None, + threads: Int = ConfigFactory.load().getInt("geotrellis.s3.threads.rdd.read") )(implicit sc: SparkContext): RDD[(K, V)] = { if (queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)] @@ -50,9 +52,9 @@ trait S3RDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => val s3client = _getS3Client() - val pool = Executors.newFixedThreadPool(8) + val pool = Executors.newFixedThreadPool(threads) - partition flatMap { seq => + val result = partition map { seq => val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => if (iter.hasNext) { val (start, end) = iter.next() @@ -81,11 +83,11 @@ trait S3RDDReader { } } - val result = nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync - pool.shutdown() - - result + nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync } + + /** Close partition pool */ + (result ++ Iterator({ pool.shutdown(); Vector.empty[(K, V)] })).flatten } } } diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala index 50e8b8c010..d8dca37146 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3RDDWriter.scala @@ -1,24 +1,17 @@ package geotrellis.spark.io.s3 -import geotrellis.raster.Tile -import geotrellis.spark._ -import geotrellis.spark.io._ import geotrellis.spark.io.avro._ import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec -import geotrellis.spark.io.index.{KeyIndex, KeyIndexMethod, ZCurveKeyIndexMethod} -import geotrellis.spark.util.KryoWrapper import com.amazonaws.services.s3.model.{AmazonS3Exception, ObjectMetadata, PutObjectRequest, PutObjectResult} -import com.typesafe.scalalogging.slf4j._ import org.apache.spark.rdd.RDD - +import com.typesafe.config.ConfigFactory import scalaz.concurrent.{Strategy, Task} import scalaz.stream.{Process, nondeterminism} -import spray.json._ -import spray.json.DefaultJsonProtocol._ import java.io.ByteArrayInputStream import java.util.concurrent.Executors + import scala.reflect._ trait S3RDDWriter { @@ -29,7 +22,8 @@ trait S3RDDWriter { rdd: RDD[(K, V)], bucket: String, keyPath: K => String, - putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p } + putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p }, + threads: Int = ConfigFactory.load().getInt("geotrellis.s3.threads.rdd.write") ): Unit = { val codec = KeyValueRecordCodec[K, V] val schema = codec.schema @@ -67,7 +61,7 @@ trait S3RDDWriter { } } - val pool = Executors.newFixedThreadPool(8) + val pool = Executors.newFixedThreadPool(threads) val write: PutObjectRequest => Process[Task, PutObjectResult] = { request => Process eval Task { @@ -79,7 +73,7 @@ trait S3RDDWriter { } } - val results = nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { requests map write } (Strategy.Executor(pool)) + val results = nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { requests map write } (Strategy.Executor(pool)) results.run.unsafePerformSync pool.shutdown() } @@ -87,5 +81,5 @@ trait S3RDDWriter { } object S3RDDWriter extends S3RDDWriter { - def getS3Client: () => S3Client = () => S3Client.default + def getS3Client: () => S3Client = () => S3Client.default } diff --git a/spark/src/main/resources/reference.conf b/spark/src/main/resources/reference.conf new file mode 100644 index 0000000000..9a5ba3ccdb --- /dev/null +++ b/spark/src/main/resources/reference.conf @@ -0,0 +1,3 @@ +geotrellis.file.threads { + rdd.read = 32 +} diff --git a/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala b/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala index 87b8ffd0dc..a95bf6a1cd 100644 --- a/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala +++ b/spark/src/main/scala/geotrellis/spark/io/file/FileRDDReader.scala @@ -2,19 +2,21 @@ package geotrellis.spark.io.file import geotrellis.spark._ import geotrellis.spark.io.avro.codecs.KeyValueRecordCodec -import geotrellis.spark.io.index.{MergeQueue, IndexRanges} +import geotrellis.spark.io.index.{IndexRanges, MergeQueue} import geotrellis.spark.io.avro.{AvroEncoder, AvroRecordCodec} import geotrellis.spark.util.KryoWrapper import geotrellis.util.Filesystem -import scalaz.concurrent.Task +import scalaz.concurrent.{Strategy, Task} import scalaz.std.vector._ import scalaz.stream.{Process, nondeterminism} import org.apache.avro.Schema import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import com.typesafe.config.ConfigFactory import java.io.File +import java.util.concurrent.Executors object FileRDDReader { def read[K: AvroRecordCodec: Boundable, V: AvroRecordCodec]( @@ -23,7 +25,8 @@ object FileRDDReader { decomposeBounds: KeyBounds[K] => Seq[(Long, Long)], filterIndexOnly: Boolean, writerSchema: Option[Schema] = None, - numPartitions: Option[Int] = None + numPartitions: Option[Int] = None, + threads: Int = ConfigFactory.load().getInt("geotrellis.file.threads.rdd.read") )(implicit sc: SparkContext): RDD[(K, V)] = { if(queryKeyBounds.isEmpty) return sc.emptyRDD[(K, V)] @@ -41,7 +44,9 @@ object FileRDDReader { sc.parallelize(bins, bins.size) .mapPartitions { partition: Iterator[Seq[(Long, Long)]] => - partition flatMap { seq => + val pool = Executors.newFixedThreadPool(threads) + + val result: Iterator[Vector[(K, V)]] = partition map { seq => val range: Process[Task, Iterator[Long]] = Process.unfold(seq.toIterator) { iter => if (iter.hasNext) { val (start, end) = iter.next() @@ -64,8 +69,11 @@ object FileRDDReader { } } - nondeterminism.njoin(maxOpen = 32, maxQueued = 32) { range map read }.runFoldMap(identity).unsafePerformSync + nondeterminism.njoin(maxOpen = threads, maxQueued = threads) { range map read }(Strategy.Executor(pool)).runFoldMap(identity).unsafePerformSync } + + /** Close partition pool */ + (result ++ Iterator({ pool.shutdown(); Vector.empty[(K, V)] })).flatten } } -} +} \ No newline at end of file From 4b7acccefc0bbac34e4a3380b579d10ef4278f27 Mon Sep 17 00:00:00 2001 From: Grigory Pomadchin Date: Tue, 16 Aug 2016 16:18:33 +0300 Subject: [PATCH 8/8] add readers / writers thread pool docs --- cassandra/src/main/resources/reference.conf | 4 +-- docs/spark/spark-io.md | 32 +++++++++++++++++++++ s3/src/main/resources/reference.conf | 2 +- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/cassandra/src/main/resources/reference.conf b/cassandra/src/main/resources/reference.conf index 4c8c68c675..12c4eaca59 100644 --- a/cassandra/src/main/resources/reference.conf +++ b/cassandra/src/main/resources/reference.conf @@ -6,8 +6,8 @@ geotrellis.cassandra { localDc = "datacenter1" usedHostsPerRemoteDc = 0 allowRemoteDCsForLocalConsistencyLevel = false - threads = { - rdd = { + threads { + rdd { write = 32 read = 32 } diff --git a/docs/spark/spark-io.md b/docs/spark/spark-io.md index 4f475c832b..b00d356a82 100644 --- a/docs/spark/spark-io.md +++ b/docs/spark/spark-io.md @@ -200,3 +200,35 @@ val tile: Tile = nlcdReader.read(SpatialKey(1,2)) ``` The idea is similar to the `LayerReader.reader` method except in this case we're producing a reader for single tiles. Additionally it must be noted that the layer metadata is accessed during the construction of the `Reader[SpatialKey, Tile]` and saved for all future calls to read a tile. + +## Readers threads + +Cassandra and S3 Layer RDDReaders / RDDWriters are configurable by threads amount. It's a programm setting, that can be different for a certain machine (depends on resources available). Configuration could be set in the `reference.conf` / `application.conf` file of your app, default settings available in a `reference.conf` file of each backend subproject (we use [TypeSafe Config](https://github.com/typesafehub/config)). +For a File backend only RDDReader is configurable, For Accumulo - only RDDWriter (Socket Strategy). For all backends CollectionReaders are configurable as well. + +Default configuration example: + +```conf +geotrellis.accumulo.threads { + rdd.write = 32 + collection.read = 32 +} +geotrellis.file.threads { + rdd.read = 32 + collection.read = 32 +} +geotrellis.cassandra.threads { + collection.read = 32 + rdd { + write = 32 + read = 32 + } +} +geotrellis.s3.threads { + collection.read = 8 + rdd { + write = 8 + read = 8 + } +} +``` diff --git a/s3/src/main/resources/reference.conf b/s3/src/main/resources/reference.conf index 46ae4d38a0..0d61e3e81f 100644 --- a/s3/src/main/resources/reference.conf +++ b/s3/src/main/resources/reference.conf @@ -1,5 +1,5 @@ geotrellis.s3.threads { - rdd = { + rdd { write = 8 read = 8 }