Skip to content

Commit

Permalink
Make more customisable via pipes (e.g. SHA sum, gunzip, etc)
Browse files Browse the repository at this point in the history
  • Loading branch information
voidcontext committed May 14, 2020
1 parent 9b880ae commit 9b956cc
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 75 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,2 +1,3 @@
docker/static-files/10MB.bin*
docker/static-files/100MB.bin*
target/
13 changes: 7 additions & 6 deletions build.sbt
Expand Up @@ -25,12 +25,13 @@ lazy val fetchfile = (project in file("fetch-file"))
.settings(
name := "fetch-file",
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalacheck" %% "scalacheck" % "1.14.1" % Test,
"org.scalatestplus" %% "scalatestplus-scalacheck" % "3.1.0.0-RC2" % Test
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalacheck" %% "scalacheck" % "1.14.1" % Test,
"org.scalatestplus" %% "scalacheck-1-14" % "3.1.1.1" % Test,
"org.typelevel" %% "claimant" % "0.1.3" % Test
),
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.11.0" cross CrossVersion.full),
addCompilerPlugin(scalafixSemanticdb)
Expand Down
3 changes: 3 additions & 0 deletions docker/static-files/Dockerfile
@@ -1,6 +1,9 @@
FROM nginx:stable

COPY ./10MB.bin /var/www/10MB.bin
COPY ./10MB.bin.gz /var/www/10MB.bin.gz
COPY ./100MB.bin /var/www/100MB.bin
COPY ./100MB.bin.gz /var/www/100MB.bin.gz
COPY ./nginx.conf /etc/nginx/conf.d/default.conf

CMD ["nginx", "-g", "daemon off;"]
9 changes: 7 additions & 2 deletions docker/static-files/pre_build.sh
Expand Up @@ -4,5 +4,10 @@ DIR="$(dirname $0)"

echo "Destination dir: $DIR"

head -c 104857600 </dev/urandom > $DIR/100MB.bin
shasum -a 256 $DIR/100MB.bin | awk '{print $1}' > $DIR/100MB.bin.sha256
for size in 10 100; do
filename=${size}MB.bin
head -c $(($size * 1048576)) </dev/urandom > $DIR/$filename
gzip -k -f $filename
shasum -a 256 $DIR/$filename | awk '{print $1}' > $DIR/$filename.sha256
shasum -a 256 $DIR/$filename.gz | awk '{print $1}' > $DIR/$filename.gz.sha256
done
31 changes: 25 additions & 6 deletions examples/src/main/scala/vdx/fetchfile/examples/Main.scala
@@ -1,9 +1,13 @@
package vdx.fetchfile.examples

import cats.effect._
import cats.syntax.functor._
import cats.effect.concurrent.Ref
import cats.instances.list._
import vdx.fetchfile.Pipes._
import vdx.fetchfile._

import scala.io.Source

import java.io.{File, FileOutputStream}
import java.net.URL

Expand All @@ -12,17 +16,32 @@ object Main extends IOApp {

implicit val clock: MonotonicClock = MonotonicClock.system

val shaSum = Source.fromFile("docker/static-files/100MB.bin.sha256").mkString.trim()
val gzippedeShaSum = Source.fromFile("docker/static-files/100MB.bin.gz.sha256").mkString.trim()

val outFile = new File("/tmp/100MB.bin")

Blocker[IO].use { blocker =>
implicit val backend: HttpClient[IO] = HttpURLConnectionClient[IO](blocker, 1024 * 16)

Downloader[IO](blocker, Progress.consoleProgress[IO])
.fetch(
new URL("http://localhost:8088/100MB.bin"),
Resource.fromAutoCloseable(IO.delay(new FileOutputStream(outFile)))
val downloader = Downloader[IO](blocker, Progress.consoleProgress[IO])

for {
ref <- Ref.of[IO, List[Byte]](List.empty)
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin.gz"),
Resource.fromAutoCloseable(IO.delay(new FileOutputStream(outFile))),
pipes = List(
collectSHA256(ref), // Collect SHA before gunzip
fs2.compress.gunzip[IO](32 * 1024) // Unzip compressed stream
),
last = ensureSHA256(shaSum) // Ensure gunzipped SHA is correct
)
.as(ExitCode.Success)
collecetedSha <- ref.get.map(_.map("%02x".format(_)).mkString)
} yield {
println(s"Collected SHA: $collecetedSha, expected gzipped SHA: $gzippedeShaSum")
ExitCode.Success
}
}
}
}
57 changes: 31 additions & 26 deletions fetch-file/src/main/scala/vdx/fetchfile/Downloader.scala
@@ -1,11 +1,12 @@
package vdx.fetchfile

import cats.Foldable
import cats.effect._
import cats.instances.string._
import cats.syntax.eq._
import cats.instances.list._
import cats.syntax.foldable._
import cats.syntax.functor._
import fs2.Pipe
import fs2.io.writeOutputStream
import fs2.{Pipe, Stream}

