Skip to content

feature: Add streaming I/O methods to FileSystem#458

Merged
xerial merged 5 commits into
mainfrom
feature/add-streaming-io
Apr 3, 2026
Merged

feature: Add streaming I/O methods to FileSystem#458
xerial merged 5 commits into
mainfrom
feature/add-streaming-io

Conversation

@xerial
Copy link
Copy Markdown
Member

@xerial xerial commented Mar 31, 2026

Summary

  • Add cross-platform streaming I/O APIs to FileSystem for processing large files without loading them entirely into memory
  • New sync methods: readLinesLazy (lazy Iterator[String]), readChunks (lazy Iterator[Array[Byte]]), readStream (InputStream), writeStream (OutputStream)
  • New Rx methods: readLinesRx and readChunksRx for reactive streaming
  • JVM/Native iterators are truly lazy with AutoCloseable support; JS uses eager fallback since Node.js sync APIs are inherently eager

Test plan

  • All 30 FileSystemTest tests pass on JVM (10 new streaming tests)
  • CI passes on all platforms (JVM, JS, Native)
  • Verify readLinesLazy doesn't load entire file on JVM
  • Verify readChunks produces correct chunk sizes and counts

🤖 Generated with Claude Code

Add cross-platform streaming I/O APIs for processing large files
without loading them entirely into memory:

- readLinesLazy: lazy line-by-line Iterator (truly lazy on JVM/Native)
- readChunks: fixed-size byte chunk Iterator
- readStream/writeStream: InputStream/OutputStream access
- readLinesRx/readChunksRx: Rx-based reactive streaming

JVM/Native iterators implement AutoCloseable for resource safety.
JS falls back to eager reads since Node.js sync APIs are inherently
eager.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added the feature New feature label Mar 31, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds streaming and reactive I/O support to the FileSystem trait for JVM, JS, and Native platforms, including lazy line reading and chunked access. The review feedback highlights that the reactive methods currently load entire files into memory, defeating the purpose of streaming. Other identified issues include an inefficient append implementation in JavaScript, a lack of explicit UTF-8 encoding in the Native platform, and resource management concerns in the custom iterators regarding eager initialization and error handling.

* Reads the file line by line as a reactive stream. Each line is emitted as an OnNext event,
* followed by OnCompletion.
*/
def readLinesRx(path: IOPath): Rx[String] = Rx.fromSeq(readLinesLazy(path).toSeq)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The implementation of readLinesRx is not truly streaming because it calls .toSeq on the lazy iterator. This exhausts the iterator and loads the entire file content into memory before creating the reactive stream, which defeats the purpose of using a lazy iterator for large files. If Rx supports it, consider using a method that creates a stream directly from an Iterator or a producer.

Comment on lines +255 to +257
def readChunksRx(path: IOPath, chunkSize: Int = 8192): Rx[Array[Byte]] = Rx.fromSeq(
readChunks(path, chunkSize).toSeq
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to readLinesRx, readChunksRx calls .toSeq on the chunk iterator, causing the entire file to be loaded into memory as a sequence of byte arrays. This is inefficient for large files. Use a streaming producer for Rx if available.

Comment on lines +652 to +654
case WriteMode.Append =>
val existing = FileSystemJS.readBytes(path)
FileSystemJS.writeBytes(path, existing ++ bytes, WriteMode.Create)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The Append mode implementation in FlushToFileOutputStream is highly inefficient as it reads the entire existing file into memory (readBytes), appends the new data, and then writes it all back. Since FileSystemJS.writeBytes already handles WriteMode.Append (which uses Node's appendFileSync or a browser-specific append), you should delegate to it directly.

        case WriteMode.Append =>
          FileSystemJS.writeBytes(path, bytes, WriteMode.Append)

override def readLines(path: IOPath): Seq[String] = readString(path).split("\n").toSeq

override def readLinesLazy(path: IOPath): Iterator[String] =
val reader = BufferedReader(FileReader(toJavaFile(path)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

FileReader uses the default platform encoding, which may not be UTF-8. To ensure cross-platform consistency as specified in the FileSystemBase trait, you should explicitly use UTF-8.

    val reader = BufferedReader(InputStreamReader(FileInputStream(toJavaFile(path)), StandardCharsets.UTF_8))

private[io] class CloseableLineIterator(reader: BufferedReader)
extends Iterator[String]
with AutoCloseable:
private var nextLine: String | Null = reader.readLine()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The iterator performs an eager read in the constructor. If the iterator is created but never consumed (or hasNext is never called), the first line is still read and the file handle remains open until GC or an explicit close. It is better to defer the first read until hasNext or next is invoked.

val line = nextLine
if line == null then
throw java.util.NoSuchElementException("No more lines")
nextLine = reader.readLine()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If reader.readLine() throws an IOException, the underlying reader will not be closed because the close() call is only triggered when hasNext returns false. Consider wrapping the read in a try-catch block that closes the resource on failure.

xerial and others added 4 commits March 31, 2026 11:08
- Move CloseableLineIterator/CloseableChunkIterator to shared sources
  (uni-core/src/main/scala) to eliminate JVM/Native duplication
- Fix JS FlushToFileOutputStream append to use appendFileSync directly
  instead of read-concatenate-rewrite (avoids loading entire file)
- Fix Rx tests to use run() instead of toSeq (unsupported on Scala.js)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Use InputStreamReader with UTF-8 in Native readLinesLazy instead of
  FileReader which uses platform default encoding
- Simplify JS FlushToFileOutputStream to delegate directly to writeBytes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
readLinesRx/readChunksRx called .toSeq on the iterator, loading
everything into memory before creating the Rx stream. This defeats
the purpose of streaming, so remove them.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@xerial xerial merged commit 1795662 into main Apr 3, 2026
14 checks passed
@xerial xerial deleted the feature/add-streaming-io branch April 3, 2026 23:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant