Skip to content

Commit

Permalink
Make HTTP call on a blocking execution context in HttpURLConnectionBa…
Browse files Browse the repository at this point in the history
…ckend
  • Loading branch information
voidcontext committed Mar 22, 2020
1 parent c164199 commit 2f88ea7
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Currently only Scala 2.13.x is supported.

## Usage

This example downloads the content of the pre-generated test file into a file in `/tmp`.
This example downloads the content of the pre-generated test file into a file in `/tmp`.

```scala
import cats.effect._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,43 @@ import java.net.{HttpURLConnection, URL}
import java.io.InputStream


/**
* A HttpBackend implementation based on java.net.HttpURLConnection.
*/
object HttpURLConnectionBackend {
def apply[F[_]: Sync: ContextShift: Bracket[*[_], Throwable]](blocker: Blocker, chunkSize: Int): HttpBackend[F] =
/**
* Creates a HttpBackend instance that is using java.net.HttpUrlConnection to make a HTTP request.
*
* The evaluation of the blocking HTTP call and the fs2.Stream creation from the java InputStream
* is using the given execution context wrapped in Blocker.
*/
def apply[F[_]: Sync: ContextShift](blocker: Blocker, chunkSize: Int): HttpBackend[F] =
url =>
for {
connResource <- makeConnectionResource(url)
connResource <- makeConnectionResource(url, blocker)
(contentLength, inStream) = connResource
fs2Stream <- Resource.liftF(Sync[F].delay(readInputStream(Sync[F].delay(inStream), chunkSize, blocker)))
fs2Stream <- Resource.liftF(
Sync[F].delay(readInputStream(Sync[F].delay(inStream), chunkSize, blocker))
)
} yield contentLength -> fs2Stream


private[this] def makeConnectionResource[F[_]: Sync](url: URL): Resource[F, (Int, InputStream)] =
Resource.make(makeConnection(url)) { case (_, inStream) => Sync[F].delay(inStream.close())}

private[this] def makeConnection[F[_]: Sync](url: URL): F[(Int, InputStream)] =
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getContentLength -> connection.getInputStream()
}
private[this] def makeConnectionResource[F[_]: Sync: ContextShift](
url: URL,
blocker: Blocker
): Resource[F, (Int, InputStream)] =
Resource.make(makeConnection(url, blocker)) { case (_, inStream) => Sync[F].delay(inStream.close())}

private[this] def makeConnection[F[_]: Sync: ContextShift](url: URL, blocker: Blocker): F[(Int, InputStream)] =
ContextShift[F].blockOn(blocker)(
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getContentLength -> connection.getInputStream()
}
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package vdx.fetchfile

import cats.effect._
import cats.effect.Sync.catsWriterTSync
import cats.instances.vector._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import cats.data.WriterT
import java.net.URL
import scala.concurrent.ExecutionContext

class HttpURLConnectionBackendSpec extends AnyFlatSpec with Matchers {
// val contextShiftIO = ContextShift[IO]

"HttpURLConnectionBackend" should "evaluate the connection on the given blocking context" in {
type TestIO[A] = WriterT[IO, Vector[String], A]

implicit val syncTestIO: Sync[TestIO] = catsWriterTSync[IO, Vector[String]]

implicit val contextShiftTestIO: ContextShift[TestIO] = new ContextShift[TestIO] {

override def shift: TestIO[Unit] = WriterT.tell(Vector("ContextShift.shift"))

override def evalOn[A](ec: ExecutionContext)(fa: TestIO[A]): TestIO[A] =
WriterT.tell[IO, Vector[String]](Vector(s"ContextShift.evalOn")).flatMap[A](_ => fa)
}

Blocker[IO].use[IO, Unit] { blocker =>
val backend = HttpURLConnectionBackend[TestIO](blocker, 1024 * 8)

val (logs, _) = backend(new URL("http://localhost:8088/100MB.bin")).use[TestIO, Unit](_ => WriterT.value(())).run.unsafeRunSync()

logs should be(Vector("ContextShift.evalOn"))

IO.unit
}.unsafeRunSync()

}
}

0 comments on commit 2f88ea7

Please sign in to comment.