import java.io.OutputStream
import java.net.URL
Expand All @@ -27,7 +28,24 @@ trait Downloader[F[_]] {
/**
* Fetches the given URL and popuplates the given output stream.
*/
def fetch(url: URL, out: Resource[F, OutputStream], sha256Sum: Option[String] = None): F[Unit]
def fetch(url: URL, out: Resource[F, OutputStream]): F[Unit] =
fetch[List](url, out, List.empty, streamUnit)

def fetch[G[_]: Foldable](url: URL, out: Resource[F, OutputStream], pipes: G[Pipe[F, Byte, Byte]]): F[Unit] =
fetch[G](url, out, pipes, streamUnit)

def fetch(url: URL, out: Resource[F, OutputStream], last: Pipe[F, Byte, Unit]): F[Unit] =
fetch[List](url, out, List.empty, last)

def fetch[G[_]: Foldable](
url: URL,
out: Resource[F, OutputStream],
pipes: G[Pipe[F, Byte, Byte]],
last: Pipe[F, Byte, Unit]
): F[Unit]

private def streamUnit[A]: Pipe[F, A, Unit] =
_.void
}

object Downloader {
Expand All @@ -40,34 +58,21 @@ object Downloader {
ec: Blocker,
progress: ContentLength => Pipe[F, Byte, Unit] = Progress.noop[F]
)(implicit client: HttpClient[F]): Downloader[F] = new Downloader[F] {
def fetch(url: URL, out: Resource[F, OutputStream], sha256Sum: Option[String] = None): F[Unit] =
def fetch[G[_]: Foldable](
url: URL,
out: Resource[F, OutputStream],
pipes: G[Pipe[F, Byte, Byte]],
last: Pipe[F, Byte, Unit]
): F[Unit] =
out.use { outStream =>
client(url) { (contentLength, body) =>
body
.observe(progress(contentLength))
// The writeOutputStream pipe returns Unit so it is safe to write the final output using observe
pipes
.foldLeft(body.observe(progress(contentLength)))(_ through _)
.observe(writeOutputStream[F](Concurrent[F].delay(outStream), ec))
.through(maybeCompareSHA(sha256Sum))
.through(last)
.compile
.drain
}
}

def maybeCompareSHA(sha256: Option[String]): Pipe[F, Byte, Unit] =
stream =>
sha256
.map[Stream[F, Unit]] { expectedSHA =>
Stream
.eval(
// We'll compute the sh256 hash of the downloaded file
stream.through(fs2.hash.sha256).compile.toVector
)
.flatMap { hashBytes =>
val hash = hashBytes.map("%02x".format(_)).mkString
if (hash === expectedSHA.toLowerCase()) Stream.emit(()).covary[F]
else Stream.raiseError(new Exception(s"Sha256 sum doesn't match (expected: $expectedSHA, got: $hash)"))
}
}
.getOrElse(stream.void)
}
}
47 changes: 47 additions & 0 deletions fetch-file/src/main/scala/vdx/fetchfile/Pipes.scala
@@ -0,0 +1,47 @@
package vdx.fetchfile

import cats.Applicative
import cats.effect.Concurrent
import cats.effect.concurrent.Ref
import cats.instances.string._
import cats.kernel.Semigroup
import cats.syntax.eq._
import cats.syntax.functor._
import cats.syntax.semigroup._
import fs2.{Pipe, RaiseThrowable, Stream}

object Pipes {

/**
* An fs2 Pipe that collects the SHA-256 sum of the stream as a
* series of bytes. The result container can be anything that has an
* Applicative and a Semigroup instance.
*
* @tparam F The Concurrent effect of the Stream
* @tparam G The type of the contianer that holds the collected bytes
* @param shaRef The ref that holds the bytes of the SHA sum
* @param S The implicit Semigroup instance of G[Byte]
*/
def collectSHA256[F[_]: Concurrent, G[_]: Applicative](
shaRef: Ref[F, G[Byte]]
)(implicit S: Semigroup[G[Byte]]): Pipe[F, Byte, Byte] =
_.observe(_.through(fs2.hash.sha256).evalTap(byte => shaRef.update(_ |+| Applicative[G].pure(byte))).void)

/**
* An fs2 Pipe that compares the stream's SHA-256 sum to the given `
* @tparam F
* @param expectedSHA
* @param C
*/
def ensureSHA256[F[_]: RaiseThrowable](expectedSHA: String)(implicit C: Stream.Compiler[F, F]): Pipe[F, Byte, Unit] =
stream =>
Stream
.eval(
stream.through(fs2.hash.sha256).compile.toVector
)
.map(_.map("%02x".format(_)).mkString)
.flatMap { hash =>
if (hash === expectedSHA.toLowerCase()) Stream.emit(()).covary[F]
else Stream.raiseError(new Exception(s"Sha256 sum doesn't match (expected: $expectedSHA, got: $hash)"))
}
}
37 changes: 2 additions & 35 deletions fetch-file/src/test/scala/vdx/fetchfile/DownloaderSpec.scala
@@ -1,30 +1,22 @@
package vdx.fetchfile

