Skip to content

Commit

Permalink
added feedback follow-up processing
Browse files Browse the repository at this point in the history
  • Loading branch information
tompesman committed Jul 11, 2012
1 parent df2b0f5 commit d83990e
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 8 deletions.
7 changes: 4 additions & 3 deletions lib/generators/push_generator.rb
Expand Up @@ -15,8 +15,9 @@ def copy_migration
end

def copy_config
copy_file "development.rb", "config/push/development.rb"
copy_file "staging.rb", "config/push/staging.rb"
copy_file "production.rb", "config/push/production.rb"
copy_file "development.rb", "config/push/development.rb"
copy_file "staging.rb", "config/push/staging.rb"
copy_file "production.rb", "config/push/production.rb"
copy_file "feedback_processor.rb", "lib/push/feedback_processor.rb"
end
end
4 changes: 3 additions & 1 deletion lib/generators/templates/create_push.rb
Expand Up @@ -21,11 +21,13 @@ def self.up
t.string :type, :null => false
t.string :follow_up, :null => false
t.timestamp :failed_at, :null => false
t.boolean :processed, :null => false, :default => false
t.timestamp :processed_at, :null => true
t.text :properties, :null => true
t.timestamps
end

add_index :push_feedback, :device
add_index :push_feedback, :processed
end

def self.down
Expand Down
13 changes: 12 additions & 1 deletion lib/generators/templates/development.rb
@@ -1,5 +1,16 @@
Push::Daemon::Builder.new do
daemon({ :poll => 2, :pid_file => "tmp/pids/push.pid", :airbrake_notify => false })
daemon
({
:poll => 2,
:pid_file => 'tmp/pids/push.pid',
:airbrake_notify => false
})

feedback
({
:poll => 60,
:processor => 'lib/push/feedback_processor'
})

