Permalink
Browse files

Moving pubsub to tunguska

  • Loading branch information...
1 parent 07dd4ed commit c33f56b79b971fbc94ce003f05c82ded3624f0f4 @kriszyp committed May 21, 2010
Showing with 36 additions and 221 deletions.
  1. +36 −0 lib/pubsub.js
  2. +0 −71 lib/pubsub/connector.js
  3. +0 −150 lib/pubsub/hub.js
View
36 lib/pubsub.js
@@ -0,0 +1,36 @@
+/**
+* A pubsub hub for distributing messages. Intended to be run as a worker
+**/
+var subscribers = {};
+
+onmessage = function(event){
+ var request = JSON.parse(event.data);
+ switch(request.method.toUpperCase()){
+ case "POST":
+ var topic = request.channel;
+ notify(topic);
+ topic = topic.replace(/\/[^\/]$/,'');
+ notify(topic + "/*");
+ while(topic){
+ topic = topic.replace(/\/[^\/]$/,'');
+ notify(topic + "/**");
+ }
+ function notify(subscribersForTopic){
+ var subscribersForTopic = subscribers[topic];
+ for(var i =0; i< subscribersForTopic.length;i++){
+ subscribersForTopic[i].postMessage(request.body);
+ }
+ }
+ break;
+ case "SUBSCRIBE":
+ var topic = request.topic;
+ (subscribers[topic] = subscribers[topic] || []).push(event.ports[0]);
+ case "UNSUBSCRIBE":
+ var topic = request.topic;
+ var subscribersForTopic = subscribers[topic];
+ if(subscribersForTopic){
+ subscribersForTopic.splice(subscribersForTopic.indexOf(event.ports[0]), 1);
+ }
+ }
+
+}
View
71 lib/pubsub/connector.js
@@ -1,71 +0,0 @@
-/**
-* A connector to other pubsub hubs
-**/
-var hub = require("./hub");
-exports.Connector = function(obj){
- onSubscription.id = obj.clientId;
- hub.subscribe("**", "monitored", onSubscription);
- return function(message){
- message.clientId = onSubscription.id;
- hub.publish(message);
- }
- function onSubscription(message){
- if(message.monitored){
- obj.subscribe(message.channel, message.forEvent);
- }else{
- obj.unsubscribe(message.channel);
- }
- }
-};
-
-function GenericConnector(clientId, connection){
- var connector = exports.Connector({
- clientId: clientId,
- subscribe: function(channel, event, listener){
- connection.send({
- method: "subscribe",
- pathInfo: channel,
- subscribe: event || "*"
- });
- },
- });
- connection.onmessage = function(message){
- connector(message);
- }
-}
-exports.WorkerConnector = function(workerName){
- worker = new SharedWorker(workerName);
- worker.onmessage = function(event){
- connection.onmessage(event.data);
- }
- var connection = {
- send: function(data){
- worker.postMessage(data);
- }
- };
- GenericConnector("local-workers", connection);
-}
-exports.HttpConnector = function(url){
-
-}
-onmessage = function(event){
- var request = JSON.parse(event.data);
- var source = event.ports[0];
- var sourceName = source.name;
- var topic = request.pathInfo;
- switch(request.method.toLowerCase()){
- case "post":
- publish(sourceName, topic, request.body);
- break;
-
- case "subscribe":
- subscribe(sourceName, topic, function(){
- source.postMessage();
- });
- break;
-
- case "unsubscribe":
- unsubscribe(sourceName, topic);
- }
-
-}
View
150 lib/pubsub/hub.js
@@ -1,150 +0,0 @@
-/**
-* A pubsub hub for distributing messages. Supports delegation to other workers and servers
-**/
-var hub = [];
-try{
- var defer = require("promise").defer,
- enqueue = require("event-loop").enqueue;
-}catch(e){
- if(!defer){
- defer = function(){return {resolve:function(){}}};
- }
- enqueue = function(func){func()};
-}
-exports.publish= function(channel, message){
- if(arguments.length === 1){
- channel = channel.channel;
- message = channel;
- }
-/* channel = normalizeChannel(channel);
- var hubs = [hub];
- for(var i = 0; i < channel.length && hubs.length > 0; i++){
- var nextHubs = [];
- for(var j = 0; < hubs.length; j++){
- var thisHub = hubs[j];
- if(thisHub.channels){
- var nextHub = thisHub.channels[channel[i]];
- if(nextHub){
- nextHubs.push(nextHub);
- }
- var nextHub = thisHub.channels["*"];
- if(nextHub){
- nextHubs.push(nextHub);
- }
- }
- }
- hubs = nextHubs;
- }*/
- var responses = [];
-/* for(var j = 0; < hubs.length; j++){
- var thisHub = hubs[j];
- for(var i =0; i< thisHub.length;i++){
- var subscriber = thisHub[i];
- if(!clientId || clientId != subscriber.id){
- responses.push(subscriber(message));
- }
- }
- }*/
- var eventName = message.event;
- var clientId = message.clientId;
- notifyAll(channel);
- channel = channel.replace(/\/[^\/]$/,'');
- notifyAll(channel + "/*");
- while(channel){
- channel = channel.replace(/\/[^\/]$/,'');
- notifyAll(channel + "/**");
- }
- function notifyAll(subscribers){
- var subscribers = hub[channel];
- if(subscribers){
- if(eventName){
- subscribers = subscribers[eventName];
- if(!subscribers){
- return;
- }
- }
- subscribers.forEach(function(subscriber){
- if(!clientId || clientId != subscriber.id){
- var deferred = defer();
- responses.push(deferred.promise);
- enqueue(function(){
- deferred.resolve(subscriber(message));
- });
- }
- });
- }
- }
- return responses;
-};
-
-exports.subscribe= function(channel, /*String?*/eventName, listener){
- channel = normalizeChannel(channel);
- var newChannel = false;
- if(typeof eventName === "function"){
- listener = eventName;
- eventName = null;
- }
- var subscribers = hub[channel];
- if(!subscribers){
- subscribers = hub[channel] = [];
- newChannel = true;
- }
- /*for(var i = 0; i < channel.length && hubs.length > 0; i++){
- var channels = subscribers.channels;
- if(!channels){
- subscribers.channels = {};
- }
- subscribers = channels
- }*/
-
- if(eventName && eventName != "*"){
- subscribers = subscribers[eventName];
- if(!subscribers){
- subscribers = subscribers[eventName] = [];
- newChannel = true;
- }
- }
- if(newChannel){
- var responses = exports.publish(channel, {channel:channel, clientId: listener.id, event: "monitored", monitored: true, forEvent: eventName});
- }else{
- var responses = [];
- }
- subscribers.push(listener);
-/* // At some point we may publish these to the target channel with a special subscription message
- var responses = all(exports.routes[source].map(function(destination){
- return destination.subscribe(channel, function(channel, message){
- publish(destination.name, channel, message);
- });
- }));*/
- responses.unsubscribe = function(){
- publish(source, channel, {}, "unsubscribe");
- subscribers.splice(subscribers.indexOf(listener), 1);
- };
- return responses;
-};
-
-exports.unsubscribe= function(channel, listener){
- exports.publish({pathInfo:channel}, "unsubscribe");
- subscribers.splice(subscribers.indexOf(listener), 1);
-};
-
-exports.getChildHub = function(channel){
- return {
- publish: addPath(exports.publish),
- subscribe: addPath(exports.subscribe),
- unsubscribe: addPath(exports.unsubscribe),
- getChildHub: addPath(exports.getChildHub)
- };
- function addPath(func){
- return function(subChannel){
- arguments[0] = channel + '/' + subChannel;
- return func.apply(this, arguments);
- }
- }
-}
- function normalizeChannel(channel){
- return typeof channel == "string" ? channel.split("/") : channel;
- }
-
-
-

0 comments on commit c33f56b

Please sign in to comment.