Skip to content

Commit

Permalink
Merge 55fa1f3 into 2532faa
Browse files Browse the repository at this point in the history
  • Loading branch information
ericyhwang committed Aug 9, 2022
2 parents 2532faa + 55fa1f3 commit 581b40d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 64 deletions.
75 changes: 44 additions & 31 deletions lib/agent.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
var hat = require('hat');
var ShareDBError = require('./error');
var logger = require('./logger');
var ACTIONS = require('./message-actions').ACTIONS;
var types = require('./types');
var util = require('./util');
var logger = require('./logger');
var ShareDBError = require('./error');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -58,7 +59,7 @@ function Agent(backend, stream) {
this.custom = {};

// Send the legacy message to initialize old clients with the random agent Id
this.send(this._initMessage('init'));
this.send(this._initMessage(ACTIONS.initLegacy));
}
module.exports = Agent;

Expand Down Expand Up @@ -174,7 +175,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query

var agent = this;
emitter.onExtra = function(extra) {
agent.send({a: 'q', id: queryId, extra: extra});
agent.send({a: ACTIONS.queryUpdate, id: queryId, extra: extra});
};

emitter.onDiff = function(diff) {
Expand All @@ -186,7 +187,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query
}
// Consider stripping the collection out of the data we send here
// if it matches the query's collection.
agent.send({a: 'q', id: queryId, diff: diff});
agent.send({a: ACTIONS.queryUpdate, id: queryId, diff: diff});
};

