Skip to content
Browse files

Merge branch 'pr/10' (fixes #10)

  • Loading branch information...
2 parents 7e1fc01 + 8789195 commit 622144917b047cad2a8d0d0b28cc21309b3b5437 @kazuho committed
Showing with 74 additions and 5 deletions.
  1. +26 −5 lib/Parallel/Prefork.pm
  2. +48 −0 t/06-wait-all-children-with-timeout.t
View
31 lib/Parallel/Prefork.pm
@@ -189,15 +189,33 @@ sub _action_for {
}
sub wait_all_children {
- my $self = shift;
+ my ($self, $timeout) = @_;
$self->{_no_adjust_until} = undef;
- while (%{$self->{worker_pids}}) {
- if (my ($pid) = $self->_wait(1)) {
+
+ my $call_wait = sub {
+ my $blocking = shift;
+ if (my ($pid) = $self->_wait($blocking)) {
if (delete $self->{worker_pids}{$pid}) {
$self->_on_child_reap($pid, $?);
}
}
+ };
+
+ if ($timeout) {
+ # the strategy is to use waitpid + sleep that gets interrupted by SIGCHLD
+ # but since there is a race condition bet. waitpid and sleep, the argument
+ # to sleep should be set to a small number (and we use 1 second).
+ my $start_at = [Time::HiRes::gettimeofday];
+ while ($self->num_workers != 0 && Time::HiRes::tv_interval($start_at) < $timeout) {
+ $call_wait->(0);
+ sleep 1;
+ }
+ } else {
+ while ($self->num_workers != 0) {
+ $call_wait->(1);
+ }
}
+ return $self->num_workers;
}
sub _update_spawn_delay {
@@ -326,9 +344,12 @@ Child processes (when executed by a zero-argument call to C<start>) should call
Sends signal to all worker processes. Only usable from manager process.
-=head2 wait_all_children
+=head2 wait_all_children()
+
+=head2 wait_all_children($timeout)
-Blocks until all worker processes exit. Only usable from manager process.
+Waits until all worker processes exit or timeout (given as an optional argument in seconds) exceeds.
+The method returns the number of the worker processes still running.
=head1 AUTHOR
View
48 t/06-wait-all-children-with-timeout.t
@@ -0,0 +1,48 @@
+#! /usr/bin/perl
+
+use strict;
+use warnings;
+
+use Fcntl qw/:flock/;
+use Test::More tests => 4;
+
+use Parallel::Prefork;
+
+my $reaped = 0;
+my $pm = Parallel::Prefork->new({
+ max_workers => 3,
+ fork_delay => 0,
+ on_child_reap => sub {
+ $reaped++;
+ }
+});
+
+my $sig_retain_cnt = 1;
+$pm->after_fork(sub {
+ $sig_retain_cnt++;
+});
+
+my $manager_pid = $$;
+
+until ($pm->signal_received) {
+ $pm->start and next;
+
+ my $rcv = 0;
+ local $SIG{TERM} = sub { $rcv++ };
+
+ if ($sig_retain_cnt == $pm->max_workers) {
+ kill 'TERM', $manager_pid;
+ }
+
+ sleep(100) while $rcv < $sig_retain_cnt;
+
+ $pm->finish;
+}
+is $pm->wait_all_children(1), 2, 'should reap one worker.';
+$pm->signal_all_children('TERM');
+is $pm->wait_all_children(1), 1, 'should reap one worker.';
+$pm->signal_all_children('TERM');
+$pm->wait_all_children();
+is $pm->num_workers, 0, 'all workers reaped.';
+
+is($reaped, $pm->max_workers, "properly called on_child_reap callback");

0 comments on commit 6221449

Please sign in to comment.
Something went wrong with that request. Please try again.