Skip to content

Commit

Permalink
Preliminary work on compressed Filter::Reference, checked in to trans…
Browse files Browse the repository at this point in the history
…fer to FreeBSD box for testing
  • Loading branch information
rcaputo committed Jan 24, 2000
1 parent 992740b commit 09b5df4
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 24 deletions.
9 changes: 9 additions & 0 deletions lib/POE/Filter/Line.pm
Expand Up @@ -47,6 +47,15 @@ sub get_pending
###############################################################################
1;

# <Abigail> All I did was change the put function to:
# <Abigail> # Turn newlines into "\x0D\x0A". Do *not* add a trailing newline.
# <Abigail> sub put {
# <Abigail> my ($self, $lines) = @_;
# <Abigail> # Make a copy.
# <Abigail> my @raw = map {my $s = $_; $s =~ s/\n/\x0D\x0A/g; $s} @$lines;
# <Abigail> \@raw;
# <Abigail> }

__END__
=head1 NAME
Expand Down
69 changes: 54 additions & 15 deletions lib/POE/Filter/Reference.pm
Expand Up @@ -23,18 +23,37 @@ sub _default_freezer {
die "Filter::Reference requires Storable or FreezeThaw";
}

#------------------------------------------------------------------------------
# Try to acquire Compress::Zlib.

my ($compress_possible, $zlib_error) = (0, '');
BEGIN {
eval {
use Compress::Zlib qw(compress uncompress);
};
if ($@) {
$zlib_error = $@;
eval 'sub compress { @_ }';
eval 'sub uncompress { @_ }';
}
else {
$compress_possible = 1;
}
}

#------------------------------------------------------------------------------

sub new {
my($type, $freezer) = @_;
$freezer||=_default_freezer();
my $type = shift;
my($type, $freezer, $compression) = @_;
$freezer ||= _default_freezer();
# not a reference... maybe a package?
unless(ref $freezer) {
unless(exists $::{$freezer.'::'}) {
eval {require "$freezer.pm"; import $freezer ();};
croak $@ if $@;
}
unless(ref $freezer) {
unless(exists $::{$freezer.'::'}) {
eval {require "$freezer.pm"; import $freezer ();};
croak $@ if $@;
}
}

# Now get the methodes we want
my $freeze=$freezer->can('nfreeze') || $freezer->can('freeze');
Expand All @@ -50,10 +69,18 @@ sub new {
$tf=sub {$freeze->($freezer, @_)};
$tt=sub {$thaw->($freezer, @_)};
}

# Compression
$compression ||= 0;
if ($compression and !$compression_possible) {
carp "Cannot compress; Compress::Zlib load failed with error: $zlib_err";
}

my $self = bless { buffer => '',
expecting => undef,
thaw => $tt,
freeze => $tf,
compress => $compression,
}, $type;
$self;
}
Expand All @@ -72,11 +99,13 @@ sub get {
)
) {
last if (length $self->{buffer} < $self->{expecting});
push( @return,
$self->{thaw}->(substr( $self->{buffer}, 0, $self->{expecting}))
);

my $chunk = substr($self->{buffer}, 0, $self->{expecting});
substr($self->{buffer}, 0, $self->{expecting}) = '';
undef $self->{expecting};

$chunk = uncompress($chunk) if $self->{compress};
push @return, $self->{thaw}->( $chunk );
}