emitter.onError = function(err) {
Expand Down Expand Up @@ -250,7 +251,7 @@ Agent.prototype.send = function(message) {

Agent.prototype._sendOp = function(collection, id, op) {
var message = {
a: 'op',
a: ACTIONS.op,
c: collection,
d: id,
v: op.v,
Expand Down Expand Up @@ -354,22 +355,34 @@ Agent.prototype._open = function() {

// Check a request to see if its valid. Returns an error if there's a problem.
Agent.prototype._checkRequest = function(request) {
if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') {
if (
request.a === ACTIONS.queryFetch ||
request.a === ACTIONS.querySubscribe ||
request.a === ACTIONS.queryUnsubscribe
) {
// Query messages need an ID property.
if (typeof request.id !== 'number') return 'Missing query ID';
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') {
} else if (request.a === ACTIONS.op ||
request.a === ACTIONS.fetch ||
request.a === ACTIONS.subscribe ||
request.a === ACTIONS.unsubscribe ||
request.a === ACTIONS.presence) {
// Doc-based request.
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
if (request.d != null && typeof request.d !== 'string') return 'Invalid id';

if (request.a === 'op' || request.a === 'p') {
if (request.a === ACTIONS.op || request.a === ACTIONS.presence) {
if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version';
}

if (request.a === 'p') {
if (request.a === ACTIONS.presence) {
if (typeof request.id !== 'string') return 'Missing presence ID';
}
} else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') {
} else if (
request.a === ACTIONS.bulkFetch ||
request.a === ACTIONS.bulkSubscribe ||
request.a === ACTIONS.bulkUnsubscribe
) {
// Bulk request
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
if (typeof request.b !== 'object') return 'Invalid bulk subscribe data';
Expand All @@ -383,28 +396,28 @@ Agent.prototype._handleMessage = function(request, callback) {
if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage));

switch (request.a) {
case 'hs':
case ACTIONS.handshake:
if (request.id) this.src = request.id;
return callback(null, this._initMessage('hs'));
case 'qf':
return callback(null, this._initMessage(ACTIONS.handshake));
case ACTIONS.queryFetch:
return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback);
case 'qs':
case ACTIONS.querySubscribe:
return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback);
case 'qu':
case ACTIONS.queryUnsubscribe:
return this._queryUnsubscribe(request.id, callback);
case 'bf':
case ACTIONS.bulkFetch:
return this._fetchBulk(request.c, request.b, callback);
case 'bs':
case ACTIONS.bulkSubscribe:
return this._subscribeBulk(request.c, request.b, callback);
case 'bu':
case ACTIONS.bulkUnsubscribe:
return this._unsubscribeBulk(request.c, request.b, callback);
case 'f':
case ACTIONS.fetch:
return this._fetch(request.c, request.d, request.v, callback);
case 's':
case ACTIONS.subscribe:
return this._subscribe(request.c, request.d, request.v, callback);
case 'u':
case ACTIONS.unsubscribe:
return this._unsubscribe(request.c, request.d, callback);
case 'op':
case ACTIONS.op:
// Normalize the properties submitted
var op = createClientOp(request, this._src());
if (op.seq >= util.MAX_SAFE_INTEGER) {
Expand All @@ -415,11 +428,11 @@ Agent.prototype._handleMessage = function(request, callback) {
}
if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message'));
return this._submit(request.c, request.d, op, callback);
case 'nf':
case ACTIONS.snapshotFetch:
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case 'nt':
case ACTIONS.snapshotFetchByTimestamp:
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
case 'p':
case ACTIONS.presence:
if (!this.backend.presenceEnabled) return;
var presence = this._createPresence(request);
if (presence.t && !util.supportsPresence(types.map[presence.t])) {
Expand All @@ -429,10 +442,10 @@ Agent.prototype._handleMessage = function(request, callback) {
});
}
return this._broadcastPresence(presence, callback);
case 'ps':
case ACTIONS.presenceSubscribe:
if (!this.backend.presenceEnabled) return;
return this._subscribePresence(request.ch, request.seq, callback);
case 'pu':
case ACTIONS.presenceUnsubscribe:
return this._unsubscribePresence(request.ch, request.seq, callback);
default:
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
Expand Down Expand Up @@ -761,7 +774,7 @@ Agent.prototype._broadcastPresence = function(presence, callback) {

Agent.prototype._createPresence = function(request) {
return {
a: 'p',
a: ACTIONS.presence,
ch: request.ch,
src: this._src(),
id: request.id, // Presence ID, not Doc ID (which is 'd')
Expand Down Expand Up @@ -813,7 +826,7 @@ Agent.prototype._requestPresence = function(channel, callback) {
Agent.prototype._handlePresenceData = function(presence) {
if (presence.src === this._src()) return;

if (presence.r) return this.send({a: 'pr', ch: presence.ch});
if (presence.r) return this.send({a: ACTIONS.presenceRequest, ch: presence.ch});

var backend = this.backend;
var context = {
Expand All @@ -824,7 +837,7 @@ Agent.prototype._handlePresenceData = function(presence) {
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
if (error) {
if (backend.doNotForwardSendPresenceErrorsToClient) backend.errorHandler(error, {agent: agent});
else agent.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
else agent.send({a: ACTIONS.presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
return;
}
agent.send(presence);
Expand Down
55 changes: 28 additions & 27 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-reques
var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var ACTIONS = require('../message-actions').ACTIONS;
var types = require('../types');
var util = require('../util');
var logger = require('../logger');
Expand Down Expand Up @@ -194,25 +195,25 @@ Connection.prototype.handleMessage = function(message) {
// Switch on the message action. Most messages are for documents and are
// handled in the doc class.
switch (message.a) {
case 'init':
case ACTIONS.initLegacy:
// Client initialization packet
return this._handleLegacyInit(message);
case 'hs':
case ACTIONS.handshake:
return this._handleHandshake(err, message);
case 'qf':
case ACTIONS.queryFetch:
var query = this.queries[message.id];
if (query) query._handleFetch(err, message.data, message.extra);
return;
case 'qs':
case ACTIONS.querySubscribe:
var query = this.queries[message.id];
if (query) query._handleSubscribe(err, message.data, message.extra);
return;
case 'qu':
case ACTIONS.queryUnsubscribe:
// Queries are removed immediately on calls to destroy, so we ignore
// replies to query unsubscribes. Perhaps there should be a callback for
// destroy, but this is currently unimplemented
return;
case 'q':
case ACTIONS.queryUpdate:
// Query message. Pass this to the appropriate query object.
var query = this.queries[message.id];
if (!query) return;
Expand All @@ -221,36 +222,36 @@ Connection.prototype.handleMessage = function(message) {
if (message.hasOwnProperty('extra')) query._handleExtra(message.extra);
return;

case 'bf':
case ACTIONS.bulkFetch:
return this._handleBulkMessage(err, message, '_handleFetch');
case 'bs':
case 'bu':
case ACTIONS.bulkSubscribe:
case ACTIONS.bulkUnsubscribe:
return this._handleBulkMessage(err, message, '_handleSubscribe');

case 'nf':
case 'nt':
case ACTIONS.snapshotFetch:
case ACTIONS.snapshotFetchByTimestamp:
return this._handleSnapshotFetch(err, message);

case 'f':
case ACTIONS.fetch:
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleFetch(err, message.data);
return;
case 's':
case 'u':
case ACTIONS.subscribe:
case ACTIONS.unsubscribe:
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleSubscribe(err, message.data);
return;
case 'op':
case ACTIONS.op:
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleOp(err, message);
return;
case 'p':
case ACTIONS.presence:
return this._handlePresence(err, message);
case 'ps':
case ACTIONS.presenceSubscribe:
return this._handlePresenceSubscribe(err, message);
case 'pu':
case ACTIONS.presenceUnsubscribe:
return this._handlePresenceUnsubscribe(err, message);
case 'pr':
case ACTIONS.presenceRequest:
return this._handlePresenceRequest(err, message);

default:
Expand Down Expand Up @@ -416,7 +417,7 @@ Connection.prototype._sendBulk = function(action, collection, values) {
}
};

Connection.prototype._sendAction = function(action, doc, version) {
Connection.prototype._sendActions = function(action, doc, version) {
// Ensure the doc is registered so that it receives the reply message
this._addDoc(doc);
if (this.bulk) {
Expand All @@ -434,22 +435,22 @@ Connection.prototype._sendAction = function(action, doc, version) {
};

Connection.prototype.sendFetch = function(doc) {
return this._sendAction('f', doc, doc.version);
return this._sendActions(ACTIONS.fetch, doc, doc.version);
};

Connection.prototype.sendSubscribe = function(doc) {
return this._sendAction('s', doc, doc.version);
return this._sendActions(ACTIONS.subscribe, doc, doc.version);
};

Connection.prototype.sendUnsubscribe = function(doc) {
return this._sendAction('u', doc);
return this._sendActions(ACTIONS.unsubscribe, doc);
};

Connection.prototype.sendOp = function(doc, op) {
// Ensure the doc is registered so that it receives the reply message
this._addDoc(doc);
var message = {
a: 'op',
a: ACTIONS.op,
c: doc.collection,
d: doc.id,
v: doc.version,
Expand Down Expand Up @@ -553,7 +554,7 @@ Connection.prototype._destroyQuery = function(query) {
// The callback should have the signature function(error, results, extra)
// where results is a list of Doc objects.
Connection.prototype.createFetchQuery = function(collection, q, options, callback) {
return this._createQuery('qf', collection, q, options, callback);
return this._createQuery(ACTIONS.queryFetch, collection, q, options, callback);
};

// Create a subscribe query. Subscribe queries return with the initial data
Expand All @@ -563,7 +564,7 @@ Connection.prototype.createFetchQuery = function(collection, q, options, callbac
// If present, the callback should have the signature function(error, results, extra)
// where results is a list of Doc objects.
Connection.prototype.createSubscribeQuery = function(collection, q, options, callback) {
return this._createQuery('qs', collection, q, options, callback);
return this._createQuery(ACTIONS.querySubscribe, collection, q, options, callback);
};

Connection.prototype.hasPending = function() {
Expand Down Expand Up @@ -716,7 +717,7 @@ Connection.prototype._handleLegacyInit = function(message) {
};

Connection.prototype._initializeHandshake = function() {
this.send({a: 'hs', id: this.id});
this.send({a: ACTIONS.handshake, id: this.id});
};

Connection.prototype._handleHandshake = function(error, message) {
Expand Down
3 changes: 2 additions & 1 deletion lib/client/presence/local-presence.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var emitter = require('../../emitter');
var ACTIONS = require('../../message-actions').ACTIONS;
var util = require('../../util');

module.exports = LocalPresence;
Expand Down Expand Up @@ -59,7 +60,7 @@ LocalPresence.prototype._ack = function(error, presenceVersion) {

LocalPresence.prototype._message = function() {
return {
a: 'p',
a: ACTIONS.presence,
ch: this.presence.channel,
id: this.presenceId,
p: this.value,
Expand Down
3 changes: 2 additions & 1 deletion lib/client/presence/presence.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ var RemotePresence = require('./remote-presence');
var util = require('../../util');
var async = require('async');
var hat = require('hat');
var ACTIONS = require('../../message-actions').ACTIONS;

module.exports = Presence;
function Presence(connection, channel) {
Expand Down Expand Up @@ -70,7 +71,7 @@ Presence.prototype.destroy = function(callback) {

Presence.prototype._sendSubscriptionAction = function(wantSubscribe, callback) {
this.wantSubscribe = !!wantSubscribe;
var action = this.wantSubscribe ? 'ps' : 'pu';
var action = this.wantSubscribe ? ACTIONS.presenceSubscribe : ACTIONS.presenceUnsubscribe;
var seq = this.connection._presenceSeq++;
this._subscriptionCallbacksBySeq[seq] = callback;
if (this.connection.canSend) {
Expand Down
5 changes: 3 additions & 2 deletions lib/client/query.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var emitter = require('../emitter');
var ACTIONS = require('../message-actions').ACTIONS;
var util = require('../util');

// Queries are live requests to the database for particular sets of fields.
Expand Down Expand Up @@ -75,8 +76,8 @@ Query.prototype.send = function() {
// Destroy the query object. Any subsequent messages for the query will be
// ignored by the connection.
Query.prototype.destroy = function(callback) {
if (this.connection.canSend && this.action === 'qs') {
this.connection.send({a: 'qu', id: this.id});
if (this.connection.canSend && this.action === ACTIONS.querySubscribe) {
this.connection.send({a: ACTIONS.queryUnsubscribe, id: this.id});
}
this.connection._destroyQuery(this);
// There is a callback for consistency, but we don't actually wait for the
Expand Down
Loading

0 comments on commit 581b40d

Please sign in to comment.