Skip to content

Commit

Permalink
Add internal condvar
Browse files Browse the repository at this point in the history
  • Loading branch information
mar-kolya committed Apr 17, 2013
1 parent 749cb8c commit 3e677e8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 15 deletions.
40 changes: 35 additions & 5 deletions lib/AnyEvent/FIFO.pm
Expand Up @@ -7,12 +7,14 @@ our $VERSION = '0.00002';

sub new {
my $class = shift;
bless {
my $self = {
max_active => 1,
@_,
active => {},
events => {},
}, $class;
};
$self->{cv} ||= AE::cv;
return bless($self, $class);
}

sub push {
Expand All @@ -25,6 +27,7 @@ sub push {
}

push @{$self->{events}->{$slot}}, [$cb, @args];
$self->{cv}->begin();

AE::postpone sub {
$self->drain();
Expand All @@ -43,6 +46,12 @@ sub waiting {
return $self->{events}->{$slot} ? (0 + @{$self->{events}->{$slot}}) : 0;
}

sub cv {
my $self = shift;
$self->{cv} = $_[0] if(@_);
return $self->{cv};
}

sub drain {
my $self = shift;

Expand All @@ -62,6 +71,7 @@ sub drain {
if ($self->{active}->{$slot} <= 0) {
delete $self->{active}->{$slot};
}
$self->{cv}->end();
AE::postpone sub {
$self->drain();
};
Expand Down Expand Up @@ -92,6 +102,8 @@ AnyEvent::FIFO - Simple FIFO Callback Dispatch
$fifo->push( "slot", \&callback, @args );
# dispatch is done automatically
# wait for all tasks to complete
$fifo->cv->recv();
sub callback {
my ($guard, @args) = @_;
Expand All @@ -102,10 +114,10 @@ AnyEvent::FIFO - Simple FIFO Callback Dispatch
=head1 DESCRIPTION
AnyEvent::FIFO is a simple FIFO queue to dispatch events in order.
AnyEvent::FIFO is a simple FIFO queue to dispatch events in order.
If you use regular watchers and register callbacks from various places in
your program, you're not necessarily guaranteed that the callbacks will be
your program, you're not necessarily guaranteed that the callbacks will be
executed in the order that you expect. By using this module, you can
register callbacks and they will be executed in that particular order.
Expand All @@ -119,6 +131,12 @@ register callbacks and they will be executed in that particular order.
Number of concurrent callbacks to be executed B<per slot>.
=item cv => $cv
Instance of L<AnyEvent condvar|AnyEvent/"CONDITION VARIABLES">. AnyEvent::FIFO will create one for you if this is not provided.
AnyEvent::FIFO calls $cv->begin() when new task is pushed and $cv->end() when task is completed.
=back
=head2 push ([$slot,] $cb [,@args])
Expand Down Expand Up @@ -175,14 +193,26 @@ The name of the slot, "__default__" is used if not specified.
=back
=head2 cv ([$cv])
Gets or sets L<AnyEvent condvar|AnyEvent/"CONDITION VARIABLES">.
=over 4
=item $cv
A new condvar to assign to this FIFO
=back
=head2 drain
Attemps to drain the queue, if possible. You DO NOT need to call this method
by yourself. It's handled automatically
=head1 AUTHOR
This module is basically a generalisation of the FIFO queue used in AnyEvent::HTTP by Marc Lehman.
This module is basically a generalisation of the FIFO queue used in AnyEvent::HTTP by Marc Lehman.
(c) Daisuke Maki C< <<daisuke@endeworks.jp>> > 2010
Expand Down
6 changes: 1 addition & 5 deletions t/001_basic.t
Expand Up @@ -4,21 +4,17 @@ use AnyEvent;

use_ok "AnyEvent::FIFO";

my $cv = AE::cv;

my $q = AnyEvent::FIFO->new();

my $expected = 1;
foreach my $i (1..10) {
$cv->begin;
$q->push( sub {
my ($guard, @args) = @_;
is( $args[0], $i, "arg is $i" );
is( $i, $expected++, "$i-th execution" );
is( $q->active, 1, "1 task is running" );
is( $q->waiting, 10 - $i, "$i tasks is waiting" );
$cv->end
}, $i );
}

$cv->recv;
$q->cv->recv;
6 changes: 1 addition & 5 deletions t/002_slots.t
Expand Up @@ -4,23 +4,19 @@ use AnyEvent;

use_ok "AnyEvent::FIFO";

my $cv = AE::cv;

my $q = AnyEvent::FIFO->new();

foreach my $group ('a'..'c') {
my $expected = 1;
foreach my $i (1..10) {
$cv->begin;
$q->push( $group, sub {
my ($guard, @args) = @_;
is( $args[0], $i, "arg is $i" );
is( $i, $expected++, "slot $group, $i-th execution" );
is( $q->active($group), 1, "1 task is running" );
is( $q->waiting($group), 10 - $i, "$i tasks is waiting" );
$cv->end
}, $i );
}
}

$cv->recv;
$q->cv->recv;

0 comments on commit 3e677e8

Please sign in to comment.