Skip to content

Commit

Permalink
add some roles for easy moose implementation; add filtering ability
Browse files Browse the repository at this point in the history
  • Loading branch information
jrockway committed Apr 16, 2010
1 parent f28709f commit 715c59e
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 8 deletions.
22 changes: 14 additions & 8 deletions lib/AnyEvent/Pump.pm
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,27 @@ use Sub::Exporter -setup => {
exports => ['pump'],
};

sub pump($$){
my ($from, $to) = @_;
sub pump($$;&){
my ($from, $to, $filter) = @_;
my $from_is_ah = $from->isa('AnyEvent::Handle');
$filter ||= sub { $_[0] }; # identity function

my $pusher; $pusher = sub {
my $h = shift;
my $data = delete $h->{rbuf};
return 0 unless $data;
my $_from = shift;
my $data = $from_is_ah ? delete $_from->{rbuf} : $_from->consume;
return 0 unless defined $data;

my $filtered = $filter->($data);
return 0 unless defined $filtered;

$to->push_write($data);
$from->push_read($pusher);
$to->push_write($filtered);
$_from->push_read($pusher);
return 1;
};
$from->push_read($pusher);

return guard {
if($from->isa('AnyEvent::Handle')){
if($from_is_ah){
$from->{_queue} = [
grep { refaddr $_ != refaddr $pusher } @{$from->{_queue} || []}
];
Expand Down
7 changes: 7 additions & 0 deletions lib/AnyEvent/Pump/Role/From.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use MooseX::Declare;

role AnyEvent::Pump::Role::From {
requires 'push_read';
requires 'consume';
requires 'kill_reader';
}
5 changes: 5 additions & 0 deletions lib/AnyEvent/Pump/Role/To.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use MooseX::Declare;

role AnyEvent::Pump::Role::To {
requires 'push_write';
}
56 changes: 56 additions & 0 deletions t/filter.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use strict;
use warnings;
use Test::More;

use AnyEvent::Pump qw(pump);
use AnyEvent::Util qw(portable_pipe);
use AnyEvent::Handle;

my ($r1, $w1) = portable_pipe;
my ($r2, $w2) = portable_pipe;

my $from = AnyEvent::Handle->new(
fh => $r1,
);

my $to = AnyEvent::Handle->new(
fh => $w2,
);

my $in = AnyEvent::Handle->new(
fh => $w1,
);

my $out = AnyEvent::Handle->new(
fh => $r2,
);

my $result = AnyEvent->condvar;
my $done = AnyEvent->condvar;

pump $from, $to, sub {
my $char = shift;
if($char =~ /X/){
$done->send(1);
return;
}
return $char;
};

$out->push_read( line => sub { $result->send($_[1]) } );
$in->push_write('hello');
delay(); # force the event loop to run
$in->push_write('X');
delay();
$in->push_write(" world\n");

ok $done->recv, 'got X';
is $result->recv, 'hello world', 'no X made it through';

done_testing;

sub delay {
my $cv = AnyEvent->condvar;
my $t; $t = AnyEvent->timer( after => 0.1, cb => sub { undef $t; $cv->send } );
$cv->recv;
}
33 changes: 33 additions & 0 deletions t/roles.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use strict;
use warnings;

use Test::More;
use AnyEvent::Pump qw(pump);

my @in = qw(t h i s i s a t e s t);
my @out = ();

{ package From;
use Moose;
with 'AnyEvent::Pump::Role::From';
sub consume { shift @in }
sub push_read { $_[1]->( $_[0] ) }
sub kill_reader { @in = ('done') }
}

{ package To;
use Moose;
with 'AnyEvent::Pump::Role::To';
sub push_write { push @out, $_[1] }
}

my $pump = pump(From->new, To->new);

is_deeply \@in, [], 'in was consumed';
is_deeply \@out, [qw(t h i s i s a t e s t)], 'in copied to out';

undef $pump;

is $in[0], 'done', 'readers were killed';

done_testing;

0 comments on commit 715c59e

Please sign in to comment.