Skip to content

Commit

Permalink
Add a basic Channel implementation.
Browse files Browse the repository at this point in the history
Same idea as channels from Go. Also add a select primitive that can
take a list of pairs of channels and code objects, and invokes the
code object for whichever channel has a value first. The select
implementation is an utter hack, but it will let us play with the
idea and API.

Both await and select can be used with promises, so you could use
a Promise.sleep(2) along with a channel in a select to time out if
the channel does not have a value within two seconds.
  • Loading branch information
jnthn committed Aug 7, 2013
1 parent 93ca160 commit 42758c1
Showing 1 changed file with 73 additions and 6 deletions.
79 changes: 73 additions & 6 deletions src/vm/jvm/core/Threading.pm
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,34 @@ my class Promise {
}
}

# A channel provides a thread-safe way to send a series of values from some
# producer(s) to some consumer(s).
my class Channel {
has Mu $!queue;

my Mu $interop;
submethod BUILD() {
$interop := nqp::jvmbootinterop() unless nqp::isconcrete($interop);
my \LinkedBlockingQueue := $interop.typeForName('java.util.concurrent.LinkedBlockingQueue');
$!queue := LinkedBlockingQueue.'constructor/new/()V'();
}

method send(Channel:D: \item) {
$!queue.add($interop.sixmodelToJavaObject(item))
}

method receive(Channel:D:) {
$interop.javaObjectToSixmodel($!queue.take())
}

method poll(Channel:D:) {
my Mu $fetched := $!queue.'method/poll/()Ljava/lang/Object;'();
nqp::jvmisnull($fetched)
?? Nil
!! $interop.javaObjectToSixmodel($fetched)
}
}

# The ThreadPoolScheduler is a straightforward scheduler that maintains a
# pool of threads and schedules work items in the order they are added
# using them.
Expand Down Expand Up @@ -273,14 +301,53 @@ sub async(&code) {
Promise.new(:scheduler($*SCHEDULER), :&code);
}

# Waits for a promise to be delivered and, once it is, unwraps the result.
# This should be made more efficient by using continuations to suspend any
# task running in the thread pool that awaits; for now, this cheat gets the
# basic idea in place.
# Waits for a promise to be kept or a channel to be able to receive a value
# and, once it can, unwraps or returns the result. This should be made more
# efficient by using continuations to suspend any task running in the thread
# pool that blocks; for now, this cheat gets the basic idea in place.
proto sub await(|) { * }
multi sub await(Promise $p) {
$p.result
}
multi sub await(*@promises) {
@promises.eager.map(&await)
multi sub await(*@awaitables) {
@awaitables.eager.map(&await)
}
multi sub await(Channel $c) {
$c.receive
}

# Takes a list of pairs, mapping a Channel or Promise to code. Invokes the
# code block of whichever Channel receives first whichever Promise is kept
# or broken first. Evaluates to the result of that code block. If none of
# the channels have a value or none of the promises have a result, blocks
# until one does.
proto sub select(|) { * }
multi sub select(*@selectors) {
# XXX Crappy spinning implementation; do something better soon.
my $found;
my $arg;
until $found {
for @selectors -> $s {
die "select expects to be passed a list of pairs" unless $s ~~ Pair;
given $s.key {
when Promise {
if .has_result {
$found := $s.value();
$arg := $_;
}
}
when Channel {
my \selected := .poll;
unless selected === Nil {
$found := $s.value();
$arg := selected;
}
}
default {
die "Cannot use select on a " ~ .^name;
}
}
}
}
$found($arg)
}

0 comments on commit 42758c1

Please sign in to comment.