Skip to content

Commit

Permalink
Added StreamReader util
Browse files Browse the repository at this point in the history
  • Loading branch information
viktor-podzigun committed Mar 31, 2022
1 parent f95ccfe commit 1d25616
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 5 deletions.
9 changes: 8 additions & 1 deletion core/src/main/scala/scommons/nodejs/FS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import scommons.nodejs.raw._
import scala.concurrent.{Future, Promise}
import scala.scalajs.js
import scala.scalajs.js.typedarray.Uint8Array
import scala.scalajs.js.|

trait FS {

Expand Down Expand Up @@ -96,8 +97,14 @@ trait FS {
raw.FS.writeFileSync(file, data, options)
}

def createReadStream(path: String,
options: js.UndefOr[String | CreateReadStreamOptions] = js.undefined): ReadStream = {

raw.FS.createReadStream(path, options)
}

def createWriteStream(path: String,
options: js.UndefOr[CreateWriteStreamOptions] = js.undefined): WriteStream = {
options: js.UndefOr[String | CreateWriteStreamOptions] = js.undefined): WriteStream = {

raw.FS.createWriteStream(path, options)
}
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/scommons/nodejs/StreamReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package scommons.nodejs

import scommons.nodejs.raw.Readable

import scala.concurrent.{Future, Promise}
import scala.scalajs.js
import scala.scalajs.js.typedarray.Uint8Array

class StreamReader(readable: Readable) {

import scala.concurrent.ExecutionContext.Implicits.global

private var ready = Promise[Unit]()
private var isEnd = false

private val readableListener: js.Function = { () =>
ready.trySuccess(())
}
private val errorListener: js.Function = { error: js.Error =>
ready.tryFailure(js.JavaScriptException(error))
}
private val endListener: js.Function0[Unit] = { () =>
isEnd = true
ready.trySuccess(())
readable.removeListener("readable", readableListener)
readable.removeListener("error", errorListener)
}
readable.on("readable", readableListener)
readable.on("error", errorListener)
readable.once("end", endListener)
readable.once("close", endListener)

def readNextBytes(size: Int): Future[Option[Uint8Array]] = {

def loop(): Future[Option[Uint8Array]] = {
ready.future.flatMap { _ =>
if (isEnd) Future.successful(None)
else {
val content = readable.read(size)
if (content != null) Future.successful(Some(content))
else {
ready = Promise[Unit]()
loop()
}
}
}
}

loop()
}
}
5 changes: 4 additions & 1 deletion core/src/main/scala/scommons/nodejs/raw/FS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ object FS extends js.Object {
data: String,
options: js.UndefOr[FileOptions]): Unit = js.native

def createReadStream(path: String | URL,
options: js.UndefOr[String | CreateReadStreamOptions] = js.native): ReadStream = js.native

def createWriteStream(path: String | URL,
options: js.UndefOr[CreateWriteStreamOptions]): WriteStream = js.native
options: js.UndefOr[String | CreateWriteStreamOptions] = js.native): WriteStream = js.native
}

/**
Expand Down
21 changes: 20 additions & 1 deletion core/src/main/scala/scommons/nodejs/raw/FSStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package scommons.nodejs.raw
import scala.scalajs.js

/**
* https://nodejs.org/docs/latest-v9.x/api/fs.html#fs_class_fs_writestream
* https://nodejs.org/docs/latest-v12.x/api/fs.html#fs_class_fs_writestream
*/
@js.native
trait WriteStream extends Writable
Expand All @@ -17,3 +17,22 @@ trait CreateWriteStreamOptions extends js.Object {
val autoClose: js.UndefOr[Boolean] = js.undefined
val start: js.UndefOr[Int] = js.undefined
}

/**
* https://nodejs.org/docs/latest-v12.x/api/fs.html#fs_class_fs_readstream
*/
@js.native
trait ReadStream extends Readable

