Skip to content

Commit

Permalink
! Fixes bug where filtered queues would not be respected
Browse files Browse the repository at this point in the history
  • Loading branch information
kschiess committed Jan 4, 2010
1 parent 3568d2f commit 4deb4c0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 14 deletions.
1 change: 0 additions & 1 deletion lib/toamqp/service/base.rb
Expand Up @@ -20,7 +20,6 @@ def thrift_module_constant(constant)
# Returns a thrift processor for this service.
#
def thrift_processor

thrift_module_constant('Processor').new(self)
end

Expand Down
16 changes: 15 additions & 1 deletion lib/toamqp/topology.rb
Expand Up @@ -44,7 +44,8 @@ def produce_exchange
"Maybe exchange '#{exchange_name}' already exists and is of a different type than #{exchange_type.inspect} ?"
end
def produce_queue
queue = connection.queue(exchange_name)
queue = connection.queue(
produce_queue_name(exchange_name))

bind_options = {}
if match_headers?
Expand All @@ -56,6 +57,19 @@ def produce_queue

return queue
end
def produce_queue_name(exchange_name)
if match_headers?
# Queue names are derived from the filter attributes. We sort the keys
# and compose the name from the exchange and the filter.
match_header = options[:match]
[
exchange_name,
match_header.keys.sort.map { |k| "#{k}_#{match_header[k]}" }].
flatten.join('-')
else
exchange_name
end
end

# Closes the connection and cleans up after the topology.
#
Expand Down
29 changes: 17 additions & 12 deletions spec/integration/filtered_spec.rb
Expand Up @@ -30,20 +30,26 @@ class FilterForBarService < FilterForBaseService
class FilterForBazService < FilterForBaseService
exchange :test_filtered, :match => { :foo => :baz }
end

after(:each) do
Bunny.run do |mq|
%w{
test_filtered-foo_baz
test_filtered-foo_bar
}.each do |queue_name|
queue = mq.queue(queue_name)
queue.pop while queue.message_count > 0
queue.delete
end
end
end

context "using a single server" do
attr_reader :server, :received
before(:each) do
@received = []
@server = TOAMQP.server(FilterForBarService.new(received), TOAMQP::SpecServer)
end
after(:each) do
Bunny.run do |mq|
queue = mq.queue('test_filtered')
queue.pop while queue.message_count > 0
queue.delete
end
end

context "when sent messages with :foo => :bar" do
before(:each) do
Expand Down Expand Up @@ -74,13 +80,12 @@ class FilterForBazService < FilterForBaseService
@bar = []
@baz = []

@bar_server = TOAMQP.server(FilterForBarService.new(bar))
@baz_server = TOAMQP.server(FilterForBazService.new(baz))
@bar_server = TOAMQP.server(FilterForBarService.new(bar), TOAMQP::SpecServer)
@baz_server = TOAMQP.server(FilterForBazService.new(baz), TOAMQP::SpecServer)

client = TOAMQP.client(:test_filtered, Test, :header => { :foo => :bar })
client.announce('message')
end
def serve

@baz_server.serve
@bar_server.serve
end
Expand Down

0 comments on commit 4deb4c0

Please sign in to comment.