Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
214 lines (148 sloc) 5.99 KB
=begin pod :kind("Type") :subkind("class") :category("domain-specific")
=TITLE class Channel
=SUBTITLE Thread-safe queue for sending values from producers to consumers
class Channel {}
A C<Channel> is a thread-safe queue that helps you to send a series of objects
from one or more producers to one or more consumers. Each object will arrive at
only one such consumer, selected by the scheduler. If there is only one
consumer and one producer, the order of objects is guaranteed to be preserved.
Sending on a C<Channel> is non-blocking.
my $c = Channel.new;
await (^10).map: {
start {
my $r = rand;
sleep $r;
$c.send($r);
}
}
$c.close;
say $c.list;
Further examples can be found in the L<concurrency page|/language/concurrency#Channels>
=head1 Methods
=head2 method send
Defined as:
method send(Channel:D: \item)
Enqueues an item into the C<Channel>. Throws an exception of type
L<X::Channel::SendOnClosed|/type/X::Channel::SendOnClosed> if the channel has been
closed already. This call will B<not> block waiting for a consumer to take the object.
There is no set limit on the number of items that may be queued, so
care should be taken to prevent runaway queueing.
my $c = Channel.new;
$c.send(1);
$c.send([2, 3, 4, 5]);
$c.close;
say $c.list; # OUTPUT: «(1 [2 3 4 5])␤»
=head2 method receive
Defined as:
method receive(Channel:D:)
Receives and removes an item from the channel. It blocks if no item is
present, waiting for a C<send> from another thread.
Throws an exception of
type L<X::Channel::ReceiveOnClosed|/type/X::Channel::ReceiveOnClosed> if the channel
has been closed, and the last item has been removed already, or if C<close> is called
while C<receive> is waiting for an item to arrive.
If the channel has been marked as erratic with method C<fail>, and the last
item has been removed, throws the argument that was given to C<fail> as an
exception.
See method C<poll> for a non-blocking version that won't throw exceptions.
my $c = Channel.new;
$c.send(1);
say $c.receive; # OUTPUT: «1␤»
=head2 method poll
Defined as:
method poll(Channel:D:)
Receives and removes an item from the channel. If no item is present, returns
C<Nil> instead of waiting.
my $c = Channel.new;
Promise.in(2).then: { $c.close; }
^10 .map({ $c.send($_); });
loop {
if $c.poll -> $item { $item.say };
if $c.closed { last };
sleep 0.1;
}
See method C<receive> for a blocking version that properly responds to channel
closing and failure.
=head2 method close
Defined as:
method close(Channel:D:)
Close the C<Channel>, normally. This makes subsequent C<send> calls die with
L<X::Channel::SendOnClosed|/type/X::Channel::SendOnClosed>. Subsequent calls of
C<.receive> may still drain any remaining items that were previously sent, but if
the queue is empty, will throw an L<X::Channel::ReceiveOnClosed|/type/X::Channel::ReceiveOnClosed>
exception. Since you can produce a C<Seq> from a Channel by contextualizing to array with C<@()>
or by calling the C<.list> method, these methods will not terminate until the channel has been
closed. A L<whenever|/language/concurrency#index-entry-whenever>-block will also
terminate properly on a closed channel.
=for code
my $c = Channel.new;
$c.close;
$c.send(1);
CATCH { default { put .^name, ': ', .Str } };
# OUTPUT: «X::Channel::SendOnClosed: Cannot send a message on a closed channel␤»
Please note that any exception thrown may prevent C<.close> from being called,
this may hang the receiving thread. Use a L<LEAVE|/language/phasers#LEAVE>
phaser to enforce the C<.close> call in this case.
=head2 method list
Defined as:
method list(Channel:D: --> List:D)
Returns a list based on the C<Seq> which will iterate items in the queue and
remove each item from it as it iterates. This can only terminate once the
C<close> method has been called.
my $c = Channel.new; $c.send(1); $c.send(2);
$c.close;
say $c.list; # OUTPUT: «(1 2)␤»
=head2 method closed
Defined as:
method closed(Channel:D: --> Promise:D)
Returns a promise that will be kept once the channel is closed by a call to
method C<close>.
my $c = Channel.new;
$c.closed.then({ say "It's closed!" });
$c.close;
sleep 1;
=head2 method fail
Defined as:
method fail(Channel:D: $error)
Closes the C<Channel> (that is, makes subsequent C<send> calls die), and enqueues
the error to be thrown as the final element in the channel. Method C<receive>
will throw that error as an exception. Does nothing if the channel has already
been closed or C<.fail> has already been called on it.
my $c = Channel.new;
$c.fail("Bad error happens!");
$c.receive;
CATCH { default { put .^name, ': ', .Str } };
# OUTPUT: «X::AdHoc: Bad error happens!␤»
=head2 method Capture
Defined as:
method Capture(Channel:D --> Capture:D)
Equivalent to calling L«C<.List.Capture>|/type/List#method_Capture»
on the invocant.
=head2 method Supply
Defined as:
method Supply(Channel:D:)
This returns an C<on-demand> L<Supply|/type/Supply> that emits a value for every value
received on the Channel. C<done> will be called on the C<Supply> when the L<Channel|/type/Channel>
is closed.
my $c = Channel.new;
my Supply $s1 = $c.Supply;
my Supply $s2 = $c.Supply;
$s1.tap(-> $v { say "First $v" });
$s2.tap(-> $v { say "Second $v" });
^10 .map({ $c.send($_) });
sleep 1;
Multiple calls to this method produce multiple instances of Supply, which compete
over the values from the Channel.
=head2 sub await
Defined as:
multi sub await(Channel:D)
multi sub await(*@)
Waits until all of one or more channels has a value available, and returns
those values (it calls C<.receive> on the channel). Also works with
L<promises|/type/Promise>.
my $c = Channel.new;
Promise.in(1).then({$c.send(1)});
say await $c;
Since 6.d, it no longer blocks a thread while waiting.
=end pod
# vim: expandtab softtabstop=4 shiftwidth=4 ft=perl6
You can’t perform that action at this time.