Skip to content

Commit

Permalink
A channel can be finished or failed.
Browse files Browse the repository at this point in the history
While some may be "forever", this provides for channels that come to
some kind of conclusion, either a happy ending or a sad ending.
  • Loading branch information
jnthn committed Aug 9, 2013
1 parent 27962b0 commit fe22c91
Showing 1 changed file with 60 additions and 5 deletions.
65 changes: 60 additions & 5 deletions src/vm/jvm/core/Threading.pm
Original file line number Diff line number Diff line change
Expand Up @@ -217,29 +217,84 @@ my class Promise {

# A channel provides a thread-safe way to send a series of values from some
# producer(s) to some consumer(s).
my class X::Channel::SendOnCompleted is Exception {
method message() { "Cannot send a message on a completed channel" }
}
my class X::Channel::ReceiveOnCompleted is Exception {
method message() { "Cannot receive a message on a completed channel" }
}
my class Channel {
# The queue of events moving through the channel.
has Mu $!queue;

# Promise that is triggered on channel completion.
has $!completion;

# Flag for if the channel is completed.
has $!completed;

# Magical objects for various ways a channel can end.
my class CHANNEL_FINISH { }
my class CHANNEL_FAIL { has $.error }

my Mu $interop;
submethod BUILD() {
$interop := nqp::jvmbootinterop() unless nqp::isconcrete($interop);
my \LinkedBlockingQueue := $interop.typeForName('java.util.concurrent.LinkedBlockingQueue');
$!queue := LinkedBlockingQueue.'constructor/new/()V'();
$!completion = Promise.new;
}

method send(Channel:D: \item) {
X::Channel::SendOnCompleted.new.throw if $!completed;
$!queue.add($interop.sixmodelToJavaObject(item))
}

method receive(Channel:D:) {
$interop.javaObjectToSixmodel($!queue.take())
my \msg := $interop.javaObjectToSixmodel($!queue.take());
if nqp::istype(msg, CHANNEL_FINISH) {
$!completion.keep(Nil);
X::Channel::ReceiveOnCompleted.new.throw
}
elsif nqp::istype(msg, CHANNEL_FAIL) {
$!completion.break(msg.error);
die msg.error;
}
msg
}

method poll(Channel:D:) {
my Mu $fetched := $!queue.'method/poll/()Ljava/lang/Object;'();
nqp::jvmisnull($fetched)
?? Nil
!! $interop.javaObjectToSixmodel($fetched)
my \fetched := $!queue.'method/poll/()Ljava/lang/Object;'();
if nqp::jvmisnull(fetched) {
Nil
} else {
my \msg := $interop.javaObjectToSixmodel(fetched);
if nqp::istype(msg, CHANNEL_FINISH) {
$!completion.keep(Nil);
Nil
}
elsif nqp::istype(msg, CHANNEL_FAIL) {
$!completion.break(msg.error);
Nil
}
else {
msg
}
}
}

method finish() {
$!completed = 1;
$!queue.add($interop.sixmodelToJavaObject(CHANNEL_FINISH))
}

method fail($error) {
$!completed = 1;
$!queue.add($interop.sixmodelToJavaObject(CHANNEL_FAIL.new(:$error)))
}

method completion() {
$!completion
}
}

Expand Down

0 comments on commit fe22c91

Please sign in to comment.