Skip to content

Commit

Permalink
Merge e91488d into 2532faa
Browse files Browse the repository at this point in the history
  • Loading branch information
ericyhwang committed Aug 9, 2022
2 parents 2532faa + e91488d commit 95d4776
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 57 deletions.
71 changes: 40 additions & 31 deletions lib/agent.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
var hat = require('hat');
var ShareDBError = require('./error');
var logger = require('./logger');
var Action = require('./shared/message-actions').Action;
var types = require('./types');
var util = require('./util');
var logger = require('./logger');
var ShareDBError = require('./error');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -58,7 +59,7 @@ function Agent(backend, stream) {
this.custom = {};

// Send the legacy message to initialize old clients with the random agent Id
this.send(this._initMessage('init'));
this.send(this._initMessage(Action.InitLegacy));
}
module.exports = Agent;

Expand Down Expand Up @@ -174,7 +175,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query

var agent = this;
emitter.onExtra = function(extra) {
agent.send({a: 'q', id: queryId, extra: extra});
agent.send({a: Action.QueryReply, id: queryId, extra: extra});
};

emitter.onDiff = function(diff) {
Expand All @@ -186,7 +187,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query
}
// Consider stripping the collection out of the data we send here
// if it matches the query's collection.
agent.send({a: 'q', id: queryId, diff: diff});
agent.send({a: Action.QueryReply, id: queryId, diff: diff});
};

