Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

[refactor] implement timeout as an optional behavior of `$pm->wait_al…

…l_children`
  • Loading branch information...
commit 87891958d5384a16633b030bd2db5cf48273ab61 1 parent aef0ac9
@kazuho authored
Showing with 25 additions and 35 deletions.
  1. +23 −33 lib/Parallel/Prefork.pm
  2. +2 −2 t/06-wait-all-children-with-timeout.t
View
56 lib/Parallel/Prefork.pm
@@ -10,8 +10,6 @@ use List::Util qw/first max min/;
use Proc::Wait3 ();
use Time::HiRes ();
-use constant DEFAULT_WAIT_INTERVAL => 0.01;
-
use Class::Accessor::Lite (
rw => [ qw/max_workers spawn_interval err_respawn_interval trap_signals signal_received manager_pid on_child_reap before_fork after_fork/ ],
);
@@ -191,41 +189,35 @@ sub _action_for {
}
sub wait_all_children {
- my $self = shift;
- $self->{_no_adjust_until} = undef;
- while (%{$self->{worker_pids}}) {
- if (my ($pid) = $self->_wait(1)) {
- if (delete $self->{worker_pids}{$pid}) {
- $self->_on_child_reap($pid, $?);
- }
- }
- }
-}
-
-sub wait_all_children_with_timeout {
- my ($self, $timeout, $interval) = @_;
- return $self->wait_all_children() if !defined $timeout || $timeout <= 0;
- $interval ||= DEFAULT_WAIT_INTERVAL;
-
+ my ($self, $timeout) = @_;
$self->{_no_adjust_until} = undef;
- my $start_at = [Time::HiRes::gettimeofday];
- while (Time::HiRes::tv_interval($start_at) < $timeout) {
- if (my ($pid) = $self->_wait(0)) {
+ my $call_wait = sub {
+ my $blocking = shift;
+ if (my ($pid) = $self->_wait($blocking)) {
if (delete $self->{worker_pids}{$pid}) {
$self->_on_child_reap($pid, $?);
}
}
- last unless $self->num_workers;
- }
- continue {
- Time::HiRes::sleep $interval;
+ };
+
+ if ($timeout) {
+ # the strategy is to use waitpid + sleep that gets interrupted by SIGCHLD
+ # but since there is a race condition bet. waitpid and sleep, the argument
+ # to sleep should be set to a small number (and we use 1 second).
+ my $start_at = [Time::HiRes::gettimeofday];
+ while ($self->num_workers != 0 && Time::HiRes::tv_interval($start_at) < $timeout) {
+ $call_wait->(0);
+ sleep 1;
+ }
+ } else {
+ while ($self->num_workers != 0) {
+ $call_wait->(1);
+ }
}
-
return $self->num_workers;
}
-
sub _update_spawn_delay {
my ($self, $secs) = @_;
$self->{_no_adjust_until} = $secs ? Time::HiRes::time() + $secs : 0;
@@ -352,14 +344,12 @@ Child processes (when executed by a zero-argument call to C<start>) should call
Sends signal to all worker processes. Only usable from manager process.
-=head2 wait_all_children
-
-Blocks until all worker processes exit. Only usable from manager process.
+=head2 wait_all_children()
-=head2 wait_all_children_with_timeout($timeout, $interval = 0.01)
+=head2 wait_all_children($timeout)
-Blocks until all worker processes exit or after C<$timeout> seconds. Only usable from manager process.
-It repeated wait at predetermined intervals internally. To change the interval, specify interval by second argument.
+Waits until all worker processes exit or timeout (given as an optional argument in seconds) exceeds.
+The method returns the number of the worker processes still running.
=head1 AUTHOR
View
4 t/06-wait-all-children-with-timeout.t
@@ -38,9 +38,9 @@ until ($pm->signal_received) {
$pm->finish;
}
-is $pm->wait_all_children_with_timeout(1), 2, 'should reap one worker.';
+is $pm->wait_all_children(1), 2, 'should reap one worker.';
$pm->signal_all_children('TERM');
-is $pm->wait_all_children_with_timeout(1), 1, 'should reap one worker.';
+is $pm->wait_all_children(1), 1, 'should reap one worker.';
$pm->signal_all_children('TERM');
$pm->wait_all_children();
is $pm->num_workers, 0, 'all workers reaped.';
Please sign in to comment.
Something went wrong with that request. Please try again.