Skip to content

Commit

Permalink
Merge ead4de4 into 9c0f1c6
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidreedsy authored Feb 20, 2023
2 parents 9c0f1c6 + ead4de4 commit 2c8fc0f
Show file tree
Hide file tree
Showing 6 changed files with 816 additions and 530 deletions.
8 changes: 6 additions & 2 deletions docs/middleware/actions.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ This action has these additional `context` properties:

> The query's projection [fields]({{ site.baseurl }}{% link api/backend.md %}#addprojection)
`channel` -- string
`channel` -- string (deprecated)

> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channel the query will subscribe to. Defaults to its collection channel.
> This property is deprecated use `channels` instead. The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel.
`channels` -- string[]

> The [Pub/Sub]({{ site.baseurl }}{% link adapters/pub-sub.md %}) channels the query will subscribe to. Defaults to its collection channel.
`query` -- Object

Expand Down
2 changes: 1 addition & 1 deletion docs/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ backend.use('commit', (context, next) => {

backend.use('query', (context, next) => {
// Set our query to only listen for changes on our user-specific channel
context.channel = userChannel(context)
context.channels = [userChannel(context)]
next()
})

Expand Down
49 changes: 43 additions & 6 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,25 +661,62 @@ Backend.prototype.querySubscribe = function(agent, index, query, options, callba
'DB does not support subscribe'
));
}
backend.pubsub.subscribe(request.channel, function(err, stream) {
if (err) return callback(err);

var channels = request.channels;

if (request.channel) {
logger.warn(
'[DEPRECATED] "query" middleware\'s context.channel is deprecated, use context.channels instead.' +
'Read more: https://share.github.io/sharedb/middleware/actions#query'
);
channels = [request.channel];
}

if (!channels || !channels.length) {
return callback(new ShareDBError(ERROR_CODE.ERR_QUERY_CHANNEL_MISSING, 'Required minimum one query channel.'));
}

var streams = [];

function destroyStreams() {
streams.forEach(function(stream) {
stream.destroy();
});
}

function createQueryEmitter() {
if (options.ids) {
var queryEmitter = new QueryEmitter(request, stream, options.ids);
var queryEmitter = new QueryEmitter(request, streams, options.ids);
backend.emit('timing', 'querySubscribe.reconnect', Date.now() - start, request);
callback(null, queryEmitter);
return;
}
// Issue query on db to get our initial results
backend._query(agent, request, function(err, snapshots, extra) {
if (err) {
stream.destroy();
destroyStreams();
return callback(err);
}
var ids = pluckIds(snapshots);
var queryEmitter = new QueryEmitter(request, stream, ids, extra);
var queryEmitter = new QueryEmitter(request, streams, ids, extra);
backend.emit('timing', 'querySubscribe.initial', Date.now() - start, request);
callback(null, queryEmitter, snapshots, extra);
});
}

channels.forEach(function(channel) {
backend.pubsub.subscribe(channel, function(err, stream) {
if (err) {
destroyStreams();
return callback(err);
}
streams.push(stream);

var subscribedToAllChannels = streams.length === channels.length;
if (subscribedToAllChannels) {
createQueryEmitter();
}
});
});
});
};
Expand All @@ -693,7 +730,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac
collection: collection,
projection: projection,
fields: fields,
channel: this.getCollectionChannel(collection),
channels: [this.getCollectionChannel(collection)],
query: query,
options: options,
db: null,
Expand Down
1 change: 1 addition & 0 deletions lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ShareDBError.CODES = {
ERR_OT_OP_NOT_PROVIDED: 'ERR_OT_OP_NOT_PROVIDED',
ERR_PRESENCE_TRANSFORM_FAILED: 'ERR_PRESENCE_TRANSFORM_FAILED',
ERR_PROTOCOL_VERSION_NOT_SUPPORTED: 'ERR_PROTOCOL_VERSION_NOT_SUPPORTED',
ERR_QUERY_CHANNEL_MISSING: 'ERR_QUERY_CHANNEL_MISSING',
ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED',
/**
* A special error that a "readSnapshots" middleware implementation can use to indicate that it
Expand Down
57 changes: 34 additions & 23 deletions lib/query-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var util = require('./util');

var ERROR_CODE = ShareDBError.CODES;

function QueryEmitter(request, stream, ids, extra) {
function QueryEmitter(request, streams, ids, extra) {
this.backend = request.backend;
this.agent = request.agent;
this.db = request.db;
Expand All @@ -15,18 +15,20 @@ function QueryEmitter(request, stream, ids, extra) {
this.fields = request.fields;
this.options = request.options;
this.snapshotProjection = request.snapshotProjection;
this.stream = stream;
this.streams = streams;
this.ids = ids;
this.extra = extra;

this.skipPoll = this.options.skipPoll || util.doNothing;
this.canPollDoc = this.db.canPollDoc(this.collection, this.query);
this.pollDebounce =
(typeof this.options.pollDebounce === 'number') ? this.options.pollDebounce :
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce : 0;
(typeof this.db.pollDebounce === 'number') ? this.db.pollDebounce :
streams.length > 1 ? 1000 : 0;
this.pollInterval =
(typeof this.options.pollInterval === 'number') ? this.options.pollInterval :
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval : 0;
(typeof this.db.pollInterval === 'number') ? this.db.pollInterval :
streams.length > 1 ? 1000 : 0;

this._polling = false;
this._pendingPoll = null;
Expand All @@ -41,23 +43,32 @@ QueryEmitter.prototype._open = function() {
this._defaultCallback = function(err) {
if (err) emitter.onError(err);
};
emitter.stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
emitter.stream.on('end', function() {
emitter.destroy();

emitter.streams.forEach(function(stream) {
stream.on('data', function(data) {
if (data.error) {
return emitter.onError(data.error);
}
emitter._update(data);
});
stream.on('end', function() {
emitter.destroy();
});
});

// Make sure we start polling if pollInterval is being used
this._flushPoll();
};

QueryEmitter.prototype.destroy = function() {
clearTimeout(this._pollDebounceId);
clearTimeout(this._pollIntervalId);
this.stream.destroy();

var stream;

while (stream = this.streams.pop()) {
stream.destroy();
}
};

QueryEmitter.prototype._emitTiming = function(action, start) {
Expand Down Expand Up @@ -140,8 +151,8 @@ QueryEmitter.prototype._flushPoll = function() {
if (this._pendingPoll) {
this.queryPoll();

// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
// If a pollInterval is specified, poll if the query doesn't get polled in
// the time of the interval
} else if (this.pollInterval) {
var emitter = this;
this._pollIntervalId = setTimeout(function() {
Expand Down Expand Up @@ -301,14 +312,14 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) {
// all messages are received and applied in order, so it is critical that none
// are dropped.
QueryEmitter.prototype.onError =
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};
QueryEmitter.prototype.onDiff =
QueryEmitter.prototype.onExtra =
QueryEmitter.prototype.onOp = function() {
throw new ShareDBError(
ERROR_CODE.ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED,
'Required QueryEmitter listener not assigned'
);
};

function getInserted(diff) {
var inserted = [];
Expand Down
Loading

0 comments on commit 2c8fc0f

Please sign in to comment.