emitter.onError = function(err) {
Expand Down Expand Up @@ -250,7 +251,7 @@ Agent.prototype.send = function(message) {

Agent.prototype._sendOp = function(collection, id, op) {
var message = {
a: 'op',
a: Action.Op,
c: collection,
d: id,
v: op.v,
Expand Down Expand Up @@ -354,22 +355,30 @@ Agent.prototype._open = function() {

// Check a request to see if its valid. Returns an error if there's a problem.
Agent.prototype._checkRequest = function(request) {
if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') {
if (request.a === Action.QueryFetch || request.a === Action.QuerySubscribe || request.a === Action.QueryUnsubscribe) {
// Query messages need an ID property.
if (typeof request.id !== 'number') return 'Missing query ID';
} else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') {
} else if (request.a === Action.Op ||
request.a === Action.Fetch ||
request.a === Action.Subscribe ||
request.a === Action.Unsubscribe ||
request.a === Action.Presence) {
// Doc-based request.
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
if (request.d != null && typeof request.d !== 'string') return 'Invalid id';

if (request.a === 'op' || request.a === 'p') {
if (request.a === Action.Op || request.a === Action.Presence) {
if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version';
}

if (request.a === 'p') {
if (request.a === Action.Presence) {
if (typeof request.id !== 'string') return 'Missing presence ID';
}
} else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') {
} else if (
request.a === Action.BulkFetch ||
request.a === Action.BulkSubscribe ||
request.a === Action.BulkUnsubscribe
) {
// Bulk request
if (request.c != null && typeof request.c !== 'string') return 'Invalid collection';
if (typeof request.b !== 'object') return 'Invalid bulk subscribe data';
Expand All @@ -383,28 +392,28 @@ Agent.prototype._handleMessage = function(request, callback) {
if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage));

switch (request.a) {
case 'hs':
case Action.Handshake:
if (request.id) this.src = request.id;
return callback(null, this._initMessage('hs'));
case 'qf':
return callback(null, this._initMessage(Action.Handshake));
case Action.QueryFetch:
return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback);
case 'qs':
case Action.QuerySubscribe:
return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback);
case 'qu':
case Action.QueryUnsubscribe:
return this._queryUnsubscribe(request.id, callback);
case 'bf':
case Action.BulkFetch:
return this._fetchBulk(request.c, request.b, callback);
case 'bs':
case Action.BulkSubscribe:
return this._subscribeBulk(request.c, request.b, callback);
case 'bu':
case Action.BulkUnsubscribe:
return this._unsubscribeBulk(request.c, request.b, callback);
case 'f':
case Action.Fetch:
return this._fetch(request.c, request.d, request.v, callback);
case 's':
case Action.Subscribe:
return this._subscribe(request.c, request.d, request.v, callback);
case 'u':
case Action.Unsubscribe:
return this._unsubscribe(request.c, request.d, callback);
case 'op':
case Action.Op:
// Normalize the properties submitted
var op = createClientOp(request, this._src());
if (op.seq >= util.MAX_SAFE_INTEGER) {
Expand All @@ -415,11 +424,11 @@ Agent.prototype._handleMessage = function(request, callback) {
}
if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message'));
return this._submit(request.c, request.d, op, callback);
case 'nf':
case Action.SnapshotFetch:
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case 'nt':
case Action.SnapshotFetchByTimestamp:
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
case 'p':
case Action.Presence:
if (!this.backend.presenceEnabled) return;
var presence = this._createPresence(request);
if (presence.t && !util.supportsPresence(types.map[presence.t])) {
Expand All @@ -429,10 +438,10 @@ Agent.prototype._handleMessage = function(request, callback) {
});
}
return this._broadcastPresence(presence, callback);
case 'ps':
case Action.PresenceSubscribe:
if (!this.backend.presenceEnabled) return;
return this._subscribePresence(request.ch, request.seq, callback);
case 'pu':
case Action.PresenceUnsubscribe:
return this._unsubscribePresence(request.ch, request.seq, callback);
default:
callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message'));
Expand Down Expand Up @@ -761,7 +770,7 @@ Agent.prototype._broadcastPresence = function(presence, callback) {

Agent.prototype._createPresence = function(request) {
return {
a: 'p',
a: Action.Presence,
ch: request.ch,
src: this._src(),
id: request.id, // Presence ID, not Doc ID (which is 'd')
Expand Down Expand Up @@ -813,7 +822,7 @@ Agent.prototype._requestPresence = function(channel, callback) {
Agent.prototype._handlePresenceData = function(presence) {
if (presence.src === this._src()) return;

if (presence.r) return this.send({a: 'pr', ch: presence.ch});
if (presence.r) return this.send({a: Action.PresenceRequest, ch: presence.ch});

var backend = this.backend;
var context = {
Expand All @@ -824,7 +833,7 @@ Agent.prototype._handlePresenceData = function(presence) {
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
if (error) {
if (backend.doNotForwardSendPresenceErrorsToClient) backend.errorHandler(error, {agent: agent});
else agent.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
else agent.send({a: Action.Presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
return;
}
agent.send(presence);
Expand Down
53 changes: 27 additions & 26 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-reques
var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var Action = require('../shared/message-actions').Action;
var types = require('../types');
var util = require('../util');
var logger = require('../logger');
Expand Down Expand Up @@ -194,25 +195,25 @@ Connection.prototype.handleMessage = function(message) {
// Switch on the message action. Most messages are for documents and are
// handled in the doc class.
switch (message.a) {
case 'init':
case Action.InitLegacy:
// Client initialization packet
return this._handleLegacyInit(message);
case 'hs':
case Action.Handshake:
return this._handleHandshake(err, message);
case 'qf':
case Action.QueryFetch:
var query = this.queries[message.id];
if (query) query._handleFetch(err, message.data, message.extra);
return;
case 'qs':
case Action.QuerySubscribe:
var query = this.queries[message.id];
if (query) query._handleSubscribe(err, message.data, message.extra);
return;
case 'qu':
case Action.QueryUnsubscribe:
// Queries are removed immediately on calls to destroy, so we ignore
// replies to query unsubscribes. Perhaps there should be a callback for
// destroy, but this is currently unimplemented
return;
case 'q':
case Action.QueryReply:
// Query message. Pass this to the appropriate query object.
var query = this.queries[message.id];
if (!query) return;
Expand All @@ -221,36 +222,36 @@ Connection.prototype.handleMessage = function(message) {
if (message.hasOwnProperty('extra')) query._handleExtra(message.extra);
return;

case 'bf':
case Action.BulkFetch:
return this._handleBulkMessage(err, message, '_handleFetch');
case 'bs':
case 'bu':
case Action.BulkSubscribe:
case Action.BulkUnsubscribe:
return this._handleBulkMessage(err, message, '_handleSubscribe');

case 'nf':
case 'nt':
case Action.SnapshotFetch:
case Action.SnapshotFetchByTimestamp:
return this._handleSnapshotFetch(err, message);

case 'f':
case Action.Fetch:
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleFetch(err, message.data);
return;
case 's':
case 'u':
case Action.Subscribe:
case Action.Unsubscribe:
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleSubscribe(err, message.data);
return;
case 'op':
case Action.Op:
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleOp(err, message);
return;
case 'p':
case Action.Presence:
return this._handlePresence(err, message);
case 'ps':
case Action.PresenceSubscribe:
return this._handlePresenceSubscribe(err, message);
case 'pu':
case Action.PresenceUnsubscribe:
return this._handlePresenceUnsubscribe(err, message);
case 'pr':
case Action.PresenceRequest:
return this._handlePresenceRequest(err, message);

default:
Expand Down Expand Up @@ -434,22 +435,22 @@ Connection.prototype._sendAction = function(action, doc, version) {
};

Connection.prototype.sendFetch = function(doc) {
return this._sendAction('f', doc, doc.version);
return this._sendAction(Action.Fetch, doc, doc.version);
};

Connection.prototype.sendSubscribe = function(doc) {
return this._sendAction('s', doc, doc.version);
return this._sendAction(Action.Subscribe, doc, doc.version);
};

Connection.prototype.sendUnsubscribe = function(doc) {
return this._sendAction('u', doc);
return this._sendAction(Action.Unsubscribe, doc);
};

Connection.prototype.sendOp = function(doc, op) {
// Ensure the doc is registered so that it receives the reply message
this._addDoc(doc);
var message = {
a: 'op',
a: Action.Op,
c: doc.collection,
d: doc.id,
v: doc.version,
Expand Down Expand Up @@ -553,7 +554,7 @@ Connection.prototype._destroyQuery = function(query) {
// The callback should have the signature function(error, results, extra)
// where results is a list of Doc objects.
Connection.prototype.createFetchQuery = function(collection, q, options, callback) {
return this._createQuery('qf', collection, q, options, callback);
return this._createQuery(Action.QueryFetch, collection, q, options, callback);
};

// Create a subscribe query. Subscribe queries return with the initial data
Expand All @@ -563,7 +564,7 @@ Connection.prototype.createFetchQuery = function(collection, q, options, callbac
// If present, the callback should have the signature function(error, results, extra)
// where results is a list of Doc objects.
Connection.prototype.createSubscribeQuery = function(collection, q, options, callback) {
return this._createQuery('qs', collection, q, options, callback);
return this._createQuery(Action.QuerySubscribe, collection, q, options, callback);
};

Connection.prototype.hasPending = function() {
Expand Down Expand Up @@ -716,7 +717,7 @@ Connection.prototype._handleLegacyInit = function(message) {
};

Connection.prototype._initializeHandshake = function() {
this.send({a: 'hs', id: this.id});
this.send({a: Action.Handshake, id: this.id});
};

Connection.prototype._handleHandshake = function(error, message) {
Expand Down
21 changes: 21 additions & 0 deletions lib/shared/message-actions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
exports.Action = {
InitLegacy: 'init',
Handshake: 'hs',
QueryFetch: 'qf',
QuerySubscribe: 'qs',
QueryUnsubscribe: 'qu',
QueryReply: 'q',
BulkFetch: 'bf',
BulkSubscribe: 'bs',
BulkUnsubscribe: 'bu',
Fetch: 'f',
Subscribe: 's',
Unsubscribe: 'u',
Op: 'op',
SnapshotFetch: 'nf',
SnapshotFetchByTimestamp: 'nt',
Presence: 'p',
PresenceSubscribe: 'ps',
PresenceUnsubscribe: 'pu',
PresenceRequest: 'pr'
};

0 comments on commit 95d4776

Please sign in to comment.