Skip to content

Commit

Permalink
events: try to improve stream creation perf
Browse files Browse the repository at this point in the history
  • Loading branch information
rluvaton committed May 12, 2024
1 parent b59447c commit e787353
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
42 changes: 29 additions & 13 deletions lib/internal/events/event_emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {
kMaxEventTargetListeners,
kMaxEventTargetListenersWarned,
kImpl,
kPreInitiated,
kIsFastPath,
kSwitchToSlowPath,
kInitialEvents,
Expand Down Expand Up @@ -222,6 +223,13 @@ ObjectDefineProperties(EventEmitter.prototype, {
configurable: false,
writable: true,
},
[kPreInitiated]: {
__proto__: null,
value: false,
enumerable: false,
configurable: false,
writable: true,
},
[kIsFastPath]: {
__proto__: null,
value: undefined,
Expand All @@ -240,7 +248,10 @@ ObjectDefineProperties(EventEmitter.prototype, {
__proto__: null,
enumerable: true,
get: function() {
return this[kImpl]?._events;
if(this[kImpl] === undefined) {
return undefined;
}
return this[kImpl]._events;
},
set: function(arg) {
// If using the _events setter move to slow path to avoid bugs with incorrect shape or functions
Expand Down Expand Up @@ -347,6 +358,23 @@ EventEmitter.setMaxListeners =
EventEmitter.init = function(opts) {
let thisPrototype;

this._maxListeners = this._maxListeners || undefined;


if (opts?.captureRejections) {
validateBoolean(opts.captureRejections, 'options.captureRejections');
this[kCapture] = Boolean(opts.captureRejections);
} else {
// Assigning the kCapture property directly saves an expensive
// prototype lookup in a very sensitive hot path.
this[kCapture] = EventEmitter.prototype[kCapture];
}

if(this[kPreInitiated] === true) {
this[kPreInitiated] = false;
return;
}

if (this[kImpl] === undefined || (thisPrototype = ObjectGetPrototypeOf(this))[kImpl] === this[kImpl]) {
this[kImpl] = new FastEventEmitter(this);
this[kIsFastPath] = true;
Expand All @@ -368,18 +396,6 @@ EventEmitter.init = function(opts) {
impl._eventsCount = 0;
impl[kShapeMode] = false;
}

this._maxListeners = this._maxListeners || undefined;


if (opts?.captureRejections) {
validateBoolean(opts.captureRejections, 'options.captureRejections');
this[kCapture] = Boolean(opts.captureRejections);
} else {
// Assigning the kCapture property directly saves an expensive
// prototype lookup in a very sensitive hot path.
this[kCapture] = EventEmitter.prototype[kCapture];
}
};

EventEmitter.prototype[kSwitchToSlowPath] = function() {
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/events/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const kMaxEventTargetListenersWarned =
const kWatermarkData = SymbolFor('nodejs.watermarkData');

const kImpl = Symbol('kImpl');
const kPreInitiated = Symbol('kPreInitiated');
const kIsFastPath = Symbol('kIsFastPath');
const kSwitchToSlowPath = Symbol('kSwitchToSlowPath');
const kRejection = SymbolFor('nodejs.rejection');
Expand All @@ -31,6 +32,7 @@ module.exports = {
kMaxEventTargetListenersWarned,
kWatermarkData,
kImpl,
kPreInitiated,
kIsFastPath,
kSwitchToSlowPath,
kRejection,
Expand Down
15 changes: 10 additions & 5 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Readable.ReadableState = ReadableState;

const EE = require('events');
const { Stream, prependListener } = require('internal/streams/legacy');
const { kInitialEvents } = require('internal/events/symbols');
const { kInitialEvents, kImpl, kShapeMode, kIsFastPath, kPreInitiated} = require('internal/events/symbols');
const { Buffer } = require('buffer');

const {
Expand Down Expand Up @@ -92,6 +92,7 @@ const FastBuffer = Buffer[SymbolSpecies];

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
const {FastEventEmitter} = require("internal/events/fast_event_emitter");

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
Expand Down Expand Up @@ -320,9 +321,9 @@ function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);

const thisEvents = this._events;
if (thisEvents === undefined || thisEvents === null) {
this[kInitialEvents] ??= {
if (this[kImpl] === undefined && this._events === undefined) {
this[kIsFastPath] = true;
this[kImpl] = new FastEventEmitter(this, this[kInitialEvents] !== undefined ? this[kInitialEvents] :{
close: undefined,
error: undefined,
data: undefined,
Expand All @@ -335,7 +336,10 @@ function Readable(options) {
// unpipe: undefined,
// [destroyImpl.kConstruct]: undefined,
// [destroyImpl.kDestroy]: undefined,
};
});
this[kImpl][kShapeMode] = true;
} else {
this[kPreInitiated] = false;
}

this._readableState = new ReadableState(options, this, false);
Expand Down Expand Up @@ -363,6 +367,7 @@ function Readable(options) {
}
}

Readable.prototype[kPreInitiated] = true;
Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
Expand Down

0 comments on commit e787353

Please sign in to comment.