Skip to content

Commit

Permalink
stream: call helper function from push and unshift
Browse files Browse the repository at this point in the history
PR-URL: nodejs#50173
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
rluvaton authored and kumarrishav committed Oct 16, 2023
1 parent 068b548 commit b5b617b
Showing 1 changed file with 140 additions and 53 deletions.
193 changes: 140 additions & 53 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() {
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false);
debug('push', chunk);

const state = this._readableState;
return (state[kState] & kObjectMode) === 0 ?
readableAddChunkPushByteMode(this, state, chunk, encoding) :
readableAddChunkPushObjectMode(this, state, chunk, encoding);
};

// Unshift should *always* be something directly out of read().
Readable.prototype.unshift = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, true);
debug('unshift', chunk);
const state = this._readableState;
return (state[kState] & kObjectMode) === 0 ?
readableAddChunkUnshiftByteMode(this, state, chunk, encoding) :
readableAddChunkUnshiftObjectMode(this, state, chunk);
};

function readableAddChunk(stream, chunk, encoding, addToFront) {
debug('readableAddChunk', chunk);
const state = stream._readableState;

let err;
if ((state[kState] & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
if (addToFront && state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);

return false;
}

if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
if (state.encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else {
chunk = Buffer.from(chunk, encoding);
}
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk != null) {
err = new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
}
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
} else if (chunk !== undefined && !(chunk instanceof Buffer)) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
return false;
}

if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {

if (!(chunk && chunk.length > 0)) {
return canPushMore(state);
}

return readableAddChunkUnshiftValue(stream, state, chunk);
}

function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
} else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed || state.errored) {
return false;
} else {
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}

return false;
}

return readableAddChunkUnshiftValue(stream, state, chunk);
}

function readableAddChunkUnshiftValue(stream, state, chunk) {
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
else
addChunk(stream, state, chunk, true);

return canPushMore(state);
}

function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);

return false;
}

if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
} else if (!addToFront) {
} else if (chunk instanceof Buffer) {
encoding = '';
} else if (Stream._isUint8Array(chunk)) {
chunk = Stream._uint8ArrayToBuffer(chunk);
encoding = '';
} else if (chunk !== undefined) {
errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
return false;
}

if (!chunk || chunk.length <= 0) {
state[kState] &= ~kReading;
maybeReadMore(stream, state);

return canPushMore(state);
}

if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());

return false;
}

if (state.destroyed || state.errored) {
return false;
}

state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (chunk.length === 0) {
maybeReadMore(stream, state);

return canPushMore(state);
}
}

addChunk(stream, state, chunk, false);
return canPushMore(state);
}

function readableAddChunkPushObjectMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);

return false;
}

if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
return false;
}

if (state.destroyed || state.errored) {
return false;
}

state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
}

addChunk(stream, state, chunk, false);
return canPushMore(state);
}

function canPushMore(state) {
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
Expand Down

0 comments on commit b5b617b

Please sign in to comment.