Skip to content

Commit

Permalink
First draft of the console progress
Browse files Browse the repository at this point in the history
  • Loading branch information
voidcontext committed Mar 12, 2020
1 parent 08f5d0d commit 7502628
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 12 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Expand Up @@ -13,5 +13,8 @@ lazy val fetchfile = (project in file("fetch-file"))
)
)

lazy val examples = (project in file("examples"))
.dependsOn(fetchfile)

lazy val root = (project in file("."))
.aggregate(fetchfile)
.aggregate(fetchfile, examples)
25 changes: 25 additions & 0 deletions examples/src/main/scala/vdx/fetchfile/examples/Main.scala
@@ -0,0 +1,25 @@
package vdx.fetchfile
package examples

import cats.effect._
import cats.syntax.functor._
import java.net.URL
import java.io.ByteArrayOutputStream

object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {

implicit val backend: Backend[IO] = HttpURLConnectionBackend[IO]

Blocker[IO].use { blocker =>
val out = new ByteArrayOutputStream()
Downloader[IO].fetch(
new URL("http://localhost:8088/100MB.bin"),
Resource.fromAutoCloseable(IO.delay(out)),
blocker,
1024,
Progress.consoleProgress
).as(ExitCode.Success)
}
}
}
8 changes: 4 additions & 4 deletions fetch-file/src/main/scala/vdx/fetchfile/Downloader.scala
Expand Up @@ -15,7 +15,7 @@ trait Downloader[F[_]] {
out: Resource[F, OutputStream],
ec: Blocker,
chunkSize: Int,
progress: Pipe[F, Byte, Unit]
progress: Int => Pipe[F, Byte, Unit] = Progress.noop[F]
)(implicit backend: Backend[F]): F[Unit]
}

Expand All @@ -27,12 +27,12 @@ object Downloader {
out: Resource[F, OutputStream],
ec: Blocker,
chunkSize: Int,
progress: Pipe[F, Byte, Unit]
progress: Int => Pipe[F, Byte, Unit] = Progress.noop
)(implicit backend: Backend[F]): F[Unit] = {
backend(url).product(out).use {
case (inStream, outStream) =>
case ((inStream, contentLength), outStream) =>
readInputStream[F](Sync[F].delay(inStream), chunkSize, ec)
.observe(progress)
.observe(progress(contentLength))
.through(writeOutputStream[F](Sync[F].delay(outStream), ec))
.compile
.drain
Expand Down
Expand Up @@ -5,15 +5,17 @@ import java.net.HttpURLConnection

object HttpURLConnectionBackend {
def apply[F[_]: Sync]: Backend[F] =
url => Resource.fromAutoCloseable {
url => Resource.make {
Sync[F].delay {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setRequestProperty(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11"
)
connection.connect()
connection.getInputStream()
connection.getInputStream -> connection.getContentLength
}
} {
case (inStream, _) => Sync[F].delay(inStream.close())
}
}
62 changes: 62 additions & 0 deletions fetch-file/src/main/scala/vdx/fetchfile/Progress.scala
@@ -0,0 +1,62 @@
package vdx.fetchfile

import cats.effect.Sync
import fs2._

import scala.concurrent.duration.{Duration, NANOSECONDS}

import java.util.concurrent.atomic.AtomicLong
import java.util.Locale

object Progress {
def noop[F[_]]: Int => Pipe[F, Byte, Unit] = _ => _.map(_ => ())

def consoleProgress[F[_]: Sync]: Int => Pipe[F, Byte, Unit] =
custom[F] { (downloadedBytes, contentLength, _, downloadSpeed) =>
println(
s"\u001b[1A\u001b[100D\u001b[0KDownloaded ${bytesToString(downloadedBytes)} of" +
s" ${bytesToString(contentLength.toLong)} | ${bytesToString(downloadSpeed)}/s"
)
}

def custom[F[_]: Sync](f: (Long, Int, Duration, Long) => Unit): Int => Pipe[F, Byte, Unit] =
contentLength => { s =>
Stream.eval(Sync[F].delay(System.nanoTime()))
.flatMap { startTime =>
val downloadedBytes = new AtomicLong(0)

s.chunks.map { chunk =>
val down = downloadedBytes.addAndGet(chunk.size.toLong)
val elapsedTime = Duration(System.nanoTime() - startTime, NANOSECONDS)
val speed = (down * 1000) / Math.max(elapsedTime.toMillis, 1)

f(down, contentLength, elapsedTime, speed)


}
}
}

// https://stackoverflow.com/questions/45885151/bytes-in-human-readable-format-with-idiomatic-scala
def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10

val (value, unit) = {
if (size >= 2 * TB) {
(size.asInstanceOf[Double] / TB, "TB")
} else if (size >= 2 * GB) {
(size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2 * MB) {
(size.asInstanceOf[Double] / MB, "MB")
} else if (size >= 2 * KB) {
(size.asInstanceOf[Double] / KB, "KB")
} else {
(size.asInstanceOf[Double], "B")
}
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}
}
2 changes: 1 addition & 1 deletion fetch-file/src/main/scala/vdx/fetchfile/package.scala
Expand Up @@ -7,5 +7,5 @@ import java.io.InputStream

package object fetchfile {

type Backend[F[_]] = URL => Resource[F, InputStream]
type Backend[F[_]] = URL => Resource[F, (InputStream, Int)]
}
10 changes: 6 additions & 4 deletions fetch-file/src/test/scala/vdx/fetchfile/DownloaderSpec.scala
@@ -1,7 +1,6 @@
package vdx.fetchfile

import cats.effect._
import fs2._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand All @@ -17,7 +16,12 @@ class DownloaderSpec extends AnyFlatSpec with Matchers {
"fetch" should "use the given backend to create the input stream" in {

implicit val backend: Backend[IO] =
(url: URL) => Resource.make(IO.delay(new ByteArrayInputStream(url.toString.getBytes())))(s => IO.delay(s.close()))
(url: URL) =>
Resource.make {
IO.delay((new ByteArrayInputStream(url.toString.getBytes()), url.toString().getBytes().length))
} {
case (s, _) => IO.delay(s.close())
}

val downloader = Downloader[IO]

Expand All @@ -30,7 +34,6 @@ class DownloaderSpec extends AnyFlatSpec with Matchers {
Resource.fromAutoCloseable(IO.delay(out)),
blocker,
1,
(s: Stream[IO, Byte]) => s.map(_ => ())
)
content <- IO.delay(out.toString)
} yield content
Expand All @@ -49,7 +52,6 @@ class DownloaderSpec extends AnyFlatSpec with Matchers {
Resource.fromAutoCloseable(IO.delay(out)),
blocker,
1024 * 64,
(s: Stream[IO, Byte]) => s.map(_ => ())
)
content <- IO.delay(out.toByteArray())
} yield content
Expand Down

0 comments on commit 7502628

Please sign in to comment.