Skip to content

Latest commit

 

History

History
191 lines (148 loc) · 9.96 KB

BinaryExtension.md

File metadata and controls

191 lines (148 loc) · 9.96 KB

Streams API Binary Extension

Readable Byte Stream API

This extended API allows efficient access to a data source available for read via the POSIX non-blocking read(2) API or similar.

We map the non-blocking read(2) to a non-blocking method ReadableByteStream.prototype.readInto(), and asynchronous notification of I/O events to the state and Promises.

ReadableByteStream has the same states as the ReadableStream. State transition happens on notifyReady() call and on calling readInto() of the underlying source.

When a ReadableByteStream is constructed, the underlying source watches for events on the file descriptor from which data is read using some I/O event loop built with select(2), poll(2), etc. ReadableByteStream provides a function notifyReady() to the underlying source. It is intended to be called when the file descriptor is ready. It moves the stream into the "readable" state. The "readable" state doesn't necessarily mean some bytes are available for read. It just means that ReadableByteStream.prototype.readInto() can be called. We need to call read(2) to know what kind of event has actually happened (new data available for read or the EOF or an error), so we enter "readable" and let the user call readInto().

ReadableByteStream's constructor takes readInto() function from the underlying source instead of taking pull() and providing [[enqueue]]. A user calls ReadableByteStream.prototype.readInto() with an ArrayBuffer prepared by the user to get available data written into the ArrayBuffer. The method calls the readInto() of the underlying source with the ArrayBuffer. readInto() calls read(2) to write read data directly onto the ArrayBuffer. The stream translates the return value of the function into the next state of the stream and returns the number of bytes written to the given ArrayBuffer as follows:

  • If there are bytes available for read,
    • The underlying source's readInto() should write the bytes into the ArrayBuffer and return the number of bytes written.
    • Then, the stream stays in the "readable" state and stream.readInto() will return the number of bytes written.
  • If the file descriptor has nothing available for non-blocking read, e.g. read(2) returning EAGAIN, EWOULDBLOCK, etc.,
    • The underlying source's readInto() should write nothing into the ArrayBuffer and return -2.
    • Then, the stream enters the "waiting" state and stream.readInto() will return 0.
  • If the file descriptor reached the EOF,
    • The underlying source's readInto() should write nothing into the ArrayBuffer and return -1.
    • Then, the stream enters the "closed" state and stream.readInto() will return 0.
  • If the read(2) fails,
    • The underlying source's readInto() should write nothing into the ArrayBuffer and throw.
    • Then, the stream enters the "errored" state and stream.readInto() will return 0.

By admitting returning the ArrayBuffer with no data written on it, ReadableByteStream satisfies the semantics of ReadableStream.

ReadableByteStream

class ReadableByteStream {
    constructor({
        function start = () => {},
        function readInto = () => {},
        function cancel() => {},
        readBufferSize = undefined
    })

    any readInto(arrayBuffer, offset, size)
    void cancel(any reason)

    get ReadableStreamState state

    get Promise<undefined> wait
    get Promise<undefined> closed

    // Internal slots
    [[state]] = "waiting"
    [[storedError]]

    [[readyPromise]]
    [[closedPromise]]

    // Holders for stuff given by the underlying source
    [[onReadInto]]
    [[onCancel]]

    // Internal methods for use by the underlying source
    [[notifyReady]]()
    [[error]](any e)
}

Abstract Operations For ReadableByteStream Objects

Notify Ready Function

A notify ready function is an anonymous built-in function that has [[Stream]] internal slot.

When a notify ready function F is called, the following steps are taken:

  1. Let stream be the value of F's [[Stream]] internal slot.
  2. If stream.[[state]] is not "waiting", return.
  3. Set stream.[[state]] to "readable".
  4. Resolve stream.[[readyPromise]] with undefined.
ErrorReadableByteStream( stream, error )
  1. If stream.[[state]] is "errored" or "closed", return.
  2. If stream.[[state]] is "waiting", reject stream.[[readyPromise]] with error.
  3. If stream.[[state]] is "readable", let stream.[[readyPromise]] be a new promise rejected with error.
  4. Set stream.[[state]] to "errored".
  5. Set stream.[[storedError]] to error.
  6. Reject stream.[[closedPromise]] with error.
Error Function

An error function is an anonymous built-in function that has [[Stream]] internal slot.

When an error function F is called with argument error, the following steps are taken:

  1. Let stream be the value of F's [[Stream]] internal slot.
  2. ErrorReadableByteStream(stream, error).

Properties of the ReadableByteStream Prototype Object

