Skip to content

Commit

Permalink
Modularized pubsub code, and added the ability for server to pubsub l…
Browse files Browse the repository at this point in the history
…ocally
  • Loading branch information
ysimonson committed Jul 19, 2010
1 parent b0e7542 commit 5eeacc8
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 246 deletions.
10 changes: 5 additions & 5 deletions apps/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ busServer.addListener('close', function(endpoint, clientId) {
sys.puts('close: ' + clientId);
});

busServer.addListener('receive', function(clientId, json) {
sys.puts('receive: ' + clientId);
busServer.addListener('receive', function(clientId, eventName, payload) {
sys.puts('receive: ' + clientId + ": " + eventName + ", " + payload);
});

busServer.addListener('listen', function(clientId, eventName) {
sys.puts('listen: ' + clientId + ': ' + eventName);
sys.puts('listen: ' + clientId + ": " + eventName);
});

busServer.addListener('unlisten', function(clientId, eventName) {
sys.puts('unlisten: ' + clientId + ': ' + eventName);
busServer.addListener('unlisten', function(clientId, eventName, payload) {
sys.puts('unlisten: ' + clientId + ": " + eventName);
});

httpServer.listen(8080);
6 changes: 2 additions & 4 deletions apps/static/chat/js/chat.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
// bus.client.js is already in the dom.
// url: (string)
// URL (relative or absolute) to the node-bus service.

this.bus = new Bus(host);
this.$messages = $messages;

this.bus.sub("chat/login", this, this.handleLogin);
this.bus.sub("chat/message", this, this.handleMessage);
this.bus.subscribe("chat/login", this, this.handleLogin);
this.bus.subscribe("chat/message", this, this.handleMessage);
}
Chat.prototype = {
// bus: Object
Expand Down Expand Up @@ -50,7 +49,6 @@
// Sends a message to the chat service.
// message: String
// Message to send.

this.bus.pub("chat/message", {username: this.username, message: message});
},

Expand Down
257 changes: 158 additions & 99 deletions apps/static/lib/bus.client.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,148 @@ THE SOFTWARE.*/

this.comet = comet;
})(window);
function TransformerEngine() {
this.transformers = [];

this.process = function(obj) {
var transformers = this.transformers;

for(var i=0, len=transformers.length; i<len; i++) {
var result = transformers[i](obj);
if(!result) return null;
obj = result;
}

return obj;
};

this.register = function(transformer) {
this.transformers.append(transformer);
};
}

function PubSubClient() {
this.subscriptions = {};

this.subscribe = function(eventName /*, scope (optional), callback*/) {
// summary:
// Subscribes to an event when given a callback and an
// optional scope for the callback.
// eventName: String
// Name of the event.
// scope: Object (Optional)
// Calling-scope of the listener callback.
// callback: Function
// Function that is listening for the eventName event.
// return:
// The subscription handle.

var scope = this;
var callback = null;

//Find what kind of arguments we're dealing with
if(arguments.length == 3){
scope = arguments[1];
callback = arguments[2];
} else if (arguments.length == 2){
callback = arguments[1];
} else {
throw new Error("subscribe(..) requires two or three arguments: event name, [callback scope,] callback");
}

//Get the container for the event
var container = this.subscriptions[eventName];
var isFirstSubscription = false;

//Create a container if one doesn't exist
if(!container) {
//This is the first callback associated with the event; notify
//the server
this.subscriptions[eventName] = container = [];
isFirstSubscription = true;
}

//Add the listener to the container and return it to allow for
//future unsubscriptions
var callee = [scope, callback];
container.push(callee);

return {
isFirstSubscription: isFirstSubscription,
handler: {eventName: eventName, callee: callee}
};
};

this.unsubscribe = function(handle) {
//Find the container for the subscription
var results = {
isLastSubscription: false,
removed: false
};

var container = this.subscriptions[handle.eventName];
if(!container) return results;

//What to search for in the container
var callee = handle.callee;

for(var i=0, len=container.length; i<len; i++) {
var item = container[i];

//If the item is found...
if(item === callee) {
//Cut it out
container.splice(i, 1);

if(container.length == 0) {
//Perform a hard-core delete to prevent memory leaks
results.isLastSubscription = true;
delete this.subscriptions[handle.eventName];
}

delete item;
results.removed = true;
return results;
}
}

return results;
};

this.fireEvent = function(eventName, payload) {
var container = this.subscriptions[eventName];
var payload = [payload];

if(container) {
for(var i = 0, len = container.length; i < len; i++){
var sub = container[i];

//Call wrapped in a setTimeout to provide "cooperative
//multitasking" - the callback's execution will be delayed
//if there are other things the browser wants to respond
//to right now
setTimeout(function() {
sub[1].apply(sub[0], payload);
}, 0);
}
}
}
}

//Try to export the public members for node.js. This will fail in a browser
//environment, so wrap it in a try/catch.
try {
exports.TransformerEngine = TransformerEngine;
exports.PubSubClient = PubSubClient;
} catch(e) {}
;(function(){
function Bus(busUrl) {
this.comet = comet(busUrl);
this.pubsub = new PubSubClient();
var self = this;

this.comet.onmessage = function(json) {
self._fireEvent(json.name, json.payload);
self.pubsub.fireEvent(json.name, json.payload);
};

this.comet.onerror = function(name, message) {
Expand All @@ -371,34 +506,13 @@ THE SOFTWARE.*/
};

Bus.prototype = {
// subscriptions: Object
// Map of event names to arrays of callback function/scope
// pairs.
subscriptions: {},
// comet: Object
// Instance of the node-comet connection.
comet: null,

_fireEvent: function(eventName, payload){
// summary:
// Fire off the event to the listeners.
// eventName:
// Name of the event.
// args: Array
// The arguments to send to subscribers.
var container = this.subscriptions[eventName];

if(container) {
for(var i = 0, len = container.length; i < len; i++){
var sub = container[i];

//Call wrapped in a setTimeout to provide "cooperative
//multitasking" - the callback's execution will be delayed
//if there are other things the browser wants to respond
//to right now
setTimeout(function() {
sub[1].apply(sub[0], [payload]);
}, 0);
}
}
},
// pubsub: Object
// Instance of the pubsub client.
pubsub: null,

subscribe: function(eventName /*, scope (optional), callback*/){
// summary:
Expand All @@ -411,40 +525,14 @@ THE SOFTWARE.*/
// callback: Function
// Function that is listening for the eventName event.
// return:
// Whether or not the subscription was made.
var scope = window;
var callback = null;
// The subscription handle.
var results = this.pubsub.subscribe.apply(this.pubsub, arguments);

//Find what kind of arguments we're dealing with
if(arguments.length == 2){
callback = arguments[1];
} else if (arguments.length == 3){
scope = arguments[1];
callback = arguments[2];
} else {
throw new Error("Bus.subscribe(..) requires two or three arguments: event name, [callback scope,] callback");
if(results.isFirstSubscription) {
this.publish('__node-bus__/listen', eventName);
}

//Get the container for the event
var container = this.subscriptions[eventName];

//Create a container if one doesn't exist
if(!container) {
this.subscriptions[eventName] = container = [];

//This is the first callback associated with the event; notify
//the server
this.comet.send({
type: 'listen',
name: eventName
});
}

//Add the listener to the container and return it to allow for
//future unsubscriptions
var callee = [scope, callback];
container.push(callee);
return {eventName: eventName, callee: callee};
return results.handle;
},

unsubscribe: function(handle){
Expand All @@ -457,39 +545,13 @@ THE SOFTWARE.*/
// If the function and scope were not found as a listener
// to the event, false will be returned.

//Find the container for the subscription
var container = this.subscriptions[handle.eventName];
if(!container) return false;
var results = this.pubsub.unsubscribe(handle);

//What to search for in the container
var callee = handle.callee;

for(var i=0, len=container.length; i<len; i++) {
var item = container[i];

//If the item is found...
if(item === callee) {
//Cut it out
container.splice(i, 1);

if(container.length == 0) {
//There are no callbacks listening to this event; notify
//the server
this.comet.send({
type: 'unlisten',
name: handle.eventName
});

//Perform a hard-core delete to prevent memory leaks
delete this.subscriptions[handle.eventName];
}

delete item;
return true;
}
if(results.isLastSubscription) {
this.publish('__node-bus__/unlisten', eventName);
}
return false;

return results.removed;
},

publish: function(eventName, payload){
Expand All @@ -508,13 +570,10 @@ THE SOFTWARE.*/
});
},

// Shortcut function for subscribe.
sub: function(){ return this.subscribe.apply(this, Array.prototype.slice.call(arguments)); },
// Shortcut function for unsubscribe.
unsub: function(){ return this.unsubscribe.apply(this, Array.prototype.slice.call(arguments)); },
// Shortcut function for publish.
pub: function pub(){ return this.publish.apply(this, Array.prototype.slice.call(arguments)); }
sub: function() { return this.subscribe.apply(this, arguments); },
unsub: function() { return this.unsubscribe.apply(this, arguments); },
pub: function() { return this.publish.apply(this, arguments); }
};

window.Bus = Bus;
})();
this.Bus = Bus;
})(window);
2 changes: 1 addition & 1 deletion apps/static/unittests/index.html
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<html>
<head>
<link rel="stylesheet" href="/lib/qunit/qunit.css" media="all" />
<script type="text/javascript" src="/lib/jquery/jquery-1.4.2.js"></script>
<script type="text/javascript" src="/lib/qunit/qunit.js"></script>
<script type="text/javascript" src="/lib/bus.client.js"></script>
<script type="text/javascript">

</script>
</head>
<body>
Expand Down
1 change: 1 addition & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<string value="/*${copyright}*/" />
<path>
<pathelement location="./lib/node-comet/src/comet.client.js"/>
<pathelement location="./src/util.js"/>
<pathelement location="./src/bus.client.js"/>
</path>
</concat>
Expand Down
Loading

0 comments on commit 5eeacc8

Please sign in to comment.