Skip to content

Commit

Permalink
馃攪 Fix sensitive memory leak warnings when using presence
Browse files Browse the repository at this point in the history
Fixes #584

At the moment, we [attach `Doc` listeners][1] for every remote client in
a document (as well as every local user).

This means `Doc`s can quickly hit the [default of 10][2] maximum event
listeners.

This change adds a `DocPresenceEmitter` class, which only registers any
given `Doc` listener once, regardless of how many presence instances
there are. It then forwards events on through its own emitters, which
have a much higher value of 1000 max listeners set, in order to avoid
too much alert spam, but also keep *some* checking for memory leaks.

[1]: https://github.com/share/sharedb/blob/cc0e3382bf3df4c38c7ccc6bc52da71aff8c9f74/lib/client/presence/remote-doc-presence.js#L42-L47
[2]: https://nodejs.org/api/events.html#eventsdefaultmaxlisteners
  • Loading branch information
alecgibson committed Feb 1, 2023
1 parent cc0e338 commit 4d17994
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 18 deletions.
2 changes: 2 additions & 0 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var ACTIONS = require('../message-actions').ACTIONS;
var types = require('../types');
var util = require('../util');
var logger = require('../logger');
var DocPresenceEmitter = require('./presence/doc-presence-emitter');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -51,6 +52,7 @@ function Connection(socket) {

// Maps from channel -> presence objects
this._presences = {};
this._docPresenceEmitter = new DocPresenceEmitter();

// Map from snapshot request ID -> snapshot request
this._snapshotRequests = {};
Expand Down
83 changes: 83 additions & 0 deletions lib/client/presence/doc-presence-emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
var util = require('../../util');
var EventEmitter = require('events').EventEmitter;

var EVENTS = [
'create',
'del',
'destroy',
'load',
'op'
];

module.exports = DocPresenceEmitter;

function DocPresenceEmitter() {
this._docs = {};
this._forwarders = {};
this._emitters = {};
}

DocPresenceEmitter.prototype.addEventListener = function(doc, event, listener) {
this._registerDoc(doc);
var emitter = util.dig(this._emitters, doc.collection, doc.id);
emitter.on(event, listener);
};

DocPresenceEmitter.prototype.removeEventListener = function(doc, event, listener) {
var emitter = util.dig(this._emitters, doc.collection, doc.id);
if (!emitter) return;
emitter.off(event, listener);
// We'll always have at least one, because of the destroy listener
if (emitter._eventsCount === 1) this._unregisterDoc(doc);
};

DocPresenceEmitter.prototype._registerDoc = function(doc) {
var alreadyRegistered = true;
var registered = util.digOrCreate(this._docs, doc.collection, doc.id, function() {
alreadyRegistered = false;
return doc;
});

if (registered !== doc) {
this._unregisterDoc(registered);
return this._registerDoc(doc);
}

if (alreadyRegistered) return;

var emitter = util.digOrCreate(this._emitters, doc.collection, doc.id, function() {
var e = new EventEmitter();
// Set a high limit to avoid unnecessary warnings, but still
// retain some degree of memory leak detection
e.setMaxListeners(1000);
return e;
});

var self = this;
EVENTS.forEach(function(event) {
var forwarder = util.digOrCreate(self._forwarders, doc.collection, doc.id, event, function() {
return function() {
Array.prototype.unshift.call(arguments, event);
emitter.emit.apply(emitter, arguments);
};
});

doc.on(event, forwarder);
});

this.addEventListener(doc, 'destroy', this._unregisterDoc.bind(this, doc));
};

DocPresenceEmitter.prototype._unregisterDoc = function(doc) {
var forwarders = util.dig(this._forwarders, doc.collection, doc.id);
for (var event in forwarders) {
doc.off(event, forwarders[event]);
}

var emitter = util.dig(this._emitters, doc.collection, doc.id);
emitter.removeAllListeners();

util.digAndRemove(this._forwarders, doc.collection, doc.id);
util.digAndRemove(this._emitters, doc.collection, doc.id);
util.digAndRemove(this._docs, doc.collection, doc.id);
};
21 changes: 11 additions & 10 deletions lib/client/presence/local-doc-presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ function LocalDocPresence(presence, presenceId) {
this.id = this.presence.id;

this._doc = this.connection.get(this.collection, this.id);
this._emitter = this.connection._docPresenceEmitter;
this._isSending = false;
this._docDataVersionByPresenceVersion = {};

Expand Down Expand Up @@ -43,11 +44,11 @@ LocalDocPresence.prototype.submit = function(value, callback) {
};

LocalDocPresence.prototype.destroy = function(callback) {
this._doc.removeListener('op', this._opHandler);
this._doc.removeListener('create', this._createOrDelHandler);
this._doc.removeListener('del', this._createOrDelHandler);
this._doc.removeListener('load', this._loadHandler);
this._doc.removeListener('destroy', this._destroyHandler);
this._emitter.removeEventListener(this._doc, 'op', this._opHandler);
this._emitter.removeEventListener(this._doc, 'create', this._createOrDelHandler);
this._emitter.removeEventListener(this._doc, 'del', this._createOrDelHandler);
this._emitter.removeEventListener(this._doc, 'load', this._loadHandler);
this._emitter.removeEventListener(this._doc, 'destroy', this._destroyHandler);

LocalPresence.prototype.destroy.call(this, callback);
};
Expand All @@ -72,11 +73,11 @@ LocalDocPresence.prototype._sendPending = function() {
};

LocalDocPresence.prototype._registerWithDoc = function() {
this._doc.on('op', this._opHandler);
this._doc.on('create', this._createOrDelHandler);
this._doc.on('del', this._createOrDelHandler);
this._doc.on('load', this._loadHandler);
this._doc.on('destroy', this._destroyHandler);
this._emitter.addEventListener(this._doc, 'op', this._opHandler);
this._emitter.addEventListener(this._doc, 'create', this._createOrDelHandler);
this._emitter.addEventListener(this._doc, 'del', this._createOrDelHandler);
this._emitter.addEventListener(this._doc, 'load', this._loadHandler);
this._emitter.addEventListener(this._doc, 'destroy', this._destroyHandler);
};

LocalDocPresence.prototype._transformAgainstOp = function(op, source) {
Expand Down
17 changes: 9 additions & 8 deletions lib/client/presence/remote-doc-presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ function RemoteDocPresence(presence, presenceId) {
this.presenceVersion = null;

this._doc = this.connection.get(this.collection, this.id);
this._emitter = this.connection._docPresenceEmitter;
this._pending = null;
this._opCache = null;
this._pendingSetPending = false;
Expand All @@ -31,19 +32,19 @@ RemoteDocPresence.prototype.receiveUpdate = function(message) {
};

RemoteDocPresence.prototype.destroy = function(callback) {
this._doc.removeListener('op', this._opHandler);
this._doc.removeListener('create', this._createDelHandler);
this._doc.removeListener('del', this._createDelHandler);
this._doc.removeListener('load', this._loadHandler);
this._emitter.removeEventListener(this._doc, 'op', this._opHandler);
this._emitter.removeEventListener(this._doc, 'create', this._createDelHandler);
this._emitter.removeEventListener(this._doc, 'del', this._createDelHandler);
this._emitter.removeEventListener(this._doc, 'load', this._loadHandler);

RemotePresence.prototype.destroy.call(this, callback);
};

RemoteDocPresence.prototype._registerWithDoc = function() {
this._doc.on('op', this._opHandler);
this._doc.on('create', this._createDelHandler);
this._doc.on('del', this._createDelHandler);
this._doc.on('load', this._loadHandler);
this._emitter.addEventListener(this._doc, 'op', this._opHandler);
this._emitter.addEventListener(this._doc, 'create', this._createDelHandler);
this._emitter.addEventListener(this._doc, 'del', this._createDelHandler);
this._emitter.addEventListener(this._doc, 'load', this._loadHandler);
};

RemoteDocPresence.prototype._setPendingPresence = function() {
Expand Down
100 changes: 100 additions & 0 deletions test/client/presence/doc-presence-emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
var Backend = require('../../../lib/backend');
var errorHandler = require('../../util').errorHandler;
var expect = require('chai').expect;

describe('DocPresenceEmitter', function() {
var backend;
var connection;
var doc;
var emitter;

beforeEach(function(done) {
backend = new Backend();
connection = backend.connect();
doc = connection.get('books', 'northern-lights');
doc.create({title: 'Northern Lights'}, done);
emitter = connection._docPresenceEmitter;
});

it('listens to an op event', function(done) {
emitter.addEventListener(doc, 'op', function(op) {
expect(op).to.eql([{p: ['author'], oi: 'Philip Pullman'}]);
done();
});

doc.submitOp([{p: ['author'], oi: 'Philip Pullman'}], errorHandler(done));
});

it('stops listening to events', function(done) {
var listener = function() {
done(new Error('should not reach'));
};

emitter.addEventListener(doc, 'op', listener);
emitter.removeEventListener(doc, 'op', listener);

doc.submitOp([{p: ['author'], oi: 'Philip Pullman'}], done);
});

it('removes the listener from the doc if there are no more listeners', function() {
expect(doc._eventsCount).to.equal(0);
var listener = function() {};

emitter.addEventListener(doc, 'op', listener);

expect(doc._eventsCount).to.be.greaterThan(0);
expect(emitter._docs).not.to.be.empty;
expect(emitter._emitters).not.to.be.empty;
expect(emitter._forwarders).not.to.be.empty;

emitter.removeEventListener(doc, 'op', listener);

expect(doc._eventsCount).to.equal(0);
expect(emitter._docs).to.be.empty;
expect(emitter._emitters).to.be.empty;
expect(emitter._forwarders).to.be.empty;
});

it('only registers a single listener on the doc', function() {
expect(doc._eventsCount).to.equal(0);
var listener = function() { };
emitter.addEventListener(doc, 'op', listener);
var count = doc._eventsCount;
emitter.addEventListener(doc, 'op', listener);
expect(doc._eventsCount).to.equal(count);
});

it('only triggers the given event', function(done) {
emitter.addEventListener(doc, 'op', function(op) {
expect(op).to.eql([{p: ['author'], oi: 'Philip Pullman'}]);
done();
});

emitter.addEventListener(doc, 'del', function() {
done(new Error('should not reach'));
});

doc.submitOp([{p: ['author'], oi: 'Philip Pullman'}], errorHandler(done));
});

it('removes listeners on destroy', function(done) {
expect(doc._eventsCount).to.equal(0);
var listener = function() { };

emitter.addEventListener(doc, 'op', listener);

expect(doc._eventsCount).to.be.greaterThan(0);
expect(emitter._docs).not.to.be.empty;
expect(emitter._emitters).not.to.be.empty;
expect(emitter._forwarders).not.to.be.empty;

doc.destroy(function(error) {
if (error) return done(error);
expect(doc._eventsCount).to.equal(0);
expect(emitter._docs).to.be.empty;
expect(emitter._emitters).to.be.empty;
expect(emitter._forwarders).to.be.empty;
done();
});
});
});
10 changes: 10 additions & 0 deletions test/client/presence/doc-presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -1025,4 +1025,14 @@ describe('DocPresence', function() {
}
], done);
});

it('does not trigger EventEmitter memory leak warnings', function() {
for (var i = 0; i < 100; i++) {
presence1.create();
}

expect(doc1._events.op.warned).not.to.be.ok;
var emitter = connection1._docPresenceEmitter._emitters[doc1.collection][doc1.id];
expect(emitter._events.op.warned).not.to.be.ok;
});
});

0 comments on commit 4d17994

Please sign in to comment.