Fetching contributors…
Cannot retrieve contributors at this time
826 lines (643 sloc) 29.1 KB
=begin pod :tag<tutorial>
=TITLE Concurrency
=SUBTITLE Concurrency and asynchronous programming
In common with most modern programming languages, Perl 6 is designed to
support parallelism, asynchronicity and L<concurrency|>. Parallelism is about doing multiple things at once. I<Asynchronous programming>, which is sometimes called event driven or reactive
programming, is about supporting changes in the program flow caused by events triggered elsewhere in the program. Finally, concurrency is about the coordination of access and modification of some shared resources.
The aim of the Perl 6 concurrency design is to provide a high-level,
composable and consistent interface, regardless of how a virtual machine
may implement it for a particular operating system, through layers of
facilities as described below.
=begin comment
I'm not quite clear which specific features should be included below
hyper-operators, autothreading junctions?
=end comment
Additionally, certain Perl features may implicitly operate in an asynchronous
fashion, so in order to ensure predictable interoperation with these features,
user code should, where possible, avoid the lower level concurrency APIs
(e.g., L<Thread> and L<Scheduler>) and use the higher-level interfaces.
=head1 High-level APIs
=head2 Promises
A L<Promise|/type/Promise> (also called I<future> in other programming
environments) encapsulates the result of a computation that may not
have completed or even started at the time the promise is obtained.
A C<Promise> starts from a C<Planned> status and can result in either a
C<Kept> status, meaning the promise has been successfully completed, or
a C<Broken> status meaning that the promise has failed.
Usually this is much of the functionality that user code needs to operate
in a concurrent or asynchronous manner.
=begin code
my $p1 =;
say $p1.status; # OUTPUT: «Planned␤»
say $p1.status; # OUTPUT: «Kept␤»
say $p1.result; # OUTPUT: «Result␤»
# (since it has been kept, a result is available!)
my $p2 =;
$p2.break('oh no');
say $p2.status; # OUTPUT: «Broken␤»
say $p2.result; # dies, because the promise has been broken
CATCH { default { say .^name, ': ', .Str } };
# OUTPUT: «X::AdHoc+{X::Promise::Broken}: oh no␤»
=end code
Promises gain much of their power by being composable, for example by
chaining, usually by the L<then|/type/Promise#method_then> method:
my $promise1 =;
my $promise2 = $promise1.then(
-> $v { say $v.result; "Second Result" }
$promise1.keep("First Result");
say $promise2.result; # OUTPUT: «First Result␤Second Result␤»
Here the L<then|/type/Promise#method_then> method schedules code to be
executed when the first L<Promise> is kept or broken, itself returning
a new L<Promise> which will be kept with the result of the code when
it is executed (or broken if the code fails). C<keep> changes the
status of the promise to C<Kept> setting the result to the positional
argument. C<result> blocks the current thread of execution until the
promise is kept or broken, if it was kept then it will return the
result (that is the value passed to C<keep>), otherwise it will throw an
exception based on the value passed to C<break>. The latter behaviour
is illustrated with:
my $promise1 =;
my $promise2 = $promise1.then(-> $v { say "Handled but : "; say $v.result});
$promise1.break("First Result");
try $promise2.result;
say $promise2.cause; # OUTPUT: «Handled but : ␤First Result␤»
Here the C<break> will cause the code block of the C<then> to throw an
exception when it calls the C<result> method on the original promise
that was passed as an argument, which will subsequently cause the second
promise to be broken, raising an exception in turn when its result
is taken. The actual L<Exception> object will then be available from
C<cause>. If the promise had not been broken C<cause> would raise a
L<X::Promise::CauseOnlyValidOnBroken> exception.
A L<Promise> can also be scheduled to be automatically kept at a future time:
my $promise1 =;
my $promise2 = $promise1.then(-> $v { say $v.status; 'Second Result' });
say $promise2.result;
The L<method in|/type/Promise#method_in> creates a new promise and
schedules a new task to call C<keep> on it no earlier than the supplied
number of seconds, returning the new L<Promise> object.
A very frequent use of promises is to run a piece of code, and keep the
promise once it returns successfully, or break it when the code dies.
The L<start method|/type/Promise#method_start> provides a shortcut
for that:
my $promise = Promise.start(
{ my $i = 0; for 1 .. 10 { $i += $_ }; $i}
say $promise.result; # OUTPUT: «55␤»
Here the C<result> of the promise returned is the value returned from
the code. Similarly if the code fails (and the promise is thus broken),
then C<cause> will be the L<Exception> object that was thrown:
my $promise = Promise.start({ die "Broken Promise" });
try $promise.result;
say $promise.cause;
This is considered to be such a commonly required pattern that it is
also provided as a keyword:
my $promise = start {
my $i = 0;
for 1 .. 10 {
$i += $_
my $result = await $promise;
say $result;
The subroutine L<await|/type/Promise#sub_await> is almost equivalent to
calling C<result> on the promise object returned by C<start> but it will
also take a list of promises and return the result of each:
my $p1 = start {
my $i = 0;
for 1 .. 10 {
$i += $_
my $p2 = start {
my $i = 0;
for 1 .. 10 {
$i -= $_
my @result = await $p1, $p2;
say @result; # OUTPUT: «[55 -55]␤»
In addition to C<await>, two class methods combine several
L<Promise> objects into a new promise: C<allof> returns a promise that
is kept when all the original promises are kept or broken:
my $promise = Promise.allof(,
await $promise;
say "All done"; # Should be not much more than three seconds later
And C<anyof> returns a new promise that will be kept when any of the
original promises is kept or broken:
my $promise = Promise.anyof(,
await $promise;
say "All done"; # Should be about 3 seconds later
Unlike C<await> however the results of the original kept promises are not
available without referring to the original, so these are more useful
when the completion or otherwise of the tasks is more important to the
consumer than the actual results, or when the results have been collected
by other means. You may, for example, want to create a dependent Promise
that will examine each of the original promises:
my @promises;
for 1..5 -> $t {
push @promises, start {
sleep $t;
say await Promise.allof(@promises).then({ so all(@promises>>.result) });
Which will give True if all of the promises were kept with True, False
If you are creating a promise that you intend to keep or break yourself
then you probably don't want any code that might receive the promise to
inadvertently (or otherwise) keep or break the promise before you do.
For this purpose there is the L<method vow|/type/Promise#method_vow>, which
returns a Vow object which becomes the only mechanism by which the promise
can be kept or broken. If an attempt to keep or break the Promise is made
directly then the exception L<X::Promise::Vowed> will be thrown, as long
as the vow object is kept private, the status of the promise is safe:
sub get_promise {
my $promise =;
my $vow = $promise.vow;{$vow.keep});
my $promise = get_promise();
# Will throw an exception
# "Access denied to keep/break this Promise; already vowed"
CATCH { default { say .^name, ': ', .Str } };
# OUTPUT: «X::Promise::Vowed: Access denied to keep/break this Promise; already vowed␤»
The methods that return a promise that will be kept or broken
automatically such as C<in> or C<start> will do this, so it is not
necessary to do it for these.
=head2 Supplies
A L<Supply> is an asynchronous data streaming mechanism that can be
consumed by one or more consumers simultaneously in a manner similar to
"events" in other programming languages and can be seen as enabling
"Event Driven" or reactive designs.
At its simplest, a L<Supply> is a message stream that can have multiple
subscribers created with the method C<tap> on to which data items can be
placed with C<emit>.
The L<Supply> can either be C<live> or C<on-demand>. A C<live> supply is like
a TV broadcast: those who tune in don't get previously emitted values. An
C<on-demand> broadcast is like Netflix: everyone who
starts streaming a movie (taps a supply), always starts it from the beginning
(gets all the values), regardless of how many people are watching it right now.
Note that no history is kept for C<on-demand> supplies, instead, the C<supply>
block is run for each tap of the supply.
A C<live> L<Supply>
is created by the L<Supplier> factory, each emitted value is passed to
all the active tappers as they are added:
my $supplier =;
my $supply = $supplier.Supply;
$supply.tap( -> $v { say $v });
for 1 .. 10 {
Note that the C<tap> is called on a L<Supply> object created by the
L<Supplier> and new values are emitted on the L<Supplier>.
X«|supply (on-demand)»
An C<on-demand> L<Supply> is created by the C<supply> keyword:
my $supply = supply {
for 1 .. 10 {
$supply.tap( -> $v { say $v });
In this case the code in the supply block is executed every time the
L<Supply> returned by C<supply> is tapped, as demonstrated by:
my $supply = supply {
for 1 .. 10 {
$supply.tap( -> $v { say "First : $v" });
$supply.tap( -> $v { say "Second : $v" });
The C<tap> method returns a L<Tap> object which can be used to obtain
information about the tap and also to turn it off when we are no longer
interested in the events:
my $supplier =;
my $supply = $supplier.Supply;
my $tap = $supply.tap( -> $v { say $v });
$supplier.emit("Won't trigger the tap");
Calling C<done> on the supply object calls the C<done> callback that
may be specified for any taps, but does not prevent any further events
being emitted to the stream, or taps receiving them.
The method C<interval> returns a new C<on-demand> supply which periodically
emits a new event at the specified interval. The data that is emitted
is an integer starting at 0 that is incremented for each event. The
following code outputs 0 .. 5 :
my $supply = Supply.interval(2);
$supply.tap(-> $v { say $v });
sleep 10;
A second argument can be supplied to C<interval> which specifies a delay
in seconds before the first event is fired. Each tap of a supply created
by C<interval> has its own sequence starting from 0, as illustrated by
the following:
my $supply = Supply.interval(2);
$supply.tap(-> $v { say "First $v" });
sleep 6;
$supply.tap(-> $v { say "Second $v"});
sleep 10;
A live C<Supply> that keeps values until first tapped can be created with
=head3 X<C<whenever>|whenever>
The X<C<whenever>|whenever> keyword can be used in supply blocks or in react blocks. It introduces a block of code that will be run when prompted by an asynchronous event that it specifies - that could be a L<Supply>, a L<Channel>, a L<Promise> or an L<Iterable>.
In this example we are watching two supplies.
my $bread-supplier =;
my $vegetable-supplier =;
my $supply = supply {
whenever $bread-supplier.Supply {
emit("We've got bread: " ~ $_);
whenever $vegetable-supplier.Supply {
emit("We've got a vegetable: " ~ $_);
$supply.tap( -> $v { say "$v" });
$vegetable-supplier.emit("Radish"); # OUTPUT: «We've got a vegetable: Radish␤»
$bread-supplier.emit("Thick sliced"); # OUTPUT: «We've got bread: Thick sliced␤»
$vegetable-supplier.emit("Lettuce"); # OUTPUT: «We've got a vegetable: Lettuce␤»
Please note that one should keep the code inside the C<whenever> as small as possible, as only one C<whenever> block will be executed at any time. One can use a C<start> block inside the C<whenever> block to run longer running code.
=head3 X<C<react>|react>
The C<react> keyword introduces a block of code containing one or more 'whenever' keywords to watch asynchronous events. The main difference between a supply block and a react block is that the code in a react block runs where it appears in the code flow, whereas a supply block has to be tapped before it does anything.
Another difference is that a supply block can be used without the 'whenever' keyword, but a react block requires at least one 'whenever' to be of any real use.
react {
whenever Supply.interval(2) -> $v {
say $v;
done() if $v == 4;
Here the C<whenever> keyword uses L«C<.act>|/type/Supply#method_act»
to create a tap on the L<Supply> from the provided block. The C<react>
block is exited when C<done()> is called in one of the taps. Using C<last> to exit the block would produce an error indicating that it's not really a loop construct.
An C<on-demand> L<Supply> can also be created from a list of values that
will be emitted in turn, thus the first C<on-demand> example could be
written as:
react {
whenever Supply.from-list(1..10) -> $v {
say $v;
=head3 Transforming supplies
An existing supply object can be filtered or transformed, using the methods
C<grep> and C<map> respectively, to create a new supply in a manner like
the similarly named list methods: C<grep> returns a supply such that only
those events emitted on the source stream for which the C<grep> condition
is true is emitted on the second supply:
my $supplier =;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "Original : $v" });
my $odd_supply = $supply.grep({ $_ % 2 });
$odd_supply.tap(-> $v { say "Odd : $v" });
my $even_supply = $supply.grep({ not $_ % 2 });
$even_supply.tap(-> $v { say "Even : $v" });
for 0 .. 10 {
C<map> returns a new supply such that for each item emitted to the
original supply a new item which is the result of the expression passed
to the C<map> is emitted:
my $supplier =;
my $supply = $supplier.Supply;
$supply.tap(-> $v { say "Original : $v" });
my $half_supply = ${ $_ / 2 });
$half_supply.tap(-> $v { say "Half : $v" });
for 0 .. 10 {
=head3 Ending a supply
If you need to have an action that runs when the supply finishes, you can do so
by setting the C<done> and C<quit> options in the call to C<tap>:
=begin code :preamble<my $supply; class X::MyApp::Error is Exception {}>
$supply.tap: { ... },
done => { say 'Job is done.' },
quit => {
when X::MyApp::Error { say "App Error: ", $_.message }
=end code
The C<quit> block works very similar to a C<CATCH>. If the exception is marked
as seen by a C<when> or C<default> block, the exception is caught and handled.
Otherwise, the exception continues to up the call tree (i.e., the same behavior
as when C<quit> is not set).
=head3 Phasers in a supply or react block
If you are using the C<react> or C<supply> block syntax with C<whenever>, you
can add phasers within your C<whenever> blocks to handle the C<done> and C<quit>
messages from the tapped supply:
=begin code :preamble<my $supply; class X::MyApp::Error is Exception {}>
react {
whenever $supply {
...; # your usual supply tap code here
LAST { say 'Job is done.' }
QUIT { when X::MyApp::Error { say "App Error: ", $_.message } }
=end code
The behavior here is the same as setting C<done> and C<quit> on C<tap>.
=head2 Channels
A L<Channel> is a thread-safe queue that can have multiple readers and
writers that could be considered to be similar in operation to a "fifo" or
named pipe except it does not enable inter-process communication. It should
be noted that, being a true queue, each value sent to the L<Channel> will only
be available to a single reader on a first read, first served basis: if you
want multiple readers to be able to receive every item sent you probably
want to consider a L<Supply>.
An item is queued onto the L<Channel> with the
L<method send|/type/Channel#method_send>, and the L<method
receive|/type/Channel#method_receive> removes an item from the queue
and returns it, blocking until a new item is sent if the queue is empty:
my $channel =;
$channel.send('Channel One');
say $channel.receive; # OUTPUT: «Channel One␤»
If the channel has been closed with the L<method
close|/type/Channel#method_close> then any C<send> will cause the
exception L<X::Channel::SendOnClosed> to be thrown, and a C<receive>
will throw a L<X::Channel::ReceiveOnClosed> if there are no more items
on the queue.
The L<method list|/type/Channel#method_list> returns all the items on
the L<Channel> and will block until further items are queued unless the
channel is closed:
my $channel =;
await (^10).map: -> $r {
start {
sleep $r;
for $channel.list -> $r {
say $r;
There is also the non-blocking L<method poll|/type/Channel#method_poll>
that returns an available item from the channel or L<Nil> if there
is no item or the channel is closed, this does of course mean that the
channel must be checked to determine whether it is closed:
my $c =;
# Start three Promises that sleep for 1..3 seconds, and then
# send a value to our Channel
^3 .map: -> $v {
start {
sleep 3 - $v;
$c.send: "$v from thread {$*}";
# Wait 3 seconds before closing the channel { $c.close }
# Continuously loop and poll the channel, until it's closed
my $is-closed = $c.closed;
loop {
if $c.poll -> $item {
say "$item received after {now - INIT now} seconds";
elsif $is-closed {
say 'Doing some unrelated things...';
sleep .6;
# Doing some unrelated things...
# Doing some unrelated things...
# 2 from thread 5 received after 1.2063182 seconds
# Doing some unrelated things...
# Doing some unrelated things...
# 1 from thread 4 received after 2.41117376 seconds
# Doing some unrelated things...
# 0 from thread 3 received after 3.01364461 seconds
# Doing some unrelated things...
The L<method closed|/type/Channel#method_closed> returns a L<Promise> that
will be kept (and consequently will evaluate to True in a boolean context)
when the channel is closed.
The C<.poll> method can be used in combination with C<.receive> method, as a
caching mechanism where lack of value returned by C<.poll> is a signal that
more values need to be fetched and loaded into the channel:
=begin code :preamble<my $c; sub slowly-fetch-a-thing {};>
sub get-value {
return $c.poll // do { start replenish-cache; $c.receive };
sub replenish-cache {
for ^20 {
$c.send: $_ for slowly-fetch-a-thing();
=end code
Channels can be used in place of the L<Supply> in the C<whenever> of a
C<react> block described earlier:
=begin code
my $channel =;
my $p = start {
react {
whenever $channel {
say $_;
await (^10).map: -> $r {
start {
sleep $r;
await $p;
=end code
It is also possible to obtain a L<Channel> from a L<Supply> using the
L<Channel method|/type/Supply#method_Channel> which returns a L<Channel>
which is fed by a C<tap> on the L<Supply>:
my $supplier =;
my $supply = $supplier.Supply;
my $channel = $supply.Channel;
my $p = start {
react {
whenever $channel -> $item {
say "via Channel: $item";
await (^10).map: -> $r {
start {
sleep $r;
await $p;
C<Channel> will return a different L<Channel> fed with the same data
each time it is called. This could be used, for instance, to fan-out a
L<Supply> to one or more L<Channel>s to provide for different interfaces
in a program.
=head2 Proc::Async
L<Proc::Async> builds on the facilities described to run and interact with
an external program asynchronously:
my $proc ='echo', 'foo', 'bar');
$proc.stdout.tap(-> $v { print "Output: $v" });
$proc.stderr.tap(-> $v { print "Error: $v" });
say "Starting...";
my $promise = $proc.start;
await $promise;
say "Done.";
# Output:
# Starting...
# Output: foo bar
# Done.
The path to the command as well as any arguments to the command are
supplied to the constructor. The command will not be executed until
L<start|/type/Proc::Async#method_start> is called, which will return
a L<Promise> that will be kept when the program exits. The standard
output and standard error of the program are available as L<Supply>
objects from the methods L<stdout|/type/Proc::Async#method_stdout>
and L<stderr|/type/Proc::Async#method_stderr> respectively which can be
tapped as required.
If you want to write to the standard input of the program
you can supply the C<:w> adverb to the constructor and use
the methods L<write|/type/Proc::Async#method_write>,
L<print|/type/Proc::Async#method_print> or
L<say|/type/Proc::Async#method_say> to write to the opened pipe once
the program has been started:
my $proc =, 'grep', 'foo');
$proc.stdout.tap(-> $v { print "Output: $v" });
say "Starting...";
my $promise = $proc.start;
$proc.say("this line has foo");
$proc.say("this one doesn't");
await $promise;
say "Done.";
# Output:
# Starting...
# Output: this line has foo
# Done.
Some programs (such as C<grep> without a file argument in this
example, ) won't exit until their standard input is closed so
L<close-stdin|/type/Proc::Async#method_close-stdin> can be called when
you are finished writing to allow the L<Promise> returned by C<start>
to be kept.
=head1 Low-level APIs
=head2 Threads
The lowest level interface for concurrency is provided by L<Thread>. A
thread can be thought of as a piece of code that may eventually be run
on a processor, the arrangement for which is made almost entirely by the
virtual machine and/or operating system. Threads should be considered,
for all intents, largely un-managed and their direct use should be
avoided in user code.
A thread can either be created and then actually run later:
my $thread = => { for 1 .. 10 -> $v { say $v }});
# ...
Or can be created and run at a single invocation:
my $thread = Thread.start({ for 1 .. 10 -> $v { say $v }});
In both cases the completion of the code encapsulated by the L<Thread>
object can be waited on with the C<finish> method which will block until
the thread completes:
=for code :preamble<my $thread;>
Beyond that there are no further facilities for synchronization or resource
sharing which is largely why it should be emphasized that threads are unlikely
to be useful directly in user code.
=head2 Schedulers
The next level of the concurrency API is supplied by classes that
implement the interface defined by the role L<Scheduler>. The intent
of the scheduler interface is to provide a mechanism to determine which
resources to use to run a particular task and when to run it. The majority
of the higher level concurrency APIs are built upon a scheduler and it
may not be necessary for user code to use them at all, although some
methods such as those found in L<Proc::Async>, L<Promise> and L<Supply>
allow you to explicitly supply a scheduler.
The current default global scheduler is available in the variable
The primary interface of a scheduler (indeed the only method required
by the L<Scheduler> interface) is the C<cue> method:
method cue(:&code, Instant :$at, :$in, :$every, :$times = 1; :&catch)
This will schedule the L<Callable> in C<&code> to be executed in the
manner determined by the adverbs (as documented in L<Scheduler>) using
the execution scheme as implemented by the scheduler. For example:
my $i = 0;
my $cancellation = $*SCHEDULER.cue({ say $i++}, every => 2 );
sleep 20;
Assuming that the C<$*SCHEDULER> hasn't been changed from the default,
will print the numbers 0 to 10 approximately (i.e with operating system
scheduling tolerances) every two seconds. In this case the code will
be scheduled to run until the program ends normally, however the method
returns a L<Cancellation> object which can be used to cancel the scheduled
execution before normal completion:
my $i = 0;
my $cancellation = $*SCHEDULER.cue({ say $i++}, every => 2 );
sleep 10;
sleep 10;
should only output 0 to 5,
Despite the apparent advantage the L<Scheduler> interface provides over
that of L<Thread> all of functionality is available through higher level
interfaces and it shouldn't be necessary to use a scheduler directly,
except perhaps in the cases mentioned above where a scheduler can be
supplied explicitly to certain methods.
A library may wish to provide an alternative scheduler implementation if
it has special requirements, for instance a UI library may want all code
to be run within a single UI thread, or some custom priority mechanism
may be required, however the implementations provided as standard and
described below should suffice for most user code.
=head3 ThreadPoolScheduler
The L<ThreadPoolScheduler> is the default scheduler, it maintains a pool
of threads that are allocated on demand, creating new ones as necessary up
to maximum number given as a parameter when the scheduler object was created
(the default is 16.) If the maximum is exceeded then C<cue> may queue the
code until such time as a thread becomes available.
Rakudo allows the maximum number of threads allowed in the default scheduler
to be set by the environment variable C<RAKUDO_MAX_THREADS> at the time
the program is started.
=head3 CurrentThreadScheduler
The L<CurrentThreadScheduler> is a very simple scheduler that will always
schedule code to be run straight away on the current thread. The implication
is that C<cue> on this scheduler will block until the code finishes
execution, limiting its utility to certain special cases such as testing.
=head2 Locks
The class L<Lock> provides the low level mechanism that protects
shared data in a concurrent environment and is thus key to supporting
thread-safety in the high level API, this is sometimes known as a
"Mutex" in other programming languages. Because the higher level classes
(L<Promise>, L<Supply> and L<Channel>) use a L<Lock> where required it
is unlikely that user code will need to use a L<Lock> directly.
The primary interface to L<Lock> is the method
L<protect|/type/Lock#method_protect> which ensures that a block of code
(commonly called a "critical section") is only executed in one thread
at a time:
my $lock =;
my $a = 0;
await (^10).map: {
start {
my $r = rand;
sleep $r;
say $a; # OUTPUT: «10␤»
C<protect> returns whatever the code block returns.
Because C<protect> will block any threads that are waiting to execute
the critical section the code should be as quick as possible.
=head1 Safety concerns
Some shared data concurrency issues are less obvious than others.
For a good general write-up on this subject see this L<blog post|>.
One particular issue of note is when container autovivification or extension
takes place. When an L<Array> or a L<Hash> entry is initially assigned the
underlying structure is altered and that operation is not async safe. For
example, in this code:
my @array;
my $slot := @array[20];
$slot = 'foo';
The third line is the critical section as that is when the array is extended.
The simplest fix is to use a L<Lock> to protect the critical section. A
possibly better fix would be to refactor the code so that sharing a container
is not necessary.
=end pod
# vim: expandtab softtabstop=4 shiftwidth=4 ft=perl6