Skip to content

Commit

Permalink
Allow Queue#pop to take an optional block
Browse files Browse the repository at this point in the history
  • Loading branch information
celldee committed Sep 14, 2009
1 parent 6190f07 commit 5b3143b
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 33 deletions.
36 changes: 23 additions & 13 deletions lib/bunny/queue08.rb
Expand Up @@ -182,9 +182,12 @@ def delete(opts = {})
:payload => :queue_empty
:delivery_details => nil
N.B. If a block is provided then the hash will be passed into the block and the return value
will be nil.
=end

def pop(opts = {})
def pop(opts = {}, &blk)

# do we want to have to provide an acknowledgement?
ack = opts.delete(:ack)
Expand All @@ -199,25 +202,32 @@ def pop(opts = {})
method = client.next_method

if method.is_a?(Qrack::Protocol::Basic::GetEmpty) then
return {:header => nil, :payload => :queue_empty, :delivery_details => nil}
queue_empty = true
elsif !method.is_a?(Qrack::Protocol::Basic::GetOk)
raise Bunny::ProtocolError, "Error getting message from queue #{name}"
end

# get delivery tag to use for acknowledge
self.delivery_tag = method.delivery_tag if ack
if !queue_empty
# get delivery tag to use for acknowledge
self.delivery_tag = method.delivery_tag if ack

header = client.next_payload
header = client.next_payload

# If maximum frame size is smaller than message payload body then message
# will have a message header and several message bodies
msg = ''
while msg.length < header.size
msg += client.next_payload
# If maximum frame size is smaller than message payload body then message
# will have a message header and several message bodies
msg = ''
while msg.length < header.size
msg += client.next_payload
end

msg_hash = {:header => header, :payload => msg, :delivery_details => method.arguments}

else
msg_hash = {:header => nil, :payload => :queue_empty, :delivery_details => nil}
end

# Return message with additional info if requested
{:header => header, :payload => msg, :delivery_details => method.arguments}
# Pass message hash to block or return message hash
blk ? blk.call(msg_hash) : msg_hash

end

Expand Down
42 changes: 26 additions & 16 deletions lib/bunny/queue09.rb
Expand Up @@ -183,9 +183,12 @@ def delete(opts = {})
:payload => :queue_empty
:delivery_details => nil
N.B. If a block is provided then the hash will be passed into the block and the return value
will be nil.
=end

def pop(opts = {})
def pop(opts = {}, &blk)

# do we want to have to provide an acknowledgement?
ack = opts.delete(:ack)
Expand All @@ -201,26 +204,33 @@ def pop(opts = {})
method = client.next_method

if method.is_a?(Qrack::Protocol09::Basic::GetEmpty) then
return {:header => nil, :payload => :queue_empty, :delivery_details => nil}
queue_empty = true
elsif !method.is_a?(Qrack::Protocol09::Basic::GetOk)
raise Bunny::ProtocolError, "Error getting message from queue #{name}"
end

# get delivery tag to use for acknowledge
self.delivery_tag = method.delivery_tag if ack

header = client.next_payload

# If maximum frame size is smaller than message payload body then message
# will have a message header and several message bodies
msg = ''
while msg.length < header.size
msg += client.next_payload

if !queue_empty
# get delivery tag to use for acknowledge
self.delivery_tag = method.delivery_tag if ack

header = client.next_payload

# If maximum frame size is smaller than message payload body then message
# will have a message header and several message bodies
msg = ''
while msg.length < header.size
msg += client.next_payload
end

msg_hash = {:header => header, :payload => msg, :delivery_details => method.arguments}

else
msg_hash = {:header => nil, :payload => :queue_empty, :delivery_details => nil}
end

# Return message with additional info if requested
{:header => header, :payload => msg, :delivery_details => method.arguments}
# Pass message hash to block or return message hash
blk ? blk.call(msg_hash) : msg_hash

end

=begin rdoc
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_08/exchange_spec.rb
Expand Up @@ -8,7 +8,7 @@

require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny]))

describe Bunny do
describe 'Exchange' do

before(:each) do
@b = Bunny.new
Expand Down
9 changes: 8 additions & 1 deletion spec/spec_08/queue_spec.rb
Expand Up @@ -8,7 +8,7 @@

require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny]))

describe Bunny do
describe 'Queue' do

before(:each) do
@b = Bunny.new
Expand Down Expand Up @@ -87,6 +87,13 @@
msg.should == lg_msg
end

it "should be able call a block when popping a message" do
q = @b.queue('test1')
q.publish('This is another test message')
q.pop { |msg| msg[:payload].should == 'This is another test message' }
q.pop { |msg| msg[:payload].should == :queue_empty }
end

it "should raise an error if purge fails" do
q = @b.queue('test1')
5.times {q.publish('This is another test message')}
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_09/exchange_spec.rb
Expand Up @@ -8,7 +8,7 @@

require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny]))

describe Bunny do
describe 'Exchange' do

before(:each) do
@b = Bunny.new(:spec => '09')
Expand Down
9 changes: 8 additions & 1 deletion spec/spec_09/queue_spec.rb
Expand Up @@ -8,7 +8,7 @@

require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny]))

describe Bunny do
describe 'Queue' do

before(:each) do
@b = Bunny.new(:spec => '09')
Expand Down Expand Up @@ -87,6 +87,13 @@
msg.should == lg_msg
end

it "should be able call a block when popping a message" do
q = @b.queue('test1')
q.publish('This is another test message')
q.pop { |msg| msg[:payload].should == 'This is another test message' }
q.pop { |msg| msg[:payload].should == :queue_empty }
end

it "should raise an error if purge fails" do
q = @b.queue('test1')
5.times {q.publish('This is another test message')}
Expand Down

0 comments on commit 5b3143b

Please sign in to comment.