Permalink
Browse files

followtail and block tests and fixes

  • Loading branch information...
1 parent c575666 commit 6c5e3e8f8d7c206ba69dd727f08fc89d2d8a8ef4 @rcaputo committed Jun 18, 2000
Showing with 228 additions and 22 deletions.
  1. +16 −6 Changes
  2. +1 −0 MANIFEST
  3. +1 −1 lib/POE.pm
  4. +2 −0 lib/POE/Driver.pm
  5. +1 −1 lib/POE/Driver/SysRW.pm
  6. +27 −11 lib/POE/Filter/Block.pm
  7. +3 −1 lib/POE/Wheel/FollowTail.pm
  8. +3 −2 tests/00_coverage.t
  9. +174 −0 tests/14_wheels_ft.t
View
22 Changes
@@ -16,14 +16,9 @@ subversions are available from <http://www.newts.org/~troc/poe.html>.
,----- To Do -----
|
-| Before 0.11
-|
-| Wheel::FollowTail test.
-| Filter::Block test.
-| Fix $kernel->fork() or take it out entirely.
-|
| After 0.11
|
+| Fix $kernel->fork() or take it out entirely.
| Filter::HTTPD test.
| Filter::Reference test.
| Filter::Stream test.
@@ -43,6 +38,17 @@ subversions are available from <http://www.newts.org/~troc/poe.html>.
(!!!) Documented that $kernel->fork() was broken in 0.1006.
+Added Filter::Block to 00_coverage.t.
+
+Added t/14_wheels_ft.t to test Wheel::FollowTail and Filter::Block.
+
+FollowTail would go into an infinite loop while discarding initial
+data. It wasn't checking drivers' get() methods' return values for
+emptiness.
+
+Driver::SysRW's get() method wasn't checking sysread's return value
+for definedness.
+
Dieter Pearcey sent in a patch to FollowTail that lets programs
specify how far to seek back into a file before tailing it. When
SeekBack is used, everything up to the current end of file is
@@ -52,6 +58,10 @@ samples/followtail.perl still works, which is encouraging.
Dieter Pearcey also submitted Filter::Block, a fixed-length block
filter. This filter is not yet tested.
+Translated Filter::Block's internal object into an anonymous list
+reference. This will be more efficient, and I understand Dieter's
+application may need it.
+
Windows compatibility seems to have gone all to heck in this version.
I'm not sure why, either, since I didn't do anything specific to
Windows but enhance its support.
View
@@ -72,3 +72,4 @@ t/10_wheels_tcp.t
t/11_signals_poe.t
t/12_signals_ev.t
t/13_wheels_udp.t
+t/14_wheels_ft.t
View
@@ -7,7 +7,7 @@ use strict;
use Carp;
use vars qw($VERSION);
-$VERSION = 0.1008;
+$VERSION = 0.1009;
sub import {
my $self = shift;
View
@@ -61,6 +61,8 @@ filehandle. It returns a reference to an array of received data
chunks. The array may be empty if nothing could be read. The array
reference it returns is a suitable parameter to POE::Filter::get().
+get() returns undef on an error.
+
Wheels usually call the get() method from their read select states.
=item *
View
@@ -71,7 +71,7 @@ sub get {
my ($self, $handle) = @_;
my $result = sysread($handle, my $buffer = '', $self->[BLOCK_SIZE]);
- if ($result || ($! == EAGAIN)) {
+ if (defined $result and ($result || ($! == EAGAIN))) {
$! = 0;
[ $buffer ];
}
View
@@ -3,17 +3,32 @@
package POE::Filter::Block;
use strict;
+use Carp qw(croak);
+
+sub BLOCK_SIZE () { 0 }
+sub FRAMING_BUFFER () { 1 }
#------------------------------------------------------------------------------
sub new {
my $type = shift;
+ croak "$type must be given an even number of parameters" if @_ & 1;
+ my %params = @_;
+
+ my $block_size =
+ ( (exists $params{BlockSize})
+ ? ( ($params{BlockSize} < 1)
+ ? 512
+ : $params{BlockSize}
+ )
+ : 512
+ );
+
+ my $self =
+ bless [ $block_size,
+ '',
+ ], $type;
- my $self = { blocksize => abs(shift) || 512,
- framing_buffer => ''
- };
-
- bless $self, $type;
$self;
}
@@ -22,11 +37,12 @@ sub new {
sub get {
my ($self, $stream) = @_;
- $self->{framing_buffer} .= join '', @{$stream};
+ $self->[FRAMING_BUFFER] .= join '', @{$stream};
my @blocks;
- while (length $self->{framing_buffer} >= $self->{blocksize}) {
- push @blocks, substr($self->{framing_buffer}, 0, $self->{blocksize}, "");
+ while (length($self->[FRAMING_BUFFER]) >= $self->[BLOCK_SIZE]) {
+ push @blocks, substr($self->[FRAMING_BUFFER], 0, $self->[BLOCK_SIZE]);
+ substr($self->[FRAMING_BUFFER], 0, $self->[BLOCK_SIZE]) = '';
}
\@blocks;
@@ -44,8 +60,8 @@ sub put {
sub get_pending {
my $self = shift;
- return unless $self->{framing_buffer};
- [ $self->{framing_buffer ];
+ return unless $self->[FRAMING_BUFFER];
+ [ $self->[FRAMING_BUFFER] ];
}
###############################################################################
@@ -59,7 +75,7 @@ POE::Filter::Block - POE Block Protocol Abstraction
=head1 SYNOPSIS
- $filter = new POE::Filter::Block(1024);
+ $filter = new POE::Filter::Block( BlockSize => 1024 );
$arrayref_of_blocks =
$filter->get($arrayref_of_raw_chunks_from_driver);
$arrayref_of_streamable_chunks_for_driver =
@@ -58,6 +58,8 @@ sub new {
# Discard partial input chunks unless a SeekBack was specified.
unless (exists $params{SeekBack}) {
while (defined(my $raw_input = $driver->get($handle))) {
+ # Skip out if there's no more input.
+ last unless @$raw_input;
$filter->get($raw_input);
}
}
@@ -97,7 +99,7 @@ sub _define_states {
# subroutine starts here
my ($k, $ses, $hdl) = @_[KERNEL, SESSION, ARG0];
- while (defined(my $raw_input = $driver->get($hdl))) {
+ if (defined(my $raw_input = $driver->get($hdl))) {
foreach my $cooked_input (@{$filter->get($raw_input)}) {
$k->call($ses, $$event_input, $cooked_input);
}
View
@@ -8,7 +8,7 @@
use strict;
use lib qw(./lib ../lib);
use TestSetup;
-&test_setup(11);
+&test_setup(12);
sub load_optional_module {
my ($test_number, $module) = @_;
@@ -51,9 +51,10 @@ sub load_required_module {
&load_optional_module( 8, 'POE::Filter::Reference');
&load_optional_module( 9, 'POE::Wheel::FollowTail');
&load_optional_module(10, 'POE::Wheel::ListenAccept');
+&load_optional_module(11, 'POE::Filter::Block');
# And one to grow on.
-print "ok 11\n";
+print "ok 12\n";
exit;
View
@@ -0,0 +1,174 @@
+#!/usr/bin/perl /w
+# $Id$
+
+# Exercises Wheel::FollowTail, Wheel::ReadWrite, and Filter::Block.
+
+use strict;
+use lib qw(./lib ../lib);
+use Socket;
+
+use TestSetup;
+&test_setup(8);
+
+# Turn on all asserts.
+# sub POE::Kernel::TRACE_DEFAULT () { 1 }
+sub POE::Kernel::ASSERT_DEFAULT () { 1 }
+use POE qw( Component::Server::TCP
+ Wheel::FollowTail
+ Wheel::ReadWrite
+ Wheel::SocketFactory
+ Filter::Line
+ Filter::Block
+ Driver::SysRW
+ );
+
+my $tcp_server_port = 31909;
+my $max_send_count = 10; # expected to be even
+
+# Congratulations! We made it this far!
+&ok(1);
+
+###############################################################################
+# A generic server session.
+
+sub sss_new {
+ my ($socket, $peer_addr, $peer_port) = @_;
+ POE::Session->create
+ ( inline_states =>
+ { _start => \&sss_start,
+ _stop => \&sss_stop,
+ got_block => \&sss_block,
+ got_flush => \&sss_flush,
+ ev_timeout => sub { delete $_[HEAP]->{wheel} },
+ },
+ args => [ $socket, $peer_addr, $peer_port ],
+ );
+}
+
+sub sss_start {
+ my ($heap, $socket, $peer_addr, $peer_port) = @_[HEAP, ARG0..ARG2];
+
+ delete $heap->{wheel};
+ $heap->{wheel} = POE::Wheel::FollowTail->new
+ ( Handle => $socket,
+ Driver => POE::Driver::SysRW->new( BlockSize => 24 ),
+ Filter => POE::Filter::Block->new( BlockSize => 16 ),
+ InputState => 'got_block',
+ ErrorState => 'got_error',
+ );
+
+ &ok_if(2, defined $heap->{wheel});
+ $heap->{read_count} = 0;
+}
+
+sub sss_block {
+ my ($kernel, $heap, $block) = @_[KERNEL, HEAP, ARG0];
+ $heap->{read_count}++;
+ $kernel->delay( ev_timeout => 2 );
+}
+
+sub sss_stop {
+ warn "session ", $_[SESSION]->ID, " read $_[HEAP]->{read_count} blocks";
+}
+
+###############################################################################
+# A TCP socket client.
+
+sub client_tcp_start {
+ my $heap = $_[HEAP];
+
+ $heap->{wheel} = POE::Wheel::SocketFactory->new
+ ( RemoteAddress => '127.0.0.1',
+ RemotePort => $tcp_server_port,
+ SuccessState => 'got_server',
+ FailureState => 'got_error',
+ );
+
+ &ok_if(3, defined $heap->{wheel});
+}
+
+sub client_tcp_stop {
+ &ok_if(4, $_[HEAP]->{put_count} == $max_send_count);
+ &ok_if(5, $_[HEAP]->{flush_count} == $_[HEAP]->{put_count} / 2);
+}
+
+sub client_tcp_connected {
+ my ($kernel, $heap, $server_socket) = @_[KERNEL, HEAP, ARG0];
+
+ delete $heap->{wheel};
+ $heap->{wheel} = POE::Wheel::ReadWrite->new
+ ( Handle => $server_socket,
+ Driver => POE::Driver::SysRW->new( BlockSize => 32 ),
+ Filter => POE::Filter::Block->new( BlockSize => 16 ),
+ ErrorState => 'got_error',
+ FlushedState => 'got_flush',
+ );
+
+ &ok_if(6, defined $heap->{wheel});
+
+ $heap->{flush_count} = 0;
+ $heap->{put_count} = 0;
+
+ $kernel->yield( 'got_alarm' );
+}
+
+sub client_tcp_got_alarm {
+ my ($kernel, $heap, $line) = @_[KERNEL, HEAP, ARG0];
+
+ $heap->{wheel}->put( '0123456789ABCDEF0123456789ABCDEF' );
+
+ $heap->{put_count} += 2;
+ if ($heap->{put_count} < $max_send_count) {
+ $kernel->delay( got_alarm => 1 );
+ }
+}
+
+sub client_tcp_got_error {
+ my ($operation, $errnum, $errstr) = @_[ARG0..ARG2];
+ warn "$operation error $errnum: $errstr";
+}
+
+sub client_tcp_got_flush {
+ $_[HEAP]->{flush_count}++;
+ # Delays destruction until all data is out.
+ delete $_[HEAP]->{wheel} if $_[HEAP]->{put_count} >= $max_send_count;
+}
+
+###############################################################################
+# Start the TCP server and client.
+
+POE::Component::Server::TCP->new
+ ( Port => $tcp_server_port,
+ Acceptor => sub { &sss_new(@_[ARG0..ARG2]);
+ # This next badness is just for testing.
+ my $sockname = $_[HEAP]->{listener}->getsockname();
+ delete $_[HEAP]->{listener};
+
+ my ($port, $addr) = sockaddr_in($sockname);
+ $addr = inet_ntoa($addr);
+ &ok_if( 7,
+ ($addr eq '0.0.0.0') &&
+ ($port == $tcp_server_port)
+ )
+ },
+ );
+
+POE::Session->create
+ ( inline_states =>
+ { _start => \&client_tcp_start,
+ _stop => \&client_tcp_stop,
+ got_server => \&client_tcp_connected,
+ got_error => \&client_tcp_got_error,
+ got_flush => \&client_tcp_got_flush,
+ got_alarm => \&client_tcp_got_alarm,
+ }
+ );
+
+### main loop
+
+$poe_kernel->run();
+
+&ok(8);
+&results;
+
+exit;

0 comments on commit 6c5e3e8

Please sign in to comment.