Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add multi channel query subscription. #587

Merged
merged 1 commit into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/middleware/actions.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ This action has these additional `context` properties:

> The query's projection [fields]({{ site.baseurl }}{% link api/backend.md %}#addprojection)

`channel` -- string
`channel` -- string (deprecated)

> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channel the query will subscribe to. Defaults to its collection channel.
> This property is deprecated use `channels` instead. The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel.

`channels` -- string[]

> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel.

`query` -- Object

Expand Down
2 changes: 1 addition & 1 deletion docs/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ backend.use('commit', (context, next) => {

backend.use('query', (context, next) => {
// Set our query to only listen for changes on our user-specific channel
context.channel = userChannel(context)
context.channels = [userChannel(context)]
next()
})

Expand Down
49 changes: 43 additions & 6 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,25 +661,62 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba
'DB does not support subscribe'
));
}
backend.pubsub.subscribe(request.channel, function(err, stream) {
if (err) return callback(err);

var channels = request.channels;

if (request.channel) {
logger.warn(
'[DEPRECATED] "query" middleware\'s context.channel is deprecated, use context.channels instead. ' +
'Read more: https://share.github.io/sharedb/middleware/actions#query'
);
channels = [request.channel];
}

if (!channels || !channels.length) {
return callback(new ShareDBError(ERROR_CODE.ERR_QUERY_CHANNEL_MISSING, 'Required minimum one query channel.'));
}

var streams = [];

function destroyStreams() {
streams.forEach(function(stream) {
stream.destroy();
});
ericyhwang marked this conversation as resolved.
Show resolved Hide resolved
}

function createQueryEmitter() {
if (options.ids) {
var queryEmitter = new QueryEmitter(request, stream, options.ids);
var queryEmitter = new QueryEmitter(request, streams, options.ids);
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);
callback(null, queryEmitter);
return;
}
// Issue query on db to get our initial results
backend._query(agent, request, function(err, snapshots, extra) {
if (err) {
stream.destroy();
destroyStreams();
return callback(err);
}
var ids = pluckIds(snapshots);
var queryEmitter = new QueryEmitter(request, stream, ids, extra);
var queryEmitter = new QueryEmitter(request, streams, ids, extra);
backend.emit('timing', 'querySubscribe.initial', Date.now() - start, request);
callback(null, queryEmitter, snapshots, extra);
});
}

channels.forEach(function(channel) {
ericyhwang marked this conversation as resolved.
Show resolved Hide resolved
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) {
destroyStreams();
return callback(err);
}
streams.push(stream);

var subscribedToAllChannels = streams.length === channels.length;
if (subscribedToAllChannels) {
createQueryEmitter();
}
});
});
});
};
Expand All @@ -693,7 +730,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac
collection: collection,
projection: projection,
fields: fields,
channel: this.getCollectionChannel(collection),
channels: [this.getCollectionChannel(collection)],
query: query,
options: options,
db: null,
Expand Down
1 change: 1 addition & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ShareDBError.CODES = {
ERR_OT_OP_NOT_PROVIDED: 'ERR_OT_OP_NOT_PROVIDED',
ERR_PRESENCE_TRANSFORM_FAILED: 'ERR_PRESENCE_TRANSFORM_FAILED',
ERR_PROTOCOL_VERSION_NOT_SUPPORTED: 'ERR_PROTOCOL_VERSION_NOT_SUPPORTED',
ERR_QUERY_CHANNEL_MISSING: 'ERR_QUERY_CHANNEL_MISSING',
ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED',
/**
* A special error that a "readSnapshots" middleware implementation can use to indicate that it
Expand Down
57 changes: 34 additions & 23 deletions lib/query-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var util = require('./util');

var ERROR_CODE = ShareDBError.CODES;

function QueryEmitter(request, stream, ids, extra) {
function QueryEmitter(request, streams, ids, extra) {
this.backend = request.backend;
this.agent = request.agent;
this.db = request.db;
Expand All @@ -15,18 +15,20 @@ function QueryEmitter(request, stream, ids, extra) {
this.fields = request.fields;
this.options = request.options;
this.snapshotProjection = request.snapshotProjection;
this.stream = stream;
this.streams = streams;
this.ids = ids;
this.extra = extra;

this.skipPoll = this.options.skipPoll || util.doNothing;
this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
this.pollDebounce =
(typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce :
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0;
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce :
streams.length > 1 ? 1000 : 0;
this.pollInterval =
(typeof this.options.pollInterval === 'number') ? this.options.pollInterval :
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0;
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval :
streams.length > 1 ? 1000 : 0;

this._polling = false;
this._pendingPoll = null;
Expand All @@ -41,23 +43,32 @@ QueryEmitter.prototype._open = function() {
this._defaultCallback = function(err) {
if (err) emitter.onError(err);
};
emitter.stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
emitter.stream.on('end', function() {
emitter.destroy();

emitter.streams.forEach(function(stream) {
stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
stream.on('end', function() {
emitter.destroy();
});
});

// Make sure we start polling if pollInterval is being used
this._flushPoll();
};

QueryEmitter.prototype.destroy = function() {
clearTimeout(this._pollDebounceId);
clearTimeout(this._pollIntervalId);
this.stream.destroy();

var stream;

while (stream = this.streams.pop()) {
stream.destroy();
}
};

QueryEmitter.prototype._emitTiming = function(action, start) {
Expand Down Expand Up @@ -140,8 +151,8 @@ QueryEmitter.prototype._flushPoll = function() {
if (this._pendingPoll) {
this.queryPoll();

// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
} else if (this.pollInterval) {
var emitter = this;
this._pollIntervalId = setTimeout(function() {
Expand Down Expand Up @@ -301,14 +312,14 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) {
// all messages are received and applied in order, so it is critical that none
// are dropped.
QueryEmitter.prototype.onError =
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};

function getInserted(diff) {
var inserted = [];
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"ot-json1": "^0.3.0",
"rich-text": "^4.1.0",
"sharedb-legacy": "npm:sharedb@=1.1.0",
"sinon": "^7.5.0"
"sinon": "^9.2.4"
},
"files": [
"lib/",
Expand Down
25 changes: 0 additions & 25 deletions test/client/presence/presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ describe('Presence', function() {
});

afterEach(function(done) {
sinon.restore();
connection1.close();
connection2.close();
backend.close(done);
Expand Down Expand Up @@ -147,30 +146,6 @@ describe('Presence', function() {
], errorHandler(done));
});

it('destroys old local presence but keeps new local presence when getting during destroy', function(done) {
presence2.create('presence-2');
var presence2a;

async.series([
presence2.subscribe.bind(presence2),
function(next) {
presence2.destroy(function() {
expect(presence2).to.equal(presence2a);
expect(Object.keys(presence2.localPresences)).to.eql(['presence-2a']);
done();
});
next();
},
function(next) {
presence2a = connection2.getPresence('test-channel');
presence2a.create('presence-2a');
presence2a.subscribe(function(error) {
next(error);
});
}
], errorHandler(done));
});

it('throws if trying to create local presence when wanting destroy', function(done) {
presence2.destroy(errorHandler(done));
expect(function() {
Expand Down
Loading