Skip to content

Commit

Permalink
Add BufferedRead and BufferedReader
Browse files Browse the repository at this point in the history
The BufferedRead trait is a Read type that uses an internal buffer,
making reads more efficient and allowing extra operations that would
otherwise be too slow. BufferedReader is an implementation of this type,
supporting any Read type and using an in-memory buffer.

Changelog: added
  • Loading branch information
yorickpeterse committed Jul 29, 2023
1 parent 59f65dd commit 085ee32
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 2 deletions.
211 changes: 210 additions & 1 deletion std/src/std/io.inko
Expand Up @@ -4,17 +4,21 @@
# reading from and writing to a file.
import std.cmp.Equal
import std.fmt.(Format, Formatter)
import std.string.ToString
import std.iter.Stream
import std.libc.bsd.errors if bsd
import std.libc.linux.errors if linux
import std.libc.mac.errors if mac
import std.string.ToString

# The initial number of bytes to read in `Read.read_all`
let INITIAL_READ_ALL_SIZE = 1024

# The maximum number of bytes to read when using `Read.read_all`.
let MAX_READ_ALL_SIZE = 1024 * 1024

# The default size of the buffer maintained by `BufferedReader`.
let DEFAULT_BUFFER_SIZE = 8 * 1024

fn extern inko_last_error -> Int32

# An error type for I/O operations.
Expand Down Expand Up @@ -357,3 +361,208 @@ trait pub Seek {
# the end.
fn pub mut seek(position: Int) -> Result[Int, Error]
}

# A `Read` type using an internal buffer, allowing more efficient reading and
# additional operations.
trait pub BufferedRead: Read {
# Fills the internal buffer by reading from the underlying iterator.
#
# Upon success, this method returns `Ok(n)` where `n` is the number of bytes
# remaining in the buffer. If the underlying read fails, an `Error` is
# returned.
#
# If there are bytes remaining in the buffer, calls to this method shouldn't
# modify it.
fn mut fill_buffer -> Result[Int, Error]

# Reads up to `size` bytes from the internal buffer into `into`, returning the
# number of bytes read.
#
# If the buffer relies on a cursor, the cursor must be advanced by this method
# such that multiple calls to `read_buffer` don't read the same bytes.
fn mut read_buffer(into: mut ByteArray, size: Int) -> Int

# Read and return a single byte.
#
# If a byte is read, `Ok(Some(n))` is returned where `n` is the byte. A
# `Ok(None)` indicates the end of the input.
#
# # Examples
#
# import std.fs.file.ReadOnlyFile
# import std.io.BufferedReader
#
# let file = ReadOnlyFile.new('README.md').unwrap
# let reader = BufferedReader.new(file)
#
# reader.read_byte # => Result.Ok(Option.Some(35))
fn pub mut read_byte -> Result[Option[Int], Error]

# Read bytes into `into` up to and including the byte specified in the `byte`
# argument.
#
# Upon success, the return value is `Ok(n)` where `n` is the number of bytes
# read into `into`.
#
# # Examples
#
# import std.fs.file.ReadOnlyFile
# import std.io.BufferedReader
#
# let file = ReadOnlyFile.new('README.md').unwrap
# let reader = BufferedReader.new(file)
# let bytes = ByteArray.new
#
# reader.read(into: bytes, size: 32)
fn pub mut read_until(byte: Int, into: mut ByteArray) -> Result[Int, Error] {
let mut total = 0

loop {
match try read_byte {
case Some(val) -> {
total += 1
into.push(val)

if byte == val { break }
}
case _ -> break
}
}

Result.Ok(total)
}

# Read bytes into `into` up to and including the newline byte (0xA aka
# `"\a"`).
#
# Upon success, the return value is `Ok(n)` where `n` is the number of bytes
# read into `into`.
#
# # Examples
#
# import std.fs.file.ReadOnlyFile
# import std.io.BufferedReader
#
# let file = ReadOnlyFile.new('README.md').unwrap
# let reader = BufferedReader.new(file)
# let bytes = ByteArray.new
#
# reader.read_line(into: bytes)
fn pub mut read_line(into: mut ByteArray) -> Result[Int, Error] {
read_until(byte: 0xA, into: into)
}

# Returns an iterator that yields the bytes in `self`.
#
# Each byte is wrapped in a `Result`, as reading may fail.
#
# # Examples
#
# import std.fs.file.ReadOnlyFile
# import std.io.BufferedReader
#
# let file = ReadOnlyFile.new('README.md').unwrap
# let reader = BufferedReader.new(file)
#
# reader.bytes.next # => Option.Some(Result.Ok(35))
fn pub mut bytes -> Stream[Result[Int, Error]] {
Stream.new fn move {
match read_byte {
case Ok(Some(num)) -> Option.Some(Result.Ok(num))
case Ok(None) -> Option.None
case Error(err) -> Option.Some(Result.Error(err))
}
}
}
}

