Skip to content

Commit

Permalink
implement Queue#shutdown!
Browse files Browse the repository at this point in the history
  • Loading branch information
MenTaLguY committed Oct 19, 2009
1 parent 9d963df commit c6c870c
Showing 1 changed file with 36 additions and 14 deletions.
50 changes: 36 additions & 14 deletions src/org/jruby/libraries/ThreadLibrary.java
Expand Up @@ -273,19 +273,34 @@ public IRubyObject allocate(Ruby runtime, RubyClass klass) {
cQueue.defineAnnotatedMethods(Queue.class);
}

@JRubyMethod(name = "shutdown!")
public synchronized IRubyObject shutdown(ThreadContext context) {
entries = null;
return context.getRuntime().getNil();
}

public synchronized void checkShutdown(ThreadContext context) {
if (entries == null) {
throw new RaiseException(context.getRuntime(), context.getRuntime().getThreadError(), "queue shut down", false);
}
}

@JRubyMethod
public synchronized IRubyObject clear(ThreadContext context) {
checkShutdown(context);
entries.clear();
return context.getRuntime().getNil();
}

@JRubyMethod(name = "empty?")
public synchronized RubyBoolean empty_p(ThreadContext context) {
checkShutdown(context);
return context.getRuntime().newBoolean(entries.size() == 0);
}

@JRubyMethod(name = {"length", "size"})
public synchronized RubyNumeric length(ThreadContext context) {
checkShutdown(context);
return RubyNumeric.int2fix(context.getRuntime(), entries.size());
}

Expand All @@ -298,6 +313,7 @@ protected synchronized long java_length() {

@JRubyMethod(name = {"pop", "deq", "shift"}, optional = 1)
public synchronized IRubyObject pop(ThreadContext context, IRubyObject[] args) {
checkShutdown(context);
boolean should_block = true;
if ( Arity.checkArgumentCount(context.getRuntime(), args, 0, 1) == 1 ) {
should_block = !args[0].isTrue();
Expand All @@ -306,22 +322,23 @@ public synchronized IRubyObject pop(ThreadContext context, IRubyObject[] args) {
throw new RaiseException(context.getRuntime(), context.getRuntime().getThreadError(), "queue empty", false);
}
numWaiting++;
while ( entries.size() == 0 ) {
try {
// TODO: No, this isn't atomic; we need to improve it
context.getThread().enterSleep();
wait();
} catch (InterruptedException e) {
} finally {
context.getThread().exitSleep();
try {
while ( java_length() == 0 ) {
try {
context.getThread().wait_timeout(this, null);
} catch (InterruptedException e) {
}
checkShutdown(context);
}
} finally {
numWaiting--;
}
numWaiting--;
return (IRubyObject)entries.removeFirst();
}

@JRubyMethod(name = {"push", "<<", "enq"})
public synchronized IRubyObject push(ThreadContext context, IRubyObject value) {
checkShutdown(context);
entries.addLast(value);
notify();
return context.getRuntime().getNil();
Expand Down Expand Up @@ -398,15 +415,20 @@ public synchronized IRubyObject pop(ThreadContext context, IRubyObject args[]) {
@JRubyMethod(name = {"push", "<<"})
@Override
public synchronized IRubyObject push(ThreadContext context, IRubyObject value) {
checkShutdown(context);
if ( java_length() >= capacity ) {
numWaiting++;
while ( java_length() >= capacity ) {
try {
context.getThread().wait_timeout(this, null);
} catch (InterruptedException e) {
try {
while ( java_length() >= capacity ) {
try {
context.getThread().wait_timeout(this, null);
} catch (InterruptedException e) {
}
checkShutdown(context);
}
} finally {
numWaiting--;
}
numWaiting--;
}
super.push(context, value);
notifyAll();
Expand Down

0 comments on commit c6c870c

Please sign in to comment.