trait CreateReadStreamOptions extends js.Object {

val flags: js.UndefOr[String] = js.undefined
val encoding: js.UndefOr[String] = js.undefined
val fd: js.UndefOr[Int] = js.undefined
val mode: js.UndefOr[Int] = js.undefined
val autoClose: js.UndefOr[Boolean] = js.undefined
val emitClose: js.UndefOr[Boolean] = js.undefined
val start: js.UndefOr[Int] = js.undefined
val end: js.UndefOr[Int] = js.undefined
val highWaterMark: js.UndefOr[Int] = js.undefined
}
2 changes: 2 additions & 0 deletions core/src/main/scala/scommons/nodejs/raw/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ trait Readable extends EventEmitter {
def resume(): Readable = js.native

def read(size: js.UndefOr[Int] = js.native): Uint8Array = js.native

def destroy(error: js.UndefOr[js.Error] = js.native): Readable = js.native
}
113 changes: 111 additions & 2 deletions showcase/src/test/scala/scommons/nodejs/StreamSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package scommons.nodejs

import scommons.nodejs.raw.{CreateReadStreamOptions, FileOptions}
import scommons.nodejs.test.AsyncTestSpec

import scala.concurrent.Promise
import scala.concurrent.{Future, Promise}
import scala.scalajs.js

class StreamSpec extends AsyncTestSpec {
Expand All @@ -24,12 +25,15 @@ class StreamSpec extends AsyncTestSpec {

p.success(())
}: js.Function1[js.Error, Unit])
}: js.Function1[js.Error, Unit])
}: js.Function1[js.Error, Unit]) shouldBe true
val resultF = p.future

//then
resultF.map { _ =>
fs.existsSync(tmpFile) shouldBe true
fs.readFileSync(tmpFile, new FileOptions {
override val encoding = "utf8"
}) shouldBe "hello, world!"

//cleanup
fs.unlinkSync(tmpFile)
Expand All @@ -39,4 +43,109 @@ class StreamSpec extends AsyncTestSpec {
fs.existsSync(tmpDir) shouldBe false
}
}

it should "fail with error when read data" in {
//given
val tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "scommons-nodejs-"))
val file = path.join(tmpDir, "example.txt")
fs.writeFileSync(file, "hello, World!!!")
val readable = fs.createReadStream(file)
val reader = new StreamReader(readable)

def loop(result: String): Future[String] = {
reader.readNextBytes(5).flatMap {
case None => Future.successful(result)
case Some(content) =>

//when
readable.destroy(js.Error("test error"))

loop(result + content.toString)
}
}
val resultF = loop("")

//then
resultF.failed.map { ex =>
ex.toString should include ("test error")

//cleanup
fs.unlinkSync(file)
fs.existsSync(file) shouldBe false

fs.rmdirSync(tmpDir)
fs.existsSync(tmpDir) shouldBe false
}
}

it should "end stream without error when read data" in {
//given
val tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "scommons-nodejs-"))
val file = path.join(tmpDir, "example.txt")
fs.writeFileSync(file, "hello, World!!!")
val readable = fs.createReadStream(file, new CreateReadStreamOptions {
override val highWaterMark: js.UndefOr[Int] = 5
})
val reader = new StreamReader(readable)

def loop(result: String): Future[String] = {
reader.readNextBytes(5).flatMap {
case None => Future.successful(result)
case Some(content) =>
content.length shouldBe 5

//when
readable.destroy()

loop(result + content.toString)
}
}
val resultF = loop("")

//then
resultF.map { result =>
result shouldBe "hello"

//cleanup
fs.unlinkSync(file)
fs.existsSync(file) shouldBe false

fs.rmdirSync(tmpDir)
fs.existsSync(tmpDir) shouldBe false
}
}

it should "read data from file" in {
//given
val tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "scommons-nodejs-"))
val file = path.join(tmpDir, "example.txt")
fs.writeFileSync(file, "hello, World!!!")
val reader = new StreamReader(fs.createReadStream(file, new CreateReadStreamOptions {
override val highWaterMark: js.UndefOr[Int] = 2
}))

def loop(result: String): Future[String] = {
reader.readNextBytes(4).flatMap {
case None => Future.successful(result)
case Some(content) =>
content.length should be <= 4
loop(result + content.toString)
}
}

//when
val resultF = loop("")

//then
resultF.map { result =>
result shouldBe "hello, World!!!"

//cleanup
fs.unlinkSync(file)
fs.existsSync(file) shouldBe false

fs.rmdirSync(tmpDir)
fs.existsSync(tmpDir) shouldBe false
}
}
}

0 comments on commit 1d25616

Please sign in to comment.