Skip to content

Commit

Permalink
Implement simple progress tracker that prints to stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
voidcontext committed Mar 14, 2020
1 parent c221f79 commit ac2a8bb
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 19 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Expand Up @@ -10,6 +10,8 @@ lazy val fetchfile = (project in file("fetch-file"))
"co.fs2" %% "fs2-io" % "2.2.2",

"org.scalatest" %% "scalatest" % "3.1.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.14.1" % "test",
"org.scalatestplus" %% "scalatestplus-scalacheck" % "3.1.0.0-RC2" % Test
)
)

Expand Down
5 changes: 3 additions & 2 deletions examples/src/main/scala/vdx/fetchfile/examples/Main.scala
@@ -1,7 +1,7 @@
package vdx.fetchfile
package examples

import cats.effect._
import cats.effect.{Blocker, IO, IOApp, ExitCode, Resource}
import cats.syntax.functor._
import java.net.URL
import java.io.ByteArrayOutputStream
Expand All @@ -10,6 +10,7 @@ object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {

implicit val backend: Backend[IO] = HttpURLConnectionBackend[IO]
implicit val clock: Clock = Clock.system

Blocker[IO].use { blocker =>
val out = new ByteArrayOutputStream()
Expand All @@ -18,7 +19,7 @@ object Main extends IOApp {
Resource.fromAutoCloseable(IO.delay(out)),
blocker,
1024,
Progress.consoleProgress
Progress.consoleProgress[IO]
).as(ExitCode.Success)
}
}
Expand Down
20 changes: 20 additions & 0 deletions fetch-file/src/main/scala/vdx/fetchfile/Clock.scala
@@ -0,0 +1,20 @@
package vdx.fetchfile

/**
* Representation of a clock.
* In case of cats.effect.Clock[F], the evaluation of IO makes the stream slightly slower.
* To avoid this performance impact, but still keep the progress tracker testable we introduce our own clock interface.
*/
trait Clock {
def nanoTime(): Long
}

object Clock {
/**
* Simple clock implementation based on System.nanoTime()
*/
def system(): Clock = new Clock {
def nanoTime(): Long = System.nanoTime()
}
}

41 changes: 25 additions & 16 deletions fetch-file/src/main/scala/vdx/fetchfile/Progress.scala
Expand Up @@ -3,40 +3,49 @@ package vdx.fetchfile
import cats.effect.Sync
import fs2._

import scala.concurrent.duration.{Duration, NANOSECONDS}

import java.util.concurrent.atomic.AtomicLong
import java.util.Locale

object Progress {
def noop[F[_]]: Int => Pipe[F, Byte, Unit] = _ => _.map(_ => ())

def consoleProgress[F[_]: Sync]: Int => Pipe[F, Byte, Unit] =
custom[F] { (downloadedBytes, contentLength, _, downloadSpeed) =>
def consoleProgress[F[_]: Sync](implicit clock: Clock): Int => Pipe[F, Byte, Unit] =
custom[F] { (downloadedBytes, contentLength, elapsedTime, downloadSpeed) =>
println(
s"\u001b[1A\u001b[100D\u001b[0KDownloaded ${bytesToString(downloadedBytes)} of" +
s" ${bytesToString(contentLength.toLong)} | ${bytesToString(downloadSpeed)}/s"
s"\u001b[1A\u001b[100D\u001b[0KDownloaded ${bytesToString(downloadedBytes)} of " +
s"${bytesToString(contentLength.toLong)} | " +
s"${bytesToString(downloadSpeed)}/s | " +
s"Time: ${millisToString(elapsedTime)}"
)
}

def custom[F[_]: Sync](f: (Long, Int, Duration, Long) => Unit): Int => Pipe[F, Byte, Unit] =
def custom[F[_]: Sync](
f: (Long, Int, Long, Long) => Unit,
chunkLimit: Option[Int] = None
)(implicit clock: Clock): Int => Pipe[F, Byte, Unit] =
contentLength => { s =>
Stream.eval(Sync[F].delay(System.nanoTime()))
Stream.eval(Sync[F].delay(clock.nanoTime()))
.flatMap { startTime =>
val downloadedBytes = new AtomicLong(0)

s.chunks.map { chunk =>
val down = downloadedBytes.addAndGet(chunk.size.toLong)
val elapsedTime = Duration(System.nanoTime() - startTime, NANOSECONDS)
val speed = (down * 1000) / Math.max(elapsedTime.toMillis, 1)

f(down, contentLength, elapsedTime, speed)

chunkLimit
.map(s.chunkLimit(_))
.getOrElse(s.chunks)
.map { chunk =>
val down = downloadedBytes.addAndGet(chunk.size.toLong)
val elapsedTime = (clock.nanoTime() - startTime) / 1000000
val speed = (down * 1000) / Math.max(elapsedTime, 1)

}
f(down, contentLength, elapsedTime, speed)
}
}
}


def millisToString(millis: Long): String =
if (millis > 1000) s"${millis.toFloat / 1000} s"
else s"${millis} ms"

// https://stackoverflow.com/questions/45885151/bytes-in-human-readable-format-with-idiomatic-scala
def bytesToString(size: Long): String = {
val TB = 1L << 40
Expand Down
Expand Up @@ -40,7 +40,7 @@ class DownloaderSpec extends AnyFlatSpec with Matchers {
}).unsafeRunSync() should be("http://example.com/test.file")
}

it should "download the file correctly with the provided backend" in {
it should "download the file correctly through the HttpURLConnectionBackend" in {
val downloader = Downloader[IO]

val downloadedBytes = (Blocker[IO].use { blocker =>
Expand Down
52 changes: 52 additions & 0 deletions fetch-file/src/test/scala/vdx/fetchfile/ProgressSpec.scala
@@ -0,0 +1,52 @@
package vdx.fetchfile

import cats.effect.IO
import fs2.Stream
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatestplus.scalacheck.Checkers

import cats.effect.ContextShift
import org.scalacheck.{Gen, Prop}
import scala.concurrent.ExecutionContext

class ProgressSpec extends AnyFlatSpec with Checkers {

val byte = Gen.choose(1, 255).map(_.toByte)
val bytes = Gen.containerOf[List, Byte](byte)

"custom" should "create a custom progress tracker that calls the given function after each chunk" in {
check(
Prop.forAll(bytes) { bs =>

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
var elapsedTime: Long = 0
implicit val clock: Clock = new Clock {
def nanoTime(): Long = {
elapsedTime += 1000000
elapsedTime
}

}

var chunksOK = true

val progress = (downloadedBytes: Long, contentLength: Int, elapsedTime: Long, downloadSpeed: Long) => {
chunksOK = chunksOK &&
contentLength == bs.length &&
elapsedTime == downloadedBytes &&
downloadSpeed == (downloadedBytes * 1000) / elapsedTime
}

val pipe = Progress.custom[IO](progress, Some(1))

Stream.emits[IO, Byte](bs)
.observe(pipe(bs.length))
.compile
.drain
.unsafeRunSync()

chunksOK
}
)
}
}

0 comments on commit ac2a8bb

Please sign in to comment.