Permalink
Browse files

Chris Williams pointed out that arbitrary data wasn't being passed

back to users.  Events were being split apart from arbitrary data, and
then just the event was yielded back to the component for retries and
internal ping queueing.  This patch adds a modified test case, fixes
the bug, and bumps the version to 1.12 for release.
  • Loading branch information...
1 parent ae66fe8 commit f0db3c58b2c5caaaac26654429f412f4f7a213fe @rcaputo committed Jun 12, 2006
Showing with 164 additions and 116 deletions.
  1. +1 −0 MANIFEST
  2. +128 −116 Ping.pm
  3. +35 −0 t/02_arbitrary_data.t
View
1 MANIFEST
@@ -5,3 +5,4 @@ Makefile.PL
Ping.pm
README
t/01_ping.t
+t/02_arbitrary_data.t
View
244 Ping.pm
@@ -24,7 +24,7 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS);
);
use vars qw($VERSION $PKTSIZE);
-$VERSION = '1.11';
+$VERSION = '1.12';
$PKTSIZE = $^O eq 'linux' ? 3_000 : 100;
use Carp qw(croak);
@@ -49,10 +49,10 @@ sub spawn {
my %params = @_;
croak "$type requires root privilege" if (
- $> and ($^O ne "VMS") and
- ($^O ne "cygwin") and
- not defined $params{Socket}
- );
+ $> and ($^O ne "VMS") and
+ ($^O ne "cygwin") and
+ not defined $params{Socket}
+ );
my $alias = delete $params{Alias};
$alias = "pinger" unless defined $alias and length $alias;
@@ -68,22 +68,22 @@ sub spawn {
my $retry = delete $params{Retry};
croak(
- "$type doesn't know these parameters: ", join(', ', sort keys %params)
- ) if scalar keys %params;
+ "$type doesn't know these parameters: ", join(', ', sort keys %params)
+ ) if scalar keys %params;
POE::Session->create(
- inline_states => {
- _start => \&poco_ping_start,
- ping => \&poco_ping_ping,
- clear => \&poco_ping_clear,
- got_pong => \&poco_ping_pong,
- _default => \&poco_ping_default,
- },
- args => [
- $alias, $timeout, $retry, $socket, $onereply, $parallelism,
- $rcvbuf, $always_decode
- ],
- );
+ inline_states => {
+ _start => \&poco_ping_start,
+ ping => \&poco_ping_ping,
+ clear => \&poco_ping_clear,
+ got_pong => \&poco_ping_pong,
+ _default => \&poco_ping_default,
+ },
+ args => [
+ $alias, $timeout, $retry, $socket, $onereply, $parallelism,
+ $rcvbuf, $always_decode
+ ],
+ );
undef;
}
@@ -116,9 +116,10 @@ my $master_seq = 0;
sub poco_ping_start {
my (
- $kernel, $heap, $alias, $timeout, $retry, $socket, $onereply,
- $parallelism, $rcvbuf, $always_decode
- ) = @_[KERNEL, HEAP, ARG0..ARG5];
+ $kernel, $heap,
+ $alias, $timeout, $retry, $socket, $onereply, $parallelism,
+ $rcvbuf, $always_decode
+ ) = @_[KERNEL, HEAP, ARG0..ARG7];
$heap->{data} = 'Use POE!' x 7; # 56 data bytes :)
$heap->{data_size} = length($heap->{data});
@@ -141,7 +142,7 @@ sub poco_ping_start {
$kernel->select_read($heap->{socket_handle} = $socket, 'got_pong');
$heap->{keep_socket} = 1;
}
- else {
+ else {
$heap->{keep_socket} = 0;
}
@@ -162,42 +163,46 @@ sub ICMP_PORT () { 0 }
# Create a raw socket to send ICMP packets down.
# (optionally) mess with the size of the buffers on the socket.
sub create_handle {
- my ($kernel, $heap) = @_;
- DEBUG_SOCKET and warn "opening a raw socket for icmp";
-
- my $protocol = (getprotobyname('icmp'))[2]
- or die "can't get icmp protocol by name: $!";
-
- my $socket = gensym();
- socket($socket, PF_INET, SOCK_RAW, $protocol)
- or die "can't create icmp socket: $!";
-
- $heap->{socket_handle} = $socket;
-
- if ($heap->{rcvbuf}) {
- unless (
- setsockopt($socket, SOL_SOCKET, SO_RCVBUF, pack("I", $heap->{rcvbuf}))
- ) {
- warn("setsockopt rcvbuf size ($heap->{rcvbuf}) failed: $!");
- }
- }
- if ($heap->{parallelism} && $heap->{parallelism} == -1) {
- my $rcvbuf = getsockopt($socket, SOL_SOCKET, SO_RCVBUF);
- if ($rcvbuf) {
- my $size = unpack("I", $rcvbuf);
- $heap->{parallelism} = int($size / $PKTSIZE);
- }
- }
-
- $kernel->select_read($heap->{socket_handle} = $socket, 'got_pong');
+ my ($kernel, $heap) = @_;
+ DEBUG_SOCKET and warn "opening a raw socket for icmp";
+
+ my $protocol = (getprotobyname('icmp'))[2]
+ or die "can't get icmp protocol by name: $!";
+
+ my $socket = gensym();
+ socket($socket, PF_INET, SOCK_RAW, $protocol)
+ or die "can't create icmp socket: $!";
+
+ $heap->{socket_handle} = $socket;
+
+ if ($heap->{rcvbuf}) {
+ unless (
+ setsockopt($socket, SOL_SOCKET, SO_RCVBUF, pack("I", $heap->{rcvbuf}))
+ ) {
+ warn("setsockopt rcvbuf size ($heap->{rcvbuf}) failed: $!");
+ }
+ }
+ if ($heap->{parallelism} && $heap->{parallelism} == -1) {
+ my $rcvbuf = getsockopt($socket, SOL_SOCKET, SO_RCVBUF);
+ if ($rcvbuf) {
+ my $size = unpack("I", $rcvbuf);
+ $heap->{parallelism} = int($size / $PKTSIZE);
+ }
+ }
+
+ $kernel->select_read($heap->{socket_handle} = $socket, 'got_pong');
}
# Request a ping. This code borrows heavily from Net::Ping.
sub poco_ping_ping {
my (
- $kernel, $heap, $sender, $event, $address, $timeout, $retry, $optpostback
- ) = @_[KERNEL, HEAP, SENDER, ARG0, ARG1, ARG2, ARG3, ARG4];
+ $kernel, $heap, $sender,
+ $event, $address, $timeout, $retry, $optpostback
+ ) = @_[
+ KERNEL, HEAP, SENDER,
+ ARG0, ARG1, ARG2, ARG3, ARG4
+ ];
# When doing retries, the pinger session will request the ping and
# therefore the sender info is bogus. So, for retries we stash all the
@@ -253,10 +258,13 @@ sub poco_ping_ping {
);
# Record information about the ping request.
- my @user_args = ();
+
+ my ($event_name, @user_args);
if (ref($event) eq "ARRAY") {
- @user_args = @{ $event };
- $event = shift @user_args;
+ ($event_name, @user_args) = @$event;
+ }
+ else {
+ $event_name = $event;
}
# Build an address to send the ping at.
@@ -269,7 +277,7 @@ sub poco_ping_ping {
# the postback behavior.
unless (defined $usable_address) {
$kernel->post(
- $sender, $event,
+ $sender, $event_name,
[ $address, # REQ_ADDRESS
$timeout, # REQ_TIMEOUT
time(), # REQ_TIME
@@ -288,22 +296,23 @@ sub poco_ping_ping {
push(@{$heap->{queue}}, $master_seq);
$heap->{pending}->{$master_seq} = [
- $msg,
- $socket_address,
- $sender,
- $event,
- $address,
- $timeout,
- $optpostback
- ];
+ $msg, # PEND_MSG
+ $socket_address, # PEND_ADDR
+ $sender, # PEND_SENDER
+ $event, # PEND_EVENT
+ $address, # PEND_ADDR ???
+ $timeout, # PEND_TIMEOUT
+ $optpostback, # PEND_OPTPOSTBACK
+ ];
+
if ($retry && $retry > 1) {
$heap->{retrydata}->{$master_seq} = [
- $sender,
- $event,
- $address,
- $timeout,
- $retry
- ];
+ $sender, # RD_SENDER
+ $event, # RD_EVENT
+ $address, # RD_ADDRESS
+ $timeout, # RD_TIMEOUT
+ $retry, # RD_RETRY
+ ];
}
_send_packet($kernel, $heap);
@@ -317,28 +326,31 @@ sub _send_packet {
# We want to throttle back since we're still waiting for pings
# so, let's just leave this till later
DEBUG and warn(
- "throttled since there are $heap->{outstanding} pings outstanding. " .
- "queue size=" . (scalar @{$heap->{queue}}) . "\n"
- );
+ "throttled since there are $heap->{outstanding} pings outstanding. " .
+ "queue size=" . (scalar @{$heap->{queue}}) . "\n"
+ );
return;
}
my $seq = shift(@{$heap->{queue}});
- # May have been cleared by caller
+ # May have been cleared by caller
return unless (exists $heap->{pending}->{$seq});
my $ping_info = delete $heap->{pending}->{$seq};
my (
- $msg, $socket_address, $sender, $event, $address, $timeout, $optpostback
- ) = @$ping_info;
+ $msg, # PEND_MSG
+ $socket_address, # PEND_ADDR
+ $sender, # PEND_SENDER
+ $event, # PEND_EVENT
+ $address, # PEND_ADDR ???
+ $timeout, # PEND_TIMEOUT
+ $optpostback, # PEND_OPTPOSTBACK
+ ) = @$ping_info;
# Send the packet. If send() fails, then we bail with an error.
my @user_args = ();
- if (ref($event) eq "ARRAY") {
- @user_args = @{ $event };
- $event = shift @user_args;
- }
+ ($event, @user_args) = @$event if ref($event) eq "ARRAY";
DEBUG and warn "sending packet sequence number $seq\n";
unless (send($heap->{socket_handle}, $msg, ICMP_FLAGS, $socket_address)) {
@@ -370,20 +382,20 @@ sub _send_packet {
if ($optpostback) {
$heap->{ping_by_seq}->{$seq} = $optpostback;
}
- else {
+ else {
$heap->{ping_by_seq}->{$seq} = [
- # PBS_POSTBACK
- $sender->postback(
- $event,
- $address, # REQ_ADDRESS
- $timeout, # REQ_TIMEOUT
- time(), # REQ_TIME
- @user_args, # REQ_USER_ARGS
- ),
- "$sender", # PBS_SESSION (stringified to weaken reference)
- $address, # PBS_ADDRESS
- time() # PBS_REQUEST_TIME
- ];
+ # PBS_POSTBACK
+ $sender->postback(
+ $event,
+ $address, # REQ_ADDRESS
+ $timeout, # REQ_TIMEOUT
+ time(), # REQ_TIME
+ @user_args, # REQ_USER_ARGS
+ ),
+ "$sender", # PBS_SESSION (stringified to weaken reference)
+ $address, # PBS_ADDRESS
+ time() # PBS_REQUEST_TIME
+ ];
}
# Duplicate pings? Forcibly time out the previous one.
@@ -410,18 +422,18 @@ sub poco_ping_clear {
if (defined $address) {
# Don't bother if we don't have it.
- if (!exists $heap->{addr_to_seq}->{$sender}->{$address}) {
- delete $heap->{pending}->{$sender}->{$address};
- return;
- }
+ if (!exists $heap->{addr_to_seq}->{$sender}->{$address}) {
+ delete $heap->{pending}->{$sender}->{$address};
+ return;
+ }
# Stop mapping the sender+address pair to that sequence number.
my $seq = delete $heap->{addr_to_seq}->{$sender}->{$address};
# Stop tracking the sender if that was the last address.
delete $heap->{addr_to_seq}->{$sender} unless (
- scalar(keys %{$heap->{addr_to_seq}->{$sender}})
- );
+ scalar(keys %{$heap->{addr_to_seq}->{$sender}})
+ );
# Discard the postback for the discarded sequence number.
DEBUG_PBS and warn "removing ping_by_seq($seq)";
@@ -470,15 +482,15 @@ sub _end_ping {
# Stop mapping the session+address to this sequence number.
delete(
- $heap->{addr_to_seq}->{
- $ping_info->[PBS_SESSION]
- }->{$ping_info->[PBS_ADDRESS]}
- );
+ $heap->{addr_to_seq}->{
+ $ping_info->[PBS_SESSION]
+ }->{$ping_info->[PBS_ADDRESS]}
+ );
# Stop tracking the session if that was the last address.
delete $heap->{addr_to_seq}->{$ping_info->[PBS_SESSION]} unless (
- scalar(keys %{$heap->{addr_to_seq}->{$ping_info->[PBS_SESSION]}})
- );
+ scalar(keys %{$heap->{addr_to_seq}->{$ping_info->[PBS_SESSION]}})
+ );
$heap->{outstanding}--;
@@ -548,9 +560,9 @@ sub poco_ping_pong {
# It's a single-reply ping. Clean up after it.
if ($heap->{onereply}) {
- _end_ping($kernel, $heap, $from_seq);
- _send_packet($kernel, $heap);
- _check_for_close($kernel, $heap);
+ _end_ping($kernel, $heap, $from_seq);
+ _send_packet($kernel, $heap);
+ _check_for_close($kernel, $heap);
}
}
@@ -572,8 +584,8 @@ sub poco_ping_default {
DEBUG and warn("retrying ping for $address\n");
my $pinginfo = _end_ping($kernel, $heap, $seq);
$kernel->yield(
- "ping", $event, $address, $timeout, $remaining-1, $pinginfo
- );
+ "ping", $event, $address, $timeout, $remaining-1, $pinginfo
+ );
return 1;
}
@@ -587,12 +599,12 @@ sub poco_ping_default {
return 1;
}
- warn "this shouldn't technically be displayed ($seq)" if (
- DEBUG and $seq =~ /^\d+$/
- );
+ warn "this shouldn't technically be displayed ($seq)" if (
+ DEBUG and $seq =~ /^\d+$/
+ );
- # Let unhandled signals pass through so we do not block SIGINT, etc.
- return 0;
+ # Let unhandled signals pass through so we do not block SIGINT, etc.
+ return 0;
}
1;
View
35 t/02_arbitrary_data.t
@@ -0,0 +1,35 @@
+#!/usr/bin/perl
+# $Id$
+# vim: filetype=perl
+
+use strict;
+use warnings;
+
+use POE qw(Component::Client::Ping);
+use Test::More tests => 1;
+
+$|=1;
+
+POE::Component::Client::Ping->spawn( Alias => 'pinger', OneReply => 1 );
+
+POE::Session->create(
+ package_states => [
+ 'main' => [ qw(_start pong) ],
+ ],
+ options => { trace => 0 },
+);
+
+POE::Kernel->run();
+exit 0;
+
+sub _start {
+ my ($kernel,$heap) = @_[KERNEL,HEAP];
+ $kernel->post( 'pinger', 'ping', [ 'pong', 'foo' ], "poe.perl.org" );
+}
+
+sub pong {
+ my ($heap, $request, $response) = @_[HEAP, ARG0, ARG1];
+ $request->[3] = "(undef)" unless defined $request->[3];
+ ok($request->[3] eq "foo", "got arbitrary data: $request->[3]");
+}
+

0 comments on commit f0db3c5

Please sign in to comment.