Skip to content

Commit

Permalink
stream: avoid tick in writable hot path
Browse files Browse the repository at this point in the history
PR-URL: #49966
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
ronag authored Oct 1, 2023
1 parent 099e2f7 commit 5de25de
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -612,24 +612,31 @@ function onwrite(stream, er) {
}

if (sync) {
const needDrain = state.length === 0 && (state.state & kNeedDrain) !== 0;
const needTick = needDrain || (state.state & kDestroyed !== 0) || cb !== nop;

// It is a common case that the callback passed to .write() is always
// the same. In that case, we do not schedule a new nextTick(), but
// rather just increase a counter, to improve performance and avoid
// memory allocations.
if (cb === nop) {
if ((state.state & kAfterWritePending) === 0) {
if ((state.state & kAfterWritePending) === 0 && needTick) {
process.nextTick(afterWrite, stream, state, 1, cb);
state.state |= kAfterWritePending;
} else {
state.pendingcb -= 1;
state.pendingcb--;
finishMaybe(stream, state, true);
}
} else if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
} else if ((state.state & kAfterWriteTickInfo) !== 0 &&
state[kAfterWriteTickInfoValue].cb === cb) {
state[kAfterWriteTickInfoValue].count++;
} else if (needTick) {
state[kAfterWriteTickInfoValue] = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state[kAfterWriteTickInfoValue]);
state.state |= (kAfterWritePending | kAfterWriteTickInfo);
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
state.state |= kAfterWritePending;
state.pendingcb--;
finishMaybe(stream, state, true);
}
} else {
afterWrite(stream, state, 1, cb);
Expand All @@ -638,7 +645,8 @@ function onwrite(stream, er) {
}

function afterWriteTick({ stream, state, count, cb }) {
state.afterWriteTickInfo = null;
state.state &= ~kAfterWriteTickInfo;
state[kAfterWriteTickInfoValue] = null;
return afterWrite(stream, state, count, cb);
}

Expand Down Expand Up @@ -795,6 +803,8 @@ Writable.prototype.end = function(chunk, encoding, cb) {
if (typeof cb === 'function') {
if (err) {
process.nextTick(cb, err);
} else if ((state.state & kErrored) !== 0) {
process.nextTick(cb, state[kErroredValue]);
} else if ((state.state & kFinished) !== 0) {
process.nextTick(cb, null);
} else {
Expand Down

0 comments on commit 5de25de

Please sign in to comment.