Skip to content

Commit

Permalink
Allow setting custom attempts limit and attempt period for failed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
markiz committed Jul 1, 2012
1 parent 720d6a8 commit d64d9c7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 28 deletions.
65 changes: 37 additions & 28 deletions lib/sunspot/amqp_index_queue/client.rb
Expand Up @@ -6,13 +6,6 @@ module AmqpIndexQueue
# Wrapper around AMQP queue. Provides several useful public API methods,
# such as {#count} and {#process}.
class Client
# Number of failures allowed before being dropped from an index
# queue altogether
MAX_ATTEMPTS_COUNT = 5

# Interval in seconds before reindex is attempted after a failure.
REINDEX_PERIOD = 300

# Wrapper around entry in an indexer queue
# @private
class Entry
Expand Down Expand Up @@ -66,6 +59,9 @@ def marshal_load(attributes)
# @option client_opts [String] "vhost" ("/") AMQP vhost
# @option client_opts [String] "sunspot_index_queue_name" ("sunspot_index_queue")
# AMQP index queue name
# @option client_opts [Integer] "retry_interval" (300) time before next
# indexing attempt in case of failure / exception
# @option client_opts [Integer] "max_attempts_count" (5) attempts count
# @api public
def initialize(session, client_opts = {})
@session = session
Expand Down Expand Up @@ -106,6 +102,23 @@ def process(limit = 10)
i
end

# Push an entry into the queue
# @api semipublic
def push(entry)
exchange.publish(Marshal.dump(entry), :key => queue_name)
end

# Pops an entry from the queue
# @api semipublic
def pop
entry = queue.pop[:payload]
if (entry != :queue_empty)
Marshal.load(entry)
else
nil
end
end

protected

# List of default options ofr a client
Expand All @@ -117,9 +130,23 @@ def default_options
:pass => "guest",
:host => "localhost",
:port => "5672",
:vhost => "/"
:vhost => "/",
:max_attempts_count => 5,
:retry_interval => 300
})
end
# Number of failures allowed before being dropped from an index
# queue altogether
# @api semipublic
def max_attempts_count
@options[:max_attempts_count]
end

# Interval in seconds before reindex is attempted after a failure.
# @api semipublic
def retry_interval
@options[:retry_interval]
end

# Current bunny session.
# @api semipublic
Expand All @@ -145,24 +172,6 @@ def exchange
Thread.current["#{object_id}_exchange"] ||= bunny.exchange('')
end


# Push an entry into the queue
# @api semipublic
def push(entry)
exchange.publish(Marshal.dump(entry), :key => queue_name)
end

# Pops an entry from the queue
# @api semipublic
def pop
entry = queue.pop[:payload]
if (entry != :queue_empty)
Marshal.load(entry)
else
nil
end
end

# Gets a next available (with run_at < Time.now) entry out of the
# queue. All the skipped entries are then pushed back into the queue.
# @api semipublic
Expand All @@ -185,7 +194,7 @@ def pop_next_available
# Index or remove an entry
# @api semipublic
def process_entry(entry)
if entry.attempts_count < MAX_ATTEMPTS_COUNT
if entry.attempts_count < max_attempts_count
if entry.to_remove
session.remove_by_id(entry.object_class_name, entry.object_id)
else
Expand All @@ -196,7 +205,7 @@ def process_entry(entry)
if defined?(::Rails)
::Rails.logger.error "Exception raised while indexing: #{e.class}: #{e}"
end
entry.run_at = REINDEX_PERIOD.since
entry.run_at = Time.now + retry_interval
entry.attempts_count += 1
push(entry)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/sunspot/amqp_index_queue/session_proxy.rb
Expand Up @@ -31,6 +31,9 @@ class SessionProxy < Sunspot::SessionProxy::AbstractSessionProxy
# @option client_opts [String] "pass" ("guest") AMQP password
# @option client_opts [String] "vhost" ("/") AMQP vhost
# @option client_opts [String] "sunspot_index_queue_name" ("sunspot_index_queue")
# @option client_opts [Integer] "retry_interval" (300) time before next
# indexing attempt in case of failure / exception
# @option client_opts [Integer] "max_attempts_count" (5) attempts count
# AMQP index queue name
def initialize(session, client_opts = {})
@session = session
Expand Down

0 comments on commit d64d9c7

Please sign in to comment.