Skip to content

Commit

Permalink
stream: writable state bitmap
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Sep 27, 2023
1 parent c19b2a7 commit de32558
Showing 1 changed file with 88 additions and 35 deletions.
123 changes: 88 additions & 35 deletions lib/internal/streams/writable.js
Expand Up @@ -73,6 +73,8 @@ ObjectSetPrototypeOf(Writable, Stream);
function nop() {}

const kOnFinished = Symbol('kOnFinished');
const kErrored = Symbol('kErrored');
const kCorked = Symbol('kCorked');

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
Expand All @@ -94,6 +96,12 @@ const kBufferProcessing = 1 << 16;
const kPrefinished = 1 << 17;
const kAllBuffers = 1 << 18;
const kAllNoop = 1 << 19;
const kHasOnFinished = 1 << 20;
const kHasErrored = 1 << 21;

const kCorkedShift = 22;
const kCorkedMask = 0b1111
const kCorked = kCorkedMask << kCorkedShift; // 4 bits

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -176,6 +184,46 @@ ObjectDefineProperties(WritableState.prototype, {

allBuffers: makeBitMapDescriptor(kAllBuffers),
allNoop: makeBitMapDescriptor(kAllNoop),

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
// This is/should be a cold path.
errored: {
enumerable: false,
get() { return (this.state & kHasErrored) !== 0 ? this[kErrored] : null; },
set(value) {
if (value) {
this[kErrored] = value;
this.state |= kHasErrored;
} else {
delete this[kErrored];
this.state &= ~kHasErrored;
}
},
},

// When true all writes will be buffered until .uncork() call.
// This is/should be a cold path.
corked: {
enumerable: false,
get() {
const val = (this.state >>> kCorkedShift) & kCorkedMask;
return val < kCorkedMask ? val : this[kCorked];
},
set(value) {
if (value >= kCorkedMask) {
this[kCorked] = value;
this.state |= kCorkedMask << kCorkedShift;
} else {
if ((this.state >>> kCorkedShift) === kCorkedMask) {
delete this[kCorked];
}
this.state &= ~kCorked;
this.state |= value << kCorkedShift;
}
},
}
});

