Permalink
Browse files

add roster

  • Loading branch information...
maccman committed Sep 12, 2010
1 parent 8b18c2d commit 5a5996a7f111304d73fed2565ed70037c05f0ca2
View
@@ -18,4 +18,6 @@ task :build do
tempfile.close
FileUtils.mv(tempfile.path, APP_PATH)
-end
+end
+
+task :default => :build
View
@@ -0,0 +1,10 @@
+clients = {}
+
+Juggernaut.subscribe do |event, data|
+ case event
+ when :subscribe
+ clients[data.session_id] = data
+ when :unsubscribe
+ clients.delete(data.session_id)
+ end
+end
View
@@ -2,18 +2,36 @@
require "json"
module Juggernaut
- def redis
- @redis ||= Redis.new
- end
-
- def redis=(val)
- @redis = val
+ def redis_options
+ @redis_options ||= {}
end
def publish(channels, data, options = {})
message = ({:channels => Array(channels), :data => data}).merge(options)
- redis.publish(:juggernaut, message.to_json)
+ redis.publish(key, message.to_json)
+ end
+
+ def publish(channels, data, options = {})
+ message = ({:channels => Array(channels), :data => data}).merge(options)
+ redis.publish(key, message.to_json)
+ end
+
+ def subscribe
+ Redis.new(redis_options).subscribe(key(:subscribe), key(:unsubscribe)) do |on|
+ on.message do |type, msg|
+ yield(type.gsub(/^juggernaut:/, "").to_sym, JSON.parse(msg))
+ end
+ end
end
+
+ protected
+ def redis
+ @redis ||= Redis.new(redis_options)
+ end
+
+ def key(*args)
+ args.unshift(:juggernaut).join(":")
+ end
- extend self
+ extend self
end
@@ -1,6 +1,8 @@
var sys = require("sys"),
utils = require("utils");
+var Events = require("./events");
+
var SuperClass = require("superclass");
Channel = module.exports = new SuperClass;
@@ -45,9 +47,11 @@ Channel.include({
subscribe: function(client){
this.clients.push(client);
+ Events.subscribe(this, client);
},
unsubscribe: function(client){
this.clients.delete(client)
+ Events.unsubscribe(this, client);
}
});
View
@@ -8,7 +8,11 @@ Client = module.exports = new SuperClass;
Client.include({
init: function(conn){
this.connection = conn;
- this.sessionID = this.connection.sessionID;
+ this.session_id = this.connection.session_id;
+ },
+
+ setMeta: function(value){
+ this.meta = value;
},
subscribe: function(name){
@@ -28,7 +32,7 @@ Client.include({
write: function(message){
if (message.except) {
except = JUtils.makeArray(message.except)
- if (except.indexOf(this.sessionID) != -1)
+ if (except.indexOf(this.session_id) != -1)
return false;
}
@@ -37,6 +41,6 @@ Client.include({
disconnect: function(){
// Unsubscribe from all channels
- Channel.unsubscribe();
+ Channel.unsubscribe(this);
}
});
@@ -5,9 +5,9 @@ Connection = module.exports = new SuperClass;
Connection.include({
init: function(stream){
- this.stream = stream;
- this.sessionID = this.stream.sessionId;
- this.client = new Client(this);
+ this.stream = stream;
+ this.session_id = this.stream.sessionId;
+ this.client = new Client(this);
this.stream.on("message", this.proxy(this.onmessage));
this.stream.on("disconnect", this.proxy(this.ondisconnect));
@@ -20,10 +20,13 @@ Connection.include({
switch (message.type){
case "subscribe":
- this.client.subscribe(message.getChannels());
+ this.client.subscribe(message.getChannel());
break;
case "unsubscribe":
- this.client.unsubscribe(message.getChannels());
+ this.client.unsubscribe(message.getChannel());
+ break;
+ case "meta":
+ this.client.setMeta(message.data);
break;
default:
throw "Unknown type"
View
@@ -0,0 +1,34 @@
+var redis = require("redis-client");
+
+Events = module.exports = {};
+
+Events.client = redis.createClient();
+
+Events.publish = function(key, value){
+ this.client.publish(
+ "juggernaut:" + key,
+ JSON.stringify(value)
+ );
+};
+
+Events.subscribe = function(channel, client) {
+ this.publish(
+ "subscribe",
+ {
+ channel: channel.name,
+ meta: client.meta,
+ session_id: client.session_id
+ }
+ );
+};
+
+Events.unsubscribe = function(channel, client) {
+ this.publish(
+ "unsubscribe",
+ {
+ channel: channel.name,
+ meta: client.meta,
+ session_id: client.session_id
+ }
+ );
+};
@@ -19,4 +19,8 @@ Message.prototype.toJSON = function(){
Message.prototype.getChannels = function(){
return(JUtils.makeArray(this.channels || this.channel));
-}
+};
+
+Message.prototype.getChannel = function(){
+ return(this.getChannels()[0]);
+};
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -9,12 +9,13 @@ if ("WebSocket" in window)
var Juggernaut = function(host, port, options){
this.host = host || "localhost";
- this.port = 8080;
+ this.port = port || 8080;
this.options = options || {};
this.handlers = {};
this.state = "disconnected";
+ this.meta = this.options.meta;
this.socket = new io.Socket(this.host,
{rememberTransport: false, port: this.port}
@@ -23,6 +24,8 @@ var Juggernaut = function(host, port, options){
this.socket.on("connect", this.proxy(this.onconnect));
this.socket.on("message", this.proxy(this.onmessage));
this.socket.on("disconnect", this.proxy(this.ondisconnect));
+
+ this.on("connect", this.proxy(this.writeMeta));
};
// Helper methods
@@ -91,6 +94,14 @@ Juggernaut.fn.trigger = function(){
callbacks[i].apply(this, args);
};
+Juggernaut.fn.writeMeta = function(){
+ if ( !this.meta ) return;
+ var message = new Juggernaut.Message;
+ message.type = "meta";
+ message.data = this.meta;
+ this.write(message);
+};
+
Juggernaut.fn.onconnect = function(){
this.sessionID = this.socket.transport.sessionid;
this.state = "connected";

0 comments on commit 5a5996a

Please sign in to comment.