Permalink
Browse files

Adding ability to retrieve messages after a certain time, specify and…

… retrieve user ids of those currently inhabiting a room
  • Loading branch information...
1 parent 357c441 commit 6c812118daa11e43373df18264586eae89f69d12 @winton committed Feb 25, 2011
Showing with 206 additions and 62 deletions.
  1. +1 −1 README.md
  2. +7 −3 lib/puggernaut/client.rb
  3. +34 −11 lib/puggernaut/server/channel.rb
  4. +22 −7 lib/puggernaut/server/http.rb
  5. +4 −4 lib/puggernaut/spec_server.rb
  6. +66 −30 public/puggernaut.js
  7. +72 −6 public/spec.js
View
@@ -79,7 +79,7 @@ Include [jQuery](http://jquery.com) and [puggernaut.js](https://github.com/winto
Javascript client example:
<pre>
-Puggernaut.path = '/long_poll';
+Puggernaut.path = '/long_poll'; // (default)
Puggernaut
.watch('channel', function(e, message) {
View
@@ -1,5 +1,6 @@
require "#{File.dirname(__FILE__)}/logger"
require 'socket'
+require 'timeout'
module Puggernaut
class Client
@@ -43,9 +44,12 @@ def send(host, port, data, try_retry=true)
begin
host_port = "#{host}:#{port}"
logger.info "Client#send - #{host_port} - #{data}"
- connection = @connections[host_port] ||= TCPSocket.open(host, port)
- connection.print(data)
- response = connection.gets
+ response = nil
+ Timeout.timeout(10) do
+ connection = @connections[host_port] ||= TCPSocket.open(host, port)
+ connection.print(data)
+ response = connection.gets
+ end
raise 'not ok' if !response || !response.include?('OK')
rescue Exception => e
logger.info "Client#send - Exception - #{e.message} - #{host_port} - #{data}"
@@ -2,31 +2,32 @@ module Puggernaut
class Server
class Channel < EM::Channel
- include Logger
+ attr_reader :channels, :user_id
- attr_reader :channels
-
- def initialize(channels)
+ def initialize(channels, user_id)
@channels = channels
+ @user_id = user_id
super()
end
class <<self
+
+ include Logger
attr_accessor :channels
- def create(channels)
- channel = self.new(channels)
+ def create(channels, user_id)
+ channel = self.new(channels, user_id)
@channels ||= []
@channels << channel
channel
end
- def all_messages_after(channel, identifier)
+ def all_messages_after_id(channel, identifier)
if @messages && @messages[channel]
found = false
(
- @messages[channel].select { |(id, message)|
+ @messages[channel].select { |(id, message, time)|
found = true if id == identifier
found
}[1..-1] || []
@@ -38,17 +39,39 @@ def all_messages_after(channel, identifier)
[]
end
end
+
+ def all_messages_after_time(channel, after_time)
+ if @messages && @messages[channel]
+ (
+ @messages[channel].select { |(id, message, time)|
+ after_time < time
+ } || []
+
+ ).collect { |message|
+ "#{channel}|#{message.join '|'}"
+ }
+ else
+ []
+ end
+ end
+
+ def inhabitants(channel_name)
+ user_ids = @channels.collect do |channel|
+ channel.user_id if channel.channels.include?(channel_name)
+ end
+ user_ids.compact
+ end
def say(messages)
@messages ||= {}
messages = messages.inject({}) do |hash, (channel_name, messages)|
messages = messages.collect do |message|
- [ rand.to_s[2..-1], message ]
+ [ rand.to_s[2..-1], message, Time.now ]
end
@messages[channel_name] ||= []
@messages[channel_name] += messages
- if @messages[channel_name].length > 100
- @messages[channel_name] = @messages[channel_name][-100..-1]
+ @messages[channel_name] = @messages[channel_name].select do |message|
+ message[2] >= Time.now - 2 * 60 * 60
end
hash[channel_name] = messages
hash
@@ -1,4 +1,5 @@
require 'cgi'
+require 'time'
module Puggernaut
class Server
@@ -23,25 +24,39 @@ def receive_data(data)
if path == '/'
channels = query['channel'].dup rescue []
lasts = query['last'].dup rescue []
+ time = query['time'].dup[0] rescue nil
+ user_id = query['user_id'].dup[0] rescue nil
unless channels.empty?
- @channel = Channel.create(channels)
- unless lasts.empty?
- lasts = channels.inject([]) { |array, channel|
- array += Channel.all_messages_after(channel, lasts.shift)
+ @channel = Channel.create(channels, user_id)
+ messages = []
+ if time
+ messages = channels.inject([]) { |array, channel|
+ array += Channel.all_messages_after_time(channel, Time.parse(time))
+ array
+ }.join("\n")
+ elsif !lasts.empty?
+ messages = channels.inject([]) { |array, channel|
+ array += Channel.all_messages_after_id(channel, lasts.shift)
array
}.join("\n")
end
- unless lasts.empty?
- respond lasts
+ unless messages.empty?
+ respond messages
else
EM::Timer.new(30) { respond }
- logger.info "Server::Channel#create - Subscribed - #{@channel.channels.join(", ")}"
+ logger.info "Server::Http#receive_data - Subscribed - #{@channel.channels.join(", ")}"
@subscription_id = @channel.subscribe { |str| respond str }
end
else
respond "no channel specified", 500
end
+ elsif path == '//inhabitants'
+ channels = query['channel'].dup rescue []
+ user_ids = channels.collect do |c|
+ Channel.inhabitants(c)
+ end
+ respond user_ids.flatten.uniq.join('|')
else
respond "not found", 404
end
@@ -26,7 +26,7 @@ class SpecServer < Sinatra::Base
get '/single' do
begin
- client = Puggernaut::Client.new("localhost:8001")
+ client = Puggernaut::Client.new("localhost:8101")
client.push :single => "single message"
client.close
rescue Exception => e
@@ -36,7 +36,7 @@ class SpecServer < Sinatra::Base
get '/multiple' do
begin
- client = Puggernaut::Client.new("localhost:8001")
+ client = Puggernaut::Client.new("localhost:8101")
client.push :multiple => [ "multiple message 1", "multiple message 2" ]
client.close
rescue Exception => e
@@ -46,7 +46,7 @@ class SpecServer < Sinatra::Base
get '/last/:count' do
begin
- client = Puggernaut::Client.new("localhost:8001")
+ client = Puggernaut::Client.new("localhost:8101")
client.push :last => "last message #{params[:count]}"
client.close
rescue Exception => e
@@ -56,7 +56,7 @@ class SpecServer < Sinatra::Base
get '/multiple/channels' do
begin
- client = Puggernaut::Client.new("localhost:8001")
+ client = Puggernaut::Client.new("localhost:8101")
client.push :single => "single message", :multiple => [ "multiple message 1", "multiple message 2" ]
client.close
rescue Exception => e
View
@@ -5,34 +5,41 @@ var Puggernaut = new function() {
this.disabled = false;
this.path = '/long_poll';
+ this.inhabitants = inhabitants;
this.unwatch = unwatch;
this.watch = watch;
var channels = {};
var errors = 0;
var events = $('<div/>');
var started = false;
-
- function ajax() {
+ var request;
+
+ function ajax(time, user_id) {
if (channelLength() > 0 && !self.disabled && errors <= 10) {
started = true;
- $.ajax({
+ request = $.ajax({
cache: false,
- data: params(),
+ data: params(time, user_id),
dataType: 'text',
- error: function() {
- errors += 1;
- ajax();
+ error: function(xhr, status, error) {
+ if (started && status != 'abort') {
+ errors += 1;
+ ajax();
+ }
},
- success: function(data) {
- errors = 0;
- $.each(data.split("\n"), function(i, line) {
- line = line.split('|', 3);
- if (typeof channels[line[0]] != 'undefined')
- channels[line[0]] = line[1];
- events.trigger(line[0], line[2]);
- });
- ajax();
+ success: function(data, status, xhr) {
+ if (started) {
+ errors = 0;
+ $.each(data.split("\n"), function(i, line) {
+ line = line.split('|', 4);
+ if (line[0] && typeof channels[line[0]] != 'undefined') {
+ channels[line[0]] = line[1];
+ events.trigger(line[0], [ line[2], new Date(line[3]) ]);
+ }
+ });
+ ajax();
+ }
},
timeout: 100000,
traditional: true,
@@ -49,31 +56,53 @@ var Puggernaut = new function() {
});
return length;
}
+
+ function inhabitants() {
+ var args = $.makeArray(arguments);
+ var fn = args.pop();
+ $.ajax({
+ cache: false,
+ data: { channel: args },
+ dataType: 'text',
+ success: function(data, status, xhr) {
+ fn(data.split('|'));
+ },
+ traditional: true,
+ url: self.path + '/inhabitants'
+ });
+ }
- function params() {
+ function params(time, user_id) {
var ch = [];
var la = [];
+
$.each(channels, function(channel, last) {
ch.push(channel);
- la.push(last);
+ if (last)
+ la.push(last);
});
- return { channel: ch, last: la };
+
+ var data = { channel: ch };
+
+ if (la.length)
+ data.last = la;
+ if (time)
+ data.time = time + '';
+ if (user_id)
+ data.user_id = user_id;
+
+ return data;
}
function unwatch() {
var args = $.makeArray(arguments);
+ started = false;
+ request.abort();
if (args.length) {
- if (args[args.length-1].constructor == String)
- $.each(args, function(i, item) {
- delete channels[item];
- });
- args = $.map(args, function(item) {
- if (item.constructor == String)
- return 'watch.' + item;
- else
- return item;
+ $.each(args, function(i, item) {
+ delete channels[item];
+ events.unbind(item);
});
- events.unbind.apply(events, args);
} else
events.unbind();
return this;
@@ -82,6 +111,13 @@ var Puggernaut = new function() {
function watch() {
var ch = $.makeArray(arguments);
var fn = ch.pop();
+ var user_id, time;
+
+ if (ch[ch.length-1] && ch[ch.length-1].constructor === Object) {
+ var options = ch.pop();
+ time = options.time;
+ user_id = options.user_id;
+ }
if (ch.length && fn) {
$.each(ch, function(i, item) {
@@ -90,7 +126,7 @@ var Puggernaut = new function() {
});
if (!started)
- ajax();
+ ajax(time, user_id);
}
return this;
Oops, something went wrong.

0 comments on commit 6c81211

Please sign in to comment.