function WritableState(options, stream, isDuplex) {
Expand Down Expand Up @@ -226,9 +274,6 @@ function WritableState(options, stream, isDuplex) {
// socket or file.
this.length = 0;

// When true all writes will be buffered until .uncork() call.
this.corked = 0;

// The callback that's passed to _write(chunk, cb).
this.onwrite = onwrite.bind(undefined, stream);

Expand All @@ -247,13 +292,6 @@ function WritableState(options, stream, isDuplex) {
// Number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0;

// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = null;

this[kOnFinished] = [];
}

function resetBuffer(state) {
Expand Down Expand Up @@ -394,13 +432,21 @@ Writable.prototype.write = function(chunk, encoding, cb) {
};

Writable.prototype.cork = function() {
this._writableState.corked++;
const state = this._writableState;

if ((state & kCorked) === 0) {
this.state |= 1 << kCorkedShift;
} else {
// TODO: Optimize
this._writableState.corked++;
}
};

Writable.prototype.uncork = function() {
const state = this._writableState;

if (state.corked) {
if ((state.state & kCorked) !== 0) {
// TODO: Optimize
state.corked--;

if ((state.state & kWriting) === 0)
Expand Down Expand Up @@ -432,7 +478,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
if (!ret)
state.state |= kNeedDrain;

if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) {
if ((state.state & kWriting) !== 0 || (state.state & (kHasErrored | kCorked)) || (state.state & kConstructed) === 0) {
state.buffered.push({ chunk, encoding, callback });
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
state.state &= ~kAllBuffers;
Expand All @@ -450,7 +496,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && (state.state & kDestroyed) === 0;
return ret && (state.state & kHasErrored) === 0 && (state.state & kDestroyed) === 0;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
Expand Down Expand Up @@ -498,7 +544,7 @@ function onwrite(stream, er) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
er.stack; // eslint-disable-line no-unused-expressions

if (!state.errored) {
if ((state.state & kHasErrored) === 0) {
state.errored = er;
}

Expand Down Expand Up @@ -573,18 +619,19 @@ function errorBuffer(state) {
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
}

const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
if ((state.state & kHasOnFinished) !== 0) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
}
}

resetBuffer(state);
}

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked ||
(state.state & (kDestroyed | kBufferProcessing)) !== 0 ||
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
(state.state & kConstructed) === 0) {
return;
}
Expand Down Expand Up @@ -669,14 +716,14 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}

// .end() fully uncorks.
if (state.corked) {
if ((state.state & kCorked) !== 0) {
state.corked = 1;
this.uncork();
}

if (err) {
// Do nothing...
} else if (!state.errored && (state.state & kEnding) === 0) {
} else if ((state.state & kErrored) === 0 && (state.state & kEnding) === 0) {
// This is forgiving in terms of unnecessary calls to end() and can hide
// logic errors. However, usually such errors are harmless and causing a
// hard error can be disproportionately destructive. It is not always
Expand All @@ -698,6 +745,8 @@ Writable.prototype.end = function(chunk, encoding, cb) {
} else if ((state.state & kFinished) !== 0) {
process.nextTick(cb, null);
} else {
state.state |= kHasOnFinished;
state[kOnFinished] ??= [];
state[kOnFinished].push(cb);
}
}
Expand All @@ -715,10 +764,10 @@ function needFinish(state) {
kFinished |
kWriting |
kErrorEmitted |
kCloseEmitted
kCloseEmitted |
kHasErrored
)) === (kEnding | kConstructed) &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0);
}

Expand All @@ -734,9 +783,11 @@ function callFinal(stream, state) {

state.pendingcb--;
if (err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err);
if ((state.state & kHasOnFinished) !== 0) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err);
}
}
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
} else if (needFinish(state)) {
Expand Down Expand Up @@ -799,9 +850,11 @@ function finish(stream, state) {
state.pendingcb--;
state.state |= kFinished;

const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](null);
if ((state.state & kHasOnFinished) !== 0) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](null);
}
}

stream.emit('finish');
Expand Down Expand Up @@ -853,8 +906,8 @@ ObjectDefineProperties(Writable.prototype, {
// where the writable side was disabled upon construction.
// Compat. The user might manually disable writable side through
// deprecated setter.
return !!w && w.writable !== false && !w.errored &&
(w.state & (kEnding | kEnded | kDestroyed)) === 0;
return !!w && w.writable !== false &&
(w.state & (kEnding | kEnded | kDestroyed | kHasErrored)) === 0;
},
set(val) {
// Backwards compatible.
Expand Down Expand Up @@ -928,7 +981,7 @@ ObjectDefineProperties(Writable.prototype, {
__proto__: null,
enumerable: false,
get() {
return this._writableState ? this._writableState.errored : null;
return this._writableState && (this._writableState.state & kHasErrored) !== 0 ? this._writableState.errored : null;
},
},

Expand All @@ -938,7 +991,7 @@ ObjectDefineProperties(Writable.prototype, {
get: function() {
return !!(
this._writableState.writable !== false &&
((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) &&
(this._writableState.state & (kDestroyed | kHasErrored)) !== 0 &&
(this._writableState.state & kFinished) === 0
);
},
Expand All @@ -952,7 +1005,7 @@ Writable.prototype.destroy = function(err, cb) {
// Invoke pending callbacks.
if ((state.state & kDestroyed) === 0 &&
(state.bufferedIndex < state.buffered.length ||
state[kOnFinished].length)) {
(((state.state & kHasOnFinished) !== 0) && state[kOnFinished].length))) {
process.nextTick(errorBuffer, state);
}

Expand Down

0 comments on commit de32558

Please sign in to comment.