Skip to content

Commit

Permalink
Make queue a job attribute
Browse files Browse the repository at this point in the history
This allows jobs to specify a queue preference to the backend, if the
backend supports queues (i.e., Beanstalk)
  • Loading branch information
Mike Foley committed May 13, 2015
1 parent 7f3cdc1 commit c602211
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 117 deletions.
74 changes: 33 additions & 41 deletions lib/quebert/backend/beanstalk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ class Beanstalk
extend Forwardable
include Logging

attr_reader :host, :default_queue_name
attr_writer :queue_names
# A buffer time in seconds added to the Beanstalk TTR for Quebert to do
# its own job cleanup The job will perform based on the Beanstalk TTR,
# but Beanstalk hangs on to the job just a little longer so that Quebert
# can bury the job or schedule a retry with the appropriate delay
TTR_BUFFER = 5

def initialize(host, default_queue_name)
attr_reader :host, :queue
attr_writer :queues

def initialize(host, queue)
@host = host
@default_queue_name = default_queue_name
@queue_names = []
@queue = queue
@queues = []
end

def self.configure(opts = {})
new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:default_queue))
end

def queue(queue_name)
Queue.new(beanstalkd_tubes[queue_name])
new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue))
end

def reserve_without_controller(timeout=nil)
watch_queues
watch_tubes
beanstalkd_tubes.reserve(timeout)
end

Expand All @@ -43,17 +45,26 @@ def drain!
reserve_without_controller.delete
end
while peek(:buried) do
default_queue.kick
kick
reserve_without_controller.delete
end
end

def_delegators :default_queue, :put, :peek
# TODO add a queue param?
def_delegators :default_tube, :peek, :kick

def put(job)
tube = beanstalkd_tubes[job.queue || queue]
tube.put(job.to_json,
:pri => job.priority,
:delay => job.delay,
:ttr => job.ttr + TTR_BUFFER)
end

private

def default_queue
@default_queue ||= queue(default_queue_name)
def default_tube
@default_tube ||= beanstalkd_tubes[queue]
end

def beanstalkd_connection
Expand All @@ -64,36 +75,17 @@ def beanstalkd_tubes
beanstalkd_connection.tubes
end

def watch_queues
if queue_names != @watched_queue_names
@watched_queue_names = queue_names
logger.info "Watching beanstalkd queues #{@watched_queue_names.inspect}"
beanstalkd_tubes.watch!(*@watched_queue_names)
def watch_tubes
if queues != @watched_tube_names
@watched_tube_names = queues
logger.info "Watching beanstalkd queues #{@watched_tube_names.inspect}"
beanstalkd_tubes.watch!(*@watched_tube_names)
end
end

def queue_names
@queue_names.empty? ? [default_queue_name] : @queue_names
end
end

class Beanstalk::Queue
extend Forwardable
attr_reader :beanstalkd_tube
def initialize(beanstalkd_tube)
@beanstalkd_tube = beanstalkd_tube
end

def put(job, *args)
priority, delay, ttr = args
opts = {}
opts[:pri] = priority if priority
opts[:delay] = delay if delay
opts[:ttr] = ttr if ttr
beanstalkd_tube.put(job.to_json, opts)
def queues
@queues.empty? ? [queue] : @queues
end

def_delegators :beanstalkd_tube, :peek, :kick
end
end
end
4 changes: 2 additions & 2 deletions lib/quebert/command_line_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def parser
"(default: #{@options[:pid]})") { |file| @options[:pid] = file }
opts.on("-C", "--config FILE", "Load options from config file") { |file| @options[:config] = file }
opts.on("-c", "--chdir DIR", "Change to dir before starting") { |dir| @options[:chdir] = File.expand_path(dir) }
opts.on("-q", "--queues LIST", "Specify queue name(s)") { |list| @options[:queue_names] = list.split(",") }
opts.on("-q", "--queues LIST", "Specify queue name(s)") { |list| @options[:queues] = list.split(",") }
end
end

Expand Down Expand Up @@ -60,7 +60,7 @@ def self.dispatch(args = ARGV)
end

worker = Worker.new
worker.queue_names = params[:queue_names] if params[:queue_names]
worker.queues = params[:queues] if params[:queues]
worker.start
end

