Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,44 @@ write('hello', () => {

A `Writable` stream in object mode will always ignore the `encoding` argument.

##### `writable.writeAhead(chunk[, encoding][, callback])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string|null} The encoding, if `chunk` is a string. **Default:** `'utf8'`
* `callback` {Function} Callback for when this chunk of data is flushed.
* Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write
additional data; otherwise `true`.

When using `writable.writeAhead()`, it's recommended to buffer
the stream using [`writable.cork()`][] first since the stream might
consume the chunk before writing ahead.

For example:

```js
const fs = require('node:fs');
const file = fs.createWriteStream('sentence.txt');
// file.cork();
file.write('World!');
setTimeout(() => {
file.writeAhead('hello');
file.end();
}, 1_000);
```

Running the above code would write `World!hello` to the file,
by uncommenting the `file.cork()`, the code would write `helloWorld!` instead.

### Readable streams

Readable streams are an abstraction for a _source_ from which data is
Expand Down
29 changes: 15 additions & 14 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,20 +290,17 @@ Writable.prototype.pipe = function() {
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};

function _write(stream, chunk, encoding, cb) {
function _write(stream, chunk, encoding, ahead, cb) {
const state = stream._writableState;

if (typeof encoding === 'function') {
cb = encoding;
cb = [encoding, ahead, cb]
.find((item) => typeof item === 'function') ?? nop;
ahead = [encoding, ahead]
.find((item) => typeof item === 'boolean') ?? false;
if (!encoding || typeof encoding !== 'string')
encoding = state.defaultEncoding;
} else {
if (!encoding)
encoding = state.defaultEncoding;
else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
throw new ERR_UNKNOWN_ENCODING(encoding);
if (typeof cb !== 'function')
cb = nop;
}
if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
throw new ERR_UNKNOWN_ENCODING(encoding);

if (chunk === null) {
throw new ERR_STREAM_NULL_VALUES();
Expand Down Expand Up @@ -337,13 +334,17 @@ function _write(stream, chunk, encoding, cb) {
return err;
}
state.pendingcb++;
return writeOrBuffer(stream, state, chunk, encoding, cb);
return writeOrBuffer(stream, state, chunk, encoding, ahead, cb);
}

Writable.prototype.write = function(chunk, encoding, cb) {
return _write(this, chunk, encoding, cb) === true;
};

Writable.prototype.writeAhead = function(chunk, encoding, cb) {
return _write(this, chunk, encoding, true, cb) === true;
};

Writable.prototype.cork = function() {
this._writableState.corked++;
};
Expand Down Expand Up @@ -372,7 +373,7 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
// If we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, callback) {
function writeOrBuffer(stream, state, chunk, encoding, ahead, callback) {
const len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand All @@ -384,7 +385,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
state.needDrain = true;

if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
state.buffered[ahead ? 'unshift' : 'push']({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
}
Expand Down
53 changes: 53 additions & 0 deletions test/parallel/test-stream-writable-write-ahead.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict';
const common = require('../common');
const assert = require('assert');

const { Writable } = require('stream');

{
const w = new Writable({
writev: common.mustCall((chunks, cb) => {
assert.deepStrictEqual(
Buffer('4213'),
Buffer.concat(chunks.map(({ chunk }) => chunk)),
);
cb();
})
});
w.cork();
w.write('1');
w.writeAhead('2');
w.write('3');
w.writeAhead('4');
w.end();
}

{
const items = [
{ a: 1 },
{ b: 2 },
{ c: 3 },
{ d: 4 },
];
const w = new Writable({
objectMode: true,
writev: common.mustCall((chunks, cb) => {
assert.deepStrictEqual(
[
items[2],
items[0],
items[1],
items[3],
],
chunks.map(({ chunk }) => chunk),
);
cb();
}),
});
w.cork();
w.writeAhead(items[0]);
w.write(items[1]);
w.writeAhead(items[2]);
w.write(items[3]);
w.end();
}