diff --git a/lib/agent.js b/lib/agent.js index ebb3bc7d..eca8057e 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,8 +1,9 @@ var hat = require('hat'); +var ShareDBError = require('./error'); +var logger = require('./logger'); +var Action = require('./shared/message-actions').Action; var types = require('./types'); var util = require('./util'); -var logger = require('./logger'); -var ShareDBError = require('./error'); var ERROR_CODE = ShareDBError.CODES; @@ -58,7 +59,7 @@ function Agent(backend, stream) { this.custom = {}; // Send the legacy message to initialize old clients with the random agent Id - this.send(this._initMessage('init')); + this.send(this._initMessage(Action.InitLegacy)); } module.exports = Agent; @@ -174,7 +175,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query var agent = this; emitter.onExtra = function(extra) { - agent.send({a: 'q', id: queryId, extra: extra}); + agent.send({a: Action.QueryReply, id: queryId, extra: extra}); }; emitter.onDiff = function(diff) { @@ -186,7 +187,7 @@ Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query } // Consider stripping the collection out of the data we send here // if it matches the query's collection. - agent.send({a: 'q', id: queryId, diff: diff}); + agent.send({a: Action.QueryReply, id: queryId, diff: diff}); }; emitter.onError = function(err) { @@ -250,7 +251,7 @@ Agent.prototype.send = function(message) { Agent.prototype._sendOp = function(collection, id, op) { var message = { - a: 'op', + a: Action.Op, c: collection, d: id, v: op.v, @@ -354,22 +355,30 @@ Agent.prototype._open = function() { // Check a request to see if its valid. Returns an error if there's a problem. Agent.prototype._checkRequest = function(request) { - if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') { + if (request.a === Action.QueryFetch || request.a === Action.QuerySubscribe || request.a === Action.QueryUnsubscribe) { // Query messages need an ID property. if (typeof request.id !== 'number') return 'Missing query ID'; - } else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') { + } else if (request.a === Action.Op || + request.a === Action.Fetch || + request.a === Action.Subscribe || + request.a === Action.Unsubscribe || + request.a === Action.Presence) { // Doc-based request. if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (request.d != null && typeof request.d !== 'string') return 'Invalid id'; - if (request.a === 'op' || request.a === 'p') { + if (request.a === Action.Op || request.a === Action.Presence) { if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version'; } - if (request.a === 'p') { + if (request.a === Action.Presence) { if (typeof request.id !== 'string') return 'Missing presence ID'; } - } else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') { + } else if ( + request.a === Action.BulkFetch || + request.a === Action.BulkSubscribe || + request.a === Action.BulkUnsubscribe + ) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (typeof request.b !== 'object') return 'Invalid bulk subscribe data'; @@ -383,28 +392,28 @@ Agent.prototype._handleMessage = function(request, callback) { if (errMessage) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, errMessage)); switch (request.a) { - case 'hs': + case Action.Handshake: if (request.id) this.src = request.id; - return callback(null, this._initMessage('hs')); - case 'qf': + return callback(null, this._initMessage(Action.Handshake)); + case Action.QueryFetch: return this._queryFetch(request.id, request.c, request.q, getQueryOptions(request), callback); - case 'qs': + case Action.QuerySubscribe: return this._querySubscribe(request.id, request.c, request.q, getQueryOptions(request), callback); - case 'qu': + case Action.QueryUnsubscribe: return this._queryUnsubscribe(request.id, callback); - case 'bf': + case Action.BulkFetch: return this._fetchBulk(request.c, request.b, callback); - case 'bs': + case Action.BulkSubscribe: return this._subscribeBulk(request.c, request.b, callback); - case 'bu': + case Action.BulkUnsubscribe: return this._unsubscribeBulk(request.c, request.b, callback); - case 'f': + case Action.Fetch: return this._fetch(request.c, request.d, request.v, callback); - case 's': + case Action.Subscribe: return this._subscribe(request.c, request.d, request.v, callback); - case 'u': + case Action.Unsubscribe: return this._unsubscribe(request.c, request.d, callback); - case 'op': + case Action.Op: // Normalize the properties submitted var op = createClientOp(request, this._src()); if (op.seq >= util.MAX_SAFE_INTEGER) { @@ -415,11 +424,11 @@ Agent.prototype._handleMessage = function(request, callback) { } if (!op) return callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid op message')); return this._submit(request.c, request.d, op, callback); - case 'nf': + case Action.SnapshotFetch: return this._fetchSnapshot(request.c, request.d, request.v, callback); - case 'nt': + case Action.SnapshotFetchByTimestamp: return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); - case 'p': + case Action.Presence: if (!this.backend.presenceEnabled) return; var presence = this._createPresence(request); if (presence.t && !util.supportsPresence(types.map[presence.t])) { @@ -429,10 +438,10 @@ Agent.prototype._handleMessage = function(request, callback) { }); } return this._broadcastPresence(presence, callback); - case 'ps': + case Action.PresenceSubscribe: if (!this.backend.presenceEnabled) return; return this._subscribePresence(request.ch, request.seq, callback); - case 'pu': + case Action.PresenceUnsubscribe: return this._unsubscribePresence(request.ch, request.seq, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); @@ -761,7 +770,7 @@ Agent.prototype._broadcastPresence = function(presence, callback) { Agent.prototype._createPresence = function(request) { return { - a: 'p', + a: Action.Presence, ch: request.ch, src: this._src(), id: request.id, // Presence ID, not Doc ID (which is 'd') @@ -813,7 +822,7 @@ Agent.prototype._requestPresence = function(channel, callback) { Agent.prototype._handlePresenceData = function(presence) { if (presence.src === this._src()) return; - if (presence.r) return this.send({a: 'pr', ch: presence.ch}); + if (presence.r) return this.send({a: Action.PresenceRequest, ch: presence.ch}); var backend = this.backend; var context = { @@ -824,7 +833,7 @@ Agent.prototype._handlePresenceData = function(presence) { backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) { if (error) { if (backend.doNotForwardSendPresenceErrorsToClient) backend.errorHandler(error, {agent: agent}); - else agent.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); + else agent.send({a: Action.Presence, ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); return; } agent.send(presence); diff --git a/lib/client/connection.js b/lib/client/connection.js index d29570af..1b551af1 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -6,6 +6,7 @@ var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-reques var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request'); var emitter = require('../emitter'); var ShareDBError = require('../error'); +var Action = require('../shared/message-actions').Action; var types = require('../types'); var util = require('../util'); var logger = require('../logger'); @@ -194,25 +195,25 @@ Connection.prototype.handleMessage = function(message) { // Switch on the message action. Most messages are for documents and are // handled in the doc class. switch (message.a) { - case 'init': + case Action.InitLegacy: // Client initialization packet return this._handleLegacyInit(message); - case 'hs': + case Action.Handshake: return this._handleHandshake(err, message); - case 'qf': + case Action.QueryFetch: var query = this.queries[message.id]; if (query) query._handleFetch(err, message.data, message.extra); return; - case 'qs': + case Action.QuerySubscribe: var query = this.queries[message.id]; if (query) query._handleSubscribe(err, message.data, message.extra); return; - case 'qu': + case Action.QueryUnsubscribe: // Queries are removed immediately on calls to destroy, so we ignore // replies to query unsubscribes. Perhaps there should be a callback for // destroy, but this is currently unimplemented return; - case 'q': + case Action.QueryReply: // Query message. Pass this to the appropriate query object. var query = this.queries[message.id]; if (!query) return; @@ -221,36 +222,36 @@ Connection.prototype.handleMessage = function(message) { if (message.hasOwnProperty('extra')) query._handleExtra(message.extra); return; - case 'bf': + case Action.BulkFetch: return this._handleBulkMessage(err, message, '_handleFetch'); - case 'bs': - case 'bu': + case Action.BulkSubscribe: + case Action.BulkUnsubscribe: return this._handleBulkMessage(err, message, '_handleSubscribe'); - case 'nf': - case 'nt': + case Action.SnapshotFetch: + case Action.SnapshotFetchByTimestamp: return this._handleSnapshotFetch(err, message); - case 'f': + case Action.Fetch: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleFetch(err, message.data); return; - case 's': - case 'u': + case Action.Subscribe: + case Action.Unsubscribe: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleSubscribe(err, message.data); return; - case 'op': + case Action.Op: var doc = this.getExisting(message.c, message.d); if (doc) doc._handleOp(err, message); return; - case 'p': + case Action.Presence: return this._handlePresence(err, message); - case 'ps': + case Action.PresenceSubscribe: return this._handlePresenceSubscribe(err, message); - case 'pu': + case Action.PresenceUnsubscribe: return this._handlePresenceUnsubscribe(err, message); - case 'pr': + case Action.PresenceRequest: return this._handlePresenceRequest(err, message); default: @@ -434,22 +435,22 @@ Connection.prototype._sendAction = function(action, doc, version) { }; Connection.prototype.sendFetch = function(doc) { - return this._sendAction('f', doc, doc.version); + return this._sendAction(Action.Fetch, doc, doc.version); }; Connection.prototype.sendSubscribe = function(doc) { - return this._sendAction('s', doc, doc.version); + return this._sendAction(Action.Subscribe, doc, doc.version); }; Connection.prototype.sendUnsubscribe = function(doc) { - return this._sendAction('u', doc); + return this._sendAction(Action.Unsubscribe, doc); }; Connection.prototype.sendOp = function(doc, op) { // Ensure the doc is registered so that it receives the reply message this._addDoc(doc); var message = { - a: 'op', + a: Action.Op, c: doc.collection, d: doc.id, v: doc.version, @@ -553,7 +554,7 @@ Connection.prototype._destroyQuery = function(query) { // The callback should have the signature function(error, results, extra) // where results is a list of Doc objects. Connection.prototype.createFetchQuery = function(collection, q, options, callback) { - return this._createQuery('qf', collection, q, options, callback); + return this._createQuery(Action.QueryFetch, collection, q, options, callback); }; // Create a subscribe query. Subscribe queries return with the initial data @@ -563,7 +564,7 @@ Connection.prototype.createFetchQuery = function(collection, q, options, callbac // If present, the callback should have the signature function(error, results, extra) // where results is a list of Doc objects. Connection.prototype.createSubscribeQuery = function(collection, q, options, callback) { - return this._createQuery('qs', collection, q, options, callback); + return this._createQuery(Action.QuerySubscribe, collection, q, options, callback); }; Connection.prototype.hasPending = function() { @@ -716,7 +717,7 @@ Connection.prototype._handleLegacyInit = function(message) { }; Connection.prototype._initializeHandshake = function() { - this.send({a: 'hs', id: this.id}); + this.send({a: Action.Handshake, id: this.id}); }; Connection.prototype._handleHandshake = function(error, message) { diff --git a/lib/shared/message-actions.js b/lib/shared/message-actions.js new file mode 100644 index 00000000..33beec7e --- /dev/null +++ b/lib/shared/message-actions.js @@ -0,0 +1,21 @@ +exports.Action = { + InitLegacy: 'init', + Handshake: 'hs', + QueryFetch: 'qf', + QuerySubscribe: 'qs', + QueryUnsubscribe: 'qu', + QueryReply: 'q', + BulkFetch: 'bf', + BulkSubscribe: 'bs', + BulkUnsubscribe: 'bu', + Fetch: 'f', + Subscribe: 's', + Unsubscribe: 'u', + Op: 'op', + SnapshotFetch: 'nf', + SnapshotFetchByTimestamp: 'nt', + Presence: 'p', + PresenceSubscribe: 'ps', + PresenceUnsubscribe: 'pu', + PresenceRequest: 'pr' +};