Skip to content

Commit

Permalink
Minor tweaks to File and I/O internals (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski committed May 21, 2024
1 parent 9174a1f commit 926f9e2
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions core/src/main/scala/ox/channels/SourceCompanionIOOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import scala.util.control.NonFatal

trait SourceCompanionIOOps:

/** Converts a [[java.io.InputStream]] into a `Source[Chunk[Bytes]]`.
/** Converts a [[java.io.InputStream]] into a `Source[Chunk[Bytes]]`. Implicit `StageCapacity` can be used to control the number of
* buffered chunks.
*
* @param is
* an `InputStream` to read bytes from.
Expand All @@ -22,19 +23,19 @@ trait SourceCompanionIOOps:
* @return
* a `Source` of chunks of bytes.
*/
def fromInputStream(is: InputStream, chunkSize: Int = 1024)(using Ox): Source[Chunk[Byte]] =
def fromInputStream(is: InputStream, chunkSize: Int = 1024)(using Ox, StageCapacity): Source[Chunk[Byte]] =
val chunks = StageCapacity.newChannel[Chunk[Byte]]
fork {
try
repeatWhile {
val a = new Array[Byte](chunkSize)
val r = is.read(a)
if r == -1 then
val buf = new Array[Byte](chunkSize)
val readBytes = is.read(buf)
if readBytes == -1 then
chunks.done()
false
else
val chunk = if r == chunkSize then Chunk.fromArray(a) else Chunk.fromArray(a.take(r))
chunks.send(chunk)
if readBytes > 0 then
chunks.send(if readBytes == chunkSize then Chunk.fromArray(buf) else Chunk.fromArray(buf.take(readBytes)))
true
}
catch
Expand All @@ -48,7 +49,8 @@ trait SourceCompanionIOOps:
}
chunks

/** Creates a `Source` that emits byte chunks read from a file.
/** Creates a `Source` that emits byte chunks read from a file. Implicit `StageCapacity` can be used to control the number of buffered
* chunks.
*
* @param path
* path the file to read from.
Expand All @@ -61,7 +63,7 @@ trait SourceCompanionIOOps:
* @throws SecurityException
* If SecurityManager error occurs when opening the file.
*/
def fromFile(path: Path, chunkSize: Int = 1024)(using Ox): Source[Chunk[Byte]] =
def fromFile(path: Path, chunkSize: Int = 1024)(using Ox, StageCapacity): Source[Chunk[Byte]] =
if Files.isDirectory(path) then throw new IOException(s"Path $path is a directory")
val chunks = StageCapacity.newChannel[Chunk[Byte]]
val jFileChannel =
Expand All @@ -79,11 +81,8 @@ trait SourceCompanionIOOps:
if readBytes < 0 then
chunks.done()
false
else if readBytes == 0 then
chunks.send(Chunk.empty)
true
else
chunks.send(Chunk.fromArray(if readBytes == chunkSize then buf.array else buf.array.take(readBytes)))
if readBytes > 0 then chunks.send(Chunk.fromArray(if readBytes == chunkSize then buf.array else buf.array.take(readBytes)))
true
}
} catch case e => chunks.errorOrClosed(e).discard
Expand Down

0 comments on commit 926f9e2

Please sign in to comment.