Expand Down
60 changes: 32 additions & 28 deletions lib/quebert/controller/beanstalk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,56 @@ def initialize(beanstalk_job)
@job = Job.from_json(beanstalk_job.body)
rescue Job::Delete
beanstalk_job.delete
log "Deleted on initialization", :error
job_log "Deleted on initialization", :error
rescue Job::Release
beanstalk_job.release @job.priority, @job.delay
log "Released on initialization with priority: #{@job.priority} and delay: #{@job.delay}", :error
beanstalk_job.release job.priority, job.delay
job_log "Released on initialization with priority: #{job.priority} and delay: #{job.delay}", :error
rescue Job::Bury
beanstalk_job.bury
log "Buried on initialization", :error
job_log "Buried on initialization", :error
rescue => e
beanstalk_job.bury
log "Error caught on initialization. #{e.inspect}", :error
job_log "Error caught on initialization. #{e.inspect}", :error
raise
end

def perform
log "Performing with args #{job.args.inspect}"
log "Beanstalk Job Stats: #{beanstalk_job.stats.inspect}"
job_log "Performing with args #{job.args.inspect}"
job_log "Beanstalk Job Stats: #{beanstalk_job.stats.inspect}"

result = false
time = Benchmark.realtime do
result = job.perform!
beanstalk_job.delete
end

log "Completed in #{(time*1000*1000).to_i/1000.to_f} ms\n"
job_log "Completed in #{(time*1000*1000).to_i/1000.to_f} ms\n"
result
rescue Job::Delete
log "Deleting job", :error
job_log "Deleting job", :error
beanstalk_job.delete
log "Job deleted", :error
job_log "Job deleted", :error
rescue Job::Release
log "Releasing with priority: #{@job.priority} and delay: #{@job.delay}", :error
beanstalk_job.release :pri => @job.priority, :delay => @job.delay
log "Job released", :error
job_log "Releasing with priority: #{job.priority} and delay: #{job.delay}", :error
beanstalk_job.release :pri => job.priority, :delay => job.delay
job_log "Job released", :error
rescue Job::Bury
log "Burrying job", :error
job_log "Burrying job", :error
beanstalk_job.bury
log "Job burried", :error
job_log "Job burried", :error
rescue Job::Timeout => e
log "Job timed out. Retrying with delay. #{e.inspect} #{e.backtrace.join("\n")}", :error
job_log "Job timed out. Retrying with delay. #{e.inspect} #{e.backtrace.join("\n")}", :error
retry_with_delay
raise
rescue Job::Retry
# The difference between the Retry and Timeout class is that
# Retry does not log an exception where as Timeout does
log "Manually retrying with delay"
# Retry does not job_log an exception where as Timeout does
job_log "Manually retrying with delay"
retry_with_delay
rescue => e
log "Error caught on perform. Burying job. #{e.inspect} #{e.backtrace.join("\n")}", :error
job_log "Error caught on perform. Burying job. #{e.inspect} #{e.backtrace.join("\n")}", :error
beanstalk_job.bury
log "Job buried", :error
job_log "Job buried", :error
raise
end

Expand All @@ -75,23 +75,27 @@ def retry_with_delay
delay = TIMEOUT_RETRY_DELAY_SEED + TIMEOUT_RETRY_GROWTH_RATE**beanstalk_job.stats["releases"].to_i

if delay > MAX_TIMEOUT_RETRY_DELAY
log "Max retry delay exceeded. Burrying job"
job_log "Max retry delay exceeded. Burrying job"
beanstalk_job.bury
log "Job burried"
job_log "Job burried"
else
log "TTR exceeded. Releasing with priority: #{@job.priority} and delay: #{delay}"
beanstalk_job.release :pri => @job.priority, :delay => delay
log "Job released"
job_log "TTR exceeded. Releasing with priority: #{job.priority} and delay: #{delay}"
beanstalk_job.release :pri => job.priority, :delay => delay
job_log "Job released"
end
rescue ::Beaneater::NotFoundError
log "Job ran longer than allowed. Beanstalk already deleted it!!!!", :error
job_log "Job ran longer than allowed. Beanstalk already deleted it!!!!", :error
# Sometimes the timer doesn't behave correctly and this job actually runs longer than
# allowed. At that point the beanstalk job no longer exists anymore. Lets let it go and don't blow up.
end

