Skip to content

Commit

Permalink
Resolve Viliam Pucik's rt.cpan.org ticket 72055. Correct edge-case
Browse files Browse the repository at this point in the history
behavior when making duplicate requests from a single session.
  • Loading branch information
rcaputo committed Jan 5, 2012
1 parent 951d3a5 commit ae5ac39
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
33 changes: 19 additions & 14 deletions lib/POE/Component/Client/Ping.pm
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ sub ICMP_FLAGS () { 0 }
sub ICMP_PORT () { 0 }

# "Static" variables which will be shared across multiple instances.
# Also multiple processes.

my $master_seq = 0;

Expand Down Expand Up @@ -130,7 +129,7 @@ sub spawn {
keep_socket => (defined $socket) || 0,
onereply => $onereply,
rcvbuf => $rcvbuf,
retry => $retry,
retry => $retry // 0,
socket_handle => $socket,
timeout => $timeout,

Expand Down Expand Up @@ -343,6 +342,7 @@ sub _do_ping {
$event, # PEND_EVENT
$address, # PEND_ADDR ???
$timeout, # PEND_TIMEOUT
$is_a_retry, # PEND_IS_RETRY
];

if ($tries_left and $tries_left > 1) {
Expand All @@ -355,12 +355,12 @@ sub _do_ping {
];
}

_send_packet($kernel, $heap, $is_a_retry);
_send_next_packet($kernel, $heap);
}


sub _send_packet {
my ($kernel, $heap, $is_a_retry) = @_;
sub _send_next_packet {
my ($kernel, $heap) = @_;
return unless (scalar @{$heap->{queue}});

if ($heap->{parallelism} && $heap->{outstanding} >= $heap->{parallelism}) {
Expand All @@ -386,6 +386,7 @@ sub _send_packet {
$event, # PEND_EVENT
$address, # PEND_ADDR ???
$timeout, # PEND_TIMEOUT
$is_a_retry, # PEND_IS_RETRY
) = @$ping_info;

# Send the packet. If send() fails, then we bail with an error.
Expand Down Expand Up @@ -443,10 +444,14 @@ sub _send_packet {

# Duplicate pings? Forcibly time out the previous one.
if (exists $heap->{addr_to_seq}->{$sender}->{$address}) {
DEBUG and warn "Duplicate ping. Canceling $address";

my $now = time();
my $old_seq = delete $heap->{addr_to_seq}->{$sender}->{$address};
my $old_info = delete $heap->{ping_by_seq}->{$old_seq};
$old_info->[PBS_POSTBACK]->( undef, undef, $now, undef );
my $ping_info = _end_ping_by_requester_and_address(
$kernel, $heap, $sender, $address
);

$ping_info->[PBS_POSTBACK]->( undef, undef, $now, undef );
}

$heap->{addr_to_seq}->{$sender}->{$address} = $seq;
Expand Down Expand Up @@ -544,12 +549,12 @@ sub _end_ping_by_requester_and_address {

# Discard the postback for the discarded sequence number.
DEBUG_PBS and warn "removing ping_by_seq($seq)";
delete $heap->{ping_by_seq}->{$seq};
my $ping_info = delete $heap->{ping_by_seq}->{$seq};
$kernel->delay($seq);

$heap->{outstanding}--;

return;
return $ping_info;
}


Expand Down Expand Up @@ -640,7 +645,7 @@ sub poco_ping_pong {
# It's a single-reply ping. Clean up after it.
if ($heap->{onereply}) {
_end_ping_by_sequence($kernel, $heap, $from_seq);
_send_packet($kernel, $heap, 0);
_send_next_packet($kernel, $heap);
_check_for_close($kernel, $heap);
}
}
Expand Down Expand Up @@ -676,7 +681,7 @@ sub poco_ping_default {
# Post a timer tick back to the session. This marks the end of
# the request/response transaction.
$ping_info->[PBS_POSTBACK]->( undef, undef, $now, undef );
_send_packet($kernel, $heap, 0);
_send_next_packet($kernel, $heap);
_check_for_close($kernel, $heap);

return;
Expand Down Expand Up @@ -1002,8 +1007,8 @@ This is the ttl for the echo response packet we received.
If the ":const" tagset is imported the following constants will be
exported:
REQ_ADDRESS, REQ_TIMEOUT, REQ_TIME
REQ_USER_ARGS, RES_ADDRESS, RES_ROUNDTRIP, RES_TIME, RES_TTL
REQ_ADDRESS, REQ_TIMEOUT, REQ_TIME REQ_USER_ARGS,
RES_ADDRESS, RES_ROUNDTRIP, RES_TIME, RES_TTL
=head1 SEE ALSO
Expand Down
26 changes: 19 additions & 7 deletions t/zz_rt_72055.t
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@
# * Al least 2 ping events are created in a row (initial pings).
#
# The number of lost pings: min( Parallelism, initial pings ) - 1.
#
# Correct behavior is for 1 response to be received, and the remaining
# N-1 requests to be forcibly timed out by subsequent duplicate
# requests.

use strict;
use warnings;

use Test::More tests => 1;
use Test::More tests => 2;

use POE qw( Component::Client::Ping );

POE::Component::Client::Ping->spawn(Parallelism => 3);
POE::Component::Client::Ping->spawn(Parallelism => 10, OneReply => 1);

POE::Session->create(
inline_states => {
_start => sub {
$_[HEAP]{got} = $_[HEAP]{expected} = 0;
$_[HEAP]{got_answer} = $_[HEAP]{got_timeout} = $_[HEAP]{expected} = 0;

# It's bad technique to send all the requets at once, but we're
# doing this to expose a bug in the module's queuing logic.
Expand All @@ -39,14 +43,22 @@ POE::Session->create(

_stop => sub {
is(
$_[HEAP]{got}, $_[HEAP]{expected},
"got the right number of responses"
$_[HEAP]{got_timeout}, $_[HEAP]{expected} - 1,
"got the right number of timeouts"
);
is(
$_[HEAP]{got_answer}, 1,
"got the right number of answers"
);
},

pong => sub {
++$_[HEAP]->{got} if defined $_[ARG1]->[0];
$_[KERNEL]->yield('ping');
if (defined $_[ARG1]->[0]) {
++$_[HEAP]->{got_answer};
}
else {
++$_[HEAP]->{got_timeout};
}
},
},
);
Expand Down

0 comments on commit ae5ac39

Please sign in to comment.