Permalink
Browse files

finally add Dieter Pearcey's stackable filters

  • Loading branch information...
1 parent 3919643 commit 296aa91b622a52a166454414ecce80dd699a48c3 @rcaputo committed Apr 8, 2001
Showing with 892 additions and 5 deletions.
  1. +10 −0 Changes
  2. +5 −0 MANIFEST
  3. +1 −1 README
  4. +39 −3 lib/POE.pm
  5. +117 −0 lib/POE/Filter/Grep.pm
  6. +1 −1 lib/POE/Filter/Line.pm
  7. +117 −0 lib/POE/Filter/Map.pm
  8. +178 −0 lib/POE/Filter/RecordBlock.pm
  9. +198 −0 lib/POE/Filter/Stackable.pm
  10. +162 −0 samples/tcp_watermarks.perl
  11. +64 −0 tests/24_filter_stack.t
View
10 Changes
@@ -95,6 +95,16 @@ update their programs.
0.1207 2001.??.??
-----------------
+Add Dieter Pearcey's Stackable filter, allowing ReadWrite (and other
+wheels) to pass data through two or more filters. Stackable comes
+with other helpful filters: Map and Grep to transform data while it's
+still in a filter stack, and RecordBlock to group several records into
+a block of them.
+
+Add t/24_filter_stack.t as a start for testing stackable filters.
+This so far is the only sample code for using these filters, and it's
+not very friendly in that regard.
+
Add ASSERT_USAGE. Setting this POE::Kernel debugging constant causes
it to check the values passed to its parameters and croak if they're
bad.
View
5 MANIFEST
@@ -10,9 +10,13 @@ POE/Driver.pm
POE/Driver/SysRW.pm
POE/Filter.pm
POE/Filter/Block.pm
+POE/Filter/Grep.pm
POE/Filter/HTTPD.pm
POE/Filter/Line.pm
+POE/Filter/Map.pm
+POE/Filter/RecordBlock.pm
POE/Filter/Reference.pm
+POE/Filter/Stackable.pm
POE/Filter/Stream.pm
POE/Kernel.pm
POE/Kernel/Event.pm
@@ -91,3 +95,4 @@ t/20_accept.t
t/21_gtk.t
t/22_wheel_run.t
t/23_nfa.t
+t/24_filter_stack.t
View
2 README
@@ -140,7 +140,7 @@ Event : 0.81
IO::Pty : 0.01
All tests successful.
-Files=24, Tests=455, 82 wallclock secs ( 8.34 cusr + 0.71 csys = 9.05 CPU)
+Files=24, Tests=459, 87 wallclock secs ( 8.88 cusr + 0.68 csys = 9.55 CPU)
** POE 0.1206 on Linux (titanic)
View
42 lib/POE.pm
@@ -618,6 +618,13 @@ The POE::Filter manpage covers filters in general and their common
interface. It discusses the pitfalls involved in switching filters
on a running wheel.
+=item POE::Filter::Grep
+
+Grep is part of the family of filters that includes Stackable, Map,
+and RecordBlock. It applies a regexp filter on data passing through
+it, before it reaches a Session. It's mainly used in filter stacks
+(see POE::Filter::Stackable).
+
=item POE::Filter::HTTPD
The HTTPD filter's manpage covers using POE as a web server.
@@ -628,12 +635,41 @@ The Line filter's manpage discusses how to read and write data by
lines; how to change the newline literal or regular expression; and
how to enable newline autodetection when working with strange peers.
+=item POE::Filter::Map
+
+Map is part of the family of filters that includes Stackable, Grep,
+and RecordBlock. It transforms data passing through it, before it
+reaches a Session.
+
+The Map filter is designed primarily to act as an interface between
+filters that deal with different data formats, but it can be used
+stand-alone to perform unique functions that no other filter does. In
+this case it's something of a wildcard filter.
+
+If you find yourself reusing the same custom Map filter, you may want
+to turn it into a full-fledged filter.
+
+=item POE::Filter::RecordBlock
+
+RecordBlock combines records into groups by count and flattens groups
+of records back into a record stream. For example, RecordBlock might
+combine log records into pairs.
+
=item POE::Filter::Reference
The Reference filter's manpage talks about marshalling data and
passing it between POE programs; and customizing the way data is
frozen, thawed and optionally compressed.
+=item POE::Filter::Stackable
+
+Stackable is a meta-filter designed to stack other filters. Stackable
+manages the filters it contains and passes data between them. In
+essence, the inner filters are combined into one super filter.
+
+The Map filter can also be used to perform quick and dirty functions
+that aren't implemented in any single existing filter.
+
=item POE::Filter::Stream
The Stream filter's manpage is pretty empty since it doesn't do much
@@ -779,8 +815,8 @@ often fixes them. The man's scarily good.
Dieter Pearcey is <dieter@bullfrog.perlhacker.org>. He goes by
several Japanese nicknames. Dieter patched Wheel::FollowTail to be
-more useful and has contributed the basic Filter::Block and its
-documentation.
+more useful. His Filter contributions include the basic Block filter,
+as well as Stackable, RecordBlock, Grep and Map.
=item Robert Seifer
@@ -817,7 +853,7 @@ Please contact the author if you've been forgotten.
Rocco Caputo is <troc+poe@netrus.net>. POE is his brainchild.
-Except where otherwise noted, POE is Copyright 1998-2000 Rocco Caputo.
+Except where otherwise noted, POE is Copyright 1998-2001 Rocco Caputo.
All rights reserved. POE is free software; you may redistribute it
and/or modify it under the same terms as Perl itself.
View
117 lib/POE/Filter/Grep.pm
@@ -0,0 +1,117 @@
+# 2001/01/25 shizukesa@pobox.com
+
+package POE::Filter::Grep;
+
+use strict;
+use Carp qw(croak);
+
+sub CODEBOTH () { 0 }
+sub CODEGET () { 1 }
+sub CODEPUT () { 2 }
+
+#------------------------------------------------------------------------------
+
+sub new {
+ my $type = shift;
+ croak "$type must be given an even number of parameters" if @_ & 1;
+ my %params = @_;
+
+ croak "$type requires a Code or both Get and Put parameters" unless
+ (defined($params{Code}) ||
+ (defined($params{Get}) && defined($params{Put})));
+
+ my $self = bless [ @params{qw(Code Get Put)} ], $type;
+}
+
+#------------------------------------------------------------------------------
+
+sub get {
+ my ($self, $data) = @_;
+ [ grep &{$self->[CODEGET] || $self->[CODEBOTH]}, @$data ];
+}
+
+#------------------------------------------------------------------------------
+
+sub put {
+ my ($self, $data) = @_;
+ [ grep &{$self->[CODEPUT] || $self->[CODEBOTH]}, @$data ];
+}
+
+#------------------------------------------------------------------------------
+
+sub get_pending {} # we don't track state
+
+#------------------------------------------------------------------------------
+
+sub modify {
+ my ($self, %params) = @_;
+ for (keys %params) {
+ next unless ($_ eq 'Put') || ($_ eq 'Get') || ($_ eq 'Code');
+ $self->[{Put => CODEPUT,
+ Get => CODEGET,
+ Code => CODEBOTH}->{$_}] = $params{$_};
+ }
+}
+
+###############################################################################
+
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Filter::Grep - POE Data Grepping Filter
+
+=head1 SYNOPSIS
+
+ $filter = POE::Filter::Grep->new(Code => sub {...});
+ $filter = POE::Filter::Grep->new(Put => sub {...}, Get => sub {...});
+ $arrayref_of_transformed_data = $filter->get($arrayref_of_raw_data);
+ $arrayref_of_streamable_data = $filter->put($arrayref_of_data);
+ $arrayref_of_streamable_data = $filter->put($single_datum);
+ $filter->modify(Code => sub {...});
+ $filter->modify(Put => sub {...}, Get => sub {...});
+
+=head1 DESCRIPTION
+
+The Grep filter takes the coderef or coderefs it is given using the
+Code, Get, or Put parameters and applies them to all data passing
+through get(), put(), or both, as appropriate. It it very similar to
+the C<grep> builtin function.
+
+=head1 PUBLIC FILTER METHODS
+
+=over 4
+
+=item *
+
+POE::Filter::Grep::modify
+
+Takes a list of parameters like the new() method, which should
+correspond to the new get(), put(), or general coderef that you wish
+to use.
+
+=item *
+
+See POE::Filter.
+
+=head1 SEE ALSO
+
+POE::Filter; POE::Filter::Grep; POE::Filter::Line;
+POE::Filter::Stackable; POE::Filter::Reference; POE::Filter::Stream;
+POE::Filter::RecordBlock; POE::Filter::HTTPD
+
+=head1 BUGS
+
+None known.
+
+=head1 AUTHORS & COPYRIGHTS
+
+The Grep filter was contributed by Dieter Pearcey. Rocco Caputo is
+sure to have had his hands in it.
+
+Please see the POE manpage for more information about authors and
+contributors.
+
+=cut
View
2 lib/POE/Filter/Line.pm
@@ -194,7 +194,7 @@ LINE:
# newlines are supposed to be sent. Second, add a trailing newline if
# one doesn't already exist. Since the referenced output list is
# supposed to contain one line per element, we also do a split and
-# join. Bleah.
+# join. Bleah. ... why isn't the code doing what the comment says?
sub put {
my ($self, $lines) = @_;
View
117 lib/POE/Filter/Map.pm
@@ -0,0 +1,117 @@
+# 2001/01/25 shizukesa@pobox.com
+
+package POE::Filter::Map;
+
+use strict;
+use Carp qw(croak);
+
+sub CODEBOTH () { 0 }
+sub CODEGET () { 1 }
+sub CODEPUT () { 2 }
+
+#------------------------------------------------------------------------------
+
+sub new {
+ my $type = shift;
+ croak "$type must be given an even number of parameters" if @_ & 1;
+ my %params = @_;
+
+ croak "$type requires a Code or both Get and Put parameters" unless
+ (defined($params{Code}) ||
+ (defined($params{Get}) && defined($params{Put})));
+
+ my $self = bless [ @params{qw(Code Get Put)} ], $type;
+}
+
+#------------------------------------------------------------------------------
+
+sub get {
+ my ($self, $data) = @_;
+ [ map &{$self->[CODEGET] || $self->[CODEBOTH]}, @$data ];
+}
+
+#------------------------------------------------------------------------------
+
+sub put {
+ my ($self, $data) = @_;
+ [ map &{$self->[CODEPUT] || $self->[CODEBOTH]}, @$data ];
+}
+
+#------------------------------------------------------------------------------
+
+sub get_pending {} # we don't track state
+
+#------------------------------------------------------------------------------
+
+sub modify {
+ my ($self, %params) = @_;
+ for (keys %params) {
+ next unless ($_ eq 'Put') || ($_ eq 'Get') || ($_ eq 'Code');
+ $self->[{Put => CODEPUT,
+ Get => CODEGET,
+ Code => CODEBOTH}->{$_}] = $params{$_};
+ }
+}
+
+###############################################################################
+
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Filter::Map - POE Data Mapping Filter
+
+=head1 SYNOPSIS
+
+ $filter = POE::Filter::Map->new(Code => sub {...});
+ $filter = POE::Filter::Map->new(Put => sub {...}, Get => sub {...});
+ $arrayref_of_transformed_data = $filter->get($arrayref_of_raw_data);
+ $arrayref_of_streamable_data = $filter->put($arrayref_of_data);
+ $arrayref_of_streamable_data = $filter->put($single_datum);
+ $filter->modify(Code => sub {...});
+ $filter->modify(Put => sub {...}, Get => sub {...});
+
+=head1 DESCRIPTION
+
+The Map filter takes the coderef or coderefs it is given using the
+Code, Get, or Put parameters and applies them to all data passing
+through get(), put(), or both, as appropriate. It it very similar to
+the C<map> builtin function.
+
+=head1 PUBLIC FILTER METHODS
+
+=over 4
+
+=item *
+
+POE::Filter::Map::modify
+
+Takes a list of parameters like the new() method, which should
+correspond to the new get(), put(), or general coderef that you wish
+to use.
+
+=item *
+
+See POE::Filter.
+
+=head1 SEE ALSO
+
+POE::Filter; POE::Filter::Grep; POE::Filter::Line;
+POE::Filter::Stackable; POE::Filter::Reference; POE::Filter::Stream;
+POE::Filter::RecordBlock; POE::Filter::HTTPD
+
+=head1 BUGS
+
+None known.
+
+=head1 AUTHORS & COPYRIGHTS
+
+The Map filter was contributed by Dieter Pearcey. Rocco Caputo is
+sure to have had his hands in it.
+
+Please see the POE manpage for more information about authors and
+contributors.
+
+=cut
View
178 lib/POE/Filter/RecordBlock.pm
@@ -0,0 +1,178 @@
+# 2001/01/25 shizukesa@pobox.com
+
+package POE::Filter::RecordBlock;
+
+use Carp qw(croak);
+use strict;
+
+sub BLOCKSIZE () { 0 };
+sub GETBUFFER () { 1 };
+sub PUTBUFFER () { 2 };
+sub CHECKPUT () { 3 };
+
+#------------------------------------------------------------------------------
+
+sub new {
+ my $type = shift;
+
+ croak "$type must be given an even number of parameters" if @_ & 1;
+ my %params = @_;
+
+ croak "BlockSize must be greater than 0" if
+ !defined($params{BlockSize}) || ($params{BlockSize} < 1);
+
+ my $self = bless [$params{BlockSize}, [], [], $params{CheckPut}], $type;
+}
+
+#------------------------------------------------------------------------------
+
+sub get {
+ my ($self, $data) = @_;
+ my @result;
+ push @{$self->[GETBUFFER]}, @$data;
+ while (@{$self->[GETBUFFER]} >= $self->[BLOCKSIZE]) {
+ push @result, [ splice @{$self->[GETBUFFER]}, 0, $self->[BLOCKSIZE] ];
+ }
+ \@result;
+}
+
+#------------------------------------------------------------------------------
+
+sub put {
+ my ($self, $data) = @_;
+ my @result;
+
+ if ($self->[CHECKPUT]) {
+ foreach (@$data) {
+ push @{$self->[PUTBUFFER]}, @$_;
+ }
+ while (@{$self->[PUTBUFFER]} >= $self->[BLOCKSIZE]) {
+ push @result, splice @{$self->[GETBUFFER]}, 0, $self->[BLOCKSIZE];
+ }
+ }
+ else {
+ push @result, splice(@{$self->[PUTBUFFER]}, 0);
+ foreach (@$data) {
+ push @result, @$_;
+ }
+ }
+ \@result;
+}
+
+#------------------------------------------------------------------------------
+
+sub get_pending {
+ my ($self) = @_;
+ [ splice @{$self->[GETBUFFER]}, 0, scalar @{$self->[GETBUFFER]} ];
+}
+
+#------------------------------------------------------------------------------
+
+sub put_pending {
+ my ($self) = @_;
+ return undef unless $self->[CHECKPUT];
+ [ splice @{$self->[PUTBUFFER]}, 0, scalar @{$self->[PUTBUFFER]} ];
+}
+
+#------------------------------------------------------------------------------
+
+sub blocksize {
+ my ($self, $size) = @_;
+ if (defined($size) && ($size > 0)) {
+ $self->[BLOCKSIZE] = $size;
+ }
+ $self->[BLOCKSIZE];
+}
+
+#------------------------------------------------------------------------------
+
+sub checkput {
+ my ($self, $val) = @_;
+ if (defined($val)) {
+ $self->[CHECKPUT] = $val;
+ }
+ $self->[CHECKPUT];
+}
+
+###############################################################################
+
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Filter::RecordBlock - POE Record Block Abstraction
+
+=head1 SYNOPSIS
+
+ $filter = new POE::Filter::RecordBlock( BlockSize => 4 );
+ $arrayref_of_arrayrefs = $filter->get($arrayref_of_raw_data);
+ $arrayref_of_raw_chunks = $filter->put($arrayref_of_arrayrefs);
+ $arrayref_of_raw_chunks = $filter->put($single_arrayref);
+ $arrayref_of_leftovers = $filter->get_pending;
+ $arrayref_of_leftovers = $filter->put_pending;
+
+=head1 DESCRIPTION
+
+RecordBlock translates between streams of B<records> and blocks of
+B<records>. In other words, it combines a number of received records
+into frames (array references), and it breaks frames back into streams
+of records in preparation for transmitting.
+
+A BlockSize parameter must be specified when the filter is
+constructed. It determines how many records are framed into a block,
+and it can be changed at runtime. Checking put() for proper block
+sizes is optional and can be either passed as a parameter to the new()
+method or changed at runtime.
+
+Extra records are held until enough records arrive to complete a
+block.
+
+=head1 PUBLIC FILTER METHODS
+
+=over 4
+
+=item *
+
+POE::Filter::RecordBlock::new
+
+The new() method takes at least one mandatory argument, the BlockSize
+parameter. It must be defined and greater than zero. The CheckPut
+parameter is optional, but if it contains a true value, "put"
+blocksize checking is turned on. Note that if this is the case,
+flushing pending records to be put is your responsibility (see
+put_pending()).
+
+=item *
+
+POE::Filter::RecordBlock::put_pending
+
+The put_pending() method returns an arrayref of any records that are
+waiting to be sent.
+
+=item *
+
+See POE::Filter.
+
+=back
+
+=head1 SEE ALSO
+
+POE::Filter; POE::Filter::Stackable; POE::Filter::HTTPD;
+POE::Filter::Reference; POE::Filter::Line; POE::Filter::Block;
+POE::Filter::Stream
+
+=head1 BUGS
+
+Undoubtedly.
+
+=head1 AUTHORS & COPYRIGHTS
+
+The RecordBlock filter was contributed by Dieter Pearcey. Rocco
+Caputo is sure to have had his hands in it.
+
+Please see the POE manpage for more information about authors and
+contributors.
+
+=cut
View
198 lib/POE/Filter/Stackable.pm
@@ -0,0 +1,198 @@
+# 2001/01/25 shizukesa@pobox.com
+
+package POE::Filter::Stackable;
+
+use strict;
+use Carp qw(croak);
+
+sub FILTERS () { 0 }
+
+#------------------------------------------------------------------------------
+
+sub new {
+ my $type = shift;
+ croak "$type must be given an even number of parameters" if @_ & 1;
+ my %params = @_;
+
+ my $self = bless [], $type;
+
+ $self->[FILTERS] = $params{Filters};
+
+ $self;
+}
+
+#------------------------------------------------------------------------------
+
+sub get {
+ my ($self, $data) = @_;
+ foreach my $filter (@{$self->[FILTERS]}) {
+ $data = $filter->get($data);
+ last unless @$data;
+ }
+ $data;
+}
+
+#------------------------------------------------------------------------------
+
+sub put {
+ my ($self, $data) = @_;
+ foreach my $filter (reverse @{$self->[FILTERS]}) {
+ $data = $filter->put($data);
+ last unless @$data;
+ }
+ $data;
+}
+
+#------------------------------------------------------------------------------
+
+sub get_pending {
+ my ($self) = @_;
+ my $data;
+ for (@{$self->[FILTERS]}) {
+ $_->put($data) if $data && @{$data};
+ $data = $_->get_pending;
+ }
+ $data || [];
+}
+
+#------------------------------------------------------------------------------
+
+sub filter_types {
+ map { ((ref $_) =~ /::(\w+)$/)[0] } @{$_[0]->[FILTERS]};
+}
+
+#------------------------------------------------------------------------------
+
+sub filters {
+ @{$_[0]->[FILTERS]};
+}
+
+#------------------------------------------------------------------------------
+
+sub shift {
+ my ($self) = @_;
+ my $filter = shift @{$self->[FILTERS]};
+ $self->[FILTERS]->[0]->put($filter->get_pending || []);
+ $filter;
+}
+
+#------------------------------------------------------------------------------
+
+sub unshift {
+ my ($self, @filters) = @_;
+ unshift(@{$self->[FILTERS]}, @filters);
+}
+
+#------------------------------------------------------------------------------
+
+sub push {
+ my ($self, @filters) = @_;
+ push(@{$self->[FILTERS]}, @filters);
+}
+
+#------------------------------------------------------------------------------
+
+sub pop {
+ my ($self) = @_;
+ my $filter = pop @{$self->[FILTERS]};
+ $self->[FILTERS]->[-1]->put($filter->get_pending || []);
+ $filter;
+}
+
+###############################################################################
+
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Filter::Stackable - POE Multiple Filter Abstraction
+
+=head1 SYNOPSIS
+
+ $filter = new POE::Filter::Stackable(Filters => [ $filter1, $filter2 ]);
+ $filter = new POE::Filter::Stackable;
+ $filter->push($filter1, $filter2);
+ $filter2 = $filter->pop;
+ $filter1 = $filter->shift;
+ $filter->unshift($filter1, $filter2);
+ $arrayref_for_driver = $filter->put($arrayref_of_data);
+ $arrayref_for_driver = $filter->put($single_data_element);
+ $arrayref_of_data = $filter->get($arrayref_of_raw_data);
+ $arrayref_of_leftovers = $filter->get_pending;
+ @filter_type_names = $filter->filter_types;
+ @filter_objects = $filter->filters;
+
+=head1 DESCRIPTION
+
+The Stackable filter allows the use of multiple filters within a
+single wheel. Internally, filters are stored in an array, with array
+index 0 being "near" to the wheel's handle and therefore being the
+first filter passed through using "get" and the last filter passed
+through in "put". All POE::Filter public methods are implemented as
+though data were being passed through a single filter; other program
+components do not need to know there are multiple filters.
+
+=head1 PUBLIC FILTER METHODS
+
+=over 4
+
+=item *
+
+POE::Filter::Stackable::new( ... )
+
+The new() method creates the Stackable filter. It accepts an optional
+parameter "Filters" that specifies an arrayref of initial filters. If
+no filters are given, Stackable will pass data through unchanged; this
+is true if there are no filters present at any time.
+
+=item *
+
+POE::Filter::Stackable::pop()
+POE::Filter::Stackable::shift()
+POE::Filter::Stackable::push($filter1, $filter2, ...)
+POE::Filter::Stackable::unshift($filter1, $filter2...)
+
+These methods all function identically to the perl builtin functions
+of the same name. push() and unshift() will return the new number of
+filters inside the Stackable filter.
+
+=item *
+
+POE::Filter::Stackable::filter_types
+
+The filter_types() method returns a list of types for the filters
+inside the Stackable filter, in order from near to far; for example,
+qw(Block HTTPD).
+
+=item *
+
+POE::Filter::Stackable::filters
+
+The filters() method returns a list of the objects inside the
+Stackable filter, in order from near to far.
+
+=item *
+
+See POE::Filter.
+
+=head1 SEE ALSO
+
+POE::Filter; POE::Filter::HTTPD; POE::Filter::Reference;
+POE::Filter::Line; POE::Filter::Block; POE::Filter::Stream
+
+=head1 BUGS
+
+Undoubtedly. None currently known.
+
+=head1 AUTHORS & COPYRIGHTS
+
+The Stackable filter was contributed by Dieter Pearcey. Rocco Caputo
+is sure to have had his hands in it.
+
+Please see the POE manpage for more information about authors and
+contributors.
+
+=cut
+
View
162 samples/tcp_watermarks.perl
@@ -0,0 +1,162 @@
+#!/usr/bin/perl -w
+# $Id$
+
+# This program tests the high and low watermarks. It merges the
+# wheels from wheels.perl and the chargen service from selects.perl to
+# create a wheel-based chargen service. It differs from
+# watermarks.perl in that it uses a TCP server component.
+
+use strict;
+use lib '..';
+use POE qw(Component::Server::TCP Wheel::ReadWrite Driver::SysRW Filter::Line);
+
+my $chargen_port = 32019;
+
+#==============================================================================
+# This is a simple chargen service.
+
+package Chargen::Connection;
+use POE::Session;
+
+# Create a new chargen session around a successfully accepted socket.
+sub new {
+ my ($package, $socket) = @_;
+ POE::Session->create
+ ( inline_states =>
+ { _start => \&poe_start,
+ wheel_got_flush => \&poe_got_flush,
+ wheel_got_input => \&poe_got_input,
+ wheel_got_error => \&poe_got_error,
+ wheel_throttle => \&poe_throttle,
+ wheel_resume => \&poe_resume,
+ write_chunk => \&poe_write_chunk,
+ },
+ args => [ $socket ],
+ );
+ undef;
+}
+
+# The session was set up within POE::Kernel, so it's safe to begin
+# working. Wrap a ReadWrite wheel around the socket, set up some
+# persistent variables, and begin writing chunks.
+sub poe_start {
+ $_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new
+ ( Handle => $_[ARG0],
+ Driver => POE::Driver::SysRW->new(),
+ Filter => POE::Filter::Line->new(),
+
+ InputState => 'wheel_got_input',
+ ErrorState => 'wheel_got_error',
+
+ HighMark => 256,
+ LowMark => 128,
+ HighState => 'wheel_throttle',
+ LowState => 'wheel_resume',
+ );
+
+ $_[HEAP]->{okay_to_send} = 1;
+ $_[HEAP]->{start_character} = 32;
+
+ $_[KERNEL]->yield('write_chunk');
+}
+
+# The client sent us input. Rather than leaving it on the socket,
+# we've read it to ignore it.
+sub poe_got_input {
+ warn "Chargen session ", $_[SESSION]->ID, " is ignoring some input.\n";
+}
+
+# An error occurred. Log it and stop this session. If the parent
+# hasn't stopped, then it will continue running.
+sub poe_got_error {
+ warn( "Chargen session ", $_[SESSION]->ID, " encountered ", $_[ARG0],
+ " error $_[ARG1]: $_[ARG2]\n"
+ );
+ $_[HEAP]->{okay_to_send} = 0;
+ delete $_[HEAP]->{wheel};
+}
+
+# Write a chunk of data to the client socket.
+sub poe_write_chunk {
+
+ # Sometimes a write-chunk event comes in that ought not. This race
+ # occurs because water-mark events are called synchronously, while
+ # write-chunk events are posted asynchronously. So it may not be
+ # okay to write a chunk when we get a write-chunk event.
+
+ if ($_[HEAP]->{okay_to_send}) {
+
+ # Enqueue chunks until ReadWrite->put() signals that its driver's
+ # buffer has reached (or exceeded) its high-water mark.
+
+ while (1) {
+
+ # Create a chargen line. Build a 72-column line of consecutive
+ # characters, starting with whatever character code we have
+ # stored. Wrap characters beyond "~" around to " ".
+ my $chargen_line =
+ join( '',
+ map { chr }
+ ($_[HEAP]->{start_character} .. ($_[HEAP]->{start_character}+71))
+ );
+ $chargen_line =~ tr[\x7F-\xDD][\x20-\x7E];
+
+ # Increment the start character, wrapping \x7F to \x20.
+ $_[HEAP]->{start_character} = 32
+ if (++$_[HEAP]->{start_character} > 126);
+
+ # Enqueue the line for output. Stop enqueuing lines if the
+ # buffer's high water mark is reached.
+ last if $_[HEAP]->{wheel}->put($chargen_line);
+ }
+
+ # Go around again!
+ $_[KERNEL]->yield('write_chunk');
+ }
+
+}
+
+# Be impressive. Log that the session has throttled, and set a flag
+# so spurious write-chunk events are ignored.
+
+sub poe_throttle {
+ warn "Chargen session ", $_[SESSION]->ID, " is throttled.\n";
+ $_[HEAP]->{okay_to_send} = 0;
+}
+
+# Be impressive, part two. Log that the session has resumed sending,
+# and clear the stop-writing flag. Only bother doing this if there's
+# still a handle; that way it doesn't keep looping around after an
+# error or something.
+
+sub poe_resume {
+ if (exists $_[HEAP]->{wheel}) {
+ warn "Chargen session ", $_[SESSION]->ID, " is resuming.\n";
+ $_[HEAP]->{okay_to_send} = 1;
+ $_[KERNEL]->yield('write_chunk');
+ }
+}
+
+#==============================================================================
+# Main loop. Create the server, and run it until something stops it.
+
+package main;
+
+print( "*** If all goes well, a watermarked (self-throttling) chargen\n",
+ "*** service will be listening on localhost port 32019. You can\n",
+ "*** watch it perform flow control by connecting to it over a slow\n",
+ "*** connection or with a client you can pause. The server will\n",
+ "*** throttle itself when its output buffer becomes too large, and\n",
+ "*** it will resume output when the client receives enough data.\n",
+ );
+
+POE::Component::Server::TCP->new
+ ( Port => $chargen_port,
+ Acceptor => sub {
+ Chargen::Connection->new($_[ARG0]);
+ },
+ );
+
+$poe_kernel->run();
+
+exit;
View
64 tests/24_filter_stack.t
@@ -0,0 +1,64 @@
+#!/usr/bin/perl -w
+# $Id$
+
+# Exercises Filter::Stack (and friends) without the rest of POE.
+
+use strict;
+use lib qw(./lib ../lib);
+
+use POE::Filter::Stackable;
+use POE::Filter::Grep;
+use POE::Filter::Map;
+use POE::Filter::RecordBlock;
+use POE::Filter::Line;
+
+use TestSetup;
+&test_setup(8);
+
+# Create a filter stack to test.
+
+my $filter_stack = POE::Filter::Stackable->new
+ ( Filters =>
+ [ POE::Filter::Line->new( Literal => "!" ),
+
+ # The next Map filter translates Put data from RecordBlock
+ # (arrayrefs) into scalars for Line. On the Get side, it just
+ # wraps parens around whatever Line returns.
+
+ POE::Filter::Map->new ( Put => sub { @$_ }, # scalarify puts
+ Get => sub { "((($_)))" }, # transform gets
+ ),
+ POE::Filter::Grep->new( Put => sub { 1 }, # always put
+ Get => sub { /1/ }, # only get /1/
+ ),
+
+ # RecordBlock puts arrayrefs. They pass through Grep->Put
+ # without change. RecordBlock receives whatever-- lines in this
+ # case, but only ones that match /1/ from Grep->Get.
+
+ POE::Filter::RecordBlock->new( BlockSize => 2 ),
+ ]
+ );
+
+&ok_if( 1, defined $filter_stack );
+
+my $block = $filter_stack->get( [ "test one (1)!test two (2)!" ] );
+&ok_unless( 2, @$block );
+
+$block = $filter_stack->get( [ "test three (3)!test four (100)!" ] );
+&ok_if( 3, @$block == 1 );
+&ok_if( 4, $block->[0]->[0] eq '(((test one (1))))' );
+&ok_if( 5, $block->[0]->[1] eq '(((test four (100))))' );
+
+# Make a copy of the block. Bad things happen when both blocks have
+# the same reference because we're passing by reference a lot.
+
+my $stream = $filter_stack->put( [ $block, $block ] );
+&ok_if( 6, @$stream == 4 );
+
+&ok_if( 7, $stream->[0] eq $stream->[2] );
+&ok_if( 8, $stream->[1] eq $stream->[3] );
+
+&results;
+
+exit 0;

0 comments on commit 296aa91

Please sign in to comment.