Skip to content

Commit

Permalink
Merge pull request #49 from gafiatulin/s3-block-on-blocking-ec
Browse files Browse the repository at this point in the history
S3Store: block on blocking ec
  • Loading branch information
matthewgraf committed Oct 15, 2019
2 parents 2c58ce8 + f72266e commit cd7ebd2
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions s3/src/main/scala/blobstore/s3/S3Store.scala
Expand Up @@ -54,19 +54,19 @@ final case class S3Store[F[_]](transferManager: TransferManager, objectAcl: Opti
// to unfold the stream we need to emit a Chunk for the current ObjectListing received from state function
// if returned ObjectListing is not truncated we emit the last Chunk and set up state function to return None
// and stop unfolding in the next iteration
getObjectListing => F.delay {
getObjectListing => CS.evalOn(blockingExecutionContext)(F.delay{
getObjectListing().map { ol =>
if (ol.isTruncated)
(_chunk(ol), () => Option(s3.listNextBatchOfObjects(ol)))
else
(_chunk(ol), () => None)
}
}
})
}
}

override def get(path: Path, chunkSize: Int): Stream[F, Byte] = {
val is: F[InputStream] = F.delay(s3.getObject(path.root, path.key).getObjectContent)
val is: F[InputStream] = CS.evalOn(blockingExecutionContext)(F.delay(s3.getObject(path.root, path.key).getObjectContent))
fs2.io.readInputStream(is, chunkSize, closeAfterUse = true, blockingExecutionContext = blockingExecutionContext)
}

Expand All @@ -78,14 +78,14 @@ final case class S3Store[F[_]](transferManager: TransferManager, objectAcl: Opti
}

val consume: ((PipedOutputStream, PipedInputStream)) => Stream[F, Unit] = ios => {
val putToS3 = Stream.eval(F.delay {
val putToS3 = Stream.eval(CS.evalOn(blockingExecutionContext)(F.delay {
val meta = new ObjectMetadata()
path.size.foreach(meta.setContentLength)
sseAlgorithm.foreach(meta.setSSEAlgorithm)
transferManager.upload(path.root, path.key, ios._2, meta).waitForCompletion()
objectAcl.foreach(acl => s3.setObjectAcl(path.root, path.key, acl))
()
})
}))

val writeBytes: Stream[F, Unit] =
_writeAllToOutputStream1(in, ios._1).stream ++ Stream.eval(F.delay(ios._1.close()))
Expand All @@ -106,14 +106,14 @@ final case class S3Store[F[_]](transferManager: TransferManager, objectAcl: Opti
_ <- remove(src)
} yield ()

override def copy(src: Path, dst: Path): F[Unit] = F.delay {
override def copy(src: Path, dst: Path): F[Unit] = CS.evalOn(blockingExecutionContext)(F.delay {
val meta = new ObjectMetadata()
sseAlgorithm.foreach(meta.setSSEAlgorithm)
val req = new CopyObjectRequest(src.root, src.key, dst.root, dst.key).withNewObjectMetadata(meta)
transferManager.copy(req).waitForCompletion()
}
})

override def remove(path: Path): F[Unit] = F.delay(s3.deleteObject(path.root, path.key))
override def remove(path: Path): F[Unit] = CS.evalOn(blockingExecutionContext)(F.delay(s3.deleteObject(path.root, path.key)))
}

object S3Store {
Expand Down

0 comments on commit cd7ebd2

Please sign in to comment.