# A type for performing buffered reads from a `Read` type.
#
# Using a `Read` type directly can be inefficient, as many calls to `Read.read`
# may involve many system calls. `BufferedReader` wraps a `Read` and buffers
# data into an internal buffer, reducing the total amount of system calls, at
# the cost of needing to maintain an in-memory buffer.
class pub BufferedReader[T: Read + mut] {
let @reader: T
let @buffer: ByteArray
let @capacity: Int
let @offset: Int

# Returns a new buffered reader that wraps the given `Read` type, using the
# default buffer size.
fn pub static new(reader: T) -> BufferedReader[T] {
with_capacity(reader, DEFAULT_BUFFER_SIZE)
}

# Returns a new buffered reader that wraps the given `Read` type, using the
# specified buffer size.
#
# The `reader` argument can be any `Read` type, provided it allows mutation
# (e.g. a `ref Reader` isn't valid).
fn pub static with_capacity(reader: T, size: Int) -> BufferedReader[T] {
if size <= 0 { panic('The buffer size must be greater than zero') }

BufferedReader {
@reader = reader,
@buffer = ByteArray.new,
@capacity = size,
@offset = 0,
}
}
}

impl BufferedRead for BufferedReader {
fn mut fill_buffer -> Result[Int, Error] {
if @buffer.size > 0 and @offset < @capacity {
return Result.Ok(@buffer.size - @offset)
}

@buffer.clear
@offset = 0
@reader.read(into: @buffer, size: @capacity)
}

fn mut read_buffer(into: mut ByteArray, size: Int) -> Int {
let copied = into.copy_from(@buffer, at: @offset, size: size)

@offset += copied
copied
}

fn pub mut read_byte -> Result[Option[Int], Error] {
match try fill_buffer {
case 0 -> Result.Ok(Option.None)
case _ -> Result.Ok(Option.Some(@buffer.get(@offset := @offset + 1)))
}
}
}

impl Read for BufferedReader {
fn pub mut read(into: mut ByteArray, size: Int) -> Result[Int, Error] {
let mut total = 0

# If the read size is larger than our buffer, there's no point in buffering
# as we can just read all data at once (of course taking account the bytes
# still in the buffer).
if size > @capacity {
if @offset < @capacity and @buffer.size > 0 {
total += read_buffer(into, size)
}

total += try @reader.read(into: into, size: size - total)

return Result.Ok(total)
}

while total < size {
if (try fill_buffer) == 0 { break }

match read_buffer(into, size - total) {
case 0 -> break
case n -> total += n
}
}

Result.Ok(total)
}
}
129 changes: 128 additions & 1 deletion std/test/std/test_io.inko
@@ -1,5 +1,5 @@
import helpers.(fmt)
import std.io.(Error, Read, Write)
import std.io.(DEFAULT_BUFFER_SIZE, BufferedReader, Error, Read, Write)
import std.libc.bsd.errors if bsd
import std.libc.linux.errors if linux
import std.libc.mac.errors if mac
Expand All @@ -12,6 +12,10 @@ class Reader {
fn static new -> Reader {
Reader { @index = 0, @bytes = ByteArray.from_array([1, 2, 3]) }
}

fn static from_array(bytes: Array[Int]) -> Reader {
Reader { @index = 0, @bytes = ByteArray.from_array(bytes) }
}
}

impl Read for Reader {
Expand All @@ -31,6 +35,14 @@ impl Read for Reader {
}
}

class ErrorReader {}

impl Read for ErrorReader {
fn pub mut read(into: mut ByteArray, size: Int) -> Result[Int, Error] {
Result.Error(Error.TimedOut)
}
}

