Permalink
Browse files

multi app support added

  • Loading branch information...
1 parent e3def2f commit db14176f18cb108fcd8a75e8ccdd183ee510a7f1 @tompesman committed Jul 21, 2012
View
22 bin/push
@@ -3,12 +3,24 @@
require 'optparse'
require 'push'
-foreground = false
environment = ARGV[0]
+
+config = Struct.new(:foreground, :pid_file, :push_poll, :airbrake_notify, :feedback_poll, :feedback_processor).new
+config.foreground = false
+config.push_poll = 2
+config.airbrake_notify = false
+config.feedback_poll = 60
+config.feedback_processor = 'lib/push/feedback_processor'
+
banner = 'Usage: push <Rails environment> [options]'
ARGV.options do |opts|
opts.banner = banner
- opts.on('-f', '--foreground', 'Run in the foreground.') { foreground = true }
+ opts.on('-f', '--foreground', 'Run in the foreground.') { config.foreground = true }
+ opts.on('-p PATH', '--pid-file PATH', String, 'Path to write PID file. Relative to Rails root unless absolute.') { |path| config.pid_file = path }
+ opts.on('-P N', '--push-poll N', Integer, "Frequency in seconds to check for new notifications. Default: #{config.push_poll}.") { |n| config.push_poll = n }
+ opts.on('-n', '--airbrake-notify', 'Enables error notifications via Airbrake.') { config.check_for_errors = true }
+ opts.on('-F N', '--feedback-poll N', Integer, "Frequency in seconds to check for feedback for the feedback processor. Default: #{config.feedback_poll}. Use 0 to disable.") { |n| config.feedback_poll = n }
+ opts.on('-b PATH', '--feedback-processor PATH', String, "Path to the feedback processor. Default: #{config.feedback_processor}.") { |n| config.feedback_processor = n }
opts.on('-v', '--version', 'Print this version of push.') { puts "push #{Push::VERSION}"; exit }
opts.on('-h', '--help', 'You\'re looking at it.') { puts opts; exit }
opts.parse!
@@ -24,4 +36,8 @@ load 'config/environment.rb'
require 'push/daemon'
-Push::Daemon.start(environment, foreground)
+if config.pid_file && !Pathname.new(config.pid_file).absolute?
+ config.pid_file = File.join(Rails.root, config.pid_file)
+end
+
+Push::Daemon.start(environment, config)
View
3 lib/generators/push_generator.rb
@@ -15,9 +15,6 @@ def copy_migration
end
def copy_config
- copy_file "development.rb", "config/push/development.rb"
- copy_file "staging.rb", "config/push/staging.rb"
- copy_file "production.rb", "config/push/production.rb"
copy_file "feedback_processor.rb", "lib/push/feedback_processor.rb"
end
end
View
12 lib/generators/templates/create_push.rb
@@ -1,6 +1,16 @@
class CreatePush < ActiveRecord::Migration
def self.up
+ create_table :push_configurations do |t|
+ t.string :type, :null => false
+ t.string :app, :null => false
+ t.text :properties, :null => true
+ t.boolean :enabled, :null => false, :default => false
+ t.integer :connections, :null => false, :default => 1
+ t.timestamps
+ end
+
create_table :push_messages do |t|
+ t.string :app, :null => false
t.string :device, :null => false
t.string :type, :null => false
t.text :properties, :null => true
@@ -17,6 +27,7 @@ def self.up
add_index :push_messages, [:delivered, :failed, :deliver_after]
create_table :push_feedback do |t|
+ t.string :app, :null => false
t.string :device, :null => false
t.string :type, :null => false
t.string :follow_up, :null => false
@@ -33,5 +44,6 @@ def self.up
def self.down
drop_table :push_feedback
drop_table :push_messages
+ drop_table :push_configurations
end
end
View
36 lib/generators/templates/development.rb
@@ -1,36 +0,0 @@
-Push::Daemon::Builder.new do
- daemon
- ({
- :poll => 2,
- :pid_file => 'tmp/pids/push.pid',
- :airbrake_notify => false
- })
-
- feedback
- ({
- :poll => 60,
- :processor => 'lib/push/feedback_processor'
- })
-
- provider :apns,
- {
- :certificate => "development.pem",
- :certificate_password => "",
- :sandbox => true,
- :connections => 3,
- :feedback_poll => 60
- }
-
- provider :c2dm,
- {
- :connections => 2,
- :email => "",
- :password => ""
- }
-
- provider :gcm,
- {
- :connections => 2,
- :key => 'api key'
- }
-end
View
36 lib/generators/templates/production.rb
@@ -1,36 +0,0 @@
-Push::Daemon::Builder.new do
- daemon
- ({
- :poll => 2,
- :pid_file => 'tmp/pids/push.pid',
- :airbrake_notify => false
- })
-
- feedback
- ({
- :poll => 60,
- :processor => 'lib/push/feedback_processor'
- })
-
- provider :apns,
- {
- :certificate => "production.pem",
- :certificate_password => "",
- :sandbox => false,
- :connections => 3,
- :feedback_poll => 60
- }
-
- provider :c2dm,
- {
- :connections => 2,
- :email => "",
- :password => ""
- }
-
- provider :gcm,
- {
- :connections => 2,
- :key => 'api key'
- }
-end
View
36 lib/generators/templates/staging.rb
@@ -1,36 +0,0 @@
-Push::Daemon::Builder.new do
- daemon
- ({
- :poll => 2,
- :pid_file => 'tmp/pids/push.pid',
- :airbrake_notify => false
- })
-
- feedback
- ({
- :poll => 60,
- :processor => 'lib/push/feedback_processor'
- })
-
- provider :apns,
- {
- :certificate => "staging.pem",
- :certificate_password => "",
- :sandbox => true,
- :connections => 3,
- :feedback_poll => 60
- }
-
- provider :c2dm,
- {
- :connections => 2,
- :email => "",
- :password => ""
- }
-
- provider :gcm,
- {
- :connections => 2,
- :key => 'api key'
- }
-end
View
1 lib/push.rb
@@ -1,4 +1,5 @@
require 'active_record'
require 'push/version'
+require 'push/configuration'
require 'push/message'
require 'push/feedback'
View
10 lib/push/configuration.rb
@@ -0,0 +1,10 @@
+module Push
+ class Configuration < ActiveRecord::Base
+ self.table_name = 'push_configurations'
+ scope :enabled, where(:enabled => true)
+ validates :app, :presence => true
+ validates :connections, :presence => true
+ validates :connections, :numericality => { :greater_than => 0, :only_integer => true }
+ validates :type, :uniqueness => { :scope => :app, :message => "Only one push provider type per configuration name" }
+ end
+end
View
74 lib/push/daemon.rb
@@ -1,65 +1,39 @@
require 'thread'
-require 'push/daemon/builder'
require 'push/daemon/interruptible_sleep'
require 'push/daemon/delivery_error'
require 'push/daemon/disconnection_error'
-require 'push/daemon/pool'
require 'push/daemon/connection_pool'
require 'push/daemon/database_reconnectable'
require 'push/daemon/delivery_queue'
require 'push/daemon/delivery_handler'
-require 'push/daemon/delivery_handler_pool'
+require 'push/daemon/feedback'
require 'push/daemon/feedback/feedback_feeder'
require 'push/daemon/feedback/feedback_handler'
require 'push/daemon/feeder'
require 'push/daemon/logger'
+require 'push/daemon/app'
module Push
module Daemon
class << self
- attr_accessor :logger, :configuration, :delivery_queue,
- :connection_pool, :delivery_handler_pool, :foreground, :providers,
- :feedback_configuration, :feedback_queue, :feedback_handler, :feedback_feeder
+ attr_accessor :logger, :config
end
- def self.start(environment, foreground)
- self.providers = []
- @foreground = foreground
+ def self.start(environment, config)
+ self.config = config
+ self.logger = Logger.new(:foreground => config.foreground, :airbrake_notify => config.airbrake_notify)
setup_signal_hooks
-
- require File.join(Rails.root, 'config', 'push', environment + '.rb')
-
- self.logger = Logger.new(:foreground => foreground, :airbrake_notify => configuration[:airbrake_notify])
-
- self.delivery_queue = DeliveryQueue.new
-
- daemonize unless foreground
-
+ daemonize unless config.foreground
write_pid_file
- dbconnections = 0
- self.connection_pool = ConnectionPool.new
- self.providers.each do |provider|
- self.connection_pool.populate(provider)
- dbconnections += provider.totalconnections
- end
-
- rescale_poolsize(dbconnections)
-
- self.delivery_handler_pool = DeliveryHandlerPool.new(connection_pool.size)
- delivery_handler_pool.populate
-
- if feedback_configuration
- self.feedback_queue = DeliveryQueue.new
- self.feedback_handler = Feedback::FeedbackHandler.new(Rails.root + Push::Daemon.feedback_configuration[:processor])
- self.feedback_handler.start
- self.feedback_feeder = Feedback::FeedbackFeeder.new(Push::Daemon.feedback_configuration[:poll])
- self.feedback_feeder.start
- end
+ App.load
+ App.start
+ Feedback.load(config)
+ Feedback.start
+ rescale_poolsize(App.database_connections + Feedback.database_connections)
logger.info('[Daemon] Ready')
-
- Push::Daemon::Feeder.start(foreground)
+ Feeder.start(config)
end
protected
@@ -91,14 +65,16 @@ def self.handle_shutdown_signal
end
def self.shutdown
- puts "\nShutting down..."
- Push::Daemon::Feeder.stop
- Push::Daemon.delivery_handler_pool.drain if Push::Daemon.delivery_handler_pool
-
- self.providers.each do |provider|
- provider.stop
+ print "\nShutting down..."
+ Feeder.stop
+ Feedback.stop
+ App.stop
+
+ while Thread.list.count > 1
+ sleep 0.1
+ print "."
end
-
+ print "\n"
delete_pid_file
end
@@ -116,19 +92,19 @@ def self.daemonize
end
def self.write_pid_file
- if !configuration[:pid_file].blank?
+ if !config[:pid_file].blank?
begin
File.open(configuration[:pid_file], 'w') do |f|
f.puts $$
end
rescue SystemCallError => e
- logger.error("Failed to write PID to '#{configuration[:pid_file]}': #{e.inspect}")
+ logger.error("Failed to write PID to '#{config[:pid_file]}': #{e.inspect}")
end
end
end
def self.delete_pid_file
- pid_file = configuration[:pid_file]
+ pid_file = config[:pid_file]
File.delete(pid_file) if !pid_file.blank? && File.exists?(pid_file)
end
end
View
103 lib/push/daemon/app.rb
@@ -0,0 +1,103 @@
+module Push
+ module Daemon
+ class App
+ class << self
+ attr_reader :apps
+ end
+
+ @apps = {}
+
+ def self.load
+ configurations = Push::Configuration.enabled
+ configurations.each do |config|
+ if @apps[config.app] == nil
+ @apps[config.app] = App.new(config.app)
+ end
+ @apps[config.app].configs << config
+ end
+ end
+
+ def self.ready
+ ready = []
+ @apps.each { |app, runner| ready << app if runner.ready? }
+ ready
+ end
+
+ def self.deliver(notification)
+ if app = @apps[notification.app]
+ app.deliver(notification)
+ else
+ Rapns::Daemon.logger.error("No such app '#{notification.app}' for notification #{notification.id}.")
+ end
+ end
+
+ def self.start
+ @apps.values.map(&:start)
+ end
+
+ def self.stop
+ @apps.values.map(&:stop)
+ end
+
+ def self.database_connections
+ @apps.empty? ? 0 : @apps.values.collect{|x| x.database_connections }.inject(:+)
+ end
+
+ def initialize(name)
+ @name = name
+ @configs = []
+ @handlers = []
+ @providers = []
+ @queue = DeliveryQueue.new
+ @database_connections = 0
+ end
+
+ attr_accessor :configs
+ attr_reader :database_connections
+
+ def deliver(notification)
+ @queue.push(notification)
+ end
+
+ def start
+ @connection_pool = ConnectionPool.new
+ @configs.each do |config|
+ provider = load_provider(config.name, config.properties.merge({:connections => config.connections, :name => config.app}))
+ @providers << provider
+ @database_connections += provider.totalconnections
+ @connection_pool.populate(provider)
+ end
+ @connection_pool.size.times do |i|
+ @handlers << start_handler(i)
+ end
+ end
+
+ def stop
+ @handlers.map(&:stop)
+ @providers.map(&:stop)
+ end
+
+ def ready?
+ @queue.notifications_processed?
+ end
+
+ protected
+
+ def start_handler(i)
+ handler = DeliveryHandler.new(@queue, @connection_pool, "#{@name} #{i}")
+ handler.start
+ handler
+ end
+
+ def load_provider(klass, options)
+ begin
+ middleware = Push::Daemon.const_get("#{klass}".camelize)
+ rescue NameError
+ raise LoadError, "Could not find matching push provider for #{klass.inspect}. You may need to install an additional gem (such as push-#{klass})."
+ end
+
+ middleware.new(options)
+ end
+ end
+ end
+end
View
27 lib/push/daemon/builder.rb
@@ -1,27 +0,0 @@
-module Push
- module Daemon
- class Builder
- def initialize(&block)
- instance_eval(&block) if block_given?
- end
-
- def daemon(options)
- Push::Daemon.configuration = options
- end
-
- def feedback(options)
- Push::Daemon.feedback_configuration = options
- end
-
- def provider(klass, options)
- begin
- middleware = Push::Daemon.const_get("#{klass}".camelize)
- rescue NameError
- raise LoadError, "Could not find matching push provider for #{klass.inspect}. You may need to install an additional gem (such as push-#{klass})."
- end
-
- Push::Daemon.providers << middleware.new(options)
- end
- end
- end
-end
View
2 lib/push/daemon/connection_pool.rb
@@ -23,7 +23,7 @@ def checkout(notification_type)
end
def size
- @connections.size
+ @connections.values.collect{|x| x.length }.inject(:+)
end
end
end
View
23 lib/push/daemon/delivery_handler.rb
@@ -2,14 +2,15 @@ module Push
module Daemon
class DeliveryHandler
attr_reader :name
- STOP = 0x666
- def initialize(i)
- @name = "DeliveryHandler #{i}"
+ def initialize(queue, connection_pool, name)
+ @queue = queue
+ @connection_pool = connection_pool
+ @name = "DeliveryHandler #{name}"
end
def start
- Thread.new do
+ @thread = Thread.new do
loop do
break if @stop
handle_next_notification
@@ -19,26 +20,26 @@ def start
def stop
@stop = true
- Push::Daemon.delivery_queue.push(STOP)
+ @queue.wakeup(@thread)
end
protected
def handle_next_notification
- notification = Push::Daemon.delivery_queue.pop
-
- if notification == STOP
+ begin
+ notification = @queue.pop
+ rescue DeliveryQueue::WakeupError
return
end
begin
- connection = Push::Daemon.connection_pool.checkout(notification.use_connection)
+ connection = @connection_pool.checkout(notification.use_connection)
notification.deliver(connection)
rescue StandardError => e
Push::Daemon.logger.error(e)
ensure
- Push::Daemon.connection_pool.checkin(connection)
- Push::Daemon.delivery_queue.notification_processed
+ @connection_pool.checkin(connection)
+ @queue.notification_processed
end
end
end
View
20 lib/push/daemon/delivery_handler_pool.rb
@@ -1,20 +0,0 @@
-module Push
- module Daemon
- class DeliveryHandlerPool < Pool
-
- protected
-
- def new_object_for_pool(i)
- DeliveryHandler.new(i)
- end
-
- def object_added_to_pool(object)
- object.start
- end
-
- def object_removed_from_pool(object)
- object.stop
- end
- end
- end
-end
View
45 lib/push/daemon/delivery_queue.rb
@@ -1,19 +1,23 @@
module Push
module Daemon
class DeliveryQueue
+ class WakeupError < StandardError; end
def initialize
- @mutex = Mutex.new
@num_notifications = 0
- @queue = Queue.new
+ @queue = []
+ @waiting = []
+ @mutex = Mutex.new
end
- def push(notification)
- @mutex.synchronize { @num_notifications += 1 }
- @queue.push(notification)
+ def wakeup(thread)
+ @mutex.synchronize do
+ t = @waiting.delete(thread)
+ t.raise WakeupError if t
+ end
end
- def pop
- @queue.pop
+ def size
+ @mutex.synchronize { @queue.size }
end
def notification_processed
@@ -23,6 +27,33 @@ def notification_processed
def notifications_processed?
@mutex.synchronize { @num_notifications == 0 }
end
+
+ def push(notification)
+ @mutex.synchronize do
+ @num_notifications += 1
+ @queue.push(notification)
+
+ begin
+ t = @waiting.shift
+ t.wakeup if t
+ rescue ThreadError
+ retry
+ end
+ end
+ end
+
+ def pop
+ @mutex.synchronize do
+ while true
+ if @queue.empty?
+ @waiting.push Thread.current
+ @mutex.sleep
+ else
+ return @queue.shift
+ end
+ end
+ end
+ end
end
end
end
View
33 lib/push/daemon/feedback.rb
@@ -0,0 +1,33 @@
+module Push
+ module Daemon
+ module Feedback
+ class << self
+ attr_accessor :queue, :handler, :feeder
+ end
+
+ def self.load(config)
+ return if config.feedback_poll == 0
+ self.queue = DeliveryQueue.new
+ self.handler = Feedback::FeedbackHandler.new(Rails.root + config.feedback_processor)
+ self.feeder = Feedback::FeedbackFeeder.new(config.feedback_poll)
+ end
+
+ def self.start
+ return if self.handler.nil? or self.feeder.nil?
+ self.handler.start
+ self.feeder.start
+ @started = true
+ end
+
+ def self.stop
+ return unless @started
+ self.feeder.stop
+ self.handler.stop
+ end
+
+ def self.database_connections
+ @started ? 2 : 0
+ end
+ end
+ end
+end
View
4 lib/push/daemon/feedback/feedback_feeder.rb
@@ -33,9 +33,9 @@ def stop
def enqueue_feedback
begin
with_database_reconnect_and_retry(name) do
- if Push::Daemon.feedback_queue.notifications_processed?
+ if Push::Daemon::Feedback.queue.notifications_processed?
Push::Feedback.ready_for_followup.find_each do |feedback|
- Push::Daemon.feedback_queue.push(feedback)
+ Push::Daemon::Feedback.queue.push(feedback)
end
end
end
View
16 lib/push/daemon/feedback/feedback_handler.rb
@@ -3,15 +3,15 @@ module Daemon
module Feedback
class FeedbackHandler
attr_reader :name
- STOP = 0x666
def initialize(processor)
@name = "FeedbackHandler"
+ @queue = Push::Daemon::Feedback.queue
require processor
end
def start
- Thread.new do
+ @thread = Thread.new do
loop do
break if @stop
handle_next_feedback
@@ -21,25 +21,25 @@ def start
def stop
@stop = true
- Push::Daemon.feedback_queue.push(STOP)
+ @queue.wakeup(@thread)
end
protected
def handle_next_feedback
- feedback = Push::Daemon.feedback_queue.pop
-
- if feedback == STOP
+ begin
+ feedback = @queue.pop
+ rescue DeliveryQueue::WakeupError
return
end
begin
Push::FeedbackProcessor.process(feedback)
- feedback.is_processed(@name)
rescue StandardError => e
Push::Daemon.logger.error(e)
ensure
- Push::Daemon.feedback_queue.notification_processed
+ feedback.is_processed(@name)
+ @queue.notification_processed
end
end
end
View
13 lib/push/daemon/feeder.rb
@@ -8,13 +8,13 @@ def self.name
"Feeder"
end
- def self.start(foreground)
- reconnect_database unless foreground
+ def self.start(config)
+ reconnect_database unless config.foreground
loop do
break if @stop
enqueue_notifications
- interruptible_sleep Push::Daemon.configuration[:poll]
+ interruptible_sleep config.push_poll
end
end
@@ -28,10 +28,9 @@ def self.stop
def self.enqueue_notifications
begin
with_database_reconnect_and_retry(name) do
- if Push::Daemon.delivery_queue.notifications_processed?
- Push::Message.ready_for_delivery.find_each do |notification|
- Push::Daemon.delivery_queue.push(notification)
- end
+ Push::Message.ready_for_delivery.find_each do |notification|
+ ready_apps = Push::Daemon::App.ready
+ Push::Daemon::App.deliver(notification) if ready_apps.include?(notification.app)
end
end
rescue StandardError => e
View
36 lib/push/daemon/pool.rb
@@ -1,36 +0,0 @@
-module Push
- module Daemon
- class Pool
- def initialize(num_objects)
- @num_objects = num_objects
- @queue = Queue.new
- end
-
- def populate
- @num_objects.times do |i|
- object = new_object_for_pool(i)
- @queue.push(object)
- object_added_to_pool(object)
- end
- end
-
- def drain
- while !@queue.empty?
- object = @queue.pop
- object_removed_from_pool(object)
- end
- end
-
- protected
-
- def new_object_for_pool(i)
- end
-
- def object_added_to_pool(object)
- end
-
- def object_removed_from_pool(object)
- end
- end
- end
-end
View
2 lib/push/feedback.rb
@@ -4,7 +4,7 @@ class Feedback < ActiveRecord::Base
self.table_name = 'push_feedback'
scope :ready_for_followup, where(:processed => false)
-
+ validates :app, :presence => true
validates :device, :presence => true
validates :follow_up, :presence => true
validates :failed_at, :presence => true
View
3 lib/push/message.rb
@@ -6,6 +6,7 @@ class Message < ActiveRecord::Base
include Push::Daemon::DatabaseReconnectable
self.table_name = "push_messages"
+ validates :app, :presence => true
validates :device, :presence => true
scope :ready_for_delivery, lambda { where('delivered = ? AND failed = ? AND (deliver_after IS NULL OR deliver_after < ?)', false, false, Time.now) }
@@ -22,7 +23,7 @@ def deliver(connection)
self.save!(:validate => false)
end
- Push::Daemon.logger.info("Message #{id} delivered to #{device}")
+ Push::Daemon.logger.info("[#{connection.name}] Message #{id} delivered to #{device}")
rescue Push::DeliveryError, Push::DisconnectionError => error
handle_delivery_error(error, connection)
raise

0 comments on commit db14176

Please sign in to comment.