return \@return;
Expand All @@ -90,7 +119,7 @@ sub put {

my @raw = map {
my $frozen = $self->{freeze}->($_);
length($frozen) . "\0" . $frozen;
$frozen = compress($frozen) if $self->{compress};
} @$references;
\@raw;
}
Expand All @@ -117,7 +146,7 @@ POE::Filter::Reference - POE Freeze/Thaw Protocol Abstraction
=head1 SYNOPSIS
$filter = new POE::Filter::Something();
$filter = new POE::Filter::Reference();
$arrayref_of_perl_references =
$filter->get($arrayref_of_raw_chunks_from_driver);
$arrayref_of_serialized_perl_references =
Expand All @@ -142,8 +171,9 @@ ship data between systems with different byte orders.
POE::Filter::Reference::new( ... )
The new() method creates and initializes the reference filter. It
accepts an optional parameter to specify a serializer. The serializer
may be a package or an object.
accepts optional parameters to specify a serializer and the use of
compression. The serializer may be a package or an object; the
compression flag is a Perl "boolean" value.
A package serializer must have a thaw() function, and it must have
either a freeze() or nfreeze() function. If it has both freeze() and
Expand All @@ -158,17 +188,26 @@ reference to the reconstituted data. The freeze() and nfreeze()
methods receive $self and a reference; they should return a scalar
with the reference's serialized representation.
If the serializer parameter is undef, a default one will be used.
This lets programs specify compression without having to worry about
naming a serializer.
For example:
# Use the default filter (either Storable or FreezeThaw).
my $filter1 = new POE::Filter::Reference();
my $filter = new POE::Filter::Reference();
# Use Storable explicitly, specified by package name.
my $filter = new POE::Filter::Reference('Storable');
# Use an object.
my $filter = new POE::Filter::Reference($object);
# Use an object, with compression.
my $filter = new POE::Filter::Reference($object, 1);
# Use the default serializer, with compression.
my $filter = new POE::Filter::Reference(undef, 1);
The new() method will try to require any packages it needs.
Expand Down
8 changes: 4 additions & 4 deletions lib/POE/Session.pm
Expand Up @@ -366,10 +366,10 @@ sub register_state {

#------------------------------------------------------------------------------

sub ID {
my $self = shift;
$POE::Kernel::poe_kernel->ID_session_to_id($self);
}
#sub ID {
# my $self = shift;
# $POE::Kernel::poe_kernel->ID_session_to_id($self);
#}

#------------------------------------------------------------------------------

Expand Down
6 changes: 3 additions & 3 deletions lib/POE/Wheel/ReadWrite.pm
Expand Up @@ -80,7 +80,7 @@ sub _define_write_state {

my $writes_pending = $driver->flush($handle);
if ($!) {
$event_error && $k->call($me, $event_error, 'write', ($!+0), $!);
$event_error && $k->call( $me, $event_error, 'write', ($!+0), $! );
$k->select_write($handle);
}
elsif (defined $writes_pending) {
Expand Down Expand Up @@ -120,11 +120,11 @@ sub _define_read_state {
my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
if (defined(my $raw_input = $driver->get($handle))) {
foreach my $cooked_input (@{$filter->get($raw_input)}) {
$k->call($me, $event_input, $cooked_input)
$k->call($me, $event_input, $cooked_input);
}
}
else {
$event_error && $k->call($me, $event_error, 'read', ($!+0), $!);
$event_error && $k->call( $me, $event_error, 'read', ($!+0), $! );
$k->select_read($handle);
}
}
Expand Down
2 changes: 1 addition & 1 deletion samples/refsender.perl
Expand Up @@ -75,7 +75,7 @@ sub client_connected {
$heap->{'wheel'} = new POE::Wheel::ReadWrite
( Handle => $socket, # read/write on this handle
Driver => new POE::Driver::SysRW, # using sysread and syswrite
Filter => new POE::Filter::Reference, # parsing data as frozen refs
Filter => new POE::Filter::Reference(undef,1), # parsing refs
InputState => 'received', # generating this on input
ErrorState => 'error', # generating this on error
FlushedState => 'flushed', # generating this on flush
Expand Down
2 changes: 1 addition & 1 deletion samples/refserver.perl
Expand Up @@ -95,7 +95,7 @@ sub daemon_start {
$heap->{wheel_client} = new POE::Wheel::ReadWrite
( Handle => $handle, # on this handle
Driver => new POE::Driver::SysRW, # using sysread and syswrite
Filter => new POE::Filter::Reference, # and parsing I/O as references
Filter => new POE::Filter::Reference(undef,1), # parsing as refs
InputState => 'client', # generate this event on input
ErrorState => 'error', # generate this event on error
);
Expand Down

0 comments on commit 09b5df4

Please sign in to comment.