class Writer {
let @buffer: ByteArray

Expand Down Expand Up @@ -142,4 +154,119 @@ fn pub tests(t: mut Tests) {

t.equal(writer.buffer, "foo\n".to_byte_array)
}

t.test('BufferedRead.read_until') fn (t) {
let reader = BufferedReader.new(Reader.from_array([1, 0xA, 2, 0xA, 3]))
let bytes = ByteArray.new

t.equal(reader.read_until(byte: 0xA, into: bytes), Result.Ok(2))
t.equal(bytes, ByteArray.from_array([1, 0xA]))

t.equal(reader.read_until(byte: 0xA, into: bytes), Result.Ok(2))
t.equal(bytes, ByteArray.from_array([1, 0xA, 2, 0xA]))

t.equal(reader.read_until(byte: 0xA, into: bytes), Result.Ok(1))
t.equal(bytes, ByteArray.from_array([1, 0xA, 2, 0xA, 3]))
}

t.test('BufferedRead.read_line') fn (t) {
let reader = BufferedReader.new(Reader.from_array([1, 0xA, 2, 0xA, 3]))
let bytes = ByteArray.new

t.equal(reader.read_line(bytes), Result.Ok(2))
t.equal(bytes, ByteArray.from_array([1, 0xA]))

t.equal(reader.read_line(bytes), Result.Ok(2))
t.equal(bytes, ByteArray.from_array([1, 0xA, 2, 0xA]))

t.equal(reader.read_line(bytes), Result.Ok(1))
t.equal(bytes, ByteArray.from_array([1, 0xA, 2, 0xA, 3]))
}

t.test('BufferedRead.read_line') fn (t) {
let reader = BufferedReader.new(Reader.new)

t.equal(reader.bytes.to_array, [Result.Ok(1), Result.Ok(2), Result.Ok(3)])
}

t.test('BufferedReader.new') fn (t) {
let reader = BufferedReader.new(Reader.new)

t.equal(reader.capacity, DEFAULT_BUFFER_SIZE)
}

t.test('BufferedReader.with_capacity') fn (t) {
let reader = BufferedReader.with_capacity(Reader.new, size: 32)

t.equal(reader.capacity, 32)
}

t.panic('BufferedReader.with_capacity with an invalid size') fn {
BufferedReader.with_capacity(Reader.new, size: 0)
}

t.test('BufferedReader.fill_buffer') fn (t) {
let reader = BufferedReader.with_capacity(Reader.new, size: 3)

t.equal(reader.fill_buffer, Result.Ok(3))
t.equal(reader.buffer, ByteArray.from_array([1, 2, 3]))

# This is to make sure fill_buffer() doesn't somehow mess up the buffer for
# repeated calls without reading from the buffer.
t.equal(reader.fill_buffer, Result.Ok(3))
t.equal(reader.buffer, ByteArray.from_array([1, 2, 3]))

# This is to make sure that if the buffer is consumed, a call to
# fill_buffer() resets it.
reader.read_buffer(into: ByteArray.new, size: 3)
reader.fill_buffer

t.true(reader.buffer.empty?)
t.equal(reader.offset, 0)
}

t.test('BufferedReader.read_buffer') fn (t) {
let reader = BufferedReader.new(Reader.new)
let bytes = ByteArray.new

reader.fill_buffer
t.equal(reader.read_buffer(into: bytes, size: 2), 2)
t.equal(reader.offset, 2)
t.equal(bytes, ByteArray.from_array([1, 2]))
}

t.test('BufferedReader.read_byte') fn (t) {
let ok = BufferedReader.new(Reader.new)
let err = BufferedReader.new(ErrorReader {})

t.equal(ok.read_byte, Result.Ok(Option.Some(1)))
t.equal(ok.read_byte, Result.Ok(Option.Some(2)))
t.equal(ok.read_byte, Result.Ok(Option.Some(3)))
t.equal(ok.read_byte, Result.Ok(Option.None))
t.equal(err.read_byte, Result.Error(Error.TimedOut))
}

t.test('BufferedReader.read with a small read size') fn (t) {
let reader = BufferedReader.new(Reader.new)
let bytes = ByteArray.new

t.equal(reader.read(into: bytes, size: 2), Result.Ok(2))
t.equal(bytes, ByteArray.from_array([1, 2]))
t.equal(reader.buffer, ByteArray.from_array([1, 2, 3]))
t.equal(reader.offset, 2)

t.equal(reader.read(into: bytes, size: 2), Result.Ok(1))
t.equal(bytes, ByteArray.from_array([1, 2, 3]))
t.equal(reader.buffer, ByteArray.from_array([1, 2, 3]))
t.equal(reader.offset, 3)
}

t.test('BufferedReader.read with a size larger than the buffer size') fn (t) {
let reader = BufferedReader.with_capacity(Reader.new, size: 1)
let bytes = ByteArray.new

t.equal(reader.read(into: bytes, size: 1), Result.Ok(1))
t.equal(reader.read(into: bytes, size: 10), Result.Ok(2))
t.equal(bytes, ByteArray.from_array([1, 2, 3]))
}
}

0 comments on commit 085ee32

Please sign in to comment.