RE-DRAFT: Synopsis 17: Concurrency
Jonathan Worthington <jnthn@jnthn.net>
Elizabeth Mattijsen <liz@dijkmat.nl>
Created: 3 Nov 2013
Last Modified: 4 Nov 2013
Version: 3
This synopsis is based around the concurrency primitives and tools currently being implemented in Rakudo on the JVM. It covers both things that are already implemented today, in addition to things expected to be implemented in the near future (where "near" means O(months)).
Perl 6 generally prefers constructs that compose well, enabling large problems to be solved by putting together solutions for lots of smaller problems. This also helps make it easier to extend and refactor code.
Many common language features related to parallel and asynchronous programming lack composability. For example:
Locks do not compose, since two independently correct operations using locks may deadlock when performed together.
Callback-centric approaches tend to compose badly, with chains of asynchronous operations typically leading to deeply nested callbacks. This essentially is just leaving the programmer to do a CPS transform of their own logical view of the program by hand.
Directly spawning threads on a per-component basis tends to compose badly, as when a dozen such components are used together the result is a high number of threads with no ability to centrally schedule or handle errors.
In Perl 6, concurrency features aimed at typical language users should have good composability properties, both with themselves and also with other language features.
Asynchrony happens when we initiate an operation, then continue running our own idea of "next thing" without waiting for the operation to complete. This differs from synchronous programming, where calling a sub or method causes the caller to wait for a result for continuing.
The vast majority of programmers are much more comfortable with synchrony, as in many senses it's the "normal thing". As soon as we have things taking place asynchronously, there is a need to coordinate the work, and doing so tends to be domain specific. Therefore, placing the programmer in an asynchronous situation when they didn't ask for it is likely to lead to confusion and bugs. We should try to make places where asynchrony happens clear.
It's also worthwhile trying to make it easy to keep asynchronous things flowing asynchronously. While synchronous code is pull-y (for example, eating its way through iterable things, blocking for results), asynchronous code is push-y (results get pushed to things that know what to do next).
Places where we go from synchronous to asynchronous, or from asynchronous to synchronous, are higher risk areas for bugs and potential bottlenecks. Thus, Perl 6 should try to provide features that help minimize the need to make such transitions.
Parallelism is primarily about taking something we could do serially and using multiple CPU cores in order to get to a result more quickly. This leads to a very nice property: a parallel solution to a problem should give the same answer as a serial solution.
While under the hood there is asynchrony and the inherent coordination it requires, on the outside a problem solved using parallel programming is still, when taken as a whole, a single, synchronous operation.
Elsewhere in the specification, Perl 6 provides several features that allow the programmer to indicate that parallelizing an operation will produce the same result as evaluating it serially:
Hyper operators ("Hyper operators" in S03) express that parallel operator application is safe
Junctions ("Junctions" in S09) may auto-thread in parallel
Feeds ("Feed operators" in S06) form pipelines and express that the stages may be executed in parallel in a producer-consumer style relationship (though each stage is in itself not parallelized)
hyper
andrace
list operators ("The hyper operator" in S02) express that iteration may be done in parallel; this is a generalization of hyper operators
The easy things should be easy, and able to be built out of primitives that compose nicely. However, such things have to be built out of what VMs and operating systems provide: threads, atomic instructions (such as CAS), and concurrency control constructs such as mutexes and semaphores. Perl 6 is meant to last for decades, and the coming decades will doubtless bring new ways do do parallel and asynchronous programming that we do not have today. They will still, however, almost certainly need to be built out of what is available.
Thus, the primitive things should be provided for those who need to work on such hard things. Perl 6 should not hide the existence of OS-level threads, or fail to provide access to lower level concurrency control constructs. However, they should be clearly documented as not the way to solve the majority of problems.
Schedulers lie at the heart of all concurrency in Perl 6. While most users are unlikely to immediately encounter schedulers when starting to use Perl 6's concurrency features, many of them are implemented in terms of it. Thus, they will be described first here to avoid lots of forward references.
A scheduler is something that does the Scheduler
role. Its responsibility is taking code objects representing tasks that need to be performed and making sure they get run, as well as handling any time-related operations (such as, "run this code every second").
The current default scheduler is available as $*SCHEDULER
. If no such dynamic variable has been declared, then $PROCESS::SCHEDULER
is used. This defaults to an instance of ThreadPoolScheduler
, which maintains a pool of threads and distributes scheduled work amongst them. Since the scheduler is dynamically scoped, this means that test scheduler modules can be developed that poke a $*SCHEDULER
into EXPORT
, and then provide the test writer with control over time.
The schedule
method takes a Callable
object and schedules it.
$*SCHEDULER.schedule: { say "Golly, I got scheduled!" }
Various options may be supplied as named arguments. (All references to time are taken to be in seconds, which may be fractional.) You may schedule an event to fire off after some number of seconds:
$*SCHEDULER.schedule: in=>10, { say "10s later" }
or at a given absolute time, specified as an Instant
:
$*SCHEDULER.schedule: at=>$instant, { say "10s later" }
Use :every
to schedule an operation to run at a fixed interval, possibly with a delay before the first scheduling.
# Every second, from now
$*SCHEDULER.schedule: :every(1), { say "Oh wow, a kangaroo!" }
# Every 0.5s, but don't start for 2s.
$*SCHEDULER.schedule({ say "Kenya believe it?" }, :every(0.5), :in(2));
If a scheduled item dies, the scheduler will catch this exception and pass it to a handle_uncaught
method, a default implementation of which is provided by the Scheduler
role. This by default will report the exception and cause the entire application to terminate. However, it is possible to replace this:
$*SCHEDULER.uncaught_handler = sub ($exception) {
$logger.log_error($exception);
}
For more fine-grained handling, it is possible to schedule code along with a code object to be invoked with the thrown exception if it dies:
$*SCHEDULER.schedule:
{ upload_progress($stuff) },
catch => -> $ex { warn "Could not upload latest progress" }
Schedulers also provide counts of the number of operations in various states:
say $*SCHEDULER.loads;
This returns, in order, the number of scheduled things that are not yet runnable due to delays, the number of scheduled things that are runnable but not yet assigned to a thread, and the number of scheduled things that are now assigned to a thread (and presumably running). [Conjecture: perhaps these should be separate methods.]
Schedulers may optionally provide further introspection in order to support tools such as debuggers.
There is also a CurrentThreadScheduler
, which always schedules things on the current thread. It provides the same methods, just no concurrency, and any exceptions are thrown immediately. This is mostly useful for forcing synchrony in places that default to asynchrony. (Note that .loads
can never return anything but 0 for the currently running scheduled things, since they're waiting on the current thread to stop scheduling first!)
A Promise
is a synchronization primitive for an asynchronous piece of work that will produce a single result (thus keeping the promise) or fail in some way (thus breaking the promise).
The simplest way to use a Promise
is to create one:
my $promise = Promise.new;
And then later keep
it:
$promise.keep(42);
Or break
it:
$promise.break(X::Some::Problem.new); # With exception
$promise.break("I just couldn't do it"); # With message
The current status of a Promise
is available through the status
method, which returns an element from the PromiseStatus
enumeration.
enum PromiseStatus (:Planned(0), :Kept(1), :Broken(2));
The result itself can be obtained by calling result
. If the Promise
was already kept, the result is immediately returned. If the Promise
was broken then the exception that it was broken with is thrown. If the Promise
is not yet kept or broken, then the caller will block until this happens. There is a has_result
method for checking if a Promise
is already kept or broken, and an excuse
method for extracting the exception from a Broken
Promise
rather than having it thrown.
if $promise.has_result {
if $promise.status == Kept {
say "Kept, result = " ~ $promise.result;
}
else {
say "Broken because " ~ $promise.excuse;
}
}
else {
say "Still working!";
}
A Promise
will boolify to has_result
also, for convenience.
[Conjectural: we perhaps don't need has_result and the boolification. If the boolification is a good idea, maybe we drop .has_result.]
You can also simply use a switch:
given $promise.status {
when Planned { say "Still working!" }
when Kept { say "Kept, result = ", $promise.result }
when Broken { say "Broken because ", $promise.excuse }
}
There are various convenient "factory" methods on Promise
. The most common is start
.
my $p = Promise.start(&do_hard_calculation);
This creates a Promise
that runs the supplied code, and calls keep
with its result. If the code throws an exception, then break
is called with the Exception
. Most of the time, however, the above is simply written as:
my $p = start {
# code here
}
Which is implemented by calling Promise.start
.
There is also a method to create a Promise
that is kept after a number of seconds, or at a specific time:
my $kept_in_10s = Promise.in(10);
my $kept_in_duration = Promise.in($duration);
my $kept_at_instant = Promise.at($instant);
The result
is always True
and such a Promise
can never be broken. It is mostly useful for combining with other promises.
There are also a couple of Promise
combinators. The anyof
combinator creates a Promise
that is kept whenever any of the specified Promise
s are kept. If the first promise to produce a result is instead broken, then the resulting Promise
is also broken. The excuse is passed along. When the Promise
is kept, it has a True
result.
my $calc = start { ... }
my $timeout = Promise.in(10);
my $timecalc = Promise.anyof($calc, $timeout);
There is also an allof
combinator, which creates a Promise
that will be kept when all of the specified Promise
s are kept, or broken if any of them are broken.
[Conjecture: there should be infix operators for these resembling the junctionals.]
The then
method on a Promise
is used to request that a certain piece of code should be run, receiving the Promise
as an argument, when the Promise
is kept or broken. If the Promise
is already kept or broken, the code is scheduled immediately. It is possible to call then
more than once, and each time it returns a Promise
representing the completion of both the original Promise
as well as the code specified in then
.
my $feedback_promise = $download_promise.then(-> $res {
given $res.status {
when Kept { say "File $res.result().name() download" }
when Broken { say "FAIL: $res.excuse()" }
}
});
[Conjecture: this needs better syntax to separate the "then" policies from the "else" policies (and from "catch" policies?), and to avoid a bunch of switch boilerplate. We already know the givens here...]
One risk when working with Promise
is that another piece of code will sneak in and keep or break a Promise
it should not. The notion of a promise is user-facing. To instead represent the promise from the viewpoint of the promiser, the various built-in Promise
factory methods and combinators use Promise::Vow
objects to represent that internal resolve to fulfill the promise. ("I have vowed to keep my promise to you.") The vow
method on a Promise
returns an object with keep
and break
methods. It can only be called once during a Promise
object's lifetime. Since keep
and break
on the Promise
itself just delegate to self.vow.keep(...)
or self.vow.break(...)
, obtaining the vow before letting the Promise
escape to the outside world is a way to take ownership of the right to keep or break it. For example, here is how the Promise.in
factory is implemented:
method in(Promise:U: $seconds, :$scheduler = $*SCHEDULER) {
my $p = Promise.new(:$scheduler);
my $v = $p.vow;
$scheduler.schedule: { $v.keep(True) }, :in($seconds);
$p;
}
The await
function is used to wait for one or more Promise
s to produce a result.
my ($a, $b) = await $p1, $p2;
This simply calls result
on each of the Promise
s, so any exception will be thrown. There is also a winner
statement [keywords still negotiable]:
winner {
when $p1 { say "First promise got a value" }
when $p2 { say "Second promise got a value" }
}
That will invoke the closure associated with the first promise that produces a result, either kept or broken. In order to avoid blocking, it is possible to provide a default that will be invoked if none of the promises currently have a result available.
winner {
when $p1 { say "First promise got a value" }
when $p2 { say "Second promise got a value" }
default { say "Not done yet" }
}
The construct as a whole returns the result of whichever block was selected.
A Channel
is essentially a concurrent queue. One or more threads can put values into the Channel
using send
:
my $c = Channel.new;
$c.send($msg);
Meanwhile, others can receive
them:
my $msg = $c.receive;
Channels are ideal for producer/consumer scenarios, and since there can be many senders and many receivers, they adapt well to scaling certain pipeline stages out over multiple workers also. [Conjectural: The two feed operators ==>
and <==
are implemented using Channel to connect each of the stages.]
A Channel
may be "forever", but it is possible to close
it to further sends:
$c.close();
Trying to send
any further messages on a closed channel will throw the X::Channel::SendOnClosed
exception. Closing a channel has no effect on the receiving end until all sent values have been received. At that point, any further calls to receive will throw X::Channel::ReceiveOnClosed
. The closed
property returns a Promise
that is kept when the sender has called close
and all sent messages have been received.
While receive
blocks until it can read, poll
takes a message from the channel if one is there or immediately returns Nil
if nothing is there. The peek
method is similar, but will not remove the message. Note that use of peek
is risky in any multi-receiver situation, since another worker may receive the message you peeked.
The winner
construct also works on channels, and will try to receive a value from the first Channel
that has one available. It can also be used in order to write a loop to receive from a channel until it is closed:
loop {
winner {
when $c { say "Received $_" }
when $c.closed { last }
}
}
[Conjectural: this is such a common pattern that we might want to provide a $channel.each(-> $val { ... }) to factor it out.]
Channels are good for producer/consumer scenarios, but because each worker blocks on receive, it is not such an ideal construct for doing fine-grained processing of asynchronously produced streams of values. Additionally, there can only be one receiver for each value. Supplies exist to address both of these issues. A Supply
pushes or pumps values to one or more receivers who have registered their interest.
Anything that does the Supply
role can be tapped (that is, subscribed to) by calling the tap
method on it. This takes between one and three callables as arguments:
$supply.tap(
-> $value { say "Got a $value" },
{ say "Reached the end" },
-> $ex { say "Got exception: $ex" });
The first, known as more
, is invoked whenever a value is produced by the thing that has been tapped. The second, known as done
, is invoked when all values have been produced and no more will be. The final one, known as fail
, is invoked if there is an error. This also means there will be no further values. Expressing the possible invocations as a grammar:
more* [ done | fail ]? # invocations
The simplest Supply is a Supply
class, which is punned from the role. On the "pumping" end, this has corresponding methods more
, done
, and fail
, which notify all current taps.
my $s = Supply.new;
my $t1 = $s.tap({ say $_ });
$s.more(1); # 1\n
$s.more(2); # 2\n
my $t2 = $s.tap({ say 2 * $_ },
{ say "End" });
$s.more(3); # 3\n6\n
The object returned by tap
represents the subscription. To stop subscribing, call close
on it.
$t1.close;
$s.more(4); # 8\n
$s.done; # End\n
This doesn't introduce any asynchrony directly. However, it is possible for values to be pumped by a Supply
from an asynchronous worker.
The Publish
class (punned from the Supply
role) has various methodsi that produce more interesting kinds of Supply
. These default to working asynchronously. Furthermore, they start producing values upon the point of the tap.
Publish.for
takes a (potentially lazy) list of values, and returns a Supply
that, when tapped, will iterate over the values and invoke the more
callable for each of them, and any done
callable at the end. If the iteration at some point produces an exception, then the fail
callable will be invoked.
Publish.interval
produces a Supply
that, when tapped, will produce an ascending value at a regular time interval.
Publish.interval(1).tap(&say); # Once a second, starting now
Publish.interval(5, 10).tap(&say); # Each 5 seconds, starting in 10 seconds
[TODO: many more of these, including ones for publishing a Channel and a Promise.]
Supplies are mathematically dual to iterators, and so it is possible to define the same set of operations on them as are available on lazy lists. The key difference is that, while grep
on a lazy list pulls a value to process, working synchronously, grep
on a Supply has values pushed through it, and pushes those that match the filter onwards to anything that taps it.
The following methods are available on a Supply
:
- flat
- grep
- map
- uniq
- squish
There are some others that will only publish a result or results if done
is reached:
- elems
- max
- min
- minmax
- reduce
- reverse
- sort
There are some combinators that deal with bringing multiple supplies together:
merge
-
produces a supply containing the values produced by two other supplies, and triggering
done
once both of the supplies have done so. zip
-
produces a supply that pairs together items from two other supplies, using
infix:<,>
by default or any other user-supplied function.
[TODO: plenty more of these: combine_latest, while, until...]
These combinators that involve multiple supplies need care in their implementation, since values may arrive at any point on each, and possibly at the same time. To help write such combinators, the on
meta-combinator is useful. on
taps many supplies, and ensures that only one callback will be running at a time, freeing the combinator writer of worrying about synchronization issues. Here is how zip
is implemented:
method zip(Supply $a, Supply $b, &with = &infix:<,>) {
my @as;
my @bs;
on -> $res {
$a => sub ($val) {
@as.push($val);
if @as && @bs {
$res.more(with(@as.shift, @bs.shift));
}
},
$b => sub ($val) {
@bs.push($val);
if @as && @bs {
$res.more(with(@as.shift, @bs.shift));
}
}
}
}
Thus there is never any race or other thread-safely problems with mutating the @as
and @bs
. The default behaviour, if a callable is specified along with the supply, is to use it for more
and provide a default done
and fail
. The default done
triggers done
on the result supply, which is the correct semantics for zip
. On the other hand, merge
wants different semantics, and so must provide a done
. This can be implemented as follows:
method merge(Supply $a, Supply $b) {
my $done = 0;
on -> $res {
$a => {
more => sub ($val) { $res.more($val) },
done => {
$res.done() if ++$done == 2;
}
},
$b => {
more => sub ($val) { $res.more($val) },
done => {
$res.done() if ++$done == 2;
}
}
}
}
A fail
can be provided in a similar way, although the default - convey the failure to the result supply - is normally what is wanted. The exception is writing combinators related to error handling.
[TODO: specify catch
and various other error-handling related timeouts.]
There is no event loop. Previous versions of this synopsis mentioned an event loop that would be underlying all concurrency. In this version, this is not the case.
Instead, most system level events, be they POSIX signals, or specific OS events such as file updates, will be exposed as a Supply
. For instance, $*POSIX is a Supply in which POSIX signals received by the running process, will appear as POSIX::Signal
objects to taps. On non-POSIX systems, a similar Supply will exist to handle system events.
VM-level threads, which typically correspond to OS-level threads, are exposed through the Thread
class. Whatever underlies it, a Thread
should always be backed by something that is capable of being scheduled on a CPU core (that is, it may not be a "green thread" or similar). Most users will not need to work with Thread
s directly. However, those building their own schedulers may well need to do so, and there may be other exceptional circumstances that demand such low-level control.
The easiest way to start a thread is with the start
method, which takes a Callable
and runs it on a new thread:
my $thread = Thread.start({
say "Gosh, I'm in a thread!";
});
It is also possible to create a thread object, and set it running later:
my $thread = Thread.new(code => {
say "A thread, you say?";
});
# later...
$thread.run();
Both approaches result in $thread
containing a Thread
object. At some point, join
should be called on the thread, from the thread that started it. This blocks until the thread has completed.
say "Certainly before the thread is started";
my $thread = Thread.start({ say "In the thread" });
say "This could come before or after the thread's output";
$thread.join();
say "Certainly after all the above output";
As an alternative to join
, it is possible to create a thread whose lifetime is bounded by that of the overall application. Such threads are automatically terminated when the application exits. In a scenario where the initial thread creates an application lifetime thread and no others, then the exit of the initial thread will cause termination of the overall program. Such a thread is created by either:
my $thread = Thread.new(:code({ ... }), :app_lifetime);
Or just:
my $thread = Thread.start({ ... }, :app_lifetime);
The property can be introspected:
say $thread.app_lifetime; # True/False
Each thread also has a unique ID, which can be obtained by the id
property.
say $thread.id;
This should be treated as an opaque number. It can not be assumed to map to any particular operating system's idea of thread ID, for example. For that, use something that lets you get at OS-level identifiers (such as calling an OS API using NativeCall).
A thread may also be given a name. This can be useful for understanding its usage. Uniqueness is not enforced; indeed, the default is "<anon>".
A thread stringifies to something of the form:
Thread<id>(name)
For example:
Thread<1234>(<anon>)
The currently executing thread is available through $*THREAD
. This is even available in the initial thread of the program, in this case by falling back to $PROCESS::THREAD
, which is the initial thread of the process.
Finally, the yield
method can be called on Thread
(not on any particular thread) to hint to the OS that the thread has nothing useful to do for the moment, and so another thread should run instead.
The Atomic Compare and Swap (CAS) primitive is directly supported by most modern hardware. It has been shown that it can be used to build a whole range of concurrency control mechanisms (such as mutexes and semaphores). It can also be used to implement lock-free data structures. It is decidedly a primitive, and not truly composable due to risk of livelock. However, since so much can be built out of it, Perl 6 provides it directly.
A Perl 6 implementation of CAS would look something like this:
sub cas($ref is rw, $expected, $new) {
my $seen = $ref;
if $ref === $expected {
$ref = $new;
}
return $seen;
}
Except that it happens atomically. For example, a crappy non-reentrant mutex could be implemented as:
class CrappyMutex {
has $!locked = 0;
method lock() {
loop {
return if cas($!locked, 0, 1) == 0;
}
}
method unlock() {
$!locked = 0;
}
}
Another common use of CAS is in providing lock-free data structures. Any data structure can be made lock-free as long as you're willing to never mutate it, but build a fresh one each time. To support this, there is another &cas
candidate that takes a scalar and a block. It calls the block with the seen initial value. The block returns the new, updated value. If nothing else updated the value in the meantime, the reference will be updated. If the CAS fails because another update got in first, the block will be run again, passing in the latest value.
For example, we could implement a top-5 news headlines list, which can be accessed and updated without ever locking, as:
class TopHeadlines {
has $!headlines = []; # Scalar holding array, as CAS needs
method headlines() {
$!headlines
}
method add_headline($headline) {
cas($!headlines, -> @current {
my @new = $headline, @current;
@new.pop while @new.elems > 5;
@new
});
}
}
It's the programmer's duty to ensure that the original data structure is never mutated and that the block has no side-effects (since it may be run any number of times).
Locks are unpleasant to work with, and users are pushed towards higher level synchronization primitives. However, those need to be implemented via lower level constructs for efficiency. As such, a simple lock mechanism - as close to what the execution environment offers as possible - is provided by the Lock
class. Note that it is erroneous to rely on the exact representation of an instance of this type (for example, don't assume it can be mixed into). Put another way, treat Lock
like a native type.
A Lock
is instantiated with new
:
$!lock = Lock.new;
The best way to use it is:
$!lock.protect: {
# code to run with the lock held
}
This acquires the lock, runs the code passed, and then releases the lock. It ensures the lock will be released even if an exception is thrown. It is also possible to do:
{
$!lock.lock();
# do stuff
LEAVE $!lock.unlock()
}
When using the lock
and unlock
methods, the programmer must ensure that the lock is unlocked. Lock
is reentrant. Naturally, it's easy to introduce deadlocks. Again, this is a last resort, intended for those who are building first resorts.