constructor({ start, readInto, cancel, readBufferSize })
  1. Let stream be the this value.
  2. If IsCallable(start) is false, then throw a TypeError exception.
  3. If IsCallable(readInto) is false, then throw a TypeError exception.
  4. If IsCallable(cancel) is false, then throw a TypeError exception.
  5. If readBufferSize is not undefined,
    1. Let readBufferSize be ToInteger(readBufferSize).
    2. If readBufferSize < 0, throw a RangeError exception.
  6. Set stream.[[onReadInto]] to readInto.
  7. Set stream.[[onCancel]] to cancel.
  8. Set stream.[[readBufferSize]] to readBufferSize.
  9. Let stream.[[readyPromise]] be a new promise.
  10. Let stream.[[closedPromise]] be a new promise.
  11. Let stream.[[notifyReady]] be a new built-in function object as defined in Notify Ready Function with [[Stream]] internal slot set to stream.
  12. Let stream.[[error]] be a new built-in function object as defined in Error Function with [[Stream]] internal slot set to stream.
  13. Let startResult be the result of calling the [[Call]] internal method of start with undefined as thisArgument and (stream.[[notifyReady]], stream.[[error]]) as argumentList.
  14. ReturnIfAbrupt(startResult).
ReadableByteStream.prototype.read ()
  1. If this.[[readBufferSize]] is undefined, throw a TypeError exception.
  2. Let arrayBuffer be a new array buffer with length equals to this.[[readBufferSize]].
  3. Let bytesRead be Invoke(this, "readInto", (arrayBuffer, 0, this.[[readBufferSize]])).
  4. Let resizedArrayBuffer be a new array buffer with length equal to bytesRead created by transferring arrayBuffer using the semantics of the proposed ArrayBuffer.transfer.
  5. Return resizedArrayBuffer.
ReadableByteStream.prototype.readInto ( arrayBuffer, offset, size )
  1. If this.[[state]] is "waiting" or "closed", throw a TypeError exception.
  2. If this.[[state]] is "errored", throw this.[[storedError]].
  3. Assert: this.[[state]] is "readable".
  4. If Type(arrayBuffer) is not Object, throw a TypeError exception.
  5. If arrayBuffer does not have an [[ArrayBufferData]] internal slot, throw a TypeError exception.
  6. If the value of arrayBuffer's [[ArrayBufferData]] internal slot is undefined, then throw a TypeError exception.
  7. If IsNeuteredBuffer(arrayBuffer) is true, then throw a TypeError exception.
  8. Let bufferLength be the value of arrayBuffer's [[ArrayBufferByteLength]] internal slot.
  9. If offset is undefined, let offset be 0.
  10. Otherwise,
    1. Let offset be ToInteger(offset).
    2. ReturnIfAbrupt(offset).
    3. If offset < 0, throw a RangeError exception.
  11. If size is undefined, let size be bufferLength - offset.
  12. Otherwise,
    1. Let size be ToInteger(size).
    2. ReturnIfAbrupt(size).
  13. If size < 0 or offset + size > bufferLength, throw a RangeError exception.
  14. Let bytesRead be the result of calling the [[Call]] internal method of this.[[onReadInto]] with undefined as thisArgument and (arrayBuffer, offset, size) as argumentsList.
  15. If bytesRead is an abrupt completion,
    1. ErrorReadableByteStream(this, bytesRead.[[value]]).
    2. Return bytesRead.
  16. Let bytesRead be ToNumber(bytesRead).
  17. If bytesRead is NaN or bytesRead < -2 or bytesRead > bufferLength,
    1. Let error be a RangeError exception.
    2. ErrorReadableByteStream(this, error).
    3. Throw error.
  18. If bytesRead is -2,
    1. Set this.[[state]] to "waiting".
    2. Let this.[[readyPromise]] be a new promise.
    3. Return 0.
  19. If bytesRead is -1,
    1. Set this.[[state]] to "closed"
    2. Resolve this.[[closedPromise]] with undefined.
    3. Return 0.
  20. Return bytesRead.
ReadableByteStream.prototype.cancel ( reason )
  1. If this.[[state]] is "closed", return a new promise resolved with undefined.
  2. If this.[[state]] is "errored", return a new promise rejected with this.[[storedError]].
  3. If this.[[state]] is "waiting", resolve this.[[readyPromise]] with undefined.
  4. Set this.[[state]] to "closed".
  5. Resolve this.[[closedPromise]] with undefined.
  6. Let cancelPromise be a new promise.
  7. Let sourceCancelPromise be the result of promise-calling this.[[onCancel]](reason).
  8. Upon fulfillment of sourceCancelPromise, resolve cancelPromise with undefined.
  9. Upon rejection of sourceCancelPromise with reason r, reject cancelPromise with r.
  10. Return cancelPromise.
get ReadableByteStream.prototype.state
  1. Let stream be the this value.
  2. Return stream.[[state]].
get ReadableByteStream.prototype.ready
  1. Let stream be the this value.
  2. Return stream.[[readyPromise]].
get ReadableByteStream.prototype.closed
  1. Let stream be the this value.
  2. Return stream.[[closedPromise]].