Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: construct #29656

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 164 additions & 7 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,7 @@ added: v9.3.0

* {number}

Return the value of `highWaterMark` passed when constructing this
`Writable`.
Return the value of `highWaterMark` passed when creating this `Writable`.
yorkie marked this conversation as resolved.
Show resolved Hide resolved

##### `writable.writableLength`
<!-- YAML
Expand Down Expand Up @@ -1193,8 +1192,7 @@ added: v9.3.0

* {number}

Returns the value of `highWaterMark` passed when constructing this
`Readable`.
Returns the value of `highWaterMark` passed when creating this `Readable`.

##### `readable.readableLength`
<!-- YAML
Expand Down Expand Up @@ -1792,7 +1790,7 @@ expectations.
added: v1.2.0
-->

For many simple cases, it is possible to construct a stream without relying on
For many simple cases, it is possible to create a stream without relying on
inheritance. This can be accomplished by directly creating instances of the
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
objects and passing appropriate methods as constructor options.
Expand All @@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
const { Writable } = require('stream');

const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
ronag marked this conversation as resolved.
Show resolved Hide resolved
// Free resources...
}
});
```
Expand Down Expand Up @@ -1861,6 +1865,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][writable-_construct] method.
ronag marked this conversation as resolved.
Show resolved Hide resolved
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.

Expand Down Expand Up @@ -1906,6 +1912,56 @@ const myWritable = new Writable({
});
```

#### `writable._construct(callback)`
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.

The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.

This optional function will be called in a tick after the stream constructor
has returned, delaying any `_write`, `_final` and `_destroy` calls until
`callback` is called. This is useful to initialize state or asynchronously
initialize resources before the stream can be used.

```js
const { Writable } = require('stream');
const fs = require('fs');

class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = fd;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### `writable._write(chunk, encoding, callback)`
<!-- YAML
changes:
Expand Down Expand Up @@ -2130,6 +2186,8 @@ changes:
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.

Expand Down Expand Up @@ -2172,6 +2230,63 @@ const myReadable = new Readable({
});
```

#### `readable._construct(callback)`
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.

The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Readable`
class methods only.

This optional function will be scheduled in the next tick by the stream
constructor, delaying any `_read` and `_destroy` calls until `callback` is
called. This is useful to initialize state or asynchronously initialize
resources before the stream can be used.

```js
const { Readable } = require('stream');
const fs = require('fs');

class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### `readable._read(size)`
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
});
```

When using pipeline:

```js
const { Transform, pipeline } = require('stream');
const fs = require('fs');

pipeline(
fs.createReadStream('object.json')
.setEncoding('utf-8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
} catch (err) {
callback(err);
}
}
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
}
);
ronag marked this conversation as resolved.
Show resolved Hide resolved
```

#### An Example Duplex Stream

The following illustrates a simple example of a `Duplex` stream that wraps a
Expand Down Expand Up @@ -2706,8 +2861,8 @@ unhandled post-destroy errors.

#### Creating Readable Streams with Async Generators

We can construct a Node.js Readable Stream from an asynchronous generator
using the `Readable.from()` utility method:
A Node.js Readable Stream can be created from an asynchronous generator using
ronag marked this conversation as resolved.
Show resolved Hide resolved
the `Readable.from()` utility method:

```js
const { Readable } = require('stream');
Expand Down Expand Up @@ -2960,6 +3115,7 @@ contain multi-byte characters.
[http-incoming-message]: http.html#http_class_http_incomingmessage
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[object-mode]: #stream_object_mode
[readable-_construct]: #stream_readable_construct_callback
[readable-_destroy]: #stream_readable_destroy_err_callback
[readable-destroy]: #stream_readable_destroy_error
[stream-_final]: #stream_writable_final_callback
Expand All @@ -2976,6 +3132,7 @@ contain multi-byte characters.
[stream-uncork]: #stream_writable_uncork
[stream-write]: #stream_writable_write_chunk_encoding_callback
[Stream Three States]: #stream_three_states
[writable-_construct]: #stream_writable_construct_callback
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[writable-new]: #stream_constructor_new_stream_writable_options
Expand Down
24 changes: 19 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
Expand Down Expand Up @@ -197,9 +203,16 @@ function Readable(options) {

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this, options);

destroyImpl.construct(this, () => {
maybeReadMore(this, this._readableState);
});
}

Readable.prototype.destroy = destroyImpl.destroy;
Expand Down Expand Up @@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
}

// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
// reading, then it's unnecessary, if we're constructing we have to wait,
// and if we're destroyed or errored, then it's not allowed,
if (state.ended || state.reading || state.destroyed || state.errored ||
!state.constructed) {
doRead = false;
debug('reading or ended', doRead);
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
Expand Down Expand Up @@ -587,7 +601,7 @@ function emitReadable_(stream) {
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {
if (!state.readingMore) {
if (!state.readingMore && state.constructed) {
state.readingMore = true;
process.nextTick(maybeReadMore_, stream, state);
}
Expand Down
27 changes: 25 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// Emit prefinish if the only thing we're waiting for is _write cbs
// This is relevant for synchronous Transform streams.
this.prefinished = false;
Expand Down Expand Up @@ -249,9 +255,22 @@ function Writable(options) {

if (typeof options.final === 'function')
this._final = options.final;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this, options);

destroyImpl.construct(this, () => {
const state = this._writableState;

if (!state.writing) {
clearBuffer(this, state);
}

finishMaybe(this, state);
});
}

// Otherwise people can pipe Writable streams, which is just wrong.
Expand Down Expand Up @@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

state.length += len;

if (state.writing || state.corked || state.errored) {
if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
Expand Down Expand Up @@ -492,7 +511,10 @@ function errorBuffer(state, err) {

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked || state.bufferProcessing || state.destroyed) {
if (state.corked ||
state.bufferProcessing ||
state.destroyed ||
!state.constructed) {
return;
}

Expand Down Expand Up @@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {

function needFinish(state) {
return (state.ending &&
state.constructed &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0 &&
Expand Down
Loading