Skip to content

Commit

Permalink
✨ Add multi channel query subscription.
Browse files Browse the repository at this point in the history
At the moment we can only specify one channel the specific query, would be able to listen to, as per docs:
```javascript
backend.use('query', (context, next) => {
  // Set our query to only listen for changes on our user-specific channel
  context.channel = userChannel(context)
  next()
})
```

However let's imagine the situation where the user wants to query all
the posts, the where posted by him and all his friends. Now we would need
new query every for friend separately to make sure the proper scalability is
preserved and we do not receive all the changes to posts collection. This change allows to listen for multiple channels, so if we
want to query all user friends posts. We can do it by this:
```javascript
backend.use('query', (context, next) => {
  // Set our query to only listen for changes on our user-specific channel
  context.channels = [userChannel(context), friendChannel(context))]
  next()
})
```

Now this query would only listen to all the changes that were made to the user
posts and his friends.
  • Loading branch information
Dawidpol committed Feb 14, 2023
1 parent cc0e338 commit c5dd7d7
Show file tree
Hide file tree
Showing 5 changed files with 567 additions and 528 deletions.
2 changes: 1 addition & 1 deletion docs/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ backend.use('commit', (context, next) => {

backend.use('query', (context, next) => {
// Set our query to only listen for changes on our user-specific channel
context.channel = userChannel(context)
context.channels = [userChannel(context)]
next()
})

Expand Down
46 changes: 40 additions & 6 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,25 +661,59 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba
'DB does not support subscribe'
));
}
backend.pubsub.subscribe(request.channel, function(err, stream) {
if (err) return callback(err);

var channels = request.channels;

if (request.channel) {
logger.warn('[DEPRECATED] Do no use channel for query, use channels instead.');
channels = [request.channel];
}

if (!channels || !channels.length) {
return callback(new ShareDBError(ERROR_CODE.ERR_MISSING_QUERY_CHANNEL, 'Required minimum one query channel.'));
}

var streams = [];

function destroyStreams() {
streams.forEach(function(stream) {
stream.destroy();
});
}

function createQueryEmitter() {
if (options.ids) {
var queryEmitter = new QueryEmitter(request, stream, options.ids);
var queryEmitter = new QueryEmitter(request, streams, options.ids);
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);
callback(null, queryEmitter);
return;
}
// Issue query on db to get our initial results
backend._query(agent, request, function(err, snapshots, extra) {
if (err) {
stream.destroy();
destroyStreams();
return callback(err);
}
var ids = pluckIds(snapshots);
var queryEmitter = new QueryEmitter(request, stream, ids, extra);
var queryEmitter = new QueryEmitter(request, streams, ids, extra);
backend.emit('timing', 'querySubscribe.initial', Date.now() - start, request);
callback(null, queryEmitter, snapshots, extra);
});
}

channels.forEach(function(channel) {
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) {
destroyStreams();
return callback(err);
}
streams.push(stream);

var subscribedToAllChannels = streams.length === channels.length;
if (subscribedToAllChannels) {
createQueryEmitter();
}
});
});
});
};
Expand All @@ -693,7 +727,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac
collection: collection,
projection: projection,
fields: fields,
channel: this.getCollectionChannel(collection),
channels: [this.getCollectionChannel(collection)],
query: query,
options: options,
db: null,
Expand Down
3 changes: 2 additions & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ ShareDBError.CODES = {
ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND',
ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED',
ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE',
ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR'
ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR',
ERR_MISSING_QUERY_CHANNEL: 'ERR_MISSING_QUERY_CHANNEL'
};

module.exports = ShareDBError;
57 changes: 34 additions & 23 deletions lib/query-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var util = require('./util');

var ERROR_CODE = ShareDBError.CODES;

function QueryEmitter(request, stream, ids, extra) {
function QueryEmitter(request, streams, ids, extra) {
this.backend = request.backend;
this.agent = request.agent;
this.db = request.db;
Expand All @@ -15,18 +15,20 @@ function QueryEmitter(request, stream, ids, extra) {
this.fields = request.fields;
this.options = request.options;
this.snapshotProjection = request.snapshotProjection;
this.stream = stream;
this.streams = streams;
this.ids = ids;
this.extra = extra;

this.skipPoll = this.options.skipPoll || util.doNothing;
this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
this.pollDebounce =
(typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce :
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0;
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce :
streams.length > 1 ? 1000 : 0;
this.pollInterval =
(typeof this.options.pollInterval === 'number') ? this.options.pollInterval :
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0;
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval :
streams.length > 1 ? 1000 : 0;

this._polling = false;
this._pendingPoll = null;
Expand All @@ -41,23 +43,32 @@ QueryEmitter.prototype._open = function() {
this._defaultCallback = function(err) {
if (err) emitter.onError(err);
};
emitter.stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
emitter.stream.on('end', function() {
emitter.destroy();

emitter.streams.forEach(function(stream) {
stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
stream.on('end', function() {
emitter.destroy();
});
});

// Make sure we start polling if pollInterval is being used
this._flushPoll();
};

QueryEmitter.prototype.destroy = function() {
clearTimeout(this._pollDebounceId);
clearTimeout(this._pollIntervalId);
this.stream.destroy();

var stream;

while (stream = this.streams.pop()) {
stream.destroy();
}
};

QueryEmitter.prototype._emitTiming = function(action, start) {
Expand Down Expand Up @@ -140,8 +151,8 @@ QueryEmitter.prototype._flushPoll = function() {
if (this._pendingPoll) {
this.queryPoll();

// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
} else if (this.pollInterval) {
var emitter = this;
this._pollIntervalId = setTimeout(function() {
Expand Down Expand Up @@ -301,14 +312,14 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) {
// all messages are received and applied in order, so it is critical that none
// are dropped.
QueryEmitter.prototype.onError =
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};

function getInserted(diff) {
var inserted = [];
Expand Down

0 comments on commit c5dd7d7

Please sign in to comment.