Skip to content

Commit

Permalink
Refactor Backend to return fs2 Stream instead of java InputStream - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
voidcontext committed Mar 21, 2020
1 parent a6906b8 commit 0a49616
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 46 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ lazy val fetchfile = (project in file("fetch-file"))
"org.scalatest" %% "scalatest" % "3.1.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.14.1" % "test",
"org.scalatestplus" %% "scalatestplus-scalacheck" % "3.1.0.0-RC2" % Test
)
),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.11.0" cross CrossVersion.full)
)

lazy val examples = (project in file("examples"))
Expand Down
23 changes: 12 additions & 11 deletions fetch-file/src/main/scala/vdx/fetchfile/Downloader.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package vdx.fetchfile

import cats.effect.{Blocker, Concurrent, Resource, Sync}
import cats.syntax.semigroupal._
import cats.effect._
import fs2.Pipe
import fs2.io.{readInputStream, writeOutputStream}
import fs2.io.writeOutputStream

import java.io.OutputStream
import java.net.URL
import cats.effect.ContextShift

trait Downloader[F[_]] {
def fetch(url: URL, out: Resource[F, OutputStream]): F[Unit]
Expand All @@ -17,15 +15,18 @@ object Downloader {

def apply[F[_]: Concurrent: ContextShift](
ec: Blocker,
chunkSize: Int,
progress: Int => Pipe[F, Byte, Unit] = Progress.noop[F]
)(implicit backend: Backend[F]): Downloader[F] = new Downloader[F] {
)(implicit backend: HttpBackend[F]): Downloader[F] = new Downloader[F] {
def fetch(url: URL, out: Resource[F, OutputStream]): F[Unit] =
backend(url).product(out).use {
case ((inStream, contentLength), outStream) =>
readInputStream[F](Sync[F].delay(inStream), chunkSize, ec)
.observe(progress(contentLength))
.through(writeOutputStream[F](Sync[F].delay(outStream), ec))
(
for {
outStream <- out
lengthAndInputStream <- Resource.liftF(backend(url))
} yield (outStream, lengthAndInputStream)
) .use {
case (outStream, (contentLength, inputStream)) =>
inputStream.observe(progress(contentLength))
.through(writeOutputStream[F](Concurrent[F].delay(outStream), ec))
.compile
.drain
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@

package vdx.fetchfile

import cats.effect.{Resource, Sync}
import cats.effect._
import fs2.io.{readInputStream}
import fs2.Stream

import java.net.HttpURLConnection

object HttpURLConnectionBackend {
def apply[F[_]: Sync]: Backend[F] =
url => Resource.make {
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getInputStream -> connection.getContentLength
def apply[F[_]: Sync: ContextShift: Bracket[*[_], Throwable]](blocker: Blocker, chunkSize: Int): HttpBackend[F] =
url =>
Resource.make {
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getContentLength -> connection.getInputStream
}
} {
case (_, inStream) => Sync[F].delay(inStream.close())
} use[F, (Int, Stream[F, Byte])] {
case (contentLength, inStream) => Sync[F].pure(contentLength -> readInputStream(Sync[F].delay(inStream), chunkSize, blocker))
}
} {
case (inStream, _) => Sync[F].delay(inStream.close())
}
}
6 changes: 2 additions & 4 deletions fetch-file/src/main/scala/vdx/fetchfile/package.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package vdx

import cats.effect.Resource
import fs2.Stream

import java.net.URL
import java.io.InputStream

package object fetchfile {

type Backend[F[_]] = URL => Resource[F, (InputStream, Int)]
type HttpBackend[F[_]] = URL => F[(Int, Stream[F, Byte])] // What about the content length?
}
29 changes: 13 additions & 16 deletions fetch-file/src/test/scala/vdx/fetchfile/DownloaderSpec.scala
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
package vdx.fetchfile

import cats.effect._
import fs2.Stream
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.io.ByteArrayOutputStream
import java.net.URL
import scala.io.Source
import java.security.MessageDigest

import scala.io.Source


class DownloaderSpec extends AnyFlatSpec with Matchers {

implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)

"fetch" should "use the given backend to create the input stream" in {

implicit val backend: Backend[IO] =
(url: URL) =>
Resource.make {
IO.delay((new ByteArrayInputStream(url.toString.getBytes()), url.toString().getBytes().length))
} {
case (s, _) => IO.delay(s.close())
}

implicit val backend: HttpBackend[IO] =
(url: URL) => {
val bytes = url.toString.getBytes()

IO.delay(bytes.length -> Stream.emits(bytes.toList).covary[IO])
}

(Blocker[IO].use { blocker =>
val out = new ByteArrayOutputStream()
val downloader = Downloader[IO](
blocker,
1,
)
val downloader = Downloader[IO](blocker)

for {
_ <- downloader.fetch(
Expand All @@ -45,8 +42,8 @@ class DownloaderSpec extends AnyFlatSpec with Matchers {
it should "download the file correctly through the HttpURLConnectionBackend" in {

val downloadedBytes = (Blocker[IO].use { blocker =>
implicit val backend = HttpURLConnectionBackend[IO]
val downloader = Downloader[IO](blocker, 1024 * 8)
implicit val backend = HttpURLConnectionBackend[IO](blocker, 1024 * 8)
val downloader = Downloader[IO](blocker)
val out = new ByteArrayOutputStream()
for {
_ <- downloader.fetch(
Expand Down

0 comments on commit 0a49616

Please sign in to comment.