Skip to content

Commit

Permalink
Don't wait for a job to finish before processing the next queue:
Browse files Browse the repository at this point in the history
- Adding tests uncovered a limitation that the most number of queues
  that could be provided in the array was two, and that in order for the
  second one to process the first had to complete AND also needed to
  have a listener for it defined through Minion. This limitation has
  been removed.
  • Loading branch information
durran committed Mar 23, 2011
1 parent ba87c82 commit dd74d29
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
23 changes: 5 additions & 18 deletions lib/minion.rb
Expand Up @@ -25,16 +25,12 @@ def enqueue(name, data = nil)
raise "cannot enqueue an empty or nil name" if name.nil? || name.empty?
data ||= {}

if name.respond_to?(:shift)
queue = name.shift
data["next_job"] = name unless name.empty?
else
queue = name
end

encoded = JSON.dump(data)
log("send: #{queue}:#{encoded}")
bunny.queue(queue, :durable => true, :auto_delete => false).publish(encoded)

[ name ].flatten.each do |queue|
log("send: #{queue}:#{encoded}")
bunny.queue(queue, :durable => true, :auto_delete => false).publish(encoded)
end
end

def log(msg)
Expand Down Expand Up @@ -63,12 +59,8 @@ def job(queue, options = {}, &blk)
return if AMQP.closing?
begin
log "recv: #{queue}:#{m}"

args = decode_json(m)

result = yield(args)

next_job(args, result)
rescue Object => e
raise unless error_handler
error_handler.call(e,queue,m,h)
Expand Down Expand Up @@ -139,11 +131,6 @@ def bunny
@@bunny ||= Bunny.new(amqp_config).tap { |b| b.start }
end

def next_job(args, response)
queue = args.delete("next_job")
enqueue(queue,args.merge(response)) if queue and not queue.empty?
end

def error_handler
@@error_handler ||= nil
end
Expand Down
29 changes: 23 additions & 6 deletions spec/minion_spec.rb
Expand Up @@ -89,9 +89,14 @@
bunny.queue("minion.second")
end

let(:third) do
bunny.queue("minion.third")
end

before do
first.purge
second.purge
third.purge
end

context "when the array is empty" do
Expand All @@ -108,19 +113,31 @@
end

before do
Minion.enqueue([ "minion.first", "minion.second" ], data)
Minion.enqueue([ "minion.first", "minion.second", "minion.third" ], data)
end

let(:message) do
let(:first_message) do
JSON.parse(first.pop[:payload])
end

it "adds the data to the queue" do
message.should == data
let(:second_message) do
JSON.parse(second.pop[:payload])
end

let(:third_message) do
JSON.parse(third.pop[:payload])
end

it "adds the data to the first queue" do
first_message.should == data
end

it "adds the data to the second queue" do
second_message.should == data
end

it "adds the next job data" do
message["next_job"].should == [ "minion.second" ]
it "adds the data to the third queue" do
third_message.should == data
end
end
end
Expand Down

0 comments on commit dd74d29

Please sign in to comment.