Permalink
Browse files

Add streaming filter change tests.

  • Loading branch information...
1 parent d85efdc commit 26ffab29a7b227b7588864fd6ac36390024f0521 @rcaputo committed Aug 12, 2001
Showing with 130 additions and 22 deletions.
  1. +130 −22 tests/19_filterchange.t
@@ -15,7 +15,7 @@ sub DEBUG () { 0 }
sub POE::Kernel::ASSERT_DEFAULT () { 1 }
use POE qw( Wheel::ReadWrite Driver::SysRW
Filter::Block Filter::Line Filter::Reference Filter::Stream
- Pipe::TwoWay
+ Pipe::OneWay Pipe::TwoWay
);
# Showstopper here. Try to build a pair of file handles. This will
@@ -31,7 +31,7 @@ unless (defined $master_read) {
}
# Set up tests, and go.
-&test_setup(36);
+&test_setup(55);
### Skim down to PARTIAL BUFFER TESTS to find the partial buffer
### get_pending tests. Those tests can run stand-alone without the
@@ -50,10 +50,6 @@ unless (defined $master_read) {
# (ref -> lin) (ref -> str) (ref -> ref) (ref -> blo)
# (blo -> lin) (blo -> str) (blo -> ref) (blo -> blo)
-# Standard block size. Things will be truncated or space-padded out
-# to this size.
-sub BLOCK_SIZE () { 128 }
-
# Symbolic constants for mode names, so we don't make typos.
sub LINE () { 'line' }
sub STREAM () { 'stream' }
@@ -110,12 +106,7 @@ my @master_script =
sub wrap_payload {
my ($mode, $payload) = @_;
- # Pad/truncate blocks.
- if ($mode eq BLOCK) {
- $payload = pack 'A' . BLOCK_SIZE, $payload;
- }
- # Change the payload into a reference.
- elsif ($mode eq REFERENCE) {
+ if ($mode eq REFERENCE) {
my $copy = $payload;
$payload = \$copy;
}
@@ -126,12 +117,7 @@ sub wrap_payload {
sub unwrap_payload {
my ($mode, $payload) = @_;
- # Unpad/truncate blocks.
- if ($mode eq BLOCK) {
- $payload = unpack 'A' . BLOCK_SIZE, $payload;
- }
- # Dereference referenced payloads.
- elsif ($mode eq REFERENCE) {
+ if ($mode eq REFERENCE) {
$payload = $$payload;
}
@@ -279,7 +265,7 @@ sub master_stop {
sub master_input {
my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0];
-
+
my $mode = $heap->{current_mode};
$input = &unwrap_payload( $mode, $input );
DEBUG and warn "M: got $mode input: $input\n";
@@ -382,7 +368,118 @@ sub master_error {
delete $heap->{wheel};
}
-### Main loop.
+### Streamed session does just about everything together.
+
+# Streamed tests:
+# (lin -> lin) (lin -> ref) (lin -> blo)
+# (ref -> lin) (ref -> ref) (ref -> blo)
+# -blo -> lin) (blo -> ref) (blo -> blo)
+
+# Script that drives the streamed test session. It must be different
+# because "stream" eats everything all at once, ruining the data
+# beyond it. That's okay with handshaking (above), but not here.
+
+my @streamed_script =
+ ( DL, # line -> line
+ 'kyriel',
+ DR, # line -> reference
+ 'coral',
+ DR, # reference -> reference
+ 'drforr',
+ DB, # reference -> block
+ 'fimmtiu',
+ DB, # block -> block
+ 'sungo',
+ DR, # block -> reference
+ 'dynweb',
+ DL, # reference -> line
+ 'sky',
+ DB, # line -> block
+ 'braderuna',
+ DL, # o/` and that brings us back to line o/`
+ 'fletch',
+
+ 'done',
+ );
+
+sub streamed_start {
+ my ($kernel, $heap) = @_[KERNEL, HEAP];
+
+ my ($read, $write) = POE::Pipe::OneWay->new();
+ die $! unless defined $read;
+
+ $heap->{stream} = POE::Wheel::ReadWrite->new
+ ( InputHandle => $read,
+ OutputHandle => $write,
+ Filter => POE::Filter::Line->new(),
+ Driver => POE::Driver::SysRW->new(),
+ InputEvent => 'got_input',
+ ErrorEvent => 'got_error',
+ );
+
+ # Start in line mode.
+ my $current_mode = $heap->{current_mode} = LINE;
+ $heap->{errors} = $heap->{current_step} = 0;
+
+ # Stream it all at once. Whee!
+ foreach my $step (@streamed_script) {
+
+ # Send whatever it is in the current mode.
+ $heap->{stream}->put( &wrap_payload( $current_mode, $step ) );
+
+ # Switch to the next mode if we should.
+ if ($step =~ /^do (\S+)/) {
+ $current_mode = $1;
+
+ if ($current_mode eq LINE) {
+ $heap->{stream}->set_output_filter( POE::Filter::Line->new() ),
+ }
+ elsif ($current_mode eq REFERENCE) {
+ $heap->{stream}->set_output_filter( POE::Filter::Reference->new() ),
+ }
+ elsif ($current_mode eq BLOCK) {
+ $heap->{stream}->set_output_filter( POE::Filter::Block->new() ),
+ }
+ else {
+ die;
+ }
+ }
+ }
+}
+
+sub streamed_input {
+ my ($kernel, $heap, $wrapped_input) = @_[KERNEL, HEAP, ARG0];
+
+ my $input = &unwrap_payload( $heap->{current_mode}, $wrapped_input );
+
+ &ok_if( 37 + $heap->{current_step},
+ ($input eq $streamed_script[$heap->{current_step}++])
+ );
+
+ if ($input =~ /^do (\S+)/) {
+ my $current_mode = $heap->{current_mode} = $1;
+
+ if ($current_mode eq LINE) {
+ $heap->{stream}->set_input_filter( POE::Filter::Line->new() ),
+ }
+ elsif ($current_mode eq REFERENCE) {
+ $heap->{stream}->set_input_filter( POE::Filter::Reference->new() ),
+ }
+ elsif ($current_mode eq BLOCK) {
+ $heap->{stream}->set_input_filter( POE::Filter::Block->new() ),
+ }
+ else {
+ die;
+ }
+
+ return;
+ }
+
+ delete $heap->{stream} if $input eq 'done';
+}
+
+
+### Handshaking tests.
# Start the slave/server session first.
POE::Session->create
@@ -407,8 +504,19 @@ POE::Session->create
}
);
-# Begin a client and a server session on either side of a socket. I
-# think this is an improvement over forking.
+### Streamed filter transition tests. These are all run together.
+### The object is to figure out how to unglom things.
+
+POE::Session->create
+ ( inline_states =>
+ { _start => \&streamed_start,
+ _stop => sub { }, # placeholder for stricture test
+ got_input => \&streamed_input,
+ }
+ );
+
+# Begin the handshaking and streaming tests. I think this is an
+# improvement over forking.
$poe_kernel->run();

0 comments on commit 26ffab2

Please sign in to comment.