Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ext/ConcurrentRubyExtService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public boolean basicLoad(final Ruby runtime) throws IOException {
new com.concurrent_ruby.ext.AtomicReferenceLibrary().load(runtime, false);
new com.concurrent_ruby.ext.JavaAtomicBooleanLibrary().load(runtime, false);
new com.concurrent_ruby.ext.JavaAtomicFixnumLibrary().load(runtime, false);
new com.concurrent_ruby.ext.JavaSemaphoreLibrary().load(runtime, false);
return true;
}
}
144 changes: 144 additions & 0 deletions ext/com/concurrent_ruby/ext/JavaSemaphoreLibrary.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.concurrent_ruby.ext;

import java.io.IOException;
import java.util.concurrent.Semaphore;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;

public class JavaSemaphoreLibrary {

public void load(Ruby runtime, boolean wrap) throws IOException {
RubyModule concurrentMod = runtime.defineModule("Concurrent");
RubyClass atomicCls = concurrentMod.defineClassUnder("JavaSemaphore", runtime.getObject(), JRUBYREFERENCE_ALLOCATOR);

atomicCls.defineAnnotatedMethods(JavaSemaphore.class);

}

private static final ObjectAllocator JRUBYREFERENCE_ALLOCATOR = new ObjectAllocator() {
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new JavaSemaphore(runtime, klazz);
}
};

@JRubyClass(name = "JavaSemaphore", parent = "Object")
public static class JavaSemaphore extends RubyObject {

private JRubySemaphore semaphore;

public JavaSemaphore(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod
public IRubyObject initialize(ThreadContext context, IRubyObject value) {
this.semaphore = new JRubySemaphore(rubyFixnumToInt(value, "count"));
return context.nil;
}

@JRubyMethod
public IRubyObject acquire(ThreadContext context, IRubyObject value) throws InterruptedException {
this.semaphore.acquire(rubyFixnumToInt(value, "permits"));
return context.nil;
}

@JRubyMethod(name = "available_permits")
public IRubyObject availablePermits(ThreadContext context) {
return new RubyFixnum(getRuntime(), this.semaphore.availablePermits());
}

@JRubyMethod(name = "drain_permits")
public IRubyObject drainPermits(ThreadContext context) {
return new RubyFixnum(getRuntime(), this.semaphore.drainPermits());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've missed one more thing. JRuby extensions should try to use interface provided by Ruby object (result of getRuntime()) it may do additional steps like caching etc. see https://github.com/jruby/jruby/blob/master/core/src/main/java/org/jruby/RubyFixnum.java#L207-L212. So it's better to use the newBoolean and newFixnum methods on Ruby object.

}

@JRubyMethod
public IRubyObject acquire(ThreadContext context) throws InterruptedException {
this.semaphore.acquire(1);
return context.nil;
}

@JRubyMethod(name = "try_acquire")
public IRubyObject tryAcquire(ThreadContext context) throws InterruptedException {
return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(1));
}

@JRubyMethod(name = "try_acquire")
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits) throws InterruptedException {
return RubyBoolean.newBoolean(getRuntime(), semaphore.tryAcquire(rubyFixnumToInt(permits, "permits")));
}

@JRubyMethod(name = "try_acquire")
public IRubyObject tryAcquire(ThreadContext context, IRubyObject permits, IRubyObject timeout) throws InterruptedException {
return RubyBoolean.newBoolean(getRuntime(),
semaphore.tryAcquire(
rubyFixnumToInt(permits, "permits"),
rubyNumericToLong(timeout, "timeout"),
java.util.concurrent.TimeUnit.SECONDS)
);
}

@JRubyMethod
public IRubyObject release(ThreadContext context) {
this.semaphore.release(1);
return RubyBoolean.newBoolean(getRuntime(), true);
}

@JRubyMethod
public IRubyObject release(ThreadContext context, IRubyObject value) {
this.semaphore.release(rubyFixnumToInt(value, "permits"));
return RubyBoolean.newBoolean(getRuntime(), true);
}

@JRubyMethod(name = "reduce_permits")
public IRubyObject reducePermits(ThreadContext context, IRubyObject reduction) throws InterruptedException {
this.semaphore.publicReducePermits(rubyFixnumToInt(reduction, "reduction"));
return context.nil;
}

private int rubyFixnumToInt(IRubyObject value, String paramName) {
if (value instanceof RubyFixnum && ((RubyFixnum) value).getLongValue() > 0) {
RubyFixnum fixNum = (RubyFixnum) value;
return (int) fixNum.getLongValue();
} else {
throw getRuntime().newArgumentError(paramName + " must be in integer greater than zero");
}
}

private long rubyNumericToLong(IRubyObject value, String paramName) {
if (value instanceof RubyNumeric && ((RubyNumeric) value).getDoubleValue() > 0) {
RubyNumeric fixNum = (RubyNumeric) value;
return fixNum.getLongValue();
} else {
throw getRuntime().newArgumentError(paramName + " must be in float greater than zero");
}
}

class JRubySemaphore extends Semaphore {

public JRubySemaphore(int permits) {
super(permits);
}

public JRubySemaphore(int permits, boolean value) {
super(permits, value);
}

public void publicReducePermits(int i) {
reducePermits(i);
}

}
}
}

67 changes: 3 additions & 64 deletions lib/concurrent/atomic/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def release(permits = 1)
end

# @!macro [attach] semaphore_method_reduce_permits
#
#
# @api private
#
# Shrinks the number of available permits by the indicated reduction.
Expand All @@ -126,7 +126,7 @@ def reduce_permits(reduction)
fail ArgumentError, 'reduction must be an non-negative integer'
end
@mutex.synchronize { @free -= reduction }
nil
nil
end

private
Expand All @@ -153,73 +153,12 @@ def try_acquire_timed(permits, timeout)
if RUBY_PLATFORM == 'java'

# @!macro semaphore
#
#
# A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {#acquire} blocks if necessary
# until a permit is available, and then takes it. Each {#release} adds a permit,
# potentially releasing a blocking acquirer.
# However, no actual permit objects are used; the Semaphore just keeps a count of the number available and
# acts accordingly.
class JavaSemaphore
# @!macro semaphore_method_initialize
def initialize(count)
unless count.is_a?(Fixnum) && count >= 0
fail(ArgumentError,
'count must be in integer greater than or equal zero')
end
@semaphore = java.util.concurrent.Semaphore.new(count)
end

# @!macro semaphore_method_acquire
def acquire(permits = 1)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
@semaphore.acquire(permits)
end

# @!macro semaphore_method_available_permits
def available_permits
@semaphore.availablePermits
end

# @!macro semaphore_method_drain_permits
def drain_permits
@semaphore.drainPermits
end

# @!macro semaphore_method_try_acquire
def try_acquire(permits = 1, timeout = nil)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
if timeout.nil?
@semaphore.tryAcquire(permits)
else
@semaphore.tryAcquire(permits,
timeout,
java.util.concurrent.TimeUnit::SECONDS)
end
end

# @!macro semaphore_method_release
def release(permits = 1)
unless permits.is_a?(Fixnum) && permits > 0
fail ArgumentError, 'permits must be an integer greater than zero'
end
@semaphore.release(permits)
true
end

# @!macro semaphore_method_reduce_permits
def reduce_permits(reduction)
unless reduction.is_a?(Fixnum) && reduction >= 0
fail ArgumentError, 'reduction must be an non-negative integer'
end
@semaphore.reducePermits(reduction)
end
end

# @!macro semaphore
class Semaphore < JavaSemaphore
end

Expand Down