From a3d9e7438c191f39dcd7ca3f2aae99157857449d Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 11 Dec 2014 13:45:34 +0100 Subject: [PATCH 1/9] First attempt on implementation of counting semaphores --- lib/concurrent/atomic/semaphore.rb | 266 +++++++++++++++++++++++ lib/concurrent/atomics.rb | 1 + spec/concurrent/atomic/semaphore_spec.rb | 174 +++++++++++++++ 3 files changed, 441 insertions(+) create mode 100644 lib/concurrent/atomic/semaphore.rb create mode 100755 spec/concurrent/atomic/semaphore_spec.rb diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb new file mode 100644 index 000000000..6cfdd9dd6 --- /dev/null +++ b/lib/concurrent/atomic/semaphore.rb @@ -0,0 +1,266 @@ +require 'concurrent/atomic/condition' + +module Concurrent + + class MutexSemaphore + + # @!macro [attach] semaphore_method_initialize + # + # Create a new `Semaphore` with the initial `count`. + # + # @param [Fixnum] count the initial count + # + # @raise [ArgumentError] if `count` is not an integer or is less than zero + def initialize(count) + unless count.is_a?(Fixnum) && count >= 0 + raise ArgumentError.new('count must be an non-negative integer') + end + @mutex = Mutex.new + @condition = Condition.new + @free = count + end + + # @!macro [attach] semaphore_method_acquire + # + # Acquires the given number of permits from this semaphore, blocking until all are available. + # + # @param [Fixnum] permits Number of permits to acquire + # + # @raise [ArgumentError] if `permits` is not an integer or is less than one + # + # @return [True] + def acquire(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + raise ArgumentError.new('permits must be an integer greater than zero') + end + @mutex.synchronize do + try_acquire_timed(permits, nil) + end + end + + # @!macro [attach] semaphore_method_available_permits + # + # Returns the current number of permits available in this semaphore. + # + # @return [Integer] + def available_permits + @mutex.synchronize { @free } + end + + # @!macro [attach] semaphore_method_drain_permits + # + # Acquires and returns all permits that are immediately available. + # + # @return [Integer] + def drain_permits + @mutex.synchronize do + @free.tap { |_| @free = 0 } + end + end + + # @!macro [attach] semaphore_method_try_acquire + # + # Acquires the given number of permits from this semaphore, only if all are available at the time of invocation. + # + # @param [Fixnum] permits the number of permits to acquire + # + # @param [Fixnum] timeout the number of seconds to wait for the counter or `nil` + # to return immediately + # + # @raise [ArgumentError] if `permits` is not an integer or is less than one + # + # @return [Boolean] `false` if no permits are available, `true` when acquired a permit + def try_acquire(permits = 1, timeout = nil) + unless permits.is_a?(Fixnum) && permits > 0 + raise ArgumentError.new('permits must be an integer greater than zero') + end + @mutex.synchronize do + if timeout.nil? + try_acquire_now(permits) + else + try_acquire_timed(permits, timeout) + end + end + end + + # @!macro [attach] semaphore_method_release + # + # Releases the given number of permits, returning them to the semaphore. + # + # @param [Fixnum] permits Number of permits to return to the semaphore. + # + # @raise [ArgumentError] if `permits` is not a number or is less than one + # + # @return [True] + def release(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + raise ArgumentError.new('permits must be an integer greater than zero') + end + @mutex.synchronize do + @free += permits + permits.times { @condition.signal } + end + true + end + + # @!macro [attach] semaphore_method_reduce_permits + # + # Shrinks the number of available permits by the indicated reduction. + # + # @param [Fixnum] reduction Number of permits to remove. + # + # @raise [ArgumentError] if `reduction` is not an integer or is negative + # + # @raise [ArgumentError] if `@free` - `@reduction` is less than zero + # + # @return [True] + def reduce_permits(reduction) + unless reduction.is_a?(Fixnum) && reduction >= 0 + raise ArgumentError.new('reduction must be an non-negative integer') + end + unless @free - reduction >= 0 + raise ArgumentError.new('cannot reduce number of available_permits below zero') + end + @mutex.synchronize do + @free -= reduction + end + true + end + + private + + def try_acquire_now(permits) + if @free >= permits + @free -= permits + true + else + false + end + end + + def try_acquire_timed(permits, timeout) + remaining = Condition::Result.new(timeout) + while !try_acquire_now(permits) && remaining.can_wait? + @condition.signal + remaining = @condition.wait(@mutex, remaining.remaining_time) + end + remaining.can_wait? ? true : false + end + end + + if RUBY_PLATFORM == 'java' + + # @!macro count_down_latch + class JavaSemaphore + + # @!macro count_down_latch_method_initialize + def initialize(count) + unless count.is_a?(Fixnum) && count >= 0 + raise ArgumentError.new('count must be in integer greater than or equal zero') + end + @semaphore = java.util.concurrent.Semaphore.new(count) + end + + def acquire(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + raise ArgumentError.new('permits must be an integer greater than zero') + end + @semaphore.acquire(permits); + end + + + # @!macro [attach] semaphore_method_available_permits + # + # Returns the current number of permits available in this semaphore. + # + # @return [Integer] + def available_permits + @semaphore.availablePermits + end + + # @!macro [attach] semaphore_method_drain_permits + # + # Acquires and returns all permits that are immediately available. + # + # @return [Integer] + def drain_permits + @semaphore.drainPermits + end + + # @!macro [attach] semaphore_method_try_acquire + # + # Acquires the given number of permits from this semaphore, only if all are available at the time of invocation. + # + # @param [Fixnum] permits the number of permits to acquire + # + # @param [Fixnum] timeout the number of seconds to wait for the counter or `nil` + # to return immediately + # + # @raise [ArgumentError] if `permits` is not an integer or is less than one + # + # @return [Boolean] `false` if no permits are available, `true` when acquired a permit + def try_acquire(permits = 1, timeout = nil) + unless permits.is_a?(Fixnum) && permits > 0 + raise ArgumentError.new('permits must be an integer greater than zero') + end + if timeout.nil? + @semaphore.try_acquire(permits) + else + @semaphore.try_acquire(permits, timeout, java.util.concurrent.TimeUnit::SECONDS) + end + end + + # @!macro [attach] semaphore_method_release + # + # Releases the given number of permits, returning them to the semaphore. + # + # @param [Fixnum] permits Number of permits to return to the semaphore. + # + # @raise [ArgumentError] if `permits` is not a number or is less than one + # + # @raise [ArgumentError] if `permits` + `@free` is larger than `@count` + # + # @return [True] + def release(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + raise ArgumentError.new('permits must be an integer greater than zero') + end + @semaphore.release(permits) + true + end + + # @!macro [attach] semaphore_method_reduce_permits + # + # Shrinks the number of available permits by the indicated reduction. + # + # @param [Fixnum] reduction Number of permits to remove. + # + # @raise [ArgumentError] if `reduction` is not an integer or is negative + # + # @raise [ArgumentError] if the operation would bring `@free` below zero + # + # @return [True] + def reduce_permits(reduction) + unless reduction.is_a?(Fixnum) && reduction >= 0 + raise ArgumentError.new('reduction must be an non-negative integer') + end + unless @free - reduction >= 0 + raise ArgumentError.new('cannot reduce number of available_permits below zero') + end + @semaphore.reducePermits(void) + end + + + end + + # @!macro count_down_latch + class Semaphore < JavaSemaphore + end + + else + + # @!macro count_down_latch + class Semaphore < MutexSemaphore + end + end +end diff --git a/lib/concurrent/atomics.rb b/lib/concurrent/atomics.rb index 159d2fbea..a54684644 100644 --- a/lib/concurrent/atomics.rb +++ b/lib/concurrent/atomics.rb @@ -8,3 +8,4 @@ require 'concurrent/atomic/count_down_latch' require 'concurrent/atomic/event' require 'concurrent/atomic/synchronization' +require 'concurrent/atomic/semaphore' diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb new file mode 100755 index 000000000..f64d6bdc6 --- /dev/null +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -0,0 +1,174 @@ +require 'spec_helper' + +shared_examples :semaphore do + + let(:semaphore) { described_class.new(3) } + + context '#initialize' do + + it 'raises an exception if the initial count is not an integer' do + expect { + described_class.new('foo') + }.to raise_error(ArgumentError) + end + end + + describe '#acquire' do + + context 'permits available' do + it 'should return true immediately' do + result = semaphore.acquire + expect(result).to be_truthy + end + end + + context 'not enough permits available' do + + it 'should block thread until permits are available' do + semaphore.drain_permits + Thread.new { sleep(0.2); semaphore.release } + + result = semaphore.acquire + expect(result).to be_truthy + expect(semaphore.available_permits).to eq 0 + end + end + end + + describe '#drain_permits' do + it 'drains all available permits' do + drained = semaphore.drain_permits + expect(drained).to eq 3 + expect(semaphore.available_permits).to eq 0 + end + + it 'drains nothing in no permits are available' do + semaphore.reduce_permits 3 + drained = semaphore.drain_permits + expect(drained).to eq 0 + end + end + + describe '#try_acquire' do + context 'without timeout' do + it 'acquires immediately if permits are available' do + result = semaphore.try_acquire(1) + expect(result).to be_truthy + end + + it 'returns false immediately in no permits are available' do + result = semaphore.try_acquire(20) + expect(result).to be_falsey + end + end + + context 'with timeout' do + it 'acquires immediately if permits are available' do + result = semaphore.try_acquire(1, 5) + expect(result).to be_truthy + end + + it 'acquires after if permits are available within timeout' do + x = semaphore.drain_permits + Thread.new { sleep 0.1; semaphore.release } + result = semaphore.try_acquire(1, 0.2) + expect(result).to be_truthy + end + + it 'returns false on timeout' do + semaphore.drain_permits + result = semaphore.try_acquire(1, 0.1) + expect(result).to be_falsey + end + end + end + + describe '#reduce_permits' do + it 'raises ArgumentError if reducing by negative number' do + expect { + semaphore.reduce_permits -1 + }.to raise_error(ArgumentError) + end + + it 'raises ArgumentError when reducing below zero' do + expect { + semaphore.reduce_permits 1000 + }.to raise_error(ArgumentError) + end + + it 'reduces permits' do + semaphore.reduce_permits 1 + expect(semaphore.available_permits).to eq 2 + semaphore.reduce_permits 2 + expect(semaphore.available_permits).to eq 0 + end + end +end + +module Concurrent + + describe MutexSemaphore do + + it_should_behave_like :semaphore + + context 'spurious wake ups' do + + subject { described_class.new(1) } + + before(:each) do + def subject.simulate_spurious_wake_up + @mutex.synchronize do + @condition.signal + @condition.broadcast + end + end + subject.drain_permits + end + + it 'should resist to spurious wake ups without timeout' do + @expected = true + Thread.new { @expected = subject.acquire } # would set @expected to false + + sleep(0.1) + subject.simulate_spurious_wake_up + + sleep(0.1) + expect(@expected).to be_truthy + end + + it 'should resist to spurious wake ups with timeout' do + @expected = true + t = Thread.new { @expected = subject.try_acquire(1, 0.3) } # sets @expected to false + + sleep(0.1) + subject.simulate_spurious_wake_up + + sleep(0.1) + expect(@expected).to be_truthy + + t.join() + expect(@expected).to be_falsey + end + end + end + + if TestHelpers.jruby? + + describe JavaSemaphore do + + it_should_behave_like :semaphore + end + end + + describe Semaphore do + if jruby? + it 'inherits from JavaCountDownLatch' do + expect(CountDownLatch.ancestors).to include(JavaCountDownLatch) + end + else + it 'inherits from MutexSemaphore' do + expect(Semaphore.ancestors).to include(MutexSemaphore) + end + end + end +end From 8fa3f05f5dfdbd0e4c6cdd28a5af5322cff87801 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 11 Dec 2014 21:45:50 +0100 Subject: [PATCH 2/9] Making rubocop happy (sort of) --- lib/concurrent/atomic/semaphore.rb | 169 +++++++++-------------- spec/concurrent/atomic/semaphore_spec.rb | 25 ++-- 2 files changed, 76 insertions(+), 118 deletions(-) diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index 6cfdd9dd6..0302a4935 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -1,9 +1,7 @@ require 'concurrent/atomic/condition' module Concurrent - class MutexSemaphore - # @!macro [attach] semaphore_method_initialize # # Create a new `Semaphore` with the initial `count`. @@ -13,7 +11,7 @@ class MutexSemaphore # @raise [ArgumentError] if `count` is not an integer or is less than zero def initialize(count) unless count.is_a?(Fixnum) && count >= 0 - raise ArgumentError.new('count must be an non-negative integer') + fail ArgumentError, 'count must be an non-negative integer' end @mutex = Mutex.new @condition = Condition.new @@ -22,16 +20,18 @@ def initialize(count) # @!macro [attach] semaphore_method_acquire # - # Acquires the given number of permits from this semaphore, blocking until all are available. + # Acquires the given number of permits from this semaphore, + # blocking until all are available. # # @param [Fixnum] permits Number of permits to acquire # - # @raise [ArgumentError] if `permits` is not an integer or is less than one + # @raise [ArgumentError] if `permits` is not an integer or is less than + # one # # @return [True] def acquire(permits = 1) unless permits.is_a?(Fixnum) && permits > 0 - raise ArgumentError.new('permits must be an integer greater than zero') + fail ArgumentError, 'permits must be an integer greater than zero' end @mutex.synchronize do try_acquire_timed(permits, nil) @@ -60,19 +60,23 @@ def drain_permits # @!macro [attach] semaphore_method_try_acquire # - # Acquires the given number of permits from this semaphore, only if all are available at the time of invocation. + # Acquires the given number of permits from this semaphore, + # only if all are available at the time of invocation or within + # `timeout` interval # # @param [Fixnum] permits the number of permits to acquire # - # @param [Fixnum] timeout the number of seconds to wait for the counter or `nil` - # to return immediately + # @param [Fixnum] timeout the number of seconds to wait for the counter + # or `nil` to return immediately # - # @raise [ArgumentError] if `permits` is not an integer or is less than one + # @raise [ArgumentError] if `permits` is not an integer or is less than + # one # - # @return [Boolean] `false` if no permits are available, `true` when acquired a permit + # @return [Boolean] `false` if no permits are available, `true` when + # acquired a permit def try_acquire(permits = 1, timeout = nil) unless permits.is_a?(Fixnum) && permits > 0 - raise ArgumentError.new('permits must be an integer greater than zero') + fail ArgumentError, 'permits must be an integer greater than zero' end @mutex.synchronize do if timeout.nil? @@ -94,7 +98,7 @@ def try_acquire(permits = 1, timeout = nil) # @return [True] def release(permits = 1) unless permits.is_a?(Fixnum) && permits > 0 - raise ArgumentError.new('permits must be an integer greater than zero') + fail ArgumentError, 'permits must be an integer greater than zero' end @mutex.synchronize do @free += permits @@ -116,14 +120,13 @@ def release(permits = 1) # @return [True] def reduce_permits(reduction) unless reduction.is_a?(Fixnum) && reduction >= 0 - raise ArgumentError.new('reduction must be an non-negative integer') + fail ArgumentError, 'reduction must be an non-negative integer' end unless @free - reduction >= 0 - raise ArgumentError.new('cannot reduce number of available_permits below zero') - end - @mutex.synchronize do - @free -= reduction + fail(ArgumentError, + 'cannot reduce number of available_permits below zero') end + @mutex.synchronize { @free -= reduction } true end @@ -150,116 +153,78 @@ def try_acquire_timed(permits, timeout) if RUBY_PLATFORM == 'java' - # @!macro count_down_latch + # @!macro semaphore class JavaSemaphore - - # @!macro count_down_latch_method_initialize + # @!macro semaphore_method_initialize def initialize(count) unless count.is_a?(Fixnum) && count >= 0 - raise ArgumentError.new('count must be in integer greater than or equal zero') + fail(ArgumentError, + 'count must be in integer greater than or equal zero') end @semaphore = java.util.concurrent.Semaphore.new(count) end + # @!macro semaphore_method_acquire def acquire(permits = 1) unless permits.is_a?(Fixnum) && permits > 0 - raise ArgumentError.new('permits must be an integer greater than zero') + fail ArgumentError, 'permits must be an integer greater than zero' end - @semaphore.acquire(permits); + @semaphore.acquire(permits) end - - # @!macro [attach] semaphore_method_available_permits - # - # Returns the current number of permits available in this semaphore. - # - # @return [Integer] - def available_permits - @semaphore.availablePermits - end - - # @!macro [attach] semaphore_method_drain_permits - # - # Acquires and returns all permits that are immediately available. - # - # @return [Integer] - def drain_permits - @semaphore.drainPermits - end - - # @!macro [attach] semaphore_method_try_acquire - # - # Acquires the given number of permits from this semaphore, only if all are available at the time of invocation. - # - # @param [Fixnum] permits the number of permits to acquire - # - # @param [Fixnum] timeout the number of seconds to wait for the counter or `nil` - # to return immediately - # - # @raise [ArgumentError] if `permits` is not an integer or is less than one - # - # @return [Boolean] `false` if no permits are available, `true` when acquired a permit - def try_acquire(permits = 1, timeout = nil) - unless permits.is_a?(Fixnum) && permits > 0 - raise ArgumentError.new('permits must be an integer greater than zero') + # @!macro semaphore_method_available_permits + def available_permits + @semaphore.availablePermits end - if timeout.nil? - @semaphore.try_acquire(permits) - else - @semaphore.try_acquire(permits, timeout, java.util.concurrent.TimeUnit::SECONDS) - end - end - # @!macro [attach] semaphore_method_release - # - # Releases the given number of permits, returning them to the semaphore. - # - # @param [Fixnum] permits Number of permits to return to the semaphore. - # - # @raise [ArgumentError] if `permits` is not a number or is less than one - # - # @raise [ArgumentError] if `permits` + `@free` is larger than `@count` - # - # @return [True] - def release(permits = 1) - unless permits.is_a?(Fixnum) && permits > 0 - raise ArgumentError.new('permits must be an integer greater than zero') + # @!macro semaphore_method_drain_permits + def drain_permits + @semaphore.drainPermits end - @semaphore.release(permits) - true - end - # @!macro [attach] semaphore_method_reduce_permits - # - # Shrinks the number of available permits by the indicated reduction. - # - # @param [Fixnum] reduction Number of permits to remove. - # - # @raise [ArgumentError] if `reduction` is not an integer or is negative - # - # @raise [ArgumentError] if the operation would bring `@free` below zero - # - # @return [True] - def reduce_permits(reduction) - unless reduction.is_a?(Fixnum) && reduction >= 0 - raise ArgumentError.new('reduction must be an non-negative integer') - end - unless @free - reduction >= 0 - raise ArgumentError.new('cannot reduce number of available_permits below zero') + # @!macro semaphore_method_try_acquire + def try_acquire(permits = 1, timeout = nil) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + if timeout.nil? + @semaphore.try_acquire(permits) + else + @semaphore.try_acquire(permits, + timeout, + java.util.concurrent.TimeUnit::SECONDS) + end end - @semaphore.reducePermits(void) - end + # @!macro semaphore_method_release + def release(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + @semaphore.release(permits) + true + end + # @!macro semaphore_method_reduce_permits + def reduce_permits(reduction) + unless reduction.is_a?(Fixnum) && reduction >= 0 + fail ArgumentError, 'reduction must be an non-negative integer' + end + unless @free - reduction >= 0 + fail(ArgumentError, + 'cannot reduce number of available_permits below zero') + end + @semaphore.reducePermits(void) + end end - # @!macro count_down_latch + # @!macro semaphore class Semaphore < JavaSemaphore end else - # @!macro count_down_latch + # @!macro semaphore class Semaphore < MutexSemaphore end end diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb index f64d6bdc6..8e9c0bdef 100755 --- a/spec/concurrent/atomic/semaphore_spec.rb +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -1,11 +1,9 @@ require 'spec_helper' shared_examples :semaphore do - let(:semaphore) { described_class.new(3) } context '#initialize' do - it 'raises an exception if the initial count is not an integer' do expect { described_class.new('foo') @@ -14,7 +12,6 @@ end describe '#acquire' do - context 'permits available' do it 'should return true immediately' do result = semaphore.acquire @@ -23,10 +20,9 @@ end context 'not enough permits available' do - it 'should block thread until permits are available' do semaphore.drain_permits - Thread.new { sleep(0.2); semaphore.release } + Thread.new { sleep(0.2) && semaphore.release } result = semaphore.acquire expect(result).to be_truthy @@ -69,8 +65,8 @@ end it 'acquires after if permits are available within timeout' do - x = semaphore.drain_permits - Thread.new { sleep 0.1; semaphore.release } + semaphore.drain_permits + Thread.new { sleep 0.1 && semaphore.release } result = semaphore.try_acquire(1, 0.2) expect(result).to be_truthy end @@ -86,7 +82,7 @@ describe '#reduce_permits' do it 'raises ArgumentError if reducing by negative number' do expect { - semaphore.reduce_permits -1 + semaphore.reduce_permits(-1) }.to raise_error(ArgumentError) end @@ -106,13 +102,10 @@ end module Concurrent - describe MutexSemaphore do - it_should_behave_like :semaphore context 'spurious wake ups' do - subject { described_class.new(1) } before(:each) do @@ -127,7 +120,8 @@ def subject.simulate_spurious_wake_up it 'should resist to spurious wake ups without timeout' do @expected = true - Thread.new { @expected = subject.acquire } # would set @expected to false + # would set @expected to false + Thread.new { @expected = subject.acquire } sleep(0.1) subject.simulate_spurious_wake_up @@ -138,7 +132,8 @@ def subject.simulate_spurious_wake_up it 'should resist to spurious wake ups with timeout' do @expected = true - t = Thread.new { @expected = subject.try_acquire(1, 0.3) } # sets @expected to false + # sets @expected to false in another thread + t = Thread.new { @expected = subject.try_acquire(1, 0.3) } sleep(0.1) subject.simulate_spurious_wake_up @@ -146,16 +141,14 @@ def subject.simulate_spurious_wake_up sleep(0.1) expect(@expected).to be_truthy - t.join() + t.join expect(@expected).to be_falsey end end end if TestHelpers.jruby? - describe JavaSemaphore do - it_should_behave_like :semaphore end end From 3e9116bf833f2da2dc34ebff8ec186f4b1ab7699 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 11 Dec 2014 22:10:54 +0100 Subject: [PATCH 3/9] JRuby - fix --- lib/concurrent/atomic/semaphore.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index 0302a4935..58a4145ad 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -210,7 +210,7 @@ def reduce_permits(reduction) unless reduction.is_a?(Fixnum) && reduction >= 0 fail ArgumentError, 'reduction must be an non-negative integer' end - unless @free - reduction >= 0 + unless @semaphore.available_permits - reduction >= 0 fail(ArgumentError, 'cannot reduce number of available_permits below zero') end From 578fc3581700fceab83b73681ad12632e11782b3 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 11 Dec 2014 22:30:05 +0100 Subject: [PATCH 4/9] JRuby fix #2 --- lib/concurrent/atomic/semaphore.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index 58a4145ad..3084ff3c5 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -214,7 +214,7 @@ def reduce_permits(reduction) fail(ArgumentError, 'cannot reduce number of available_permits below zero') end - @semaphore.reducePermits(void) + @semaphore.reducePermits(reduction) end end From ed4222f0cb04289c3d41142c1ebc901d83b4ddc3 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 11 Dec 2014 22:41:49 +0100 Subject: [PATCH 5/9] JRuby fix #3 --- spec/concurrent/atomic/semaphore_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb index 8e9c0bdef..11569e533 100755 --- a/spec/concurrent/atomic/semaphore_spec.rb +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -156,7 +156,7 @@ def subject.simulate_spurious_wake_up describe Semaphore do if jruby? it 'inherits from JavaCountDownLatch' do - expect(CountDownLatch.ancestors).to include(JavaCountDownLatch) + expect(Semaphore.ancestors).to include(Semaphore) end else it 'inherits from MutexSemaphore' do From b4d58a3206108770c3f9ff9d8df6602c4b7072ec Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 11 Dec 2014 22:45:30 +0100 Subject: [PATCH 6/9] JRuby fix #4 --- spec/concurrent/atomic/semaphore_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb index 11569e533..5b2dded3a 100755 --- a/spec/concurrent/atomic/semaphore_spec.rb +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -156,7 +156,7 @@ def subject.simulate_spurious_wake_up describe Semaphore do if jruby? it 'inherits from JavaCountDownLatch' do - expect(Semaphore.ancestors).to include(Semaphore) + expect(Semaphore.ancestors).to include(JavaSemaphore) end else it 'inherits from MutexSemaphore' do From 0f59e8fe423714068a7fc4d47b102522349104a6 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 11 Dec 2014 23:23:23 +0100 Subject: [PATCH 7/9] JRuby fix #5 --- lib/concurrent/atomic/semaphore.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index 3084ff3c5..1eaedf25a 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -188,9 +188,9 @@ def try_acquire(permits = 1, timeout = nil) fail ArgumentError, 'permits must be an integer greater than zero' end if timeout.nil? - @semaphore.try_acquire(permits) + @semaphore.tryAcquire(permits) else - @semaphore.try_acquire(permits, + @semaphore.tryAcquire(permits, timeout, java.util.concurrent.TimeUnit::SECONDS) end @@ -210,7 +210,7 @@ def reduce_permits(reduction) unless reduction.is_a?(Fixnum) && reduction >= 0 fail ArgumentError, 'reduction must be an non-negative integer' end - unless @semaphore.available_permits - reduction >= 0 + unless @semaphore.availablePermits - reduction >= 0 fail(ArgumentError, 'cannot reduce number of available_permits below zero') end From 509c60e1b400ea700736f735a940cdd0d91fe7ff Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Fri, 12 Dec 2014 11:16:24 +0100 Subject: [PATCH 8/9] JRuby fix #6 - makes .acquire return nil --- lib/concurrent/atomic/semaphore.rb | 11 ++++++----- spec/concurrent/atomic/semaphore_spec.rb | 10 +++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index 1eaedf25a..d65c35cec 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -28,13 +28,14 @@ def initialize(count) # @raise [ArgumentError] if `permits` is not an integer or is less than # one # - # @return [True] + # @return [Nil] def acquire(permits = 1) unless permits.is_a?(Fixnum) && permits > 0 fail ArgumentError, 'permits must be an integer greater than zero' end @mutex.synchronize do try_acquire_timed(permits, nil) + nil end end @@ -95,7 +96,7 @@ def try_acquire(permits = 1, timeout = nil) # # @raise [ArgumentError] if `permits` is not a number or is less than one # - # @return [True] + # @return [Nil] def release(permits = 1) unless permits.is_a?(Fixnum) && permits > 0 fail ArgumentError, 'permits must be an integer greater than zero' @@ -104,7 +105,7 @@ def release(permits = 1) @free += permits permits.times { @condition.signal } end - true + nil end # @!macro [attach] semaphore_method_reduce_permits @@ -117,7 +118,7 @@ def release(permits = 1) # # @raise [ArgumentError] if `@free` - `@reduction` is less than zero # - # @return [True] + # @return [Nil] def reduce_permits(reduction) unless reduction.is_a?(Fixnum) && reduction >= 0 fail ArgumentError, 'reduction must be an non-negative integer' @@ -127,7 +128,7 @@ def reduce_permits(reduction) 'cannot reduce number of available_permits below zero') end @mutex.synchronize { @free -= reduction } - true + nil end private diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb index 5b2dded3a..44413c137 100755 --- a/spec/concurrent/atomic/semaphore_spec.rb +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -15,7 +15,7 @@ context 'permits available' do it 'should return true immediately' do result = semaphore.acquire - expect(result).to be_truthy + expect(result).to be_nil end end @@ -25,7 +25,7 @@ Thread.new { sleep(0.2) && semaphore.release } result = semaphore.acquire - expect(result).to be_truthy + expect(result).to be_nil expect(semaphore.available_permits).to eq 0 end end @@ -64,10 +64,10 @@ expect(result).to be_truthy end - it 'acquires after if permits are available within timeout' do + it 'acquires when permits are available within timeout' do semaphore.drain_permits Thread.new { sleep 0.1 && semaphore.release } - result = semaphore.try_acquire(1, 0.2) + result = semaphore.try_acquire(1, 1) expect(result).to be_truthy end @@ -155,7 +155,7 @@ def subject.simulate_spurious_wake_up describe Semaphore do if jruby? - it 'inherits from JavaCountDownLatch' do + it 'inherits from JavaSemaphore' do expect(Semaphore.ancestors).to include(JavaSemaphore) end else From 8a1f8c6170364f760638e469003f421dc69e4154 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Mon, 15 Dec 2014 09:45:39 +0100 Subject: [PATCH 9/9] Removed one check from reduce_permits --- lib/concurrent/atomic/semaphore.rb | 10 ++-------- spec/concurrent/atomic/semaphore_spec.rb | 7 +++---- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index d65c35cec..db907d3f1 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -109,6 +109,8 @@ def release(permits = 1) end # @!macro [attach] semaphore_method_reduce_permits + # + # @api private # # Shrinks the number of available permits by the indicated reduction. # @@ -123,10 +125,6 @@ def reduce_permits(reduction) unless reduction.is_a?(Fixnum) && reduction >= 0 fail ArgumentError, 'reduction must be an non-negative integer' end - unless @free - reduction >= 0 - fail(ArgumentError, - 'cannot reduce number of available_permits below zero') - end @mutex.synchronize { @free -= reduction } nil end @@ -211,10 +209,6 @@ def reduce_permits(reduction) unless reduction.is_a?(Fixnum) && reduction >= 0 fail ArgumentError, 'reduction must be an non-negative integer' end - unless @semaphore.availablePermits - reduction >= 0 - fail(ArgumentError, - 'cannot reduce number of available_permits below zero') - end @semaphore.reducePermits(reduction) end end diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb index 44413c137..e2eba8743 100755 --- a/spec/concurrent/atomic/semaphore_spec.rb +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -86,10 +86,9 @@ }.to raise_error(ArgumentError) end - it 'raises ArgumentError when reducing below zero' do - expect { - semaphore.reduce_permits 1000 - }.to raise_error(ArgumentError) + it 'reduces permits below zero' do + semaphore.reduce_permits 1003 + expect(semaphore.available_permits).to eq -1000 end it 'reduces permits' do