provider :apns,
{
Expand Down
28 changes: 28 additions & 0 deletions lib/generators/templates/feedback_processor.rb
@@ -0,0 +1,28 @@
module Push
class FeedbackProcessor
def self.process(feedback)
if feedback.instance_of? Push::FeedbackGcm
if feedback.follow_up == 'delete'
# TODO: delete gcm device

elsif feedback.follow_up == 'update'
# TODO: update gcm device
# device = feedback.update_to

end
elsif feedback.instance_of? Push::FeedbackC2dm
if feedback.follow_up == 'delete'
# TODO: delete c2dm device

end
elsif feedback.instance_of? Push::FeedbackApns
if feedback.follow_up == 'delete'
# TODO: delete apns device

end
else
Push::Daemon.logger.info("[FeedbackProcessor] Unknown feedback type")
end
end
end
end
13 changes: 12 additions & 1 deletion lib/generators/templates/production.rb
@@ -1,5 +1,16 @@
Push::Daemon::Builder.new do
daemon({ :poll => 2, :pid_file => "tmp/pids/push.pid", :airbrake_notify => false })
daemon
({
:poll => 2,
:pid_file => 'tmp/pids/push.pid',
:airbrake_notify => false
})

feedback
({
:poll => 60,
:processor => 'lib/push/feedback_processor'
})

provider :apns,
{
Expand Down
13 changes: 12 additions & 1 deletion lib/generators/templates/staging.rb
@@ -1,5 +1,16 @@
Push::Daemon::Builder.new do
daemon({ :poll => 2, :pid_file => "tmp/pids/push.pid", :airbrake_notify => false })
daemon
({
:poll => 2,
:pid_file => 'tmp/pids/push.pid',
:airbrake_notify => false
})

feedback
({
:poll => 60,
:processor => 'lib/push/feedback_processor'
})

provider :apns,
{
Expand Down
13 changes: 12 additions & 1 deletion lib/push/daemon.rb
Expand Up @@ -9,14 +9,17 @@
require 'push/daemon/delivery_queue'
require 'push/daemon/delivery_handler'
require 'push/daemon/delivery_handler_pool'
require 'push/daemon/feedback/feedback_feeder'
require 'push/daemon/feedback/feedback_handler'
require 'push/daemon/feeder'
require 'push/daemon/logger'

module Push
module Daemon
class << self
attr_accessor :logger, :configuration, :delivery_queue,
:connection_pool, :delivery_handler_pool, :foreground, :providers
:connection_pool, :delivery_handler_pool, :foreground, :providers,
:feedback_configuration, :feedback_queue, :feedback_handler, :feedback_feeder
end

def self.start(environment, foreground)
Expand Down Expand Up @@ -46,6 +49,14 @@ def self.start(environment, foreground)
self.delivery_handler_pool = DeliveryHandlerPool.new(connection_pool.size)
delivery_handler_pool.populate

if feedback_configuration
self.feedback_queue = DeliveryQueue.new
self.feedback_handler = Feedback::FeedbackHandler.new(Rails.root + Push::Daemon.feedback_configuration[:processor])
self.feedback_handler.start
self.feedback_feeder = Feedback::FeedbackFeeder.new(Push::Daemon.feedback_configuration[:poll])
self.feedback_feeder.start
end

logger.info('[Daemon] Ready')

Push::Daemon::Feeder.start(foreground)
Expand Down
4 changes: 4 additions & 0 deletions lib/push/daemon/builder.rb
Expand Up @@ -9,6 +9,10 @@ def daemon(options)
Push::Daemon.configuration = options
end

def feedback(options)
Push::Daemon.feedback_configuration = options
end

def provider(klass, options)
begin
middleware = Push::Daemon.const_get("#{klass}".camelize)
Expand Down
49 changes: 49 additions & 0 deletions lib/push/daemon/feedback/feedback_feeder.rb
@@ -0,0 +1,49 @@
module Push
module Daemon
module Feedback
class FeedbackFeeder
include ::Push::Daemon::DatabaseReconnectable
include ::Push::Daemon::InterruptibleSleep

def initialize(poll)
@poll = poll
end

def name
"FeedbackFeeder"
end

def start
Thread.new do
loop do
break if @stop
enqueue_feedback
interruptible_sleep @poll
end
end
end

def stop
@stop = true
interrupt_sleep
end

protected

def enqueue_feedback
begin
with_database_reconnect_and_retry(name) do
if Push::Daemon.feedback_queue.notifications_processed?
Push::Feedback.ready_for_followup.find_each do |feedback|
Push::Daemon.feedback_queue.push(feedback)
end
end
end
rescue StandardError => e
Push::Daemon.logger.error(e)
end
end
end
end
end
end
48 changes: 48 additions & 0 deletions lib/push/daemon/feedback/feedback_handler.rb
@@ -0,0 +1,48 @@
module Push
module Daemon
module Feedback
class FeedbackHandler
attr_reader :name
STOP = 0x666

def initialize(processor)
@name = "FeedbackHandler"
require processor
end

def start
Thread.new do
loop do
break if @stop
handle_next_feedback
end
end
end

def stop
@stop = true
Push::Daemon.feedback_queue.push(STOP)
end

protected

def handle_next_feedback
feedback = Push::Daemon.feedback_queue.pop

if feedback == STOP
return
end

begin
Push::FeedbackProcessor.process(feedback)
feedback.is_processed(@name)
rescue StandardError => e
Push::Daemon.logger.error(e)
ensure
Push::Daemon.feedback_queue.notification_processed
end
end
end
end
end
end
11 changes: 11 additions & 0 deletions lib/push/feedback.rb
@@ -1,9 +1,20 @@
module Push
class Feedback < ActiveRecord::Base
include Push::Daemon::DatabaseReconnectable
self.table_name = 'push_feedback'

scope :ready_for_followup, where(:processed => false)

validates :device, :presence => true
validates :follow_up, :presence => true
validates :failed_at, :presence => true

def is_processed(name)
with_database_reconnect_and_retry(name) do
self.processed = true
self.processed_at = Time.now
self.save
end
end
end
end

0 comments on commit d83990e

Please sign in to comment.