Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #199 from ryedin/master

use promises in socket.io + channels
  • Loading branch information...
commit 17902c80ed0129669e77660fa4f9b9e0b01339c0 2 parents a1248de + 1198802
@ryedin ryedin authored
View
96 lib/channels.js
@@ -2,7 +2,8 @@ var _ = require("underscore")._,
Registry = require("./registry"),
uuid = require("node-uuid"), //TODO migrate usage to uuid-pool later
inherits = require("inherits"),
- cache = require("./simple-cache");
+ cache = require("./simple-cache"),
+ Promise = require('./promise').Promise;
/**
* @namespace Channels
@@ -54,6 +55,16 @@ var errorTypes = {
}
};
+//helper method to resolve promise instance
+function resolvePromiseInstance(promise) {
+ var _promise = promise && promise.isPromise ? promise : new Promise();
+ if (_promise !== promise && typeof promise == 'function') {
+ _promise.then(promise);
+ }
+ promise = _promise;
+ return promise;
+}
+
/**
* @class Channels allow events to be published to one or more clients who have subscribed.
* Clients are added to a channel on successful subscription.
@@ -97,22 +108,24 @@ Channel.prototype = {
* @param {Object} client
* @param {Object} messageData
*/
- handleMessage: function(client, messageData, cb) {
+ handleMessage: function(client, messageData, promise) {
+ promise = resolvePromiseInstance(promise);
+
var message = messageData.message,
data = messageData.data;
//special case 'subscribe' since all other actions require subscription first
if (message === "subscribe") {
- this.add(client, data, cb);
+ this.add(client, data, promise);
} else {
if (this.itemCache[client.id]) {
switch (message) {
case "unsubscribe":
this.remove(client);
- cb && cb();
+ promise.resolve();
break;
default:
- this.send(client, message, data, messageData.toClients, cb);
+ this.send(client, message, data, messageData.toClients, promise);
break;
}
} else {
@@ -120,7 +133,7 @@ Channel.prototype = {
client,
errorTypes.NOT_SUBSCRIBED
);
- cb && cb();
+ promise.resolve();
}
}
},
@@ -130,7 +143,9 @@ Channel.prototype = {
* @param {Object} client
* @param {Object} data
*/
- add: function(client, data, cb) {
+ add: function(client, data, promise) {
+ promise = resolvePromiseInstance(promise);
+
if (client.id === this.channelClient.id) {
return Channel.super.prototype.add.call(this, client);
}
@@ -139,8 +154,10 @@ Channel.prototype = {
var me = this,
hookArgs = {
client: client,
- data: data
+ data: data,
+ promise: promise
};
+
me.doHook("subscribe", hookArgs, function(err, hookData) {
if (err && err !== nohook) {
me.error(
@@ -148,6 +165,7 @@ Channel.prototype = {
{type: "Hook Error", message: "Subscribe error: " + err},
{hook: "subscribe", error: err}
);
+ promise.resolve();
} else {
data = err === nohook ? data : hookData;
//create a channel specific uuid to avoid exposing underlying socket.io ids to other clients
@@ -159,6 +177,23 @@ Channel.prototype = {
me.crossIndexedClients[_uuid] = client; //enable fast lookups by channel-specific uuid
me.clientsBySessionId[client.session.id] = client;
+ client.on("disconnect", function() {
+ me.remove(client);
+ });
+
+ //tell the client it has successfully subscribed (and let it know its own channel specific id)
+ promise.then(function() {
+ client.json.send({
+ type: "channel",
+ channelId: me.id,
+ message: "subscribe",
+ data: {
+ clientId: _uuid,
+ data: data
+ }
+ });
+ });
+
if (me.options.announceConnections) {
me.doHook("connect", hookArgs, function(err, hookData) {
if (err && err !== nohook) {
@@ -167,6 +202,8 @@ Channel.prototype = {
{type: "Hook Error", message: "Connect error: " + err},
{hook: "connect", error: err}
);
+
+ promise.resolve();
} else {
data = err === nohook ? data : hookData;
var eventData = {
@@ -189,26 +226,12 @@ Channel.prototype = {
//fire connection message event for any local code that cares
me.fire('connection', eventData);
- cb && cb();
+ promise.resolve();
}
});
+ } else {
+ promise.resolve()
}
-
- client.on("disconnect", function() {
- me.remove(client);
- console.log('client was disconnected -------------------------- clientId: ' + client.id);
- });
-
- //lastly, tell the client it has successfully subscribed (and let it know its own channel specific id)
- client.json.send({
- type: "channel",
- channelId: me.id,
- message: "subscribe",
- data: {
- clientId: _uuid,
- data: data
- }
- });
}
}
});
@@ -275,8 +298,11 @@ Channel.prototype = {
* @param {Object} data
* @param {Object} toClients
*/
- send: function(client, message, data, toClients, cb) {
+ send: function(client, message, data, toClients, promise) {
+ promise = resolvePromiseInstance(promise);
+
var me = this;
+
//swallow reserved messages (only the server can initiate those)
if (reservedMessages.indexOf(message.toLowerCase()) > -1) {
me.error(
@@ -284,12 +310,14 @@ Channel.prototype = {
errorTypes.UNSUPPORTED_MESSAGE,
{message: message, data: data}
);
+ promise.resolve();
} else if (toClients && toClients.length && !me.options.allowDirectMessaging) {
me.error(
client,
errorTypes.DIRECT_MESSAGE_NOT_ALLOWED,
{message: message, data: data, toClients: toClients}
);
+ promise.resolve();
} else {
//apply message restrictions if they exist
if (!me.options.messages || me.options.messages.indexOf(message) > -1) {
@@ -297,8 +325,10 @@ Channel.prototype = {
client: client,
message: message,
data: data,
- toClients: toClients
+ toClients: toClients,
+ promise: promise
};
+
me.doHook("message", hookArgs, function(err, hookData, hookToClients) {
if (err && err !== nohook) {
me.error(
@@ -306,6 +336,7 @@ Channel.prototype = {
{type: "Hook Error", message: "Message error: " + err},
{hook: "message", error: err}
);
+ promise.resolve();
} else {
data = err === nohook
? data
@@ -340,7 +371,7 @@ Channel.prototype = {
//fire event
me.fire('message', eventData, (hookToClients || toClients));
- cb && cb();
+ promise.resolve();
}
});
} else {
@@ -349,6 +380,7 @@ Channel.prototype = {
errorTypes.UNSUPPORTED_MESSAGE,
{message: message, data: data}
);
+ promise.resolve();
}
}
},
@@ -359,12 +391,12 @@ Channel.prototype = {
: _client;
},
- sendMessage: function(message, data, toClients) {
- this.send(this.channelClient, message, data, toClients);
+ sendMessage: function(message, data, toClients, promise) {
+ this.send(this.channelClient, message, data, toClients, promise);
},
- sendBySessionId: function(sessionId, message, data) {
- this.sendMessage(message, data, [sessionId]);
+ sendBySessionId: function(sessionId, message, data, promise) {
+ this.sendMessage(message, data, [sessionId], promise);
},
/**
View
9 lib/feather.js
@@ -23,7 +23,8 @@ var simpleId = require("./simple-id"),
connectRouter = require("./router_connect"),
fs = require("fs"),
nodePath = require('path'),
- Parser = require('./parser');
+ Parser = require('./parser'),
+ Promise = require('./promise');
/**
* @namespace serves as the root namespace for the entire framework
@@ -379,7 +380,11 @@ var feather = module.exports = /** @lends feather */ {
/**
* Framework access to {@link Semaphore}
*/
- Semaphore: Semaphore
+ Semaphore: Semaphore,
+ /**
+ * Promise class from Mozilla's GCLI project
+ */
+ Promise: Promise
},
/**
View
1  lib/middleware.js
@@ -90,7 +90,6 @@ exports.getMiddleware = function(options, cb) {
//a redirect handler in case the router changes the url
function(req, res, next) {
if (req.url !== req.originalUrl) {
- debugger;
//do the redirect
res.statusCode = 303;
res.setHeader("Location", req.url);
View
277 lib/promise.js
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2009-2011 Mozilla Foundation and contributors
+ * Licensed under the New BSD license. See LICENSE.txt or:
+ * http://opensource.org/licenses/BSD-3-Clause
+ */
+
+ /*
+ * NOTE: this file was copied from the promise implementation in Mozilla's GCLI project
+ * The original license is in-tact above.
+ */
+
+
+/**
+ * Create an unfulfilled promise
+ * @constructor
+ */
+function Promise() {
+ this._status = Promise.PENDING;
+ this._value = undefined;
+ this._onSuccessHandlers = [];
+ this._onErrorHandlers = [];
+
+ // Debugging help
+ this._id = Promise._nextId++;
+ Promise._outstanding[this._id] = this;
+}
+
+/**
+ * We give promises and ID so we can track which are outstanding
+ */
+Promise._nextId = 0;
+
+/**
+ * Outstanding promises. Handy list for debugging only
+ */
+Promise._outstanding = [];
+
+/**
+ * Recently resolved promises. Also for debugging only
+ */
+Promise._recent = [];
+
+/**
+ * A promise can be in one of 2 states.
+ * The ERROR and SUCCESS states are terminal, the PENDING state is the only
+ * start state.
+ */
+Promise.ERROR = -1;
+Promise.PENDING = 0;
+Promise.SUCCESS = 1;
+
+/**
+ * Yeay for RTTI
+ */
+Promise.prototype.isPromise = true;
+
+/**
+ * Have we either been resolve()ed or reject()ed?
+ */
+Promise.prototype.isComplete = function() {
+ return this._status != Promise.PENDING;
+};
+
+/**
+ * Have we resolve()ed?
+ */
+Promise.prototype.isResolved = function() {
+ return this._status == Promise.SUCCESS;
+};
+
+/**
+ * Have we reject()ed?
+ */
+Promise.prototype.isRejected = function() {
+ return this._status == Promise.ERROR;
+};
+
+/**
+ * Take the specified action of fulfillment of a promise, and (optionally)
+ * a different action on promise rejection
+ */
+Promise.prototype.then = function(onSuccess, onError) {
+ if (typeof onSuccess === 'function') {
+ if (this._status === Promise.SUCCESS) {
+ onSuccess.call(null, this._value);
+ }
+ else if (this._status === Promise.PENDING) {
+ this._onSuccessHandlers.push(onSuccess);
+ }
+ }
+
+ if (typeof onError === 'function') {
+ if (this._status === Promise.ERROR) {
+ onError.call(null, this._value);
+ }
+ else if (this._status === Promise.PENDING) {
+ this._onErrorHandlers.push(onError);
+ }
+ }
+
+ return this;
+};
+
+/**
+ * Like then() except that rather than returning <tt>this</tt> we return
+ * a promise which resolves when the original promise resolves
+ */
+Promise.prototype.chainPromise = function(onSuccess) {
+ var chain = new Promise();
+ chain._chainedFrom = this;
+ this.then(function(data) {
+ try {
+ chain.resolve(onSuccess(data));
+ }
+ catch (ex) {
+ chain.reject(ex);
+ }
+ }, function(ex) {
+ chain.reject(ex);
+ });
+ return chain;
+};
+
+/**
+ * Supply the fulfillment of a promise
+ */
+Promise.prototype.resolve = function(data) {
+ return this._complete(this._onSuccessHandlers,
+ Promise.SUCCESS, data, 'resolve');
+};
+
+/**
+ * Renege on a promise
+ */
+Promise.prototype.reject = function(data) {
+ return this._complete(this._onErrorHandlers, Promise.ERROR, data, 'reject');
+};
+
+/**
+ * Internal method to be called on resolve() or reject()
+ */
+Promise.prototype._complete = function(list, status, data, name) {
+ // Complain if we've already been completed
+ if (this._status != Promise.PENDING) {
+ Promise._error('Promise complete. Attempted ' + name + '() with ', data);
+ Promise._error('Prev status = ', this._status, ', value = ', this._value);
+ throw new Error('Promise already complete');
+ }
+ else if (list.length == 0 && status == Promise.ERROR) {
+ // Complain if a rejection is ignored
+ // (this is the equivalent of an empty catch-all clause)
+ Promise._error("Promise rejection ignored and silently dropped");
+ Promise._error(data);
+ var frame;
+ if (data.stack) {
+ // This is an exception or an exception-like value
+ Promise._error("Printing original stack");
+ for (frame = data.stack; frame; frame = frame.caller) {
+ Promise._error(frame);
+ }
+ }
+ else if (data.fileName && data.lineNumber) {
+ Promise._error("Error originating at " + data.fileName + ", line "
+ + data.lineNumber);
+ }
+ else if (typeof Components !== "undefined") {
+ try {
+ if (Components.stack) {
+ Promise._error("Original stack not available. Printing current stack");
+ for (frame = Components.stack; frame; frame = frame.caller) {
+ Promise._error(frame);
+ }
+ }
+ }
+ catch (ex) {
+ // Ignore failure to read Components.stack
+ }
+ }
+ }
+
+ Promise._setTimeout(function() {
+ this._status = status;
+ this._value = data;
+
+ // Call all the handlers, and then delete them
+ list.forEach(function(handler) {
+ handler.call(null, this._value);
+ }, this);
+ delete this._onSuccessHandlers;
+ delete this._onErrorHandlers;
+
+ // Remove the given {promise} from the _outstanding list, and add it to the
+ // _recent list, pruning more than 20 recent promises from that list
+ delete Promise._outstanding[this._id];
+ // The web version of this code includes this very useful debugging aid,
+ // however there is concern that it will create a memory leak, so we leave it
+ // out when embedded in Mozilla.
+ //*
+ Promise._recent.push(this);
+ while (Promise._recent.length > 20) {
+ Promise._recent.shift();
+ }
+ //*/
+ }.bind(this), 1);
+
+ return this;
+};
+
+/**
+ * Minimal debugging.
+ */
+Promise.prototype.toString = function() {
+ return "[Promise " + this._id + "]";
+};
+
+/**
+ * Takes an array of promises and returns a promise that that is fulfilled once
+ * all the promises in the array are fulfilled
+ * @param promiseList The array of promises
+ * @return the promise that is fulfilled when all the array is fulfilled
+ */
+Promise.group = function(promiseList) {
+ if (!Array.isArray(promiseList)) {
+ promiseList = Array.prototype.slice.call(arguments);
+ }
+
+ // If the original array has nothing in it, return now to avoid waiting
+ if (promiseList.length === 0) {
+ return new Promise().resolve([]);
+ }
+
+ var groupPromise = new Promise();
+ var results = [];
+ var fulfilled = 0;
+
+ var onSuccessFactory = function(index) {
+ return function(data) {
+ results[index] = data;
+ fulfilled++;
+ // If the group has already failed, silently drop extra results
+ if (groupPromise._status !== Promise.ERROR) {
+ if (fulfilled === promiseList.length) {
+ groupPromise.resolve(results);
+ }
+ }
+ };
+ };
+
+ promiseList.forEach(function(promise, index) {
+ var onSuccess = onSuccessFactory(index);
+ var onError = groupPromise.reject.bind(groupPromise);
+ promise.then(onSuccess, onError);
+ });
+
+ return groupPromise;
+};
+
+/**
+ * Executes a code snippet or a function after specified delay.
+ * @param callback is the function you want to execute after the delay.
+ * @param delay is the number of milliseconds that the function call should
+ * be delayed by. Note that the actual delay may be longer, see Notes below.
+ * @return the ID of the timeout
+ */
+Promise._setTimeout = function(callback, delay) {
+ return setTimeout(callback, delay);
+};
+
+/**
+ * This implementation of promise also runs in a browser.
+ * Promise._error allows us to redirect error messages to the console with
+ * minimal changes.
+ */
+Promise._error = console.warn.bind(console);
+
+
+exports.Promise = Promise;
View
41 lib/socket.js
@@ -6,7 +6,8 @@ var io = require("socket.io"),
cache = require("./simple-cache"),
channels = require("./channels"),
fs = require('fs'),
- Connect = require('connect');
+ Connect = require('connect'),
+ Promise = require('./promise').Promise;
/**
* @namespace The main socket object, which will also be used to route
@@ -25,9 +26,9 @@ socket.addChannel = channels.addChannel;
socket.getChannel = channels.getChannel;
//TODO: migrate all these to a channel implementation
-function doRpc(client, message, cb) {
+function doRpc(client, message, promise) {
Widget.doRpc(message.request, client, message.data, function(err, _result) {
- cb && cb();
+
var result = {
messageId: message.id,
type: "rpc",
@@ -36,10 +37,12 @@ function doRpc(client, message, cb) {
result: _result
};
client.json.send(result);
+
+ promise.resolve();
});
}
-function doEvent(client, message, cb) {
+function doEvent(client, message, promise) {
//secure the global busChannel to this client
//(in other words, this conversation is just between the server and this client)
if (message.data.busName === "bus:feather.sys:" + client.id) {
@@ -55,33 +58,36 @@ function doEvent(client, message, cb) {
}
});
}
+
+ promise.resolve();
}
-function doChannel(client, message, cb) {
+function doChannel(client, message, promise) {
var channel = channels.getChannel(message.data.channelId);
if (channel) {
- channel.handleMessage(client, message.data, cb);
+ channel.handleMessage(client, message.data, promise);
} else {
- //TODO: error condition; log and handle
+ promise.resolve();
}
}
-function handleMessage(client, message, cb) {
+function handleMessage(client, message, promise) {
switch (message.type) {
case "rpc":
- doRpc(client, message, cb);
+ doRpc(client, message, promise);
break;
case "event":
- doEvent(client, message, cb);
+ doEvent(client, message, promise);
break;
case "channel":
- doChannel(client, message, cb);
+ doChannel(client, message, promise);
break;
case "sessionId":
client.json.send({
messageId: message.id,
sessionId: client.id
});
+ promise.resolve();
break;
}
}
@@ -200,12 +206,19 @@ var init = socket.init = function(options, cb, mirror) {
//build "request" object to not break downstream APIs (legacy support)
message.request = {session: client.session, sessionId: client.session.id};
- handleMessage(client, message, function() {
- if (message.request && message.request.session) {
+ //create a promise to pass down the handler layers
+ var promise = new Promise();
+ var resolve = function() {
+ if (client.session) {
// Re-store the session in case they modified it.
httpServer.sessionStore.set(client.session.id, client.session);
}
- });
+ };
+
+ //store session whether the promise is resolved or rejected
+ promise.then(resolve, resolve);
+
+ handleMessage(client, message, promise);
});
});
});
Please sign in to comment.
Something went wrong with that request. Please try again.