Skip to content

Commit

Permalink
✨ Add multi channel query subscription.
Browse files Browse the repository at this point in the history
At the moment we can only specify one channel the specific query, would be able to listen to, as per docs:
```javascript
backend.use('query', (context, next) => {
  // Set our query to only listen for changes on our user-specific channel
  context.channel = userChannel(context)
  next()
})
```

However let's imagine the situation where the user wants to query all
the posts, the where posted by him and all his friends. Now we would need
new query every for friend separately to make sure the proper scalability is
preserved and we do not receive all the changes to posts collection. This change allows to listen for multiple channels, so if we
want to query all user friends posts. We can do it by this:
```javascript
backend.use('query', (context, next) => {
  // Set our query to only listen for changes on our user-specific channel
  context.channels = [userChannel(context), friendChannel(context))]
  next()
})
```

Now this query would only listen to all the changes that were made to the user
posts and his friends.
  • Loading branch information
Dawidpol committed Feb 14, 2023
1 parent cc0e338 commit 2ca6a2f
Show file tree
Hide file tree
Showing 6 changed files with 573 additions and 530 deletions.
8 changes: 6 additions & 2 deletions docs/middleware/actions.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ This action has these additional `context` properties:

> The query's projection [fields]({{ site.baseurl }}{% link api/backend.md %}#addprojection)
`channel` -- string
`channel` -- string (deprecated)

> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channel the query will subscribe to. Defaults to its collection channel.
> This property is deprecated use `channels` instead. The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel.
`channels` -- string[]

> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel.
`query` -- Object

Expand Down
2 changes: 1 addition & 1 deletion docs/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ backend.use('commit', (context, next) => {

backend.use('query', (context, next) => {
// Set our query to only listen for changes on our user-specific channel
context.channel = userChannel(context)
context.channels = [userChannel(context)]
next()
})

Expand Down
46 changes: 40 additions & 6 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,25 +661,59 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba
'DB does not support subscribe'
));
}
backend.pubsub.subscribe(request.channel, function(err, stream) {
if (err) return callback(err);

var channels = request.channels;

if (request.channel) {
logger.warn('[DEPRECATED] Do not use channel for query, use channels instead.');
channels = [request.channel];
}

if (!channels || !channels.length) {
return callback(new ShareDBError(ERROR_CODE.ERR_MISSING_QUERY_CHANNEL, 'Required minimum one query channel.'));
}

var streams = [];

function destroyStreams() {
streams.forEach(function(stream) {
stream.destroy();
});
}

function createQueryEmitter() {
if (options.ids) {
var queryEmitter = new QueryEmitter(request, stream, options.ids);
var queryEmitter = new QueryEmitter(request, streams, options.ids);
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);
callback(null, queryEmitter);
return;
}
// Issue query on db to get our initial results
backend._query(agent, request, function(err, snapshots, extra) {
if (err) {
stream.destroy();
destroyStreams();
return callback(err);
}
var ids = pluckIds(snapshots);
var queryEmitter = new QueryEmitter(request, stream, ids, extra);
var queryEmitter = new QueryEmitter(request, streams, ids, extra);
backend.emit('timing', 'querySubscribe.initial', Date.now() - start, request);
callback(null, queryEmitter, snapshots, extra);
});
}

channels.forEach(function(channel) {
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) {
destroyStreams();
return callback(err);
}
streams.push(stream);

var subscribedToAllChannels = streams.length === channels.length;
if (subscribedToAllChannels) {
createQueryEmitter();
}
});
});
});
};
Expand All @@ -693,7 +727,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac
collection: collection,
projection: projection,
fields: fields,
channel: this.getCollectionChannel(collection),
channels: [this.getCollectionChannel(collection)],
query: query,
options: options,
db: null,
Expand Down
3 changes: 2 additions & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ ShareDBError.CODES = {
ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND',
ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED',
ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE',
ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR'
ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR',
ERR_MISSING_QUERY_CHANNEL: 'ERR_MISSING_QUERY_CHANNEL'
};

module.exports = ShareDBError;
57 changes: 34 additions & 23 deletions lib/query-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var util = require('./util');

var ERROR_CODE = ShareDBError.CODES;

function QueryEmitter(request, stream, ids, extra) {
function QueryEmitter(request, streams, ids, extra) {
this.backend = request.backend;
this.agent = request.agent;
this.db = request.db;
Expand All @@ -15,18 +15,20 @@ function QueryEmitter(request, stream, ids, extra) {
this.fields = request.fields;
this.options = request.options;
this.snapshotProjection = request.snapshotProjection;
this.stream = stream;
this.streams = streams;
this.ids = ids;
this.extra = extra;

this.skipPoll = this.options.skipPoll || util.doNothing;
this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
this.pollDebounce =
(typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce :
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0;
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce :
streams.length > 1 ? 1000 : 0;
this.pollInterval =
(typeof this.options.pollInterval === 'number') ? this.options.pollInterval :
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0;
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval :
streams.length > 1 ? 1000 : 0;

this._polling = false;
this._pendingPoll = null;
Expand All @@ -41,23 +43,32 @@ QueryEmitter.prototype._open = function() {
this._defaultCallback = function(err) {
if (err) emitter.onError(err);
};
emitter.stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
emitter.stream.on('end', function() {
emitter.destroy();

emitter.streams.forEach(function(stream) {
stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
stream.on('end', function() {
emitter.destroy();
});
});

// Make sure we start polling if pollInterval is being used
this._flushPoll();
};

QueryEmitter.prototype.destroy = function() {
clearTimeout(this._pollDebounceId);
clearTimeout(this._pollIntervalId);
this.stream.destroy();

var stream;

while (stream = this.streams.pop()) {
stream.destroy();
}
};

QueryEmitter.prototype._emitTiming = function(action, start) {
Expand Down Expand Up @@ -140,8 +151,8 @@ QueryEmitter.prototype._flushPoll = function() {
if (this._pendingPoll) {
this.queryPoll();

// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
} else if (this.pollInterval) {
var emitter = this;
this._pollIntervalId = setTimeout(function() {
Expand Down Expand Up @@ -301,14 +312,14 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) {
// all messages are received and applied in order, so it is critical that none
// are dropped.
QueryEmitter.prototype.onError =
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};

function getInserted(diff) {
var inserted = [];
Expand Down
Loading

0 comments on commit 2ca6a2f

Please sign in to comment.