Permalink
Browse files

More specs (all passing), lots of work to get multiple channels sendi…

…ng in one poll request
  • Loading branch information...
1 parent 8021f34 commit ca700995b6c712c0b0c5cbb80879ece45c60570c @winton committed Jan 18, 2011
View
17 lib/puggernaut/client.rb
@@ -22,15 +22,16 @@ def close
end
def push(messages)
- messages.each do |channel, message|
- message =
- if message.is_a?(::Array)
- message.collect { |m| "#{channel}|#{m}" }.join("\n")
- else
- "#{channel}|#{message}"
- end
+ messages = messages.collect do |channel, message|
+ if message.is_a?(::Array)
+ message.collect { |m| "#{channel}|#{m}" }.join("\n")
+ else
+ "#{channel}|#{message}"
+ end
+ end
+ unless messages.empty?
@servers.each do |(host, port)|
- send host, port, message
+ send host, port, messages.join("\n")
end
end
end
View
16 lib/puggernaut/server.rb
@@ -7,29 +7,26 @@ class Server
include Logger
- class <<self
- attr_accessor :channels
- end
-
def initialize(http_port=8000, tcp_port=http_port+1)
- puts "Puggernaut is starting on #{http_port} (HTTP) and #{tcp_port} (TCP)"
- puts "*snort*"
+ puts "\nPuggernaut is starting on #{http_port} (HTTP) and #{tcp_port} (TCP)"
+ puts "*snort*\n\n"
errors = 0
while errors <= 10
begin
+ Channel.channels = []
GC.start
- self.class.channels = {}
EM.epoll if EM.epoll?
EM.run do
logger.info "Server#initialize - Starting HTTP - #{http_port}"
EM.start_server '0.0.0.0', http_port, Http
logger.info "Server#initialize - Starting TCP - #{tcp_port}"
EM.start_server '0.0.0.0', tcp_port, Tcp
+
+ errors = 0
end
- errors = 0
rescue Interrupt
logger.info "Server#initialize - Shutting down"
exit
@@ -39,6 +36,9 @@ def initialize(http_port=8000, tcp_port=http_port+1)
logger.error "\t" + $!.backtrace.join("\n\t")
end
end
+
+ puts "Exiting because of too many consecutive errors :("
+ puts "Check #{Dir.pwd}/log/puggernaut.log\n\n"
end
end
end
View
83 lib/puggernaut/server/channel.rb
@@ -4,35 +4,66 @@ class Channel < EM::Channel
include Logger
- attr_reader :messages, :channel
-
- def initialize(channel)
- @messages = []
- @channel = channel
+ attr_reader :channels
+
+ def initialize(channels)
+ @channels = channels
super()
end
- def all_messages_after(identifier)
- found = false
- (
- @messages.select { |(id, message)|
- found = true if id == identifier
- found
- }[1..-1] || []
-
- ).collect { |message|
- "#{@channel}|#{message.join '|'}"
- }
- end
-
- def say(messages)
- push messages.split("\n").collect { |message|
- message = [ rand.to_s[2..-1], message ]
- @messages << message
- @messages.shift if @messages.length > 100
- logger.info "Server::Channel#say - #{@channel} - #{message[0]} - #{message[1]}"
- "#{@channel}|#{message.join '|'}"
- }.join("\n")
+ class <<self
+
+ attr_accessor :channels
+
+ def create(channels)
+ channel = self.new(channels)
+ @channels ||= []
+ @channels << channel
+ channel
+ end
+
+ def all_messages_after(channel, identifier)
+ if @messages && @messages[channel]
+ found = false
+ (
+ @messages[channel].select { |(id, message)|
+ found = true if id == identifier
+ found
+ }[1..-1] || []
+
+ ).collect { |message|
+ "#{channel}|#{message.join '|'}"
+ }
+ else
+ []
+ end
+ end
+
+ def say(messages)
+ @messages ||= {}
+ messages = messages.inject({}) do |hash, (channel_name, messages)|
+ messages = messages.collect do |message|
+ [ rand.to_s[2..-1], message ]
+ end
+ @messages[channel_name] ||= []
+ @messages[channel_name] += messages
+ if @messages[channel_name].length > 100
+ @messages[channel_name] = @messages[channel_name][-100..-1]
+ end
+ hash[channel_name] = messages
+ hash
+ end
+ @channels.each do |channel|
+ push = channel.channels.collect do |channel_name|
+ if messages[channel_name]
+ messages[channel_name].collect { |message|
+ "#{channel_name}|#{message.join('|')}"
+ }.join("\n")
+ end
+ end
+ channel.push push.compact.join("\n")
+ end
+ end
end
end
end
View
36 lib/puggernaut/server/http.rb
@@ -21,25 +21,23 @@ def receive_data(data)
end
if path == '/'
- if query && !query['channel'].empty?
- @channels = query['channel'].collect do |channel|
- Puggernaut::Server.channels[channel] ||= Channel.new(channel)
- end
- if query['last'] && !query['last'].empty?
- last = query['last'].dup
- last = @channels.inject([]) { |array, channel|
- array += channel.all_messages_after(last.shift)
+ channels = query['channel'].dup rescue []
+ lasts = query['last'].dup rescue []
+
+ unless channels.empty?
+ @channel = Channel.create(channels)
+ unless lasts.empty?
+ lasts = channels.inject([]) { |array, channel|
+ array += Channel.all_messages_after(channel, lasts.shift)
array
}.join("\n")
end
- if last && !last.empty?
- respond last
+ unless lasts.empty?
+ respond lasts
else
EM::Timer.new(30) { respond }
- @subscription_ids = @channels.collect do |channel|
- logger.info "Server::Http#receive_data - Subscribed - #{channel.channel}"
- channel.subscribe { |str| respond str }
- end
+ logger.info "Server::Channel#create - Subscribed - #{@channel.channels.join(", ")}"
+ @subscription_id = @channel.subscribe { |str| respond str }
end
else
respond "no channel specified", 500
@@ -64,12 +62,10 @@ def respond(body='', status=200, content_type='text/plain; charset=utf-8')
end
def unbind
- if @subscription_ids
- @subscription_ids.each do |id|
- channel = @channels.shift
- channel.unsubscribe(id)
- logger.info "Sever::Http#unbind - #{channel.channel} - #{id}"
- end
+ if @subscription_id
+ @channel.unsubscribe(@subscription_id)
+ Channel.channels.delete @channel
+ logger.info "Sever::Http#unbind - Unsubscribe - #{@subscription_id}"
end
end
end
View
5 lib/puggernaut/server/tcp.rb
@@ -11,10 +11,7 @@ def receive_data(data)
hash[channel] << message
hash
end
- messages.each do |channel, messages|
- channel = Puggernaut::Server.channels[channel] ||= Channel.new(channel)
- channel.say messages.join("\n")
- end
+ Channel.say messages
send_data "OK\n"
end
end
View
10 lib/puggernaut/spec_server.rb
@@ -53,4 +53,14 @@ class SpecServer < Sinatra::Base
e.message
end
end
+
+ get '/multiple/channels' do
+ begin
+ client = Puggernaut::Client.new("localhost:8001")
+ client.push :single => "single message", :multiple => [ "multiple message 1", "multiple message 2" ]
+ client.close
+ rescue Exception => e
+ e.message
+ end
+ end
end
View
9 public/puggernaut.js
@@ -9,18 +9,23 @@ var Puggernaut = new function() {
this.watch = watch;
var channels = {};
+ var errors = 0;
var events = $('<div/>');
var started = false;
function ajax() {
- if (channelLength() > 0 && !self.disabled) {
+ if (channelLength() > 0 && !self.disabled && errors <= 10) {
started = true;
$.ajax({
cache: false,
data: params(),
dataType: 'text',
- error: ajax,
+ error: function() {
+ errors += 1;
+ ajax();
+ },
success: function(data) {
+ errors = 0;
$.each(data.split("\n"), function(i, line) {
line = line.split('|', 3);
if (typeof channels[line[0]] != 'undefined')
View
47 public/spec.js
@@ -1,6 +1,6 @@
$(function() {
- module("Single", {
+ module("Single message", {
setup: function() {
Puggernaut.watch('single', function(e, message) {
equals(message, 'single message');
@@ -15,7 +15,7 @@ $(function() {
$.get('/single');
});
- module("Multiple", {
+ module("Multiple messages", {
setup: function() {
var executions = 0;
Puggernaut.watch('multiple', function(e, message) {
@@ -34,12 +34,10 @@ $(function() {
$.get('/multiple');
});
- module("Last", {
+ module("Last message", {
setup: function() {
- var ran = false;
Puggernaut.watch('last', function(e, message) {
- if (ran == false) {
- ran = true;
+ if (message != 'last message 2') {
equals(message, 'last message 1');
Puggernaut.disabled = true;
$.get('/last/2', function() {
@@ -55,8 +53,43 @@ $(function() {
}
});
- test("should receive multiple messages", function() {
+ test("should pick up last message", function() {
stop();
$.get('/last/1');
});
+
+ module("Multiple channels");
+
+ test("should receive a message", function() {
+ stop();
+
+ var executions = 0;
+ var total_runs = 0;
+
+ Puggernaut.disabled = true;
+
+ Puggernaut
+ .watch('single', function(e, message) {
+ total_runs += 1;
+ equals(message, 'single message');
+ Puggernaut.unwatch('single');
+ if (total_runs == 3)
+ start();
+ });
+
+ Puggernaut.disabled = false;
+
+ Puggernaut
+ .watch('multiple', function(e, message) {
+ executions += 1;
+ total_runs += 1;
+ equals(message, 'multiple message ' + executions);
+ if (executions == 2)
+ Puggernaut.unwatch('multiple');
+ if (total_runs == 3)
+ start();
+ });
+
+ $.get('/multiple/channels');
+ });
});

0 comments on commit ca70099

Please sign in to comment.