Skip to content

Commit

Permalink
Return a Resource from HttpBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
voidcontext committed Mar 21, 2020
1 parent 0a49616 commit 9c7aa14
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 23 deletions.
5 changes: 3 additions & 2 deletions examples/src/main/scala/vdx/fetchfile/examples/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import java.io.{File, FileOutputStream}
object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {

implicit val backend: Backend[IO] = HttpURLConnectionBackend[IO]
implicit val clock: Clock = Clock.system

val outFile = new File("/tmp/100MB.bin")

Blocker[IO].use { blocker =>
Downloader[IO](blocker, 1024 * 8, Progress.consoleProgress[IO])
implicit val backend: HttpBackend[IO] = HttpURLConnectionBackend[IO](blocker, 1024 * 16)

Downloader[IO](blocker, Progress.consoleProgress[IO])
.fetch(
new URL("http://localhost:8088/100MB.bin"),
Resource.fromAutoCloseable(IO.delay(new FileOutputStream(outFile)))
Expand Down
4 changes: 2 additions & 2 deletions fetch-file/src/main/scala/vdx/fetchfile/Downloader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ object Downloader {
(
for {
outStream <- out
lengthAndInputStream <- Resource.liftF(backend(url))
lengthAndInputStream <- backend(url)
} yield (outStream, lengthAndInputStream)
) .use {
).use {
case (outStream, (contentLength, inputStream)) =>
inputStream.observe(progress(contentLength))
.through(writeOutputStream[F](Concurrent[F].delay(outStream), ec))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,32 @@ package vdx.fetchfile

import cats.effect._
import fs2.io.{readInputStream}
import fs2.Stream

import java.net.HttpURLConnection
import java.net.{HttpURLConnection, URL}
import java.io.InputStream


object HttpURLConnectionBackend {
def apply[F[_]: Sync: ContextShift: Bracket[*[_], Throwable]](blocker: Blocker, chunkSize: Int): HttpBackend[F] =
url =>
Resource.make {
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getContentLength -> connection.getInputStream
}
} {
case (_, inStream) => Sync[F].delay(inStream.close())
} use[F, (Int, Stream[F, Byte])] {
case (contentLength, inStream) => Sync[F].pure(contentLength -> readInputStream(Sync[F].delay(inStream), chunkSize, blocker))
}
for {
connResource <- makeConnectionResource(url)
(contentLength, inStream) = connResource
fs2Stream <- Resource.liftF(Sync[F].delay(readInputStream(Sync[F].delay(inStream), chunkSize, blocker)))
} yield contentLength -> fs2Stream


private[this] def makeConnectionResource[F[_]: Sync](url: URL): Resource[F, (Int, InputStream)] =
Resource.make(makeConnection(url)) { case (_, inStream) => Sync[F].delay(inStream.close())}

private[this] def makeConnection[F[_]: Sync](url: URL): F[(Int, InputStream)] =
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getContentLength -> connection.getInputStream()
}
}
4 changes: 3 additions & 1 deletion fetch-file/src/main/scala/vdx/fetchfile/package.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package vdx

import cats.effect.Resource
import fs2.Stream

import java.net.URL

package object fetchfile {
type HttpBackend[F[_]] = URL => F[(Int, Stream[F, Byte])] // What about the content length?

type HttpBackend[F[_]] = URL => Resource[F, (Int, Stream[F, Byte])]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DownloaderSpec extends AnyFlatSpec with Matchers {
(url: URL) => {
val bytes = url.toString.getBytes()

IO.delay(bytes.length -> Stream.emits(bytes.toList).covary[IO])
Resource.pure[IO, (Int, Stream[IO, Byte])](bytes.length -> Stream.emits(bytes.toList).covary[IO])
}

(Blocker[IO].use { blocker =>
Expand Down

0 comments on commit 9c7aa14

Please sign in to comment.