import cats.effect._
import fs2.Stream
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import vdx.fetchfile.Downloader.ContentLength

import scala.io.Source

import java.io.ByteArrayOutputStream
import java.net.URL
import java.security.MessageDigest

class DownloaderSpec extends AnyFlatSpec with Matchers {
class DownloaderSpec extends AnyFlatSpec with Matchers with TestHttpClient {

implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)

"fetch" should "use the given client to create the stream representing the content" in {

implicit val client: HttpClient[IO] =
url =>
sink => {
val bytes = url.toString.getBytes()

sink(ContentLength(bytes.length.toLong), Stream.emits(bytes.toList).covary[IO])
}
implicit val client: HttpClient[IO] = makeClient(_.toString().getBytes())

(Blocker[IO].use { blocker =>
val out = new ByteArrayOutputStream()
Expand Down Expand Up @@ -66,29 +58,4 @@ class DownloaderSpec extends AnyFlatSpec with Matchers {

shaSum should be(expectedShaSum)
}

it should "be successful when the given shasum is correct" in {
val expectedShaSum = Source.fromFile("docker/static-files/100MB.bin.sha256").mkString.trim()

download(expectedShaSum).attempt.unsafeRunSync() should be(Right(()))
}

it should "be fail when the given shasum is incorrect" in {
download("some-wrong-sha-sum").attempt.unsafeRunSync() should be(a[Left[_, _]])
}

def download(shaSum: String): IO[Unit] =
Blocker[IO].use { blocker =>
implicit val client = HttpURLConnectionClient[IO](blocker, 1024 * 8)
val downloader = Downloader[IO](blocker)
val out = new ByteArrayOutputStream()
for {
_ <- downloader.fetch(
new URL("http://localhost:8088/100MB.bin"),
Resource.fromAutoCloseable(IO.delay(out)),
sha256Sum = Option(shaSum)
)
_ <- IO.delay(out.toByteArray())
} yield ()
}
}
88 changes: 88 additions & 0 deletions fetch-file/src/test/scala/vdx/fetchfile/PipesSpec.scala
@@ -0,0 +1,88 @@
package vdx.fetchfile

import cats.effect.ContextShift
import cats.effect.IO
import cats.effect.concurrent.Ref
import cats.instances.list._
import fs2.Stream
import org.scalacheck.Arbitrary
import org.scalacheck.Gen
import org.scalacheck.Prop
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatestplus.scalacheck.Checkers
import org.typelevel.claimant.Claim

import scala.concurrent.ExecutionContext

import java.security.MessageDigest

@SuppressWarnings(Array("scalafix:DisableSyntax.=="))
class PipesSpec extends AnyWordSpec with Matchers with TestHttpClient with Checkers {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val testText = "This is a test text"
val expectedShaSum = "c62968ebcd6b7c706a8ac082db862682fa8be407106cb7eaa050c713a4e969d7"

val byteArray: Gen[Array[Byte]] = Gen.listOf(Arbitrary.arbByte.arbitrary).map(_.toArray)

"ensureSHA256" should {
"do nothing when the given shaSum is correct" in {
check(
Prop.forAllNoShrink(byteArray) { bytes =>
Stream
.emits(bytes)
.covary[IO]
.through(Pipes.ensureSHA256(calcHash("SHA-256", bytes)))
.compile
.drain
.attempt
.unsafeRunSync()
.isRight
}
)
}

"fail the stream when the given shasum is not correct" in {
check(
Prop.forAllNoShrink(byteArray) { bytes =>
Stream
.emits(bytes)
.covary[IO]
.through(Pipes.ensureSHA256("some-wrong-sha"))
.compile
.drain
.attempt
.unsafeRunSync()
.isLeft
}
)
}
}

"collectSHA256" should {
"collect the shasum without modifying the stream" in {
check(
Prop.forAllNoShrink(byteArray) { bytes =>
(for {
ref <- Ref.of[IO, List[Byte]](List.empty)
result <- Stream.emits(bytes).covary[IO].through(Pipes.collectSHA256(ref)).compile.toList
collected <- ref.get
} yield {
Claim(
result == bytes.toList &&
collected.map("%02x".format(_)).mkString == calcHash("SHA-256", bytes)
)
}).unsafeRunSync()
}
)
}
}

def calcHash(algo: String, bytes: Array[Byte]): String =
MessageDigest
.getInstance(algo)
.digest(bytes)
.map("%02x".format(_))
.mkString
}

0 comments on commit 9b956cc

Please sign in to comment.