From 7bbc18c465f3dab04dc136264906577125c2f504 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Wed, 26 Jun 2019 07:33:44 -0400 Subject: [PATCH] Throw errors inside IO (#3003) --- .../spark/store/accumulo/AccumuloWriteStrategy.scala | 3 +++ .../store/accumulo/AccumuloCollectionReader.scala | 5 ++++- .../spark/store/cassandra/CassandraRDDWriter.scala | 9 ++++++++- docs/cassandra/CassandraIndexStrategySpec.scala | 12 +++++++++--- .../geotrellis/spark/store/s3/S3RDDWriter.scala | 3 +++ .../scala/geotrellis/spark/store/s3/SaveToS3.scala | 4 ++++ .../store/hadoop/geotiff/GeoTiffLayerReader.scala | 11 ++++++++--- .../main/scala/geotrellis/store/AsyncWriter.scala | 3 +++ .../main/scala/geotrellis/store/util/IOUtils.scala | 5 ++++- 9 files changed, 46 insertions(+), 9 deletions(-) diff --git a/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala b/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala index 0821e74946..b94e40d757 100644 --- a/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala +++ b/accumulo-spark/src/main/scala/geotrellis/spark/store/accumulo/AccumuloWriteStrategy.scala @@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig import cats.effect.IO import cats.syntax.apply._ +import cats.syntax.either._ import scala.concurrent.ExecutionContext @@ -143,7 +144,9 @@ case class SocketWriteStrategy( .parJoin(threads) .compile .drain + .attempt .unsafeRunSync() + .valueOr(throw _) } finally { writer.close(); pool.shutdown() } diff --git a/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala b/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala index 237d8f476c..ddcb0ea3fb 100644 --- a/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala +++ b/accumulo/src/main/scala/geotrellis/store/accumulo/AccumuloCollectionReader.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text import cats.effect._ import cats.syntax.apply._ +import cats.syntax.either._ import scala.concurrent.ExecutionContext import scala.collection.JavaConverters._ @@ -85,7 +86,9 @@ object AccumuloCollectionReader { .compile .toVector .map(_.flatten) - .unsafeRunSync + .attempt + .unsafeRunSync() + .valueOr(throw _) } finally pool.shutdown() } } diff --git a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala index ddb1c52818..9df0baa5a2 100644 --- a/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala +++ b/cassandra-spark/src/main/scala/geotrellis/spark/store/cassandra/CassandraRDDWriter.scala @@ -33,6 +33,7 @@ import com.datastax.driver.core.schemabuilder.SchemaBuilder import cats.effect.IO import cats.syntax.apply._ +import cats.syntax.either._ import org.apache.avro.Schema import org.apache.spark.rdd.RDD @@ -165,7 +166,13 @@ object CassandraRDDWriter { } } - results.compile.drain.unsafeRunSync() + results + .compile + .drain + .attempt + .unsafeRunSync() + .valueOr(throw _) + pool.shutdown() } } diff --git a/docs/cassandra/CassandraIndexStrategySpec.scala b/docs/cassandra/CassandraIndexStrategySpec.scala index 2d705591ec..1c72d8af40 100644 --- a/docs/cassandra/CassandraIndexStrategySpec.scala +++ b/docs/cassandra/CassandraIndexStrategySpec.scala @@ -171,7 +171,10 @@ class CassandraIndexStrategySpec extends FunSpec with Matchers with CassandraTes val allTiles = tiles.parSequence - allTiles.unsafeRunSync() + allTiles + .attempt + .unsafeRunSync() + .valueOr(throw _) () //don't care about results... minimize heap churn } @@ -184,7 +187,10 @@ class CassandraIndexStrategySpec extends FunSpec with Matchers with CassandraTes val allTiles = tiles.parSequence - allTiles.unsafeRunSync() + allTiles + .attempt + .unsafeRunSync() + .valueOr(throw _) () //don't care about results... minimize heap churn } @@ -221,4 +227,4 @@ object MathUtils{ } def stdDev[T: Numeric](xs: Iterable[T]): Double = math.sqrt(variance(xs)) -} \ No newline at end of file +} diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala index 11082e9376..9b51edf33d 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3RDDWriter.scala @@ -26,6 +26,7 @@ import geotrellis.spark.util.KryoWrapper import cats.effect.{IO, Timer} import cats.syntax.apply._ +import cats.syntax.either._ import software.amazon.awssdk.services.s3.model.{S3Exception, PutObjectRequest, PutObjectResponse, GetObjectRequest} import software.amazon.awssdk.services.s3.S3Client @@ -145,7 +146,9 @@ class S3RDDWriter( .parJoin(threads) .compile .drain + .attempt .unsafeRunSync() + .valueOr(throw _) pool.shutdown() } diff --git a/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala b/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala index 951c107a39..9e9592a8a6 100644 --- a/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala +++ b/s3-spark/src/main/scala/geotrellis/spark/store/s3/SaveToS3.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import cats.effect.{IO, Timer} import cats.syntax.apply._ +import cats.syntax.either._ import scala.concurrent.ExecutionContext @@ -118,7 +119,10 @@ object SaveToS3 { .parJoin(threads) .compile .toVector + .attempt .unsafeRunSync() + .valueOr(throw _) + pool.shutdown() } } diff --git a/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala b/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala index 70e534c025..1003cc3a06 100644 --- a/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala +++ b/spark/src/main/scala/geotrellis/spark/store/hadoop/geotiff/GeoTiffLayerReader.scala @@ -19,6 +19,7 @@ package geotrellis.spark.store.hadoop.geotiff import geotrellis.raster.io.geotiff._ import geotrellis.raster.{CellGrid, Raster, RasterExtent} import geotrellis.raster.resample.{RasterResampleMethods, ResampleMethod} +import geotrellis.store.LayerId import geotrellis.layer.{SpatialKey, ZoomedLayoutScheme} import geotrellis.vector.{Extent, ProjectedExtent} import geotrellis.raster.crop.Crop @@ -28,13 +29,13 @@ import geotrellis.raster.reproject.RasterReprojectMethods import geotrellis.raster.merge.RasterMergeMethods import geotrellis.util.ByteReader import geotrellis.util.annotations.experimental + import cats.effect.IO import cats.syntax.apply._ +import cats.syntax.either._ + import java.net.URI import java.util.concurrent.{ExecutorService, Executors} - -import geotrellis.store.LayerId - import scala.concurrent.ExecutionContext import scala.reflect.ClassTag @@ -94,7 +95,9 @@ import scala.reflect.ClassTag (index flatMap readRecord) .compile .toVector.map(_.flatten.reduce(_ merge _)) + .attempt .unsafeRunSync() + .valueOr(throw _) } @experimental def readAll[ @@ -124,6 +127,8 @@ import scala.reflect.ClassTag .parJoin(defaultThreads) .compile .toVector + .attempt .unsafeRunSync() + .valueOr(throw _) } } diff --git a/store/src/main/scala/geotrellis/store/AsyncWriter.scala b/store/src/main/scala/geotrellis/store/AsyncWriter.scala index a9bda9df57..a6088857c6 100644 --- a/store/src/main/scala/geotrellis/store/AsyncWriter.scala +++ b/store/src/main/scala/geotrellis/store/AsyncWriter.scala @@ -18,6 +18,7 @@ package geotrellis.store import cats.effect.{IO, Timer} import cats.syntax.apply._ +import cats.syntax.either._ import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext @@ -83,7 +84,9 @@ abstract class AsyncWriter[Client, V, E](threads: Int) extends Serializable { .parJoin(threads) .compile .toVector + .attempt .unsafeRunSync() + .valueOr(throw _) pool.shutdown() } diff --git a/store/src/main/scala/geotrellis/store/util/IOUtils.scala b/store/src/main/scala/geotrellis/store/util/IOUtils.scala index 3f2ba01007..3e9d3e1d78 100644 --- a/store/src/main/scala/geotrellis/store/util/IOUtils.scala +++ b/store/src/main/scala/geotrellis/store/util/IOUtils.scala @@ -81,8 +81,11 @@ object IOUtils { .parJoin(threads) .compile .toVector - .unsafeRunSync + .attempt + .unsafeRunSync() + .valueOr(throw _) .flatten + } finally pool.shutdown() } }