Skip to content

Commit

Permalink
Update dependenies, including moving to Cats Effect 3
Browse files Browse the repository at this point in the history
  • Loading branch information
voidcontext committed Jul 6, 2022
1 parent b3c0f17 commit c3af92b
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 222 deletions.
48 changes: 25 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ This example downloads the content of the pre-generated test file into a file in

```scala
import cats.effect._
import cats.effect.concurrent.Ref
import cats.instances.list._
import fs2.Pipe
import fs2.compression.Compression
import vdx.fetchfile.Pipes._
import vdx.fetchfile._

Expand All @@ -31,34 +32,35 @@ import java.net.URL
object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {

implicit val clock: MonotonicClock = MonotonicClock.system
implicit val clock: MonotonicClock = MonotonicClock.system()

val shaSum = Source.fromFile("docker/static-files/100MB.bin.sha256").mkString.trim()
val gzippedeShaSum = Source.fromFile("docker/static-files/100MB.bin.gz.sha256").mkString.trim()

val outFile = new File("/tmp/100MB.bin")

Blocker[IO].use { blocker =>
implicit val backend: HttpClient[IO] = HttpURLConnectionClient[IO](blocker, 1024 * 16)

val downloader = Downloader[IO](blocker, Progress.consoleProgress[IO])

for {
ref <- Ref.of[IO, List[Byte]](List.empty)
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin.gz"),
Resource.fromAutoCloseable(IO.delay(new FileOutputStream(outFile))),
pipes = List(
collectSHA256(ref), // Collect SHA before gunzip
fs2.compress.gunzip[IO](32 * 1024) // Unzip compressed stream
),
last = ensureSHA256(shaSum) // Ensure gunzipped SHA is correct
)
collecetedSha <- ref.get.map(_.map("%02x".format(_)).mkString)
} yield {
println(s"Collected SHA: $collecetedSha, expected gzipped SHA: $gzippedeShaSum")
ExitCode.Success
}
implicit val backend: HttpClient[IO] = HttpURLConnectionClient[IO](1024 * 16)

val downloader = Downloader[IO](Progress.consoleProgress[IO])

val gunzip: Pipe[IO, Byte, Byte] =
Compression[IO].gunzip(32 * 1024).andThen(_.flatMap(_.content))

for {
ref <- Ref.of[IO, List[Byte]](List.empty)
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin.gz"),
Resource.fromAutoCloseable(IO.delay(new FileOutputStream(outFile))),
pipes = List(
collectSHA256(ref), // Collect SHA before gunzip
gunzip // Unzip compressed stream
),
last = ensureSHA256(shaSum) // Ensure gunzipped SHA is correct
)
collecetedSha <- ref.get.map(_.map("%02x".format(_)).mkString)
} yield {
println(s"Collected SHA: $collecetedSha, expected gzipped SHA: $gzippedeShaSum")
ExitCode.Success
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import xerial.sbt.Sonatype._

ThisBuild / name := "fetch-file"
ThisBuild / scalaVersion := "2.13.1"
ThisBuild / scalaVersion := "2.13.8"
ThisBuild / organization := "com.gaborpihaj"
ThisBuild / dynverSonatypeSnapshots := true
ThisBuild / scalafixDependencies += "com.nequissimus" %% "sort-imports" % "0.3.2"

ThisBuild / publishTo := sonatypePublishToBundle.value

val catsEffectVersion = "2.1.2"
val fs2Version = "2.2.2"
val http4sVersion = "0.21.1"
val catsEffectVersion = "3.3.12"
val fs2Version = "3.2.8"
val http4sVersion = "0.23.12"

val scalaTestVersion = "3.1.1"
val scalaTestVersion = "3.2.12"

lazy val publishSettings = List(
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
Expand All @@ -29,11 +29,11 @@ lazy val fetchfile = (project in file("fetch-file"))
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalacheck" %% "scalacheck" % "1.14.1" % Test,
"org.scalatestplus" %% "scalacheck-1-14" % "3.1.1.1" % Test,
"org.typelevel" %% "claimant" % "0.1.3" % Test
"org.scalacheck" %% "scalacheck" % "1.16.0" % Test,
"org.scalatestplus" %% "scalacheck-1-16" % "3.2.12.0" % Test,
"org.typelevel" %% "claimant" % "0.2.0" % Test
),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.11.0" cross CrossVersion.full),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.2" cross CrossVersion.full),
addCompilerPlugin(scalafixSemanticdb)
)

Expand All @@ -47,7 +47,7 @@ lazy val fetchfileHttp4s = (project in file("fetch-file-http4s"))
"org.http4s" %% "http4s-blaze-client" % http4sVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.11.0" cross CrossVersion.full),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.2" cross CrossVersion.full),
addCompilerPlugin(scalafixSemanticdb)
)
.dependsOn(fetchfile)
Expand Down
48 changes: 25 additions & 23 deletions examples/src/main/scala/vdx/fetchfile/examples/Main.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package vdx.fetchfile.examples

import cats.effect._
import cats.effect.concurrent.Ref
import cats.instances.list._
import fs2.Pipe
import fs2.compression.Compression
import vdx.fetchfile.Pipes._
import vdx.fetchfile._

Expand All @@ -14,34 +15,35 @@ import java.net.URL
object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {

implicit val clock: MonotonicClock = MonotonicClock.system
implicit val clock: MonotonicClock = MonotonicClock.system()

val shaSum = Source.fromFile("docker/static-files/100MB.bin.sha256").mkString.trim()
val gzippedeShaSum = Source.fromFile("docker/static-files/100MB.bin.gz.sha256").mkString.trim()

val outFile = new File("/tmp/100MB.bin")

Blocker[IO].use { blocker =>
implicit val backend: HttpClient[IO] = HttpURLConnectionClient[IO](blocker, 1024 * 16)

val downloader = Downloader[IO](blocker, Progress.consoleProgress[IO])

for {
ref <- Ref.of[IO, List[Byte]](List.empty)
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin.gz"),
Resource.fromAutoCloseable(IO.delay(new FileOutputStream(outFile))),
pipes = List(
collectSHA256(ref), // Collect SHA before gunzip
fs2.compress.gunzip[IO](32 * 1024) // Unzip compressed stream
),
last = ensureSHA256(shaSum) // Ensure gunzipped SHA is correct
)
collecetedSha <- ref.get.map(_.map("%02x".format(_)).mkString)
} yield {
println(s"Collected SHA: $collecetedSha, expected gzipped SHA: $gzippedeShaSum")
ExitCode.Success
}
implicit val backend: HttpClient[IO] = HttpURLConnectionClient[IO](1024 * 16)

val downloader = Downloader[IO](Progress.consoleProgress[IO])

val gunzip: Pipe[IO, Byte, Byte] =
Compression[IO].gunzip(32 * 1024).andThen(_.flatMap(_.content))

for {
ref <- Ref.of[IO, List[Byte]](List.empty)
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin.gz"),
Resource.fromAutoCloseable(IO.delay(new FileOutputStream(outFile))),
pipes = List(
collectSHA256(ref), // Collect SHA before gunzip
gunzip // Unzip compressed stream
),
last = ensureSHA256(shaSum) // Ensure gunzipped SHA is correct
)
collecetedSha <- ref.get.map(_.map("%02x".format(_)).mkString)
} yield {
println(s"Collected SHA: $collecetedSha, expected gzipped SHA: $gzippedeShaSum")
ExitCode.Success
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ object Http4sClient {
}

private[this] def contentLength(headers: Headers): Long =
headers.get(`Content-Length`).map(_.length).getOrElse(0L)
headers.get[`Content-Length`].map(_.length).getOrElse(0L)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package vdx.fetchfile
package http4s

import cats.effect._
import cats.syntax.semigroupal._
import org.http4s.client.blaze.BlazeClientBuilder
import cats.effect.unsafe.implicits.global
import org.http4s.blaze.client.BlazeClientBuilder
import org.scalatest.EitherValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.ExecutionContext.global
import scala.io.Source

import java.io.ByteArrayOutputStream
Expand All @@ -17,26 +16,20 @@ import java.security.MessageDigest

class Http4sClientSpec extends AnyFlatSpec with Matchers with EitherValues {

implicit val cs: ContextShift[IO] = IO.contextShift(global)

"Http4sClient" should "download the file correctly via the http4s client" in {

val downloadedBytes = (Blocker[IO]
.product(BlazeClientBuilder[IO](global).resource)
.use {
case (blocker, client) =>
implicit val backend = Http4sClient[IO](client)
val downloader = Downloader[IO](blocker)
val out = new ByteArrayOutputStream()
for {
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin"),
Resource.fromAutoCloseable(IO.delay(out))
)
content <- IO.delay(out.toByteArray())
} yield content
})
.unsafeRunSync()
val downloadedBytes = (BlazeClientBuilder[IO].resource.use { client =>
implicit val backend = Http4sClient[IO](client)
val downloader = Downloader[IO]()
val out = new ByteArrayOutputStream()
for {
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin"),
Resource.fromAutoCloseable(IO.delay(out))
)
content <- IO.delay(out.toByteArray())
} yield content
}).unsafeRunSync()

downloadedBytes.length should be(1024 * 1024 * 100)
val expectedShaSum = Source.fromFile("docker/static-files/100MB.bin.sha256").mkString.trim()
Expand All @@ -51,21 +44,17 @@ class Http4sClientSpec extends AnyFlatSpec with Matchers with EitherValues {
}

it should "raise an error when the HTTP request is not successful" in {
val attempt = (Blocker[IO]
.product(BlazeClientBuilder[IO](global).resource)
.use {
case (blocker, client) =>
implicit val backend = Http4sClient[IO](client)
val attempt = (BlazeClientBuilder[IO].resource.use { client =>
implicit val backend = Http4sClient[IO](client)

val downloader = Downloader[IO](blocker)
val out = new ByteArrayOutputStream()
val downloader = Downloader[IO]()
val out = new ByteArrayOutputStream()

downloader.fetch(
new URL("http://localhost:8088/nonexsitent"),
Resource.fromAutoCloseable(IO.delay(out))
)
})
.attempt
downloader.fetch(
new URL("http://localhost:8088/nonexsitent"),
Resource.fromAutoCloseable(IO.delay(out))
)
}).attempt
.unsafeRunSync()

attempt.left.value should be(an[HttpClientException])
Expand Down
7 changes: 3 additions & 4 deletions fetch-file/src/main/scala/vdx/fetchfile/Downloader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ object Downloader {
/**
* Creates a `Downloader` that is going to run all blocking operations on the given ExecutionContext.
*/
def apply[F[_]: Concurrent: ContextShift](
ec: Blocker,
progress: ContentLength => Pipe[F, Byte, Unit] = Progress.noop[F]
def apply[F[_]: Async](
progress: ContentLength => Pipe[F, Byte, Nothing] = Progress.noop[F]
)(implicit client: HttpClient[F]): Downloader[F] = new Downloader[F] {
def fetch[G[_]: Foldable](
url: URL,
Expand All @@ -68,7 +67,7 @@ object Downloader {
client(url) { (contentLength, body) =>
pipes
.foldLeft(body.observe(progress(contentLength)))(_ through _)
.observe(writeOutputStream[F](Concurrent[F].delay(outStream), ec))
.observe(writeOutputStream[F](Async[F].delay(outStream)))
.through(last)
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,28 @@ object HttpURLConnectionClient {
* The evaluation of the blocking HTTP call and the fs2.Stream creation from the java InputStream
* is using the given execution context wrapped in Blocker.
*/
def apply[F[_]: Sync: ContextShift](blocker: Blocker, chunkSize: Int): HttpClient[F] =
def apply[F[_]: Sync](chunkSize: Int): HttpClient[F] =
url =>
sink =>
makeConnectionResource(url, blocker).map {
makeConnectionResource(url).map {
case (contentLength, inputStream) =>
ContentLength(contentLength.toLong) -> readInputStream(Sync[F].delay(inputStream), chunkSize, blocker)
ContentLength(contentLength.toLong) -> readInputStream(Sync[F].delay(inputStream), chunkSize)
}.use(sink.tupled)

private[this] def makeConnectionResource[F[_]: Sync: ContextShift](
url: URL,
blocker: Blocker
private[this] def makeConnectionResource[F[_]: Sync](
url: URL
): Resource[F, (Int, InputStream)] =
Resource.make(makeConnection(url, blocker)) { case (_, inStream) => Sync[F].delay(inStream.close()) }
Resource.make(makeConnection(url)) { case (_, inStream) => Sync[F].delay(inStream.close()) }

@SuppressWarnings(Array("scalafix:DisableSyntax.asInstanceOf"))
private[this] def makeConnection[F[_]: Sync: ContextShift](url: URL, blocker: Blocker): F[(Int, InputStream)] =
ContextShift[F].blockOn(blocker)(
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getContentLength -> connection.getInputStream()
}
)
private[this] def makeConnection[F[_]: Sync](url: URL): F[(Int, InputStream)] =
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getContentLength -> connection.getInputStream()
}
}
15 changes: 9 additions & 6 deletions fetch-file/src/main/scala/vdx/fetchfile/Pipes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package vdx.fetchfile

import cats.Applicative
import cats.effect.Concurrent
import cats.effect.concurrent.Ref
import cats.effect.{Async, Ref}
import cats.instances.string._
import cats.kernel.Semigroup
import cats.syntax.eq._
import cats.syntax.functor._
import cats.syntax.semigroup._
import fs2.{Pipe, RaiseThrowable, Stream}

Expand All @@ -22,18 +21,22 @@ object Pipes {
* @param shaRef The ref that holds the bytes of the SHA sum
* @param S The implicit Semigroup instance of G[Byte]
*/
def collectSHA256[F[_]: Concurrent, G[_]: Applicative](
def collectSHA256[F[_]: Async, G[_]: Applicative](
shaRef: Ref[F, G[Byte]]
)(implicit S: Semigroup[G[Byte]]): Pipe[F, Byte, Byte] =
_.observe(_.through(fs2.hash.sha256).evalTap(byte => shaRef.update(_ |+| Applicative[G].pure(byte))).void)
_.observe(
_.through(fs2.hash.sha256)
.evalTap(byte => shaRef.update(_ |+| Applicative[G].pure(byte)))
.drain
)

/**
* An fs2 Pipe that compares the stream's SHA-256 sum to the given `
* An fs2 Pipe that compares the stream's SHA-256 sum to the given
* @tparam F
* @param expectedSHA
* @param C
*/
def ensureSHA256[F[_]: RaiseThrowable](expectedSHA: String)(implicit C: Stream.Compiler[F, F]): Pipe[F, Byte, Unit] =
def ensureSHA256[F[_]: RaiseThrowable: Concurrent](expectedSHA: String): Pipe[F, Byte, Unit] =
stream =>
Stream
.eval(
Expand Down

0 comments on commit c3af92b

Please sign in to comment.