Skip to content

Commit

Permalink
✨ Add multi channel query subscription.
Browse files Browse the repository at this point in the history
At the moment we can only specify one channel the specific query, would be able to listen to, as per docs:
```javascript
backend.use('query', (context, next) => {
  // Set our query to only listen for changes on our user-specific channel
  context.channel = userChannel(context)
  next()
})
```

However let's imagine the situation where the user wants to query all
the posts, the where posted by him and all his friends. Now we would need
new query every for friend separately to make sure the proper scalability is
preserved and we do not receive all the changes to posts collection. This change allows to listen for multiple channels, so if we
want to query all user friends posts. We can do it by this:
```javascript
backend.use('query', (context, next) => {
  // Set our query to only listen for changes on our user-specific channel
  context.channels = [userChannel(context), friendChannel(context))]
  next()
})
```

Now this query would only listen to all the changes that were made to the user
posts and his friends.
  • Loading branch information
Dawidpol committed Feb 7, 2023
1 parent cc0e338 commit a12d3af
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 38 deletions.
2 changes: 1 addition & 1 deletion docs/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ backend.use('commit', (context, next) => {

backend.use('query', (context, next) => {
// Set our query to only listen for changes on our user-specific channel
context.channel = userChannel(context)
context.channels = [userChannel(context)]
next()
})

Expand Down
46 changes: 40 additions & 6 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,25 +661,59 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba
'DB does not support subscribe'
));
}
backend.pubsub.subscribe(request.channel, function(err, stream) {
if (err) return callback(err);

var channels = request.channels;

if (request.channel) {
logger.warn('[DEPRECATED] Do no use channel for query, use channels instead.');
channels = [request.channel];
}

if (!channels || !channels.length) {
return callback(new Error('Required minimum one query channel.'));
}

var streams = [];

function cleanStreams() {
streams.forEach(function(stream) {
stream.destroy();
});
}

function createQueryEmitter() {
if (options.ids) {
var queryEmitter = new QueryEmitter(request, stream, options.ids);
var queryEmitter = new QueryEmitter(request, streams, options.ids);
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);
callback(null, queryEmitter);
return;
}
// Issue query on db to get our initial results
backend._query(agent, request, function(err, snapshots, extra) {
if (err) {
stream.destroy();
cleanStreams();
return callback(err);
}
var ids = pluckIds(snapshots);
var queryEmitter = new QueryEmitter(request, stream, ids, extra);
var queryEmitter = new QueryEmitter(request, streams, ids, extra);
backend.emit('timing', 'querySubscribe.initial', Date.now() - start, request);
callback(null, queryEmitter, snapshots, extra);
});
}

channels.forEach(function(channel) {
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) {
cleanStreams();
return callback(err);
}
streams.push(stream);

var subscribedToAllChannels = streams.length === channels.length;
if (subscribedToAllChannels) {
createQueryEmitter();
}
});
});
});
};
Expand All @@ -693,7 +727,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac
collection: collection,
projection: projection,
fields: fields,
channel: this.getCollectionChannel(collection),
channels: [this.getCollectionChannel(collection)],
query: query,
options: options,
db: null,
Expand Down
55 changes: 32 additions & 23 deletions lib/query-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var util = require('./util');

var ERROR_CODE = ShareDBError.CODES;

function QueryEmitter(request, stream, ids, extra) {
function QueryEmitter(request, streams, ids, extra) {
this.backend = request.backend;
this.agent = request.agent;
this.db = request.db;
Expand All @@ -15,18 +15,20 @@ function QueryEmitter(request, stream, ids, extra) {
this.fields = request.fields;
this.options = request.options;
this.snapshotProjection = request.snapshotProjection;
this.stream = stream;
this.streams = streams;
this.ids = ids;
this.extra = extra;

this.skipPoll = this.options.skipPoll || util.doNothing;
this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
this.pollDebounce =
(typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce :
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0;
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce :
streams.length > 1 ? 1000 : 0;
this.pollInterval =
(typeof this.options.pollInterval === 'number') ? this.options.pollInterval :
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0;
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval :
streams.length > 1 ? 1000 : 0;

this._polling = false;
this._pendingPoll = null;
Expand All @@ -41,23 +43,30 @@ QueryEmitter.prototype._open = function() {
this._defaultCallback = function(err) {
if (err) emitter.onError(err);
};
emitter.stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
emitter.stream.on('end', function() {
emitter.destroy();

emitter.streams.forEach(function(stream) {
stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
stream.on('end', function() {
emitter.destroy();
});
});

// Make sure we start polling if pollInterval is being used
this._flushPoll();
};

QueryEmitter.prototype.destroy = function() {
clearTimeout(this._pollDebounceId);
clearTimeout(this._pollIntervalId);
this.stream.destroy();

this.streams.forEach(function(stream) {
stream.destroy();
});
};

QueryEmitter.prototype._emitTiming = function(action, start) {
Expand Down Expand Up @@ -140,8 +149,8 @@ QueryEmitter.prototype._flushPoll = function() {
if (this._pendingPoll) {
this.queryPoll();

// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
} else if (this.pollInterval) {
var emitter = this;
this._pollIntervalId = setTimeout(function() {
Expand Down Expand Up @@ -301,14 +310,14 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) {
// all messages are received and applied in order, so it is critical that none
// are dropped.
QueryEmitter.prototype.onError =
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};

function getInserted(diff) {
var inserted = [];
Expand Down

0 comments on commit a12d3af

Please sign in to comment.