def log(message, level=:info)
def job_log(message, level=:info)
# Have the job write to the log file so that we catch the details of the job
job.send(:log, message, level)
if job
job.send(:log, message, level)
else
Quebert.logger.send(level, message)
end
end
end
end
Expand Down
9 changes: 2 additions & 7 deletions lib/quebert/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Job
include Logging

attr_reader :args
attr_accessor :priority, :delay, :ttr
attr_accessor :priority, :delay, :ttr, :queue

# Prioritize Quebert jobs as specified in https://github.com/kr/beanstalkd/blob/master/doc/protocol.txt.
class Priority
Expand All @@ -21,11 +21,6 @@ class Priority
# By default, the job should live for 10 seconds tops.
DEFAULT_JOB_TTR = 10

# A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup
# The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a
# little longer so that Quebert can bury the job or schedule a retry with the appropriate delay
QUEBERT_TTR_BUFFER = 5

# Exceptions are used for signaling job status... ewww. Yank this out and
# replace with a more well thought out controller.
NotImplemented = Class.new(StandardError)
Expand Down Expand Up @@ -68,7 +63,7 @@ def perform!
# Accepts arguments that override the job options and enqueu this stuff.
def enqueue(override_opts={})
override_opts.each { |opt, val| self.send("#{opt}=", val) }
backend.put(self, priority, delay, ttr + QUEBERT_TTR_BUFFER)
backend.put(self)
end

# Serialize the job into a JSON string that we can put on the beandstalkd queue.
Expand Down
19 changes: 10 additions & 9 deletions lib/quebert/serializer.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Quebert
module Serializer

# Does this mean you could queue a job that could queue a job? Whoa!
class Job
def self.serialize(job)
Expand All @@ -9,21 +9,23 @@ def self.serialize(job)
'args' => serialize_args(job.args),
'priority' => job.priority,
'delay' => job.delay,
'ttr' => job.ttr
'ttr' => job.ttr,
'queue' => job.queue
}
end

def self.deserialize(hash)
hash = Support.stringify_keys(hash)
job = Support.constantize(hash['job']).new(*deserialize_args(hash['args']))
job.priority = hash['priority']
job.delay = hash['delay']
job.ttr = hash['ttr']
job.queue = hash['queue']
job
end

private

# Reflect on each arg and see if it has a seralizer
def self.serialize_args(args)
args.map do |arg|
Expand All @@ -37,7 +39,7 @@ def self.serialize_args(args)
hash
end
end

# Find a serializer and/or push out a value
def self.deserialize_args(args)
args.map do |arg|
Expand All @@ -50,7 +52,7 @@ def self.deserialize_args(args)
end
end
end

# Deal with converting an AR to/from a hash that we can send over the wire.
class ActiveRecord
def self.serialize(record)
Expand All @@ -60,7 +62,7 @@ def self.serialize(record)
end
{ 'model' => record.class.model_name.to_s, 'attributes' => attrs }
end

def self.deserialize(hash)
hash = Support.stringify_keys(hash)
model = Support.constantize(hash.delete('model'))
Expand All @@ -81,6 +83,5 @@ def self.deserialize(hash)
end
end
end

end
end
8 changes: 3 additions & 5 deletions lib/quebert/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ module Quebert
class Worker
include Logging

attr_accessor :exception_handler, :backend, :queue_names
attr_accessor :exception_handler, :backend, :queues

def initialize
@queue_names = []
@queues = []
yield self if block_given?
end

Expand All @@ -16,9 +16,7 @@ def start

logger.info "Worker started with #{backend.class.name} backend\n"

if backend.respond_to?(:queue_names=)
backend.queue_names = queue_names
end
backend.queues = queues if backend.respond_to?(:queues=)

while @controller = backend.reserve do
begin
Expand Down
10 changes: 7 additions & 3 deletions spec/backend_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@
end

it "should consume from multiple queues" do
@q.queue_names = ['a', 'b']
@q.queue('a').put Adder.new(1)
@q.queue('b').put Adder.new(2)
@q.queues = ["a", "b"]
job1 = Adder.new(1)
job1.queue = "a"
@q.put(job1)
job2 = Adder.new(2)
job2.queue = "b"
@q.put(job2)
@q.reserve.perform.should eql(1)
@q.reserve.perform.should eql(2)
end
Expand Down
Loading

0 comments on commit c602211

Please sign in to comment.