From 4deb4c039080ffc0d10748de8421d857c9d95f78 Mon Sep 17 00:00:00 2001 From: Kaspar Schiess Date: Mon, 4 Jan 2010 16:57:42 +0100 Subject: [PATCH] ! Fixes bug where filtered queues would not be respected --- lib/toamqp/service/base.rb | 1 - lib/toamqp/topology.rb | 16 +++++++++++++++- spec/integration/filtered_spec.rb | 29 +++++++++++++++++------------ 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/lib/toamqp/service/base.rb b/lib/toamqp/service/base.rb index 3fb1ddc..e4d64ba 100644 --- a/lib/toamqp/service/base.rb +++ b/lib/toamqp/service/base.rb @@ -20,7 +20,6 @@ def thrift_module_constant(constant) # Returns a thrift processor for this service. # def thrift_processor - thrift_module_constant('Processor').new(self) end diff --git a/lib/toamqp/topology.rb b/lib/toamqp/topology.rb index ac79339..1d13d14 100644 --- a/lib/toamqp/topology.rb +++ b/lib/toamqp/topology.rb @@ -44,7 +44,8 @@ def produce_exchange "Maybe exchange '#{exchange_name}' already exists and is of a different type than #{exchange_type.inspect} ?" end def produce_queue - queue = connection.queue(exchange_name) + queue = connection.queue( + produce_queue_name(exchange_name)) bind_options = {} if match_headers? @@ -56,6 +57,19 @@ def produce_queue return queue end + def produce_queue_name(exchange_name) + if match_headers? + # Queue names are derived from the filter attributes. We sort the keys + # and compose the name from the exchange and the filter. + match_header = options[:match] + [ + exchange_name, + match_header.keys.sort.map { |k| "#{k}_#{match_header[k]}" }]. + flatten.join('-') + else + exchange_name + end + end # Closes the connection and cleans up after the topology. # diff --git a/spec/integration/filtered_spec.rb b/spec/integration/filtered_spec.rb index 895c366..7d387c0 100644 --- a/spec/integration/filtered_spec.rb +++ b/spec/integration/filtered_spec.rb @@ -30,6 +30,19 @@ class FilterForBarService < FilterForBaseService class FilterForBazService < FilterForBaseService exchange :test_filtered, :match => { :foo => :baz } end + + after(:each) do + Bunny.run do |mq| + %w{ + test_filtered-foo_baz + test_filtered-foo_bar + }.each do |queue_name| + queue = mq.queue(queue_name) + queue.pop while queue.message_count > 0 + queue.delete + end + end + end context "using a single server" do attr_reader :server, :received @@ -37,13 +50,6 @@ class FilterForBazService < FilterForBaseService @received = [] @server = TOAMQP.server(FilterForBarService.new(received), TOAMQP::SpecServer) end - after(:each) do - Bunny.run do |mq| - queue = mq.queue('test_filtered') - queue.pop while queue.message_count > 0 - queue.delete - end - end context "when sent messages with :foo => :bar" do before(:each) do @@ -74,13 +80,12 @@ class FilterForBazService < FilterForBaseService @bar = [] @baz = [] - @bar_server = TOAMQP.server(FilterForBarService.new(bar)) - @baz_server = TOAMQP.server(FilterForBazService.new(baz)) - + @bar_server = TOAMQP.server(FilterForBarService.new(bar), TOAMQP::SpecServer) + @baz_server = TOAMQP.server(FilterForBazService.new(baz), TOAMQP::SpecServer) + client = TOAMQP.client(:test_filtered, Test, :header => { :foo => :bar }) client.announce('message') - end - def serve + @baz_server.serve @bar_server.serve end