diff --git a/ext/ConcurrentRubyExtService.java b/ext/ConcurrentRubyExtService.java index eed602147..8697c8b46 100644 --- a/ext/ConcurrentRubyExtService.java +++ b/ext/ConcurrentRubyExtService.java @@ -8,6 +8,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException { new com.concurrent_ruby.ext.AtomicReferenceLibrary().load(runtime, false); new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false); new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false); + new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false); return true; } } diff --git a/ext/com/concurrent_ruby/ext/JavaSemaphoreLibrary.java b/ext/com/concurrent_ruby/ext/JavaSemaphoreLibrary.java new file mode 100755 index 000000000..52f65d9a8 --- /dev/null +++ b/ext/com/concurrent_ruby/ext/JavaSemaphoreLibrary.java @@ -0,0 +1,144 @@ +package com.concurrent_ruby.ext; + +import java.io.IOException; +import java.util.concurrent.Semaphore; +import org.jruby.Ruby; +import org.jruby.RubyBoolean; +import org.jruby.RubyClass; +import org.jruby.RubyFixnum; +import org.jruby.RubyModule; +import org.jruby.RubyNumeric; +import org.jruby.RubyObject; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ObjectAllocator; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; + +public class JavaSemaphoreLibrary { + + public void load(Ruby runtime, boolean wrap) throws IOException { + RubyModule concurrentMod = runtime.defineModule("Concurrent"); + RubyClass atomicCls = concurrentMod.defineClassUnder("JavaSemaphore", runtime.getObject(), JRUBYREFERENCE_ALLOCATOR); + + atomicCls.defineAnnotatedMethods(JavaSemaphore.class); + + } + + private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() { + public IRubyObject allocate(Ruby runtime, RubyClass klazz) { + return new JavaSemaphore(runtime, klazz); + } + }; + + @JRubyClass(name = "JavaSemaphore", parent = "Object") + public static class JavaSemaphore extends RubyObject { + + private JRubySemaphore semaphore; + + public JavaSemaphore(Ruby runtime, RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod + public IRubyObject initialize(ThreadContext context, IRubyObject value) { + this.semaphore = new JRubySemaphore(rubyFixnumToInt(value, "count")); + return context.nil; + } + + @JRubyMethod + public IRubyObject acquire(ThreadContext context, IRubyObject value) throws InterruptedException { + this.semaphore.acquire(rubyFixnumToInt(value, "permits")); + return context.nil; + } + + @JRubyMethod(name = "available_permits") + public IRubyObject availablePermits(ThreadContext context) { + return new RubyFixnum(getRuntime(), this.semaphore.availablePermits()); + } + + @JRubyMethod(name = "drain_permits") + public IRubyObject drainPermits(ThreadContext context) { + return new RubyFixnum(getRuntime(), this.semaphore.drainPermits()); + } + + @JRubyMethod + public IRubyObject acquire(ThreadContext context) throws InterruptedException { + this.semaphore.acquire(1); + return context.nil; + } + + @JRubyMethod(name = "try_acquire") + public IRubyObject tryAcquire(ThreadContext context) throws InterruptedException { + return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(1)); + } + + @JRubyMethod(name = "try_acquire") + public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits) throws InterruptedException { + return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(rubyFixnumToInt(permits, "permits"))); + } + + @JRubyMethod(name = "try_acquire") + public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout) throws InterruptedException { + return RubyBoolean.newBoolean(getRuntime(), + semaphore.tryAcquire( + rubyFixnumToInt(permits, "permits"), + rubyNumericToLong(timeout, "timeout"), + java.util.concurrent.TimeUnit.SECONDS) + ); + } + + @JRubyMethod + public IRubyObject release(ThreadContext context) { + this.semaphore.release(1); + return RubyBoolean.newBoolean(getRuntime(), true); + } + + @JRubyMethod + public IRubyObject release(ThreadContext context, IRubyObject value) { + this.semaphore.release(rubyFixnumToInt(value, "permits")); + return RubyBoolean.newBoolean(getRuntime(), true); + } + + @JRubyMethod(name = "reduce_permits") + public IRubyObject reducePermits(ThreadContext context, IRubyObject reduction) throws InterruptedException { + this.semaphore.publicReducePermits(rubyFixnumToInt(reduction, "reduction")); + return context.nil; + } + + private int rubyFixnumToInt(IRubyObject value, String paramName) { + if (value instanceof RubyFixnum && ((RubyFixnum) value).getLongValue() > 0) { + RubyFixnum fixNum = (RubyFixnum) value; + return (int) fixNum.getLongValue(); + } else { + throw getRuntime().newArgumentError(paramName + " must be in integer greater than zero"); + } + } + + private long rubyNumericToLong(IRubyObject value, String paramName) { + if (value instanceof RubyNumeric && ((RubyNumeric) value).getDoubleValue() > 0) { + RubyNumeric fixNum = (RubyNumeric) value; + return fixNum.getLongValue(); + } else { + throw getRuntime().newArgumentError(paramName + " must be in float greater than zero"); + } + } + + class JRubySemaphore extends Semaphore { + + public JRubySemaphore(int permits) { + super(permits); + } + + public JRubySemaphore(int permits, boolean value) { + super(permits, value); + } + + public void publicReducePermits(int i) { + reducePermits(i); + } + + } + } +} + diff --git a/lib/concurrent/atomic/semaphore.rb b/lib/concurrent/atomic/semaphore.rb index 476c3c647..df9bc1555 100644 --- a/lib/concurrent/atomic/semaphore.rb +++ b/lib/concurrent/atomic/semaphore.rb @@ -109,7 +109,7 @@ def release(permits = 1) end # @!macro [attach] semaphore_method_reduce_permits - # + # # @api private # # Shrinks the number of available permits by the indicated reduction. @@ -126,7 +126,7 @@ def reduce_permits(reduction) fail ArgumentError, 'reduction must be an non-negative integer' end @mutex.synchronize { @free -= reduction } - nil + nil end private @@ -153,73 +153,12 @@ def try_acquire_timed(permits, timeout) if RUBY_PLATFORM == 'java' # @!macro semaphore - # + # # A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {#acquire} blocks if necessary # until a permit is available, and then takes it. Each {#release} adds a permit, # potentially releasing a blocking acquirer. # However, no actual permit objects are used; the Semaphore just keeps a count of the number available and # acts accordingly. - class JavaSemaphore - # @!macro semaphore_method_initialize - def initialize(count) - unless count.is_a?(Fixnum) && count >= 0 - fail(ArgumentError, - 'count must be in integer greater than or equal zero') - end - @semaphore = java.util.concurrent.Semaphore.new(count) - end - - # @!macro semaphore_method_acquire - def acquire(permits = 1) - unless permits.is_a?(Fixnum) && permits > 0 - fail ArgumentError, 'permits must be an integer greater than zero' - end - @semaphore.acquire(permits) - end - - # @!macro semaphore_method_available_permits - def available_permits - @semaphore.availablePermits - end - - # @!macro semaphore_method_drain_permits - def drain_permits - @semaphore.drainPermits - end - - # @!macro semaphore_method_try_acquire - def try_acquire(permits = 1, timeout = nil) - unless permits.is_a?(Fixnum) && permits > 0 - fail ArgumentError, 'permits must be an integer greater than zero' - end - if timeout.nil? - @semaphore.tryAcquire(permits) - else - @semaphore.tryAcquire(permits, - timeout, - java.util.concurrent.TimeUnit::SECONDS) - end - end - - # @!macro semaphore_method_release - def release(permits = 1) - unless permits.is_a?(Fixnum) && permits > 0 - fail ArgumentError, 'permits must be an integer greater than zero' - end - @semaphore.release(permits) - true - end - - # @!macro semaphore_method_reduce_permits - def reduce_permits(reduction) - unless reduction.is_a?(Fixnum) && reduction >= 0 - fail ArgumentError, 'reduction must be an non-negative integer' - end - @semaphore.reducePermits(reduction) - end - end - - # @!macro semaphore class Semaphore < JavaSemaphore end