From 5b3143b63ee54255a690efe3579879b06a0a58b6 Mon Sep 17 00:00:00 2001 From: Chris Duncan Date: Mon, 14 Sep 2009 12:49:06 +0100 Subject: [PATCH] Allow Queue#pop to take an optional block --- lib/bunny/queue08.rb | 36 +++++++++++++++++++----------- lib/bunny/queue09.rb | 42 ++++++++++++++++++++++------------- spec/spec_08/exchange_spec.rb | 2 +- spec/spec_08/queue_spec.rb | 9 +++++++- spec/spec_09/exchange_spec.rb | 2 +- spec/spec_09/queue_spec.rb | 9 +++++++- 6 files changed, 67 insertions(+), 33 deletions(-) diff --git a/lib/bunny/queue08.rb b/lib/bunny/queue08.rb index f54c2c3db..e6b292f88 100644 --- a/lib/bunny/queue08.rb +++ b/lib/bunny/queue08.rb @@ -182,9 +182,12 @@ def delete(opts = {}) :payload => :queue_empty :delivery_details => nil +N.B. If a block is provided then the hash will be passed into the block and the return value +will be nil. + =end - def pop(opts = {}) + def pop(opts = {}, &blk) # do we want to have to provide an acknowledgement? ack = opts.delete(:ack) @@ -199,25 +202,32 @@ def pop(opts = {}) method = client.next_method if method.is_a?(Qrack::Protocol::Basic::GetEmpty) then - return {:header => nil, :payload => :queue_empty, :delivery_details => nil} + queue_empty = true elsif !method.is_a?(Qrack::Protocol::Basic::GetOk) raise Bunny::ProtocolError, "Error getting message from queue #{name}" end - # get delivery tag to use for acknowledge - self.delivery_tag = method.delivery_tag if ack + if !queue_empty + # get delivery tag to use for acknowledge + self.delivery_tag = method.delivery_tag if ack - header = client.next_payload + header = client.next_payload - # If maximum frame size is smaller than message payload body then message - # will have a message header and several message bodies - msg = '' - while msg.length < header.size - msg += client.next_payload + # If maximum frame size is smaller than message payload body then message + # will have a message header and several message bodies + msg = '' + while msg.length < header.size + msg += client.next_payload + end + + msg_hash = {:header => header, :payload => msg, :delivery_details => method.arguments} + + else + msg_hash = {:header => nil, :payload => :queue_empty, :delivery_details => nil} end - - # Return message with additional info if requested - {:header => header, :payload => msg, :delivery_details => method.arguments} + + # Pass message hash to block or return message hash + blk ? blk.call(msg_hash) : msg_hash end diff --git a/lib/bunny/queue09.rb b/lib/bunny/queue09.rb index c01e34a2b..a59232e31 100644 --- a/lib/bunny/queue09.rb +++ b/lib/bunny/queue09.rb @@ -183,9 +183,12 @@ def delete(opts = {}) :payload => :queue_empty :delivery_details => nil +N.B. If a block is provided then the hash will be passed into the block and the return value +will be nil. + =end - def pop(opts = {}) + def pop(opts = {}, &blk) # do we want to have to provide an acknowledgement? ack = opts.delete(:ack) @@ -201,26 +204,33 @@ def pop(opts = {}) method = client.next_method if method.is_a?(Qrack::Protocol09::Basic::GetEmpty) then - return {:header => nil, :payload => :queue_empty, :delivery_details => nil} + queue_empty = true elsif !method.is_a?(Qrack::Protocol09::Basic::GetOk) raise Bunny::ProtocolError, "Error getting message from queue #{name}" end - - # get delivery tag to use for acknowledge - self.delivery_tag = method.delivery_tag if ack - - header = client.next_payload - - # If maximum frame size is smaller than message payload body then message - # will have a message header and several message bodies - msg = '' - while msg.length < header.size - msg += client.next_payload + + if !queue_empty + # get delivery tag to use for acknowledge + self.delivery_tag = method.delivery_tag if ack + + header = client.next_payload + + # If maximum frame size is smaller than message payload body then message + # will have a message header and several message bodies + msg = '' + while msg.length < header.size + msg += client.next_payload + end + + msg_hash = {:header => header, :payload => msg, :delivery_details => method.arguments} + + else + msg_hash = {:header => nil, :payload => :queue_empty, :delivery_details => nil} end - # Return message with additional info if requested - {:header => header, :payload => msg, :delivery_details => method.arguments} - + # Pass message hash to block or return message hash + blk ? blk.call(msg_hash) : msg_hash + end =begin rdoc diff --git a/spec/spec_08/exchange_spec.rb b/spec/spec_08/exchange_spec.rb index 7253ebe2e..a1450741e 100644 --- a/spec/spec_08/exchange_spec.rb +++ b/spec/spec_08/exchange_spec.rb @@ -8,7 +8,7 @@ require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny])) -describe Bunny do +describe 'Exchange' do before(:each) do @b = Bunny.new diff --git a/spec/spec_08/queue_spec.rb b/spec/spec_08/queue_spec.rb index e7f1c84bd..cc41eb33d 100644 --- a/spec/spec_08/queue_spec.rb +++ b/spec/spec_08/queue_spec.rb @@ -8,7 +8,7 @@ require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny])) -describe Bunny do +describe 'Queue' do before(:each) do @b = Bunny.new @@ -87,6 +87,13 @@ msg.should == lg_msg end + it "should be able call a block when popping a message" do + q = @b.queue('test1') + q.publish('This is another test message') + q.pop { |msg| msg[:payload].should == 'This is another test message' } + q.pop { |msg| msg[:payload].should == :queue_empty } + end + it "should raise an error if purge fails" do q = @b.queue('test1') 5.times {q.publish('This is another test message')} diff --git a/spec/spec_09/exchange_spec.rb b/spec/spec_09/exchange_spec.rb index 5cc62f08a..2ec260c7b 100644 --- a/spec/spec_09/exchange_spec.rb +++ b/spec/spec_09/exchange_spec.rb @@ -8,7 +8,7 @@ require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny])) -describe Bunny do +describe 'Exchange' do before(:each) do @b = Bunny.new(:spec => '09') diff --git a/spec/spec_09/queue_spec.rb b/spec/spec_09/queue_spec.rb index b4afc5a5d..0d1c49ee0 100644 --- a/spec/spec_09/queue_spec.rb +++ b/spec/spec_09/queue_spec.rb @@ -8,7 +8,7 @@ require File.expand_path(File.join(File.dirname(__FILE__), %w[.. .. lib bunny])) -describe Bunny do +describe 'Queue' do before(:each) do @b = Bunny.new(:spec => '09') @@ -87,6 +87,13 @@ msg.should == lg_msg end + it "should be able call a block when popping a message" do + q = @b.queue('test1') + q.publish('This is another test message') + q.pop { |msg| msg[:payload].should == 'This is another test message' } + q.pop { |msg| msg[:payload].should == :queue_empty } + end + it "should raise an error if purge fails" do q = @b.queue('test1') 5.times {q.publish('This is another test message')}