Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add support to ensure consistency in threaded environment

Add a new API call which takes a block to ensure that commands occur on
the tube you want in a threaded environment. For example

    beanstalk.on_tube('foo') do |connection|
      connection.put 'bar'
    end

This will send a USE and then a PUT surrounded by lock so that another
thread can't put on the wrong tube.
  • Loading branch information...
commit 4db12f0766bc02433b76bba83f2c26933eb29e46 1 parent eb0d7f5
@johnf johnf authored
View
6 README.md
@@ -10,12 +10,6 @@ For more information, see:
- <http://kr.github.com/beanstalkd/>
- <http://github.com/kr/beanstalkd/raw/master/doc/protocol.txt>
-## Notes
-
-If you want to use this library from multiple concurrent threads, you should
-synchronize access to the connection. This library does no internal
-synchronization.
-
## Contributors
- Isaac Feliu
View
21 lib/beanstalk-client/connection.rb
@@ -27,6 +27,7 @@ class Connection
def initialize(addr, default_tube=nil)
@mutex = Mutex.new
+ @tube_mutex = Mutex.new
@waiting = false
@addr = addr
connect
@@ -78,6 +79,14 @@ def peek_buried()
interact("peek-buried\r\n", :job)
end
+ def on_tube(tube, &block)
+ @tube_mutex.lock
+ use tube
+ yield self
+ ensure
+ @tube_mutex.unlock
+ end
+
def reserve(timeout=nil)
raise WaitingForJobError if @waiting
@mutex.lock
@@ -289,6 +298,10 @@ def yput(obj, pri=65536, delay=0, ttr=120)
send_to_rand_conn(:yput, obj, pri, delay, ttr)
end
+ def on_tube(tube, &block)
+ send_to_rand_conn(:on_tube, tube, &block)
+ end
+
# Reserve a job from the queue.
#
# == Parameters
@@ -376,9 +389,9 @@ def peek_job(id)
private
- def call_wrap(c, *args)
+ def call_wrap(c, *args, &block)
self.last_conn = c
- c.send(*args)
+ c.send(*args, &block)
rescue UnexpectedResponse => ex
raise ex
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => ex
@@ -401,9 +414,9 @@ def send_to_each_conn_first_res(*args)
retry_wrap{open_connections.inject(nil) {|r,c| r or call_wrap(c, *args)}}
end
- def send_to_rand_conn(*args)
+ def send_to_rand_conn(*args, &block)
connect()
- retry_wrap{call_wrap(pick_connection, *args)}
+ retry_wrap{call_wrap(pick_connection, *args, &block)}
end
def send_to_all_conns(*args)
View
80 test/test_beanstalk-client.rb
@@ -4,4 +4,84 @@ class TestBeanstalkClient < Test::Unit::TestCase
def test_truth
assert true
end
+
+ def setup
+ @beanstalk = Beanstalk::Pool.new(['127.0.0.1:11300'])
+ @tubes = ['one', 'two']
+
+ # Put something on each tube so they exist
+ @beanstalk.use('one')
+ @beanstalk.put('one')
+
+ @beanstalk.use('two')
+ @beanstalk.put('two')
+ end
+
+ def test_not_thread_safe
+ # Create threads that will execute
+ # A: use one
+ # B: use one
+ # B: put two
+ # A: put one
+ a = Thread.new do
+ @beanstalk.use('one')
+ sleep 4
+ @beanstalk.put('one')
+ end
+
+ b = Thread.new do
+ sleep 1
+ @beanstalk.use('two')
+ @beanstalk.put('two')
+ end
+
+ a.join
+ b.join
+
+ one = @beanstalk.stats_tube 'one'
+ two = @beanstalk.stats_tube 'two'
+
+ assert_equal one['current-jobs-ready'], 1
+ assert_equal two['current-jobs-ready'], 3
+ end
+
+ def test_thread_safe
+ a = Thread.new do
+ @beanstalk.on_tube('one') do |conn|
+ sleep 4
+ conn.put('one')
+ end
+ end
+
+ b = Thread.new do
+ @beanstalk.on_tube('two') do |conn|
+ sleep 1
+ conn.put('two')
+ end
+ end
+
+ a.join
+ b.join
+
+ one = @beanstalk.stats_tube 'one'
+ two = @beanstalk.stats_tube 'two'
+
+ assert_equal one['current-jobs-ready'], 2
+ assert_equal two['current-jobs-ready'], 2
+ end
+
+ def teardown
+ # Clear the tubes
+ @tubes.each do |tube|
+ stats = @beanstalk.stats_tube tube
+ num_jobs = stats['current-jobs-ready']
+ @beanstalk.watch tube
+ num_jobs.times do
+ job = @beanstalk.reserve
+ job.delete
+ end
+ @beanstalk.ignore tube
+ end
+ end
+
end
Please sign in to comment.
Something went wrong with that request. Please try again.