diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb new file mode 100644 index 000000000..db907d3f1 --- /dev/null +++ b/lib/concurrent/atomic/semaphore.rb @@ -0,0 +1,226 @@ +require 'concurrent/atomic/condition' + +module Concurrent + class MutexSemaphore + # @!macro [attach] semaphore_method_initialize + # + # Create a new `Semaphore` with the initial `count`. + # + # @param [Fixnum] count the initial count + # + # @raise [ArgumentError] if `count` is not an integer or is less than zero + def initialize(count) + unless count.is_a?(Fixnum) && count >= 0 + fail ArgumentError, 'count must be an non-negative integer' + end + @mutex = Mutex.new + @condition = Condition.new + @free = count + end + + # @!macro [attach] semaphore_method_acquire + # + # Acquires the given number of permits from this semaphore, + # blocking until all are available. + # + # @param [Fixnum] permits Number of permits to acquire + # + # @raise [ArgumentError] if `permits` is not an integer or is less than + # one + # + # @return [Nil] + def acquire(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + @mutex.synchronize do + try_acquire_timed(permits, nil) + nil + end + end + + # @!macro [attach] semaphore_method_available_permits + # + # Returns the current number of permits available in this semaphore. + # + # @return [Integer] + def available_permits + @mutex.synchronize { @free } + end + + # @!macro [attach] semaphore_method_drain_permits + # + # Acquires and returns all permits that are immediately available. + # + # @return [Integer] + def drain_permits + @mutex.synchronize do + @free.tap { |_| @free = 0 } + end + end + + # @!macro [attach] semaphore_method_try_acquire + # + # Acquires the given number of permits from this semaphore, + # only if all are available at the time of invocation or within + # `timeout` interval + # + # @param [Fixnum] permits the number of permits to acquire + # + # @param [Fixnum] timeout the number of seconds to wait for the counter + # or `nil` to return immediately + # + # @raise [ArgumentError] if `permits` is not an integer or is less than + # one + # + # @return [Boolean] `false` if no permits are available, `true` when + # acquired a permit + def try_acquire(permits = 1, timeout = nil) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + @mutex.synchronize do + if timeout.nil? + try_acquire_now(permits) + else + try_acquire_timed(permits, timeout) + end + end + end + + # @!macro [attach] semaphore_method_release + # + # Releases the given number of permits, returning them to the semaphore. + # + # @param [Fixnum] permits Number of permits to return to the semaphore. + # + # @raise [ArgumentError] if `permits` is not a number or is less than one + # + # @return [Nil] + def release(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + @mutex.synchronize do + @free += permits + permits.times { @condition.signal } + end + nil + end + + # @!macro [attach] semaphore_method_reduce_permits + # + # @api private + # + # Shrinks the number of available permits by the indicated reduction. + # + # @param [Fixnum] reduction Number of permits to remove. + # + # @raise [ArgumentError] if `reduction` is not an integer or is negative + # + # @raise [ArgumentError] if `@free` - `@reduction` is less than zero + # + # @return [Nil] + def reduce_permits(reduction) + unless reduction.is_a?(Fixnum) && reduction >= 0 + fail ArgumentError, 'reduction must be an non-negative integer' + end + @mutex.synchronize { @free -= reduction } + nil + end + + private + + def try_acquire_now(permits) + if @free >= permits + @free -= permits + true + else + false + end + end + + def try_acquire_timed(permits, timeout) + remaining = Condition::Result.new(timeout) + while !try_acquire_now(permits) && remaining.can_wait? + @condition.signal + remaining = @condition.wait(@mutex, remaining.remaining_time) + end + remaining.can_wait? ? true : false + end + end + + if RUBY_PLATFORM == 'java' + + # @!macro semaphore + class JavaSemaphore + # @!macro semaphore_method_initialize + def initialize(count) + unless count.is_a?(Fixnum) && count >= 0 + fail(ArgumentError, + 'count must be in integer greater than or equal zero') + end + @semaphore = java.util.concurrent.Semaphore.new(count) + end + + # @!macro semaphore_method_acquire + def acquire(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + @semaphore.acquire(permits) + end + + # @!macro semaphore_method_available_permits + def available_permits + @semaphore.availablePermits + end + + # @!macro semaphore_method_drain_permits + def drain_permits + @semaphore.drainPermits + end + + # @!macro semaphore_method_try_acquire + def try_acquire(permits = 1, timeout = nil) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + if timeout.nil? + @semaphore.tryAcquire(permits) + else + @semaphore.tryAcquire(permits, + timeout, + java.util.concurrent.TimeUnit::SECONDS) + end + end + + # @!macro semaphore_method_release + def release(permits = 1) + unless permits.is_a?(Fixnum) && permits > 0 + fail ArgumentError, 'permits must be an integer greater than zero' + end + @semaphore.release(permits) + true + end + + # @!macro semaphore_method_reduce_permits + def reduce_permits(reduction) + unless reduction.is_a?(Fixnum) && reduction >= 0 + fail ArgumentError, 'reduction must be an non-negative integer' + end + @semaphore.reducePermits(reduction) + end + end + + # @!macro semaphore + class Semaphore < JavaSemaphore + end + + else + + # @!macro semaphore + class Semaphore < MutexSemaphore + end + end +end diff --git a/lib/concurrent/atomics.rb b/lib/concurrent/atomics.rb index 159d2fbea..a54684644 100644 --- a/lib/concurrent/atomics.rb +++ b/lib/concurrent/atomics.rb @@ -8,3 +8,4 @@ require 'concurrent/atomic/count_down_latch' require 'concurrent/atomic/event' require 'concurrent/atomic/synchronization' +require 'concurrent/atomic/semaphore' diff --git a/spec/concurrent/atomic/semaphore_spec.rb b/spec/concurrent/atomic/semaphore_spec.rb new file mode 100755 index 000000000..e2eba8743 --- /dev/null +++ b/spec/concurrent/atomic/semaphore_spec.rb @@ -0,0 +1,166 @@ +require 'spec_helper' + +shared_examples :semaphore do + let(:semaphore) { described_class.new(3) } + + context '#initialize' do + it 'raises an exception if the initial count is not an integer' do + expect { + described_class.new('foo') + }.to raise_error(ArgumentError) + end + end + + describe '#acquire' do + context 'permits available' do + it 'should return true immediately' do + result = semaphore.acquire + expect(result).to be_nil + end + end + + context 'not enough permits available' do + it 'should block thread until permits are available' do + semaphore.drain_permits + Thread.new { sleep(0.2) && semaphore.release } + + result = semaphore.acquire + expect(result).to be_nil + expect(semaphore.available_permits).to eq 0 + end + end + end + + describe '#drain_permits' do + it 'drains all available permits' do + drained = semaphore.drain_permits + expect(drained).to eq 3 + expect(semaphore.available_permits).to eq 0 + end + + it 'drains nothing in no permits are available' do + semaphore.reduce_permits 3 + drained = semaphore.drain_permits + expect(drained).to eq 0 + end + end + + describe '#try_acquire' do + context 'without timeout' do + it 'acquires immediately if permits are available' do + result = semaphore.try_acquire(1) + expect(result).to be_truthy + end + + it 'returns false immediately in no permits are available' do + result = semaphore.try_acquire(20) + expect(result).to be_falsey + end + end + + context 'with timeout' do + it 'acquires immediately if permits are available' do + result = semaphore.try_acquire(1, 5) + expect(result).to be_truthy + end + + it 'acquires when permits are available within timeout' do + semaphore.drain_permits + Thread.new { sleep 0.1 && semaphore.release } + result = semaphore.try_acquire(1, 1) + expect(result).to be_truthy + end + + it 'returns false on timeout' do + semaphore.drain_permits + result = semaphore.try_acquire(1, 0.1) + expect(result).to be_falsey + end + end + end + + describe '#reduce_permits' do + it 'raises ArgumentError if reducing by negative number' do + expect { + semaphore.reduce_permits(-1) + }.to raise_error(ArgumentError) + end + + it 'reduces permits below zero' do + semaphore.reduce_permits 1003 + expect(semaphore.available_permits).to eq -1000 + end + + it 'reduces permits' do + semaphore.reduce_permits 1 + expect(semaphore.available_permits).to eq 2 + semaphore.reduce_permits 2 + expect(semaphore.available_permits).to eq 0 + end + end +end + +module Concurrent + describe MutexSemaphore do + it_should_behave_like :semaphore + + context 'spurious wake ups' do + subject { described_class.new(1) } + + before(:each) do + def subject.simulate_spurious_wake_up + @mutex.synchronize do + @condition.signal + @condition.broadcast + end + end + subject.drain_permits + end + + it 'should resist to spurious wake ups without timeout' do + @expected = true + # would set @expected to false + Thread.new { @expected = subject.acquire } + + sleep(0.1) + subject.simulate_spurious_wake_up + + sleep(0.1) + expect(@expected).to be_truthy + end + + it 'should resist to spurious wake ups with timeout' do + @expected = true + # sets @expected to false in another thread + t = Thread.new { @expected = subject.try_acquire(1, 0.3) } + + sleep(0.1) + subject.simulate_spurious_wake_up + + sleep(0.1) + expect(@expected).to be_truthy + + t.join + expect(@expected).to be_falsey + end + end + end + + if TestHelpers.jruby? + describe JavaSemaphore do + it_should_behave_like :semaphore + end + end + + describe Semaphore do + if jruby? + it 'inherits from JavaSemaphore' do + expect(Semaphore.ancestors).to include(JavaSemaphore) + end + else + it 'inherits from MutexSemaphore' do + expect(Semaphore.ancestors).to include(MutexSemaphore) + end + end + end +end