Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

WebSocket support

  • Loading branch information...
commit a15db0f3d1887709b5fa026bd012e638f820b105 1 parent e81cb85
@winton authored
View
3  lib/puggernaut.rb
@@ -1,8 +1,9 @@
require File.dirname(__FILE__) + '/puggernaut/gems'
-Puggernaut::Gems.activate %w(eventmachine)
+Puggernaut::Gems.activate %w(eventmachine em-websocket)
require 'eventmachine'
+require 'em-websocket'
$:.unshift File.dirname(__FILE__)
View
56 lib/puggernaut/server.rb
@@ -1,44 +1,42 @@
+require 'puggernaut/server/shared'
require 'puggernaut/server/http'
require 'puggernaut/server/channel'
require 'puggernaut/server/tcp'
+require 'puggernaut/server/websocket'
module Puggernaut
class Server
include Logger
- def initialize(http_port=8100, tcp_port=http_port.to_i+1)
- puts "\nPuggernaut is starting on #{http_port} (HTTP) and #{tcp_port} (TCP)"
+ def initialize(http_port=8100, tcp_port=http_port.to_i+1, ws_port=tcp_port.to_i+1)
+ puts "\nPuggernaut is starting on #{http_port} (Long Poll HTTP), #{tcp_port} (Puggernaut TCP), and #{ws_port} (WebSocket TCP)"
puts "*snort*\n\n"
-
- errors = 0
-
- while errors <= 10
- begin
- Channel.channels = []
- GC.start
- EM.epoll if EM.epoll?
- EM.run do
- logger.info "Server#initialize - Starting HTTP - #{http_port}"
- EM.start_server '0.0.0.0', http_port, Http
-
- logger.info "Server#initialize - Starting TCP - #{tcp_port}"
- EM.start_server '0.0.0.0', tcp_port, Tcp
-
- errors = 0
- end
- rescue Interrupt
- logger.info "Server#initialize - Shutting down"
- exit
- rescue
- errors += 1
- logger.error "Server#initialize - Error - #{$!.message}"
- logger.error "\t" + $!.backtrace.join("\n\t")
+
+ begin
+ Channel.channels = []
+ GC.start
+ EM.epoll if EM.epoll?
+ EM.run do
+ logger.info "Server#initialize - Starting HTTP - #{http_port}"
+ EM.start_server '0.0.0.0', http_port, Http
+
+ logger.info "Server#initialize - Starting TCP - #{tcp_port}"
+ EM.start_server '0.0.0.0', tcp_port, Tcp
+
+ logger.info "Server#initialize - Starting WebSocket - #{ws_port}"
+ Websocket.new '0.0.0.0', ws_port
+
+ errors = 0
end
+ rescue Interrupt
+ logger.info "Server#initialize - Shutting down"
+ exit
+ rescue
+ errors += 1
+ logger.error "Server#initialize - Error - #{$!.message}"
+ logger.error "\t" + $!.backtrace.join("\n\t")
end
-
- puts "Exiting because of too many consecutive errors :("
- puts "Check #{Dir.pwd}/log/puggernaut.log\n\n"
end
end
end
View
31 lib/puggernaut/server/http.rb
@@ -6,6 +6,7 @@ class Server
module Http
include Logger
+ include Shared
def receive_data(data)
lines = data.split(/[\r\n]+/)
@@ -22,33 +23,14 @@ def receive_data(data)
end
if path == '/'
- @join_leave = query['join_leave'].dup[0] rescue nil
-
- channels = query['channel'].dup rescue []
- lasts = query['last'].dup rescue []
- time = query['time'].dup[0] rescue nil
- user_id = query['user_id'].dup[0] rescue nil
+ channels, @join_leave, lasts, time, user_id = query_defaults(query)
unless channels.empty?
@channel = Channel.create(channels, user_id)
if @join_leave && user_id
- Channel.say channels.inject({}) { |hash, channel|
- hash[channel] = "!PUGJOIN#{user_id}"
- hash
- }, user_id
- end
- messages = []
- if time
- messages = channels.inject([]) { |array, channel|
- array += Channel.all_messages_after_time(channel, Time.parse(time))
- array
- }.join("\n")
- elsif !lasts.empty?
- messages = channels.inject([]) { |array, channel|
- array += Channel.all_messages_after_id(channel, lasts.shift)
- array
- }.join("\n")
+ join_channels(channels, user_id)
end
+ messages = gather_messages(channels, lasts, time)
unless messages.empty?
respond messages
else
@@ -89,10 +71,7 @@ def unbind
@channel.unsubscribe(@subscription_id)
logger.info "Sever::Http#unbind - Unsubscribe - #{@channel.channels.join(", ")}"
if @join_leave && @channel.user_id
- Channel.say @channel.channels.inject({}) { |hash, channel|
- hash[channel] = "!PUGLEAVE#{@channel.user_id}"
- hash
- }, @channel.user_id
+ leave_channel(channel)
end
Channel.channels.delete @channel
end
View
47 lib/puggernaut/server/shared.rb
@@ -0,0 +1,47 @@
+module Puggernaut
+ class Server
+ module Shared
+
+ def gather_messages(channels, lasts, time)
+ if time
+ channels.inject([]) { |array, channel|
+ array += Channel.all_messages_after_time(channel, Time.parse(time))
+ array
+ }.join("\n")
+ elsif !lasts.empty?
+ channels.inject([]) { |array, channel|
+ array += Channel.all_messages_after_id(channel, lasts.shift)
+ array
+ }.join("\n")
+ else
+ ''
+ end
+ end
+
+ def join_channels(channels, user_id)
+ Channel.say channels.inject({}) { |hash, channel|
+ hash[channel] = "!PUGJOIN#{user_id}"
+ hash
+ }, user_id
+ end
+
+ def leave_channel(channel)
+ Channel.say channel.channels.inject({}) { |hash, c|
+ hash[c] = "!PUGLEAVE#{channel.user_id}"
+ hash
+ }, channel.user_id
+ end
+
+ def query_defaults(query)
+ logger.info query.inspect
+ [
+ (query['channel'].dup rescue []),
+ (query['join_leave'].dup[0] rescue nil),
+ (query['last'].dup rescue []),
+ (query['time'].dup[0] rescue nil),
+ (query['user_id'].dup[0] rescue nil)
+ ]
+ end
+ end
+ end
+end
View
47 lib/puggernaut/server/websocket.rb
@@ -0,0 +1,47 @@
+module Puggernaut
+ class Server
+ class Websocket
+
+ include Logger
+ include Shared
+
+ def initialize(host, port)
+ EventMachine::WebSocket.start(:host => host, :port => port) do |ws|
+ ws.onopen do
+ logger.info "Server::Websocket#initialize - Open"
+ channel = nil
+ join_leave = nil
+ joined = nil
+ subscription_id = nil
+ ws.onmessage do |msg|
+ logger.info "Server::Websocket#initialize - Message - #{msg}"
+ channels, join_leave, lasts, time, user_id = query_defaults(CGI.parse(msg))
+ channel ||= Channel.create(channels, user_id)
+ if join_leave && user_id && !joined
+ joined = true
+ join_channels(channels, user_id)
+ end
+ messages = gather_messages(channels, lasts, time)
+ unless messages.empty?
+ ws.send messages
+ else
+ logger.info "Server::Websocket#initialize - Subscribed - #{channel.channels.join(", ")}"
+ subscription_id = channel.subscribe { |str| ws.send str }
+ end
+ end
+ ws.onclose do
+ if subscription_id
+ channel.unsubscribe(subscription_id)
+ logger.info "Sever::Websocket#initialize - Unsubscribe - #{channel.channels.join(", ")}"
+ if join_leave && channel.user_id
+ leave_channel(channel)
+ end
+ Channel.channels.delete channel
+ end
+ end
+ end
+ end
+ end
+ end
+ end
+end
View
85 public/puggernaut.js
@@ -17,6 +17,7 @@ var Puggernaut = new function() {
var leaves = {};
var started = false;
var request;
+ var request_id;
function ajax(join_leave, time, user_id) {
if (channelLength() > 0 && !self.disabled && errors <= 10) {
@@ -35,24 +36,7 @@ var Puggernaut = new function() {
if (started) {
errors = 0;
$.each(data.split("\n"), function(i, line) {
- line = line.split('|', 4);
- if (line[0] && typeof channels[line[0]] != 'undefined') {
- channels[line[0]] = line[1];
- if (line[2].substring(0, 8) == '!PUGJOIN') {
- var id = line[2].substring(8)
- if (leaves[id]) {
- delete leaves[id];
- clearTimeout(leaves[id]);
- } else
- events.trigger('join_' + line[0], id);
- } else if (line[2].substring(0, 9) == '!PUGLEAVE') {
- var id = line[2].substring(9)
- leaves[id] = setTimeout(function() {
- events.trigger('leave_' + line[0], id);
- }, 10000);
- } else
- events.trigger(line[0], [ line[2], new Date(line[3]) ]);
- }
+ processMessage(line);
});
ajax(join_leave, null, user_id);
}
@@ -112,10 +96,35 @@ var Puggernaut = new function() {
return data;
}
+ function processMessage(line) {
+ line = line.split('|', 4);
+ if (line[0] && typeof channels[line[0]] != 'undefined') {
+ var id;
+ channels[line[0]] = line[1];
+ if (line[2].substring(0, 8) == '!PUGJOIN') {
+ id = line[2].substring(8);
+ if (leaves[id]) {
+ delete leaves[id];
+ clearTimeout(leaves[id]);
+ } else
+ events.trigger('join_' + line[0], id);
+ } else if (line[2].substring(0, 9) == '!PUGLEAVE') {
+ id = line[2].substring(9);
+ leaves[id] = setTimeout(function() {
+ events.trigger('leave_' + line[0], id);
+ }, 10000);
+ } else
+ events.trigger(line[0], [ line[2], new Date(line[3]) ]);
+ }
+ }
+
function unwatch() {
var args = $.makeArray(arguments);
started = false;
- request.abort();
+ if (request.abort)
+ request.abort();
+ if (request.close)
+ request.close();
if (args.length) {
$.each(args, function(i, item) {
delete channels[item];
@@ -147,8 +156,12 @@ var Puggernaut = new function() {
events.bind(item, fn);
});
- if (!started)
- ajax(join_leave, time, user_id);
+ if (!started) {
+ if (window.WebSocket)
+ websocket(join_leave, time, user_id);
+ else
+ ajax(join_leave, time, user_id);
+ }
}
return this;
@@ -171,4 +184,34 @@ var Puggernaut = new function() {
});
return this;
}
+
+ function websocket(join_leave, time, user_id) {
+ if (channelLength() > 0 && !self.disabled && errors <= 10) {
+ started = true;
+ request = new WebSocket("ws://localhost:8102/");
+ request.onopen = function() {
+ errors = 0;
+ if (started)
+ request.send($.param(params(join_leave, time, user_id), true));
+ };
+ request.onmessage = function(evt) {
+ errors = 0;
+ if (started) {
+ $.each(evt.data.split("\n"), function(i, line) {
+ processMessage(line);
+ });
+ request.send($.param(params(join_leave, null, user_id), true));
+ }
+ };
+ request.onerror = function() {
+ errors += 1;
+ if (started)
+ websocket(join_leave, null, user_id);
+ };
+ request.onclose = function() {
+ if (started)
+ websocket(join_leave, null, user_id);
+ };
+ }
+ }
};
View
39 public/spec.js
@@ -58,19 +58,23 @@ $(function() {
module("Single message inhabitants", {
setup: function() {
- Puggernaut.watch('single', { user_id: 'test' }, function(e, message, time) {});
+ Puggernaut.watch('single', { user_id: 'test' }, function(e, message, time) {
+ setTimeout(function() {
+ Puggernaut.inhabitants('single', function(users) {
+ equals(users[0], 'test');
+ equals(users.length, 1);
+ Puggernaut.unwatch('single');
+ start();
+ });
+ }, 1000);
+ });
}
});
test("should receive a message", function() {
stop();
expect(2);
- Puggernaut.inhabitants('single', function(users) {
- equals(users[0], 'test');
- equals(users.length, 1);
- Puggernaut.unwatch('single');
- start();
- });
+ $.get('/single');
});
module("Single message join/leave/join", {
@@ -120,19 +124,14 @@ $(function() {
module("Last message", {
setup: function() {
Puggernaut.watch('last', function(e, message, time) {
- if (message != 'last message 2') {
- equals(message, 'last message 1');
+ if (message == 'last message 1') {
ok(time.constructor === Date);
- Puggernaut.disabled = true;
- $.get('/last/2', function() {
- Puggernaut.disabled = false;
- Puggernaut.watch('last', function(e, message, time) {
- equals(message, 'last message 2');
- ok(time.constructor === Date);
- Puggernaut.unwatch('last');
- start();
- });
- });
+ $.get('/last/2');
+ }
+ if (message == 'last message 2') {
+ ok(time.constructor === Date);
+ Puggernaut.unwatch('last');
+ start();
}
});
}
@@ -140,7 +139,7 @@ $(function() {
test("should pick up last message", function() {
stop();
- expect(4);
+ expect(2);
$.get('/last/1');
});
Please sign in to comment.
Something went wrong with that request. Please try again.