Skip to content

Commit

Permalink
Introduce HotBunnies::Queue::Subscription#active? and #shutdown!, add…
Browse files Browse the repository at this point in the history
… test example for cancelling

A missing piece discovered during the development of the next generation travis-ci.org worker.
  • Loading branch information
michaelklishin committed Nov 17, 2011
1 parent e9c4dd6 commit c80e2f1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 12 deletions.
42 changes: 30 additions & 12 deletions lib/hot_bunnies/queue.rb
Expand Up @@ -48,21 +48,23 @@ def status
[response.message_count, response.consumer_count]
end

private
private

def declare!
response = if @options[:passive]
then @channel.queue_declare_passive(@name)
else @channel.queue_declare(@name, @options[:durable], @options[:exclusive], @options[:auto_delete], @options[:arguments])
end
then @channel.queue_declare_passive(@name)
else @channel.queue_declare(@name, @options[:durable], @options[:exclusive], @options[:auto_delete], @options[:arguments])
end
@name = response.queue
end

class Subscription
def initialize(channel, queue_name, options={})
@channel = channel
@channel = channel
@queue_name = queue_name
@ack = options.fetch(:ack, false)
@ack = options.fetch(:ack, false)

@cancelled = java.util.concurrent.atomic.AtomicBoolean.new(false)
end

def each(options={}, &block)
Expand All @@ -82,19 +84,35 @@ def each(options={}, &block)
end

def cancel
raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !@subscriber || !@subscriber.consumer_tag
raise 'Can\'t cancel: the subscriber haven\'t received an OK yet' if !self.active?
@channel.basic_cancel(@subscriber.consumer_tag)

# RabbitMQ Java client won't clear consumer_tag from cancelled consumers,
# so we have to do this. Sharing consumers
# between threads in general is a can of worms but someone somewhere
# will almost certainly do it, so. MK.
@cancelled.set(true)

maybe_shutdown_executor
end

private
def active?
!@cancelled.get && !@subscriber.nil? && !@subscriber.consumer_tag.nil?
end

def maybe_shutdown_executor
if @executor && @shut_down_executor
@executor.shutdown
def shutdown!
if @executor && @shut_down_executor
@executor.shutdown_now
end
end

private

def maybe_shutdown_executor
if @executor && @shut_down_executor
@executor.shutdown
end
end
end

def run(&block)
@subscriber = BlockingSubscriber.new(@channel, self)
Expand Down
29 changes: 29 additions & 0 deletions spec/integration/basic_consume_spec.rb
@@ -0,0 +1,29 @@
require "spec_helper"

describe "Queue consumer" do
let(:connection) { HotBunnies.connect }
let(:channel) { connection.create_channel }

after :each do
channel.close
connection.close
end

it "provides predicates" do
queue = channel.queue("", :auto_delete => true)

subscription = queue.subscribe(:blocking => false) { |_, _| nil }

# consumer tag will be sent by the broker, so this happens
# asynchronously and we can either add callbacks/use latches or
# just wait. MK.
sleep(1.0)
subscription.should be_active

subscription.cancel
sleep(1.0)
subscription.should_not be_active

subscription.shutdown!
end
end

0 comments on commit c80e2f1

Please sign in to comment.