Skip to content

Commit

Permalink
Throw errors inside IO (#3003)
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology authored and pomadchin committed Jun 26, 2019
1 parent f6ac12c commit 7bbc18c
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 9 deletions.
Expand Up @@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig

import cats.effect.IO
import cats.syntax.apply._
import cats.syntax.either._

import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -143,7 +144,9 @@ case class SocketWriteStrategy(
.parJoin(threads)
.compile
.drain
.attempt
.unsafeRunSync()
.valueOr(throw _)
} finally {
writer.close(); pool.shutdown()
}
Expand Down
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text

import cats.effect._
import cats.syntax.apply._
import cats.syntax.either._

import scala.concurrent.ExecutionContext
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -85,7 +86,9 @@ object AccumuloCollectionReader {
.compile
.toVector
.map(_.flatten)
.unsafeRunSync
.attempt
.unsafeRunSync()
.valueOr(throw _)
} finally pool.shutdown()
}
}
Expand Up @@ -33,6 +33,7 @@ import com.datastax.driver.core.schemabuilder.SchemaBuilder

import cats.effect.IO
import cats.syntax.apply._
import cats.syntax.either._

import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -165,7 +166,13 @@ object CassandraRDDWriter {
}
}

results.compile.drain.unsafeRunSync()
results
.compile
.drain
.attempt
.unsafeRunSync()
.valueOr(throw _)

pool.shutdown()
}
}
Expand Down
12 changes: 9 additions & 3 deletions docs/cassandra/CassandraIndexStrategySpec.scala
Expand Up @@ -171,7 +171,10 @@ class CassandraIndexStrategySpec extends FunSpec with Matchers with CassandraTes

val allTiles = tiles.parSequence

allTiles.unsafeRunSync()
allTiles
.attempt
.unsafeRunSync()
.valueOr(throw _)

() //don't care about results... minimize heap churn
}
Expand All @@ -184,7 +187,10 @@ class CassandraIndexStrategySpec extends FunSpec with Matchers with CassandraTes

val allTiles = tiles.parSequence

allTiles.unsafeRunSync()
allTiles
.attempt
.unsafeRunSync()
.valueOr(throw _)

() //don't care about results... minimize heap churn
}
Expand Down Expand Up @@ -221,4 +227,4 @@ object MathUtils{
}

def stdDev[T: Numeric](xs: Iterable[T]): Double = math.sqrt(variance(xs))
}
}
Expand Up @@ -26,6 +26,7 @@ import geotrellis.spark.util.KryoWrapper

import cats.effect.{IO, Timer}
import cats.syntax.apply._
import cats.syntax.either._

import software.amazon.awssdk.services.s3.model.{S3Exception, PutObjectRequest, PutObjectResponse, GetObjectRequest}
import software.amazon.awssdk.services.s3.S3Client
Expand Down Expand Up @@ -145,7 +146,9 @@ class S3RDDWriter(
.parJoin(threads)
.compile
.drain
.attempt
.unsafeRunSync()
.valueOr(throw _)

pool.shutdown()
}
Expand Down
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD

import cats.effect.{IO, Timer}
import cats.syntax.apply._
import cats.syntax.either._

import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -118,7 +119,10 @@ object SaveToS3 {
.parJoin(threads)
.compile
.toVector
.attempt
.unsafeRunSync()
.valueOr(throw _)

pool.shutdown()
}
}
Expand Down
Expand Up @@ -19,6 +19,7 @@ package geotrellis.spark.store.hadoop.geotiff
import geotrellis.raster.io.geotiff._
import geotrellis.raster.{CellGrid, Raster, RasterExtent}
import geotrellis.raster.resample.{RasterResampleMethods, ResampleMethod}
import geotrellis.store.LayerId
import geotrellis.layer.{SpatialKey, ZoomedLayoutScheme}
import geotrellis.vector.{Extent, ProjectedExtent}
import geotrellis.raster.crop.Crop
Expand All @@ -28,13 +29,13 @@ import geotrellis.raster.reproject.RasterReprojectMethods
import geotrellis.raster.merge.RasterMergeMethods
import geotrellis.util.ByteReader
import geotrellis.util.annotations.experimental

import cats.effect.IO
import cats.syntax.apply._
import cats.syntax.either._

import java.net.URI
import java.util.concurrent.{ExecutorService, Executors}

import geotrellis.store.LayerId

import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag

Expand Down Expand Up @@ -94,7 +95,9 @@ import scala.reflect.ClassTag
(index flatMap readRecord)
.compile
.toVector.map(_.flatten.reduce(_ merge _))
.attempt
.unsafeRunSync()
.valueOr(throw _)
}

@experimental def readAll[
Expand Down Expand Up @@ -124,6 +127,8 @@ import scala.reflect.ClassTag
.parJoin(defaultThreads)
.compile
.toVector
.attempt
.unsafeRunSync()
.valueOr(throw _)
}
}
3 changes: 3 additions & 0 deletions store/src/main/scala/geotrellis/store/AsyncWriter.scala
Expand Up @@ -18,6 +18,7 @@ package geotrellis.store

import cats.effect.{IO, Timer}
import cats.syntax.apply._
import cats.syntax.either._

import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -83,7 +84,9 @@ abstract class AsyncWriter[Client, V, E](threads: Int) extends Serializable {
.parJoin(threads)
.compile
.toVector
.attempt
.unsafeRunSync()
.valueOr(throw _)

pool.shutdown()
}
Expand Down
5 changes: 4 additions & 1 deletion store/src/main/scala/geotrellis/store/util/IOUtils.scala
Expand Up @@ -81,8 +81,11 @@ object IOUtils {
.parJoin(threads)
.compile
.toVector
.unsafeRunSync
.attempt
.unsafeRunSync()
.valueOr(throw _)
.flatten

} finally pool.shutdown()
}
}

0 comments on commit 7bbc18c

Please sign in to comment.