Skip to content

Commit

Permalink
Added transformers feature, and consequently cleaned up some of the s…
Browse files Browse the repository at this point in the history
…erver-side code
  • Loading branch information
ysimonson committed Oct 26, 2010
1 parent a836814 commit 305a696
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 71 deletions.
12 changes: 10 additions & 2 deletions apps/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@ httpServer.addListener('request', function (req, res) {
}
});

busServer.addListener('receive', function(clientId, eventName, payload) {
sys.puts('receive: ' + eventName + ", " + payload);
busServer.addListener('receive', function(clientId, obj) {
sys.puts('receive: ' + obj);
});

busServer.addListener('receiveInvalidMessage', function(clientId, obj) {
sys.puts('receive invalid message: ' + obj);
});

busServer.addListener('receiveInvalidJSON', function(obj) {
sys.puts('receive invalid JSON: ' + obj);
});

busServer.addListener('listen', function(clientId, eventName) {
Expand Down
1 change: 1 addition & 0 deletions src/client/bus.client.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

this.socket.addEvent('message', function(data) {
var json = JSON.parse(data);
json = self.transformers.process(json);
self.pubsub.fireEvent(json.name, json.payload);
});
};
Expand Down
144 changes: 80 additions & 64 deletions src/server/bus.server.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,40 @@ var sys = require('sys'),

var RESERVED_EVENT_PREFIX = '__node-bus__';

// summary:
// Initialize the Bus system.
// description:
// Sets up listeners for events on the httpServer object when they
// match the given pattern.
// httpServer: http.Server
// Http Server object.
// pattern: RegExp
// URL pattern to match.
// returns:
// Nothing.
function BusServer(httpServer) {
// summary:
// Initialize the Bus system.
// description:
// Sets up listeners for events on the httpServer object when they
// match the given pattern.
// httpServer: http.Server
// Http Server object.
// pattern: RegExp
// URL pattern to match.
// returns:
// Nothing.

events.EventEmitter.call(this);
var self = this;

//handles executing callbacks when an event occurs
this.pubsub = new PubSubClient();
self.pubsub = new PubSubClient();

//mapping of client id => {event name => handle}
this.clientSubscriptions = {};
self.clientSubscriptions = {};

//the socket.io server
this.socketServer = io.listen(httpServer);
self.socketServer = io.listen(httpServer);

//performs transformations on events
//this.transformerEngine = new transformers.TransformerEngine();
self.transformers = new TransformerEngine();

//Called when a message is received
this._handleMessage = function(client, eventName, payload) {
self.emit('receive', client, eventName, payload);
self._receive = function(client, obj) {
//Transform the object
obj = self.transformers.process(obj);
if(!obj) return;

var eventName = obj.name, payload = obj.payload;

//if this is a special event...
if(eventName.substr(0, RESERVED_EVENT_PREFIX.length) == RESERVED_EVENT_PREFIX) {
Expand All @@ -48,18 +51,13 @@ function BusServer(httpServer) {
}
}

//broadcast the message to everyone
//TODO: optimize this so that it is only broadcast to listeners
/*self.socketServer.broadcast(JSON.stringify({
name: eventName,
payload: payload
}));*/

self.pubsub.fireEvent(eventName, payload);
};

this._handleListen = function(client, eventName) {
//Called when a client wants to listen to a new event
self._handleListen = function(client, eventName) {
if(!client) return;

//unlisten the client if it's somehow listening already
self._handleUnlisten(client, eventName);

Expand All @@ -77,72 +75,90 @@ function BusServer(httpServer) {
container[eventName] = subscription.handle;
};

this._handleUnlisten = function(client, eventName) {
//Called when a client wants to unlisten to an event
self._handleUnlisten = function(client, eventName) {
if(!client) return;

var container = self.clientSubscriptions[client.sessionId];
if(!container) return;

var handle = container[eventName];
if(!handle) return;

if(handle) {
self.pubsub.unsubscribe(handle);
delete container[eventName];
}
};

this.publish = function(eventName, payload) {
self._handleMessage(null, eventName, payload);
};

this.subscribe = function() {
var results = self.pubsub.subscribe.apply(self.pubsub, arguments);
return results.handle;
self.pubsub.unsubscribe(handle);
delete container[eventName];
};

this.unsubscribe = function(handle) {
var results = self.pubsub.unsubscribe(handle);
return results.removed;
};

/*this.addTransformer = function(transformer) {
this.transformerEngine.register(transformer);
};
this.removeTransformer = function(transformer) {
this.transformerEngine.unregister(transformer);
};*/

this.addListener('listen', this._handleListen);
this.addListener('unlisten', this._handleUnlisten);

this.socketServer.addListener('connection', function(client) {
//Called when a user connects
self._handleConnection = function(client) {
//Called when a message is received
client.addListener('message', function(message) {
try {
var json = JSON.parse(message);
} catch(e) {
self.emit('receiveInvalidMessage', client, message);
return;
}

var eventName = json.name, eventPayload = json.payload;
if(typeof(eventName) != 'string' || !eventPayload) return;

self._handleMessage(client, eventName, eventPayload);

self.emit('receive', client, json);
self._receive(client, json);
});


//Called when a user disconnects
client.addListener('disconnect', function() {
var container = self.clientSubscriptions[client.sessionId];
if(!container) return;

var pubsub = self.pubsub;

for(var eventName in container) {
var handle = container[eventName];
pubsub.unsubscribe(handle);
}

delete self.clientSubscriptions[client.sessionId];
});
};

self.publish = function(eventName, payload) {
self._receive(null, {
name: eventName,
payload: payload
});
};

self.subscribe = function() {
var results = self.pubsub.subscribe.apply(self.pubsub, arguments);
return results.handle;
};

self.unsubscribe = function(handle) {
var results = self.pubsub.unsubscribe(handle);
return results.removed;
};

self.addTransformer = function(transformer) {
self.transformers.register(transformer);
};

self.removeTransformer = function(transformer) {
self.transformers.unregister(transformer);
};

self.addListener('listen', self._handleListen);
self.addListener('unlisten', self._handleUnlisten);
self.socketServer.addListener('connection', self._handleConnection);

//Transformer to ensure that a message is valid
self.transformers.register(function(obj) {
var name = obj.name, payload = obj.payload;

if(typeof(name) != 'string' || !payload) {
self.emit('receiveInvalidJSON', obj);
return null;
}

return obj;
});
}

Expand Down
9 changes: 4 additions & 5 deletions src/shared/util.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
function TransformerEngine() {
this.transformers = {};
this.transformers = [];

this.process = function(obj) {
// summary:
// Processes an object, running it against each of the
// transformers.
// obj: Object
// The object to transform.
// The event object.
// return:
// The transformed object.

var transformers = this.transformers;

for(var i=0, len=transformers.length; i<len; i++) {
if(obj == null) break;
var result = transformers[i](obj);
obj = result;
obj = transformers[i](obj);
}

return obj;
Expand All @@ -26,7 +25,7 @@ function TransformerEngine() {
// Registers a new transformer.
// transformer: Function
// The transformer to register.
this.transformers.append(transformer);
this.transformers.push(transformer);
};

this.unregister = function(transformer) {
Expand Down

0 comments on commit 305a696

Please sign in to comment.