Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Assorted Channel API tweaks and improvements.
Change "finish" to "close", since it is more clear. Also "completed"
becomese "closed" to match, removing that naming oddity. There is now
also a peek method, and a bug fix to closed.
  • Loading branch information
jnthn committed Oct 31, 2013
1 parent 15a4103 commit 9c52be0
Showing 1 changed file with 47 additions and 25 deletions.
72 changes: 47 additions & 25 deletions src/vm/jvm/core/Threading.pm
Expand Up @@ -465,47 +465,48 @@ sub async(&code) {

# A channel provides a thread-safe way to send a series of values from some
# producer(s) to some consumer(s).
my class X::Channel::SendOnCompleted is Exception {
method message() { "Cannot send a message on a completed channel" }
my class X::Channel::SendOnClosed is Exception {
method message() { "Cannot send a message on a closed channel" }
}
my class X::Channel::ReceiveOnCompleted is Exception {
method message() { "Cannot receive a message on a completed channel" }
my class X::Channel::ReceiveOnClosed is Exception {
method message() { "Cannot receive a message on a closed channel" }
}
my class Channel {
# The queue of events moving through the channel.
has Mu $!queue;

# Promise that is triggered on channel completion.
has $!completion;
# Promise that is triggered when all values are received, or an error is
# received and the channel is thus closed.
has $!closed_promise;

# Flag for if the channel is completed.
has $!completed;
# Flag for if the channel is closed to senders.
has $!closed;

# Magical objects for various ways a channel can end.
my class CHANNEL_FINISH { }
my class CHANNEL_FAIL { has $.error }
my class CHANNEL_CLOSE { }
my class CHANNEL_FAIL { has $.error }

my Mu $interop;
submethod BUILD() {
$interop := nqp::jvmbootinterop() unless nqp::isconcrete($interop);
my \LinkedBlockingQueue := $interop.typeForName('java.util.concurrent.LinkedBlockingQueue');
$!queue := LinkedBlockingQueue.'constructor/new/()V'();
$!completion = Promise.new;
$!closed_promise = Promise.new;
}

method send(Channel:D: \item) {
X::Channel::SendOnCompleted.new.throw if $!completed;
X::Channel::SendOnClosed.new.throw if $!closed;
$!queue.add($interop.sixmodelToJavaObject(nqp::decont(item)))
}

method receive(Channel:D:) {
my \msg := $interop.javaObjectToSixmodel($!queue.take());
if nqp::istype(msg, CHANNEL_FINISH) {
$!completion.keep(Nil);
X::Channel::ReceiveOnCompleted.new.throw
if nqp::istype(msg, CHANNEL_CLOSE) {
$!closed_promise.keep(Nil);
X::Channel::ReceiveOnClosed.new.throw
}
elsif nqp::istype(msg, CHANNEL_FAIL) {
$!completion.break(msg.error);
$!closed_promise.break(msg.error);
die msg.error;
}
msg
Expand All @@ -517,12 +518,12 @@ my class Channel {
Nil
} else {
my \msg := $interop.javaObjectToSixmodel(fetched);
if nqp::istype(msg, CHANNEL_FINISH) {
$!completion.keep(Nil);
if nqp::istype(msg, CHANNEL_CLOSE) {
$!closed_promise.keep(Nil);
Nil
}
elsif nqp::istype(msg, CHANNEL_FAIL) {
$!completion.break(msg.error);
$!closed_promise.break(msg.error);
Nil
}
else {
Expand All @@ -531,18 +532,39 @@ my class Channel {
}
}

method finish() {
$!completed = 1;
$!queue.add($interop.sixmodelToJavaObject(CHANNEL_FINISH))
method peek(Channel:D:) {
my \fetched := $!queue.'method/peek/()Ljava/lang/Object;'();
if nqp::jvmisnull(fetched) {
Nil
} else {
my \msg := $interop.javaObjectToSixmodel(fetched);
if nqp::istype(msg, CHANNEL_CLOSE) {
$!closed_promise.keep(Nil);
Nil
}
elsif nqp::istype(msg, CHANNEL_FAIL) {
$!closed_promise.break(msg.error);
Nil
}
else {
msg
}
}
}

method close() {
$!closed = 1;
$!queue.add($interop.sixmodelToJavaObject(CHANNEL_CLOSE))
}

method fail($error) {
$!completed = 1;
$!closed = 1;
$!queue.add($interop.sixmodelToJavaObject(CHANNEL_FAIL.new(:$error)))
}

method completion() {
$!completion
method closed() {
self.peek();
$!closed_promise
}
}

Expand Down

0 comments on commit 9c52be0

Please sign in to comment.