Browse files

moving trunk over

git-svn-id: http://juggernaut.rubyforge.org/svn/trunk/gem@97 7766aa5d-c317-0410-8704-802fc6bb4297
  • Loading branch information...
0 parents commit 31fa85874d32c3773775db07f3056dd6d3720cce maccman committed Feb 2, 2008
Showing with 1,024 additions and 0 deletions.
  1. +5 −0 History.txt
  2. +12 −0 Manifest.txt
  3. +48 −0 README.txt
  4. +19 −0 Rakefile
  5. +4 −0 bin/juggernaut
  6. +148 −0 lib/juggernaut.rb
  7. +156 −0 lib/juggernaut/client.rb
  8. +19 −0 lib/juggernaut/message.rb
  9. +27 −0 lib/juggernaut/miscel.rb
  10. +208 −0 lib/juggernaut/runner.rb
  11. +367 −0 lib/juggernaut/server.rb
  12. +11 −0 lib/juggernaut/utils.rb
5 History.txt
@@ -0,0 +1,5 @@
+== 1.0.0 / 2008-01-26
+
+* 1 major enhancement
+ * Birthday!
+
12 Manifest.txt
@@ -0,0 +1,12 @@
+History.txt
+Manifest.txt
+README.txt
+Rakefile
+bin/juggernaut
+lib/juggernaut.rb
+lib/juggernaut/client.rb
+lib/juggernaut/message.rb
+lib/juggernaut/miscel.rb
+lib/juggernaut/runner.rb
+lib/juggernaut/server.rb
+lib/juggernaut/utils.rb
48 README.txt
@@ -0,0 +1,48 @@
+Juggernaut
+ by Alex MacCaw
+ http://www.eribium.org
+
+== DESCRIPTION:
+
+FIX (describe your package)
+
+== FEATURES/PROBLEMS:
+
+* FIX (list of features or problems)
+
+== SYNOPSIS:
+
+ FIX (code sample of usage)
+
+== REQUIREMENTS:
+
+* FIX (list of requirements)
+
+== INSTALL:
+
+* FIX (sudo gem install, anything else)
+
+== LICENSE:
+
+(The MIT License)
+
+Copyright (c) 2008 Alex MacCaw
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+'Software'), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
19 Rakefile
@@ -0,0 +1,19 @@
+# -*- ruby -*-
+
+require 'rubygems'
+require 'hoe'
+require './lib/juggernaut.rb'
+
+Hoe.new('juggernaut', Juggernaut::VERSION) do |p|
+ p.rubyforge_name = 'juggernaut'
+ p.author = 'Alex MacCaw'
+ p.email = 'info@eribium.org'
+ # p.summary = 'FIX'
+ # p.description = p.paragraphs_of('README.txt', 2..5).join("\n\n")
+ p.url = 'http://juggernaut.rubyforge.org'
+ p.changes = p.paragraphs_of('History.txt', 0..1).join("\n\n")
+ p.extra_deps << ['eventmachine', '>=0.10.0']
+ p.extra_deps << ['json', '>=1.1.2']
+end
+
+# vim: syntax=Ruby
4 bin/juggernaut
@@ -0,0 +1,4 @@
+#!/usr/bin/env ruby
+
+require File.join(File.dirname(__FILE__), '..', 'lib', 'juggernaut')
+Juggernaut::Runner.run
148 lib/juggernaut.rb
@@ -0,0 +1,148 @@
+require 'logger'
+require 'eventmachine'
+require 'json'
+$:.unshift(File.dirname(__FILE__))
+
+module Juggernaut
+ VERSION = '0.2'
+
+ class JuggernautError < StandardError #:nodoc:
+ end
+
+ @@options = {}
+
+ DEFAULT_CONFIG_FILE = <<-EOF
+ # ======================
+ # Juggernaut Options
+ # ======================
+
+ # === Subscription authentication ===
+ # Leave all subscription options uncommented to allow anyone to subscribe.
+
+ # If specified, subscription_url is called everytime a client subscribes.
+ # Parameters passed are: session_id, client_id and an array of channels.
+ #
+ # The server should check that the session_id matches up to the client_id
+ # and that the client is allowed to access the specified channels.
+ #
+ # If a status code other than 200 is encountered, the subscription_request fails
+ # and the client is disconnected.
+ #
+ # :subscription_url: http://localhost:3000/sessions/juggernaut_subscription
+
+ # === Broadcast and query authentication ===
+ # Leave all broadcast/query options uncommented to allow anyone to broadcast/query.
+ #
+ # Broadcast authentication in a production environment is very importantant since broadcasters
+ # can execute JavaScript on subscribed clients, leaving you vulnerable to cross site scripting
+ # attacks if broadcasters aren't authenticated.
+
+ # 1) Via IP address
+ #
+ # If specified, if a client has an ip that is specified in allowed_ips, than it is automatically
+ # authenticated, even if a secret_key isn't provided.
+ #
+ # This is the recommended method for broadcast authentication.
+ #
+ :allowed_ips:
+ - 127.0.0.1
+ # - 192.168.0.1
+
+ # 2) Via HTTP request
+ #
+ # If specified, if a client attempts a broadcast/query, without a secret_key or using an IP
+ # no included in allowed_ips, then broadcast_query_login_url will be called.
+ # Parameters passed, if given, are: session_id, client_id, channels and type.
+ #
+ # The server should check that the session_id matches up to the client id, and the client
+ # is allowed to perform that particular type of broadcast/query.
+ #
+ # If a status code other than 200 is encountered, the broadcast_query_login_url fails
+ # and the client is disconnected.
+ #
+ # :broadcast_query_login_url: http://localhost:3000/sessions/juggernaut_broadcast
+
+ # 3) Via shared secret key
+ #
+ # This secret key must be sent with any query/broadcast commands.
+ # It must be the same as the one in the Rails config file.
+ #
+ # You shouldn't authenticate broadcasts from subscribed clients using this method
+ # since the secret_key will be easily visible in the page (and not so secret any more)!
+ #
+ # :secret_key: your_secret_key_here
+
+ # == Subscription Logout ==
+
+ # If specified, logout_connection_url is called everytime a specific connection from a subscribed client disconnects.
+ # Parameters passed are session_id, client_id and an array of channels specific to that connection.
+ #
+ # :logout_connection_url: http://localhost:3000/sessions/juggernaut_connection_logout
+
+ # Logout url is called when all connections from a subscribed client are closed.
+ # Parameters passed are session_id and client_id.
+ #
+ # :logout_url: http://localhost:3000/sessions/juggernaut_logout
+
+ # === Miscellaneous ===
+
+ # timeout defaults to 10. A timeout is the time between when a client closes a connection
+ # and a logout_request or logout_connection_request is made. The reason for this is that a client
+ # may only temporarily be disconnected, and may attempt a reconnect very soon.
+ #
+ # :timeout: 10
+
+ # store_messages defaults to false. If this option is true, messages send to connections will be stored.
+ # This is useful since a client can then receive broadcasted message that it has missed (perhaps it was disconnected).
+ #
+ # :store_messages: false
+
+ # === Server ===
+
+ # Host defaults to "0.0.0.0". You shouldn't need to change this.
+ # :host: 0.0.0.0
+
+ # Port is mandatory
+ :port: 5001
+
+ EOF
+
+ class << self
+ def options
+ @@options
+ end
+
+ def options=(val)
+ @@options = val
+ end
+
+ def logger
+ return @@logger if defined?(@@loggger)
+ FileUtils.mkdir_p(File.dirname(log_path))
+ @@logger = Logger.new(log_path)
+ @@logger.level = Logger::INFO if options[:debug] == false
+ @@logger
+ rescue
+ @@logger = Logger.new(STDOUT)
+ end
+
+ def log_path
+ options[:log_path] || File.join(%w( / var run juggernaut.log ))
+ end
+
+ def pid_path
+ options[:pid_path] || File.join(%w( / var run ), "juggernaut.#{options[:port]}.pid" )
+ end
+
+ def config_path
+ options[:config_path] || File.join(%w( / var run juggernaut.yml ))
+ end
+ end
+end
+
+require 'juggernaut/utils'
+require 'juggernaut/miscel'
+require 'juggernaut/message'
+require 'juggernaut/client'
+require 'juggernaut/server'
+require 'juggernaut/runner'
156 lib/juggernaut/client.rb
@@ -0,0 +1,156 @@
+require 'uri'
+module Juggernaut
+ class Client
+ include Juggernaut::Miscel
+
+ attr_reader :id
+ attr_accessor :session_id
+ attr_reader :connections
+ @@clients = []
+
+ class << self
+ # Actually does a find_or_create_by_id
+ def find_or_create(subscriber, request)
+ if client = find_by_id(request[:client_id])
+ client.session_id = request[:session_id]
+ client.add_new_connection(subscriber)
+ client
+ else
+ self.new(subscriber, request)
+ end
+ end
+
+ def add_client(client)
+ @@clients << client unless @@clients.include?(client)
+ end
+
+ # Client find methods
+
+ def find_all
+ @@clients
+ end
+
+ def find(&block)
+ @@clients.select(&block).uniq
+ end
+
+ def find_by_id(id)
+ find {|client| client.id == id }.first
+ end
+
+ def find_by_signature(signature)
+ # signature should be unique
+ find {|client|
+ client.connections.select {|connection| connection.signature == signature }.any?
+ }.first
+ end
+
+ def find_by_channels(channels)
+ find {|client|
+ client.has_channels?(channels)
+ }
+ end
+
+ def find_by_id_and_channels(id, channels)
+ find {|client|
+ client.has_channels?(channels) && client.id == id
+ }.first
+ end
+
+ def send_logouts_after_timeout
+ @@clients.each do |client|
+ client.logout_request if !client.alive? and client.give_up? and !client.sent_logout?
+ end
+ end
+
+ # Called when the server is shutting down
+ def send_logouts_to_all_clients
+ @@clients.each do |client|
+ client.logout_request if !client.sent_logout?
+ end
+ end
+ end
+
+ def initialize(subscriber, request)
+ @connections = []
+ @id = request[:client_id]
+ @session_id = request[:session_id]
+ add_new_connection(subscriber)
+ end
+
+ def to_s
+ {
+ :id => @id.to_s,
+ :session_id => @session_id
+ }.to_json
+ end
+
+ def add_new_connection(subscriber)
+ @connections << subscriber
+ end
+
+ def subscription_request(channels)
+ return true unless options[:subscription_url]
+ post_request(options[:subscription_url], channels)
+ end
+
+ def logout_connection_request(channels)
+ return true unless options[:logout_connection_url]
+ post_request(options[:logout_connection_url], channels)
+ end
+
+ def logout_request
+ return true unless options[:logout_url]
+ @sent_logout = true
+ post_request(options[:logout_url])
+ end
+
+ def sent_logout?
+ !!@sent_logout
+ end
+
+ def send_message(msg, channels = nil)
+ @connections.each do |em|
+ em.broadcast(msg) if !channels or channels.empty? or em.has_channels?(channels)
+ end
+ end
+
+ def has_channels?(channels)
+ @connections.each do |em|
+ return true if em.has_channels?(channels)
+ end
+ false
+ end
+
+ def remove_channels!(channels)
+ @connections.each do |em|
+ em.remove_channels!(channels)
+ end
+ end
+
+ def alive?
+ @connections.select{|em| em.alive? }.any?
+ end
+
+ def give_up?
+ @connections.select {|em| em.logout_timeout and Time.now > em.logout_timeout }.any?
+ end
+
+ private
+
+ def post_request(url, channels = [])
+ url = URI.parse(url)
+ params = []
+ params << "client_id=#{id}" if id
+ params << "session_id=#{session_id}" if session_id
+ channels.each {|chan| params << "channels[]=#{chan}" }
+ url.query = params.join('&')
+ begin
+ open(url.to_s, "User-Agent" => "Ruby/#{RUBY_VERSION}")
+ rescue
+ return false
+ end
+ true
+ end
+ end
+ end
19 lib/juggernaut/message.rb
@@ -0,0 +1,19 @@
+module Juggernaut
+ class Message
+ attr_accessor :id
+ attr_accessor :signature
+ attr_accessor :body
+ attr_reader :created_at
+
+ def initialize(id, body, signature)
+ @id = id
+ @body = body
+ @signature = signature
+ @created_at = Time.now
+ end
+
+ def to_s
+ { :id => @id.to_s, :body => @body, :signature => @signature }.to_json
+ end
+ end
+end
27 lib/juggernaut/miscel.rb
@@ -0,0 +1,27 @@
+module Juggernaut
+ module Miscel
+ def options
+ Juggernaut.options
+ end
+
+ def options=(ob)
+ Juggernaut.options = ob
+ end
+
+ def log_path
+ Juggernaut.log_path
+ end
+
+ def pid_path
+ Juggernaut.pid_path
+ end
+
+ def config_path
+ Juggernaut.config_path
+ end
+
+ def logger
+ Juggernaut.logger
+ end
+ end
+end
208 lib/juggernaut/runner.rb
@@ -0,0 +1,208 @@
+require 'optparse'
+require 'yaml'
+require 'erb'
+
+module Juggernaut
+ class Runner
+ include Juggernaut::Miscel
+
+ class << self
+ def run
+ self.new
+ end
+ end
+
+ def initialize
+ self.options = {
+ :host => "0.0.0.0",
+ :port => 5001,
+ :debug => false,
+ :cleanup_timer => 2,
+ :timeout => 10,
+ :store_messages => false
+ }
+
+ self.options.merge!({
+ :pid_path => pid_path,
+ :log_path => log_path,
+ :config_path => config_path
+ })
+
+ parse_options
+
+ if !File.exists?(config_path)
+ puts "You must generate a config file (juggernaut -g)"
+ exit
+ end
+
+ options.merge!(YAML::load(ERB.new(IO.read(config_path)).result))
+
+ if options.include?(:kill)
+ kill_pid(options[:kill] || '*')
+ end
+
+ Process.euid = options[:user] if options[:user]
+ Process.egid = options[:group] if options[:group]
+
+ if !options[:daemonize]
+ start
+ else
+ daemonize
+ end
+ end
+
+ def start
+ puts "Starting Juggernaut server on port: #{options[:port]}..."
+
+ trap("INT") {
+ stop
+ exit
+ }
+ trap("TERM"){
+ stop
+ exit
+ }
+
+ EventMachine::run {
+ EventMachine::add_periodic_timer( options[:cleanup_timer].to_i ) { Juggernaut::Client.send_logouts_after_timeout }
+ EventMachine::start_server(options[:host], options[:port].to_i, Juggernaut::Server)
+ }
+ end
+
+ def stop
+ puts "Stopping Juggernaut server"
+ Juggernaut::Client.send_logouts_to_all_clients
+ EventMachine::stop
+ end
+
+ def parse_options
+ OptionParser.new do |opts|
+ opts.summary_width = 25
+ opts.banner = "Juggernaut (#{VERSION})\n\n",
+ "Usage: juggernaut [-h host] [-p port] [-P file]\n",
+ " [-d] [-k port] [-l file] [-e]\n",
+ " juggernaut --help\n",
+ " juggernaut --version\n"
+
+ opts.separator ""
+ opts.separator ""; opts.separator "Configuration:"
+
+ opts.on("-g", "--generate FILE", String, "Generate config file", "(default: #{options[:config_path]})") do |v|
+ options[:config_path] = File.expand_path(v) if v
+ generate_config_file
+ end
+
+ opts.on("-c", "--config FILE", String, "Path to configuration file.", "(default: #{options[:config_path]})") do |v|
+ options[:config_path] = File.expand_path(v)
+ end
+
+ opts.separator ""; opts.separator "Network:"
+
+ opts.on("-h", "--host HOST", String, "Specify host", "(default: #{options[:host]})") do |v|
+ options[:host] = v
+ end
+
+ opts.on("-p", "--port PORT", Integer, "Specify port", "(default: #{options[:port]})") do |v|
+ options[:port] = v
+ end
+
+ opts.separator ""; opts.separator "Daemonization:"
+
+ opts.on("-P", "--pid FILE", String, "save PID in FILE when using -d option.", "(default: #{options[:pid_path]})") do |v|
+ options[:pid_path] = File.expand_path(v)
+ end
+
+ opts.on("-d", "--daemon", "Daemonize mode") do |v|
+ options[:daemonize] = v
+ end
+
+ opts.on("-k", "--kill PORT", String, "Kill specified running daemons - leave blank to kill all.") do |v|
+ options[:kill] = v
+ end
+
+ opts.separator ""; opts.separator "Logging:"
+
+ opts.on("-l", "--log [FILE]", String, "Path to print debugging information.", "(default: #{options[:log_path]})") do |v|
+ options[:log_path] = File.expand_path(v)
+ end
+
+ opts.on("-e", "--debug", "Run in debug mode", "(default: #{options[:debug]})") do |v|
+ options[:debug] = v
+ end
+
+ opts.separator ""; opts.separator "Permissions:"
+
+ opts.on("-u", "--user USER", String, "User to run as") do |user|
+ options[:user] = user
+ end
+
+ opts.on("-G", "--group GROUP", String, "Group to run as") do |group|
+ options[:group] = group
+ end
+
+ opts.separator ""; opts.separator "Miscellaneous:"
+
+ opts.on_tail("-?", "--help", "Display this usage information.") do
+ puts "#{opts}\n"
+ exit
+ end
+
+ opts.on_tail("-v", "--version", "Display version") do |v|
+ puts "Juggernaut #{VERSION}"
+ exit
+ end
+ end.parse!
+ options
+ end
+
+ private
+
+ def generate_config_file
+ if File.exists?(config_path)
+ puts "Config file already exists. You must remove it before generating a new one."
+ exit
+ end
+ puts "Generating config file...."
+ File.open(config_path, 'w+') do |file|
+ file.write DEFAULT_CONFIG_FILE.gsub('your_secret_key_here', Digest::SHA1.hexdigest("--#{Time.now.to_s.split(//).sort_by {rand}.join}--"))
+ end
+ puts "Config file generated at #{config_path}"
+ exit
+ end
+
+ def store_pid(pid)
+ FileUtils.mkdir_p(File.dirname(pid_path))
+ File.open(pid_path, 'w'){|f| f.write("#{pid}\n")}
+ end
+
+ def kill_pid(k)
+ Dir[options[:pid_path]||File.join(File.dirname(pid_dir), "juggernaut.#{k}.pid")].each do |f|
+ begin
+ puts f
+ pid = IO.read(f).chomp.to_i
+ FileUtils.rm f
+ Process.kill(9, pid)
+ puts "killed PID: #{pid}"
+ rescue => e
+ puts "Failed to kill! #{k}: #{e}"
+ end
+ end
+ exit
+ end
+
+ def daemonize
+ fork do
+ Process.setsid
+ exit if fork
+ store_pid(Process.pid)
+ # Dir.chdir "/" # Mucks up logs
+ File.umask 0000
+ STDIN.reopen "/dev/null"
+ STDOUT.reopen "/dev/null", "a"
+ STDERR.reopen STDOUT
+ start
+ end
+ end
+
+ end
+end
367 lib/juggernaut/server.rb
@@ -0,0 +1,367 @@
+require 'eventmachine'
+require 'socket'
+require 'json'
+require 'open-uri'
+require 'fileutils'
+require 'digest/sha1'
+
+module Juggernaut
+ class Server < EventMachine::Connection
+ include Juggernaut::Miscel
+
+ class InvalidRequest < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class InvalidCommand < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class CorruptJSON < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class MalformedBroadcast < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class MalformedSubscribe < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class UnauthorisedSubscription < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class MalformedQuery < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class UnauthorisedBroadcast < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ class UnauthorisedQuery < Juggernaut::JuggernautError #:nodoc:
+ end
+
+ POLICY_FILE = <<-EOF
+ <cross-domain-policy>
+ <allow-access-from domain="*" to-ports="PORT" />
+ </cross-domain-policy>
+ EOF
+
+ POLICY_REQUEST = "<policy-file-request/>"
+
+ CR = "\0"
+
+ attr_reader :current_msg_id
+ attr_reader :messages
+ attr_reader :connected
+ attr_reader :logout_timeout
+ attr_reader :status
+ attr_reader :channels
+
+ # EM methods
+
+ def post_init
+ logger.debug "New client [#{client_ip}]"
+ @channels = []
+ @messages = []
+ @current_msg_id = 0
+ @connected = true
+ @logout_timeout = nil
+ @buffer = ''
+ end
+
+ # Juggernaut packets are terminated with "\0"
+ # so we need to buffer the data until we find the
+ # terminating "\0"
+ def receive_data(data)
+ logger.debug "Receiving data: #{data}"
+ @buffer << data
+ @buffer = process_whole_messages(@buffer)
+ end
+
+ # process any whole messages in the buffer,
+ # and return the new contents of the buffer
+ def process_whole_messages(data)
+ return data if data !~ /\0/ # only process if data contains a \0 char
+ messages = data.split("\0")
+ if data =~ /\0$/
+ data = ''
+ else
+ # remove the last message from the list (because it is incomplete) before processing
+ data = messages.pop
+ end
+ messages.each {|message| process_message(message.strip)}
+ return data
+ end
+
+ def process_message(ln)
+ logger.debug "Processing message: #{ln}"
+ @request = nil
+
+ if ln == POLICY_REQUEST
+ logger.debug "Sending crossdomain file"
+ send_data POLICY_FILE.gsub('PORT', options[:port].to_s)
+ close_connection_after_writing
+ return
+ end
+
+ begin
+ @request = JSON.parse(ln) unless ln.empty?
+ rescue
+ raise CorruptJSON, ln
+ end
+
+ raise InvalidRequest, ln if !@request
+
+ @request.symbolize_keys!
+
+ @request[:channels] = (@request[:channels] || []).compact.select {|c| !!c && c != '' }.uniq
+
+ if @request[:client_ids]
+ @request[:client_ids] = @request[:client_ids].to_a.compact.select {|c| !!c && c != '' }.uniq
+ end
+
+ case @request[:command].to_sym
+ when :broadcast: broadcast_command
+ when :subscribe: subscribe_command
+ when :query: query_command
+ else
+ raise InvalidCommand, @request[:command]
+ end
+
+ rescue JuggernautError => e
+ logger.error e
+ close_connection
+ # # So as to stop em quitting
+ # rescue => e
+ # logger ? logger.error(e) : puts(e)
+ end
+
+ def unbind
+ @client.logout_connection_request(@channels) if @client # todo - should be called after timeout?
+ logger.debug "Lost client: #{@client.id}" if @client
+ mark_dead('Unbind called')
+ end
+
+ # As far as I'm aware, send_data
+ # never throws an exception
+ def publish(msg)
+ logger.debug "Sending msg: #{msg.to_s}"
+ logger.debug "To client: #{@client.id}" if @client
+ send_data(msg.to_s + CR)
+ end
+
+ # Connection methods
+
+ def broadcast(bdy)
+ msg = Juggernaut::Message.new(@current_msg_id += 1, bdy, self.signature)
+ @messages << msg if options[:store_messages]
+ publish(msg)
+ end
+
+ def mark_dead(reason = "Unknown error")
+ # Once dead, a client never recovers since a reconnection
+ # attempt would hook onto a new em instance. A client
+ # usually dies through an unbind
+ @connected = false
+ @logout_timeout = Time::now + (options[:timeout] || 30)
+ @status = "DEAD: %s: Could potentially logout at %s" %
+ [ reason, @logout_timeout ]
+ end
+
+ def alive?
+ @connected == true
+ end
+
+ def has_channels?(channels)
+ channels.each {|channel|
+ return true if has_channel?(channel)
+ }
+ false
+ end
+
+ def has_channel?(channel)
+ @channels.include?(channel)
+ end
+
+ def add_channel(chan_name)
+ return if !chan_name or chan_name == ''
+ @channels << chan_name unless has_channel?(chan_name)
+ end
+
+ def add_channels(chan_names)
+ chan_names.to_a.each do |chan_name|
+ add_channel(chan_name)
+ end
+ end
+
+ def remove_channel!(chan_name)
+ @channels.delete(chan_name)
+ end
+
+ def remove_channels!(chan_names)
+ chan_names.to_a.each do |chan_name|
+ remove_channel!(chan_name)
+ end
+ end
+
+ def broadcast_all_messages_from(msg_id, signature_id)
+ return unless msg_id or signature_id
+ client = Juggernaut::Client.find_by_signature(signature)
+ return if !client
+ msg_id = Integer(msg_id)
+ return if msg_id >= client.current_msg_id
+ client.messages.select {|msg|
+ (msg_id..client.current_msg_id).include?(msg.id)
+ }.each {|msg| publish(msg) }
+ end
+
+ # todo - how should this be called - if at all?
+ def clean_up_old_messages(how_many_to_keep = 1000)
+ while @messages.length > how_many_to_keep
+ # We need to shift, as we want to remove the oldest first
+ @messages.shift
+ end
+ end
+
+ protected
+
+ # Commands
+
+ def broadcast_command
+ raise MalformedBroadcast, @request unless @request[:type]
+
+ raise UnauthorisedBroadcast, @request unless authenticate_broadcast_or_query
+
+ case @request[:type].to_sym
+ when :to_channels
+ # if channels is a blank array, sends to everybody!
+ broadcast_to_channels(@request[:body], @request[:channels])
+ when :to_clients
+ broadcast_needs :client_ids
+ @request[:client_ids].each do |client_id|
+ # if channels aren't empty, scopes broadcast to clients on those channels
+ broadcast_to_client(@request[:body], client_id, @request[:channels])
+ end
+ else
+ raise MalformedBroadcast, @request
+ end
+ end
+
+ def query_command
+ raise MalformedQuery, @request unless @request[:type]
+
+ raise UnauthorisedQuery, @request unless authenticate_broadcast_or_query
+
+ case @request[:type].to_sym
+ when :remove_channels_from_all_clients
+ query_needs :channels
+ clients = Juggernaut::Client.find_all
+ clients.each {|client| client.remove_channels!(@request[:channels]) }
+ when :remove_channels_from_client
+ query_needs :client_ids, :channels
+ @request[:client_ids].each do |client_id|
+ client = Juggernaut::Client.find_by_id(client_id)
+ client.remove_channels!(@request[:channels]) if client
+ end
+ when :show_users
+ if @request[:client_ids] and @request[:client_ids].any?
+ clients = @request[:client_ids].collect{ |client_id| Client.find_by_id(client_id) }.compact.uniq
+ else
+ clients = Juggernaut::Client.find_all
+ end
+ publish clients.to_json
+ when :show_user
+ query_needs :client_id
+ publish Juggernaut::Client.find_by_id(@request[:client_id]).to_json
+ when :show_users_for_channel
+ query_needs :channels
+ publish Juggernaut::Client.find_by_channels(@request[:channels]).to_json
+ else
+ raise MalformedQuery, @request[:type]
+ end
+ end
+
+ def subscribe_command
+ if channels = @request[:channels]
+ add_channels(channels)
+ end
+
+ @client = Juggernaut::Client.find_or_create(self, @request)
+
+ if !@client.subscription_request(@channels)
+ raise UnauthorisedSubscription, @client
+ end
+
+ Juggernaut::Client.add_client(@client)
+
+ if options[:store_messages]
+ broadcast_all_messages_from(@request[:last_msg_id], @request[:signature])
+ end
+ end
+
+ private
+
+ # Different broadcast types
+
+ def broadcast_to_channels(msg, channels = [])
+ Juggernaut::Client.find_all.each {|client| client.send_message(msg, channels) }
+ end
+
+ def broadcast_to_client(body, client_id, channels)
+ client = Juggernaut::Client.find_by_id(client_id)
+ client.send_message(body, channels) if client
+ end
+
+ # Helper methods
+
+ def broadcast_needs(*args)
+ args.each do |arg|
+ raise MalformedBroadcast unless @request.has_key?(arg)
+ end
+ end
+
+ def subscribe_needs(*args)
+ args.each do |arg|
+ raise MalformedSubscribe unless @request.has_key?(arg)
+ end
+ end
+
+ def query_needs(*args)
+ args.each do |arg|
+ raise MalformedQuery unless @request.has_key?(arg)
+ end
+ end
+
+ def authenticate_broadcast_or_query
+ if options[:allowed_ips] and peername = get_peername
+ return true if options[:allowed_ips].include?(client_ip)
+ elsif !request[:secret_key]
+ return true if broadcast_query_request
+ elsif options[:secret_key]
+ return true if request[:secret_key] == @options[:secret_key]
+ end
+ if !options[:allowed_ips] and !options[:secret_key] and !options[:broadcast_query_login_url]
+ return true
+ end
+ false
+ end
+
+ def broadcast_query_request
+ return false unless options[:broadcast_query_login_url]
+ url = URI.parse(options[:broadcast_query_login_url])
+ params = []
+ params << "client_id=#{@request[:client_id]}" if @request[:client_id]
+ params << "session_id=#{@request[:session_id]}" if @request[:session_id]
+ params << "type=#{@request[:type]}"
+ params << "command=#{@request[:command]}"
+ (@request[:channels] || []).each {|chan| params << "channels[]=#{chan}" }
+ url.query = params.join('&')
+ begin
+ open(url.to_s, "User-Agent" => "Ruby/#{RUBY_VERSION}")
+ rescue
+ return false
+ end
+ true
+ end
+
+ def client_ip
+ Socket.unpack_sockaddr_in(get_peername)[1]
+ end
+ end
+end
11 lib/juggernaut/utils.rb
@@ -0,0 +1,11 @@
+class Hash
+ def symbolize_keys!
+ keys.each do |key|
+ unless key.is_a?(Symbol) || (new_key = key.to_sym).nil?
+ self[new_key] = self[key]
+ delete(key)
+ end
+ end
+ self
+ end
+end

0 comments on commit 31fa858

Please sign in to comment.