Skip to content

Commit

Permalink
fix jobs incorrectly terminated and poll skipt policiy improves
Browse files Browse the repository at this point in the history
  • Loading branch information
nikopol committed Aug 13, 2012
1 parent 6299161 commit 58c5b71
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 33 deletions.
6 changes: 4 additions & 2 deletions Changes
@@ -1,9 +1,11 @@
revision history for joq

0.0.22 2012-08-10
0.0.22 2012-08-13
- name becomes unique
- fix table dump
- timeout params added to client
- improve polling skipt
- fix table dump
- fix for 'error starting job'

0.0.21 2012-08-09 *rocco*
- big queue (~30k jobs) optimization
Expand Down
8 changes: 3 additions & 5 deletions lib/joq.pm
Expand Up @@ -185,16 +185,14 @@ sub stopevents {
sub setpoll {
my $sec = shift || $cfg{polling};
delete $watch{poll} if $watch{poll};
my $lastpollend = 0;
$watch{poll} = AnyEvent->timer(
after => 0,
interval => $sec,
cb => sub {
if( (time - $lastpollend) > 0 ){
if( (time - $joq::queue::pollend) > 0 ){
joq::poll();
$lastpollend = time;
} else {
log::warn('poll skipt you should increase the polling delay');
log::core('poll skipt');
}
},
);
Expand All @@ -203,7 +201,7 @@ sub setpoll {

sub poll {
my( $queued, $running, $event ) = joq::queue::poll( $softstop );
log::core("poll queued=$queued running=$running event=$event");
#log::core("queue polled return queued=$queued running=$running event=$event");
$w->send('oneshot') if !$queued && $cfg{oneshot};
$w->send('soft stop') if !$running && $softstop;
}
Expand Down
53 changes: 29 additions & 24 deletions lib/joq/job.pm
Expand Up @@ -204,7 +204,7 @@ sub setup {

sub start {
my $job = shift;
return 0 if running( $job );
return 0 if running($job);

log::debug($job->{fullname}.' starting');

Expand All @@ -217,9 +217,13 @@ sub start {
pipe my $readerr, my $writerr;
my $pid = fork;

return 0 unless defined $pid;
if( !defined $pid ){

log::error "fork error : $@";
return 0;

} elsif( $pid ) {

if( $pid ) {
close $writout;
close $writerr;
$job->{pid} = $pid;
Expand Down Expand Up @@ -420,19 +424,20 @@ sub finished {
}

sub running {
my $job = shift;
# return 0 unless $job->{pid};
# my $r = waitpid( $job->{pid}, WNOHANG );
# my $c = $?;
# if( $r > 0 ) {
# #terminated
# log::debug($job->{fullname}.' pid terminated ('.$c.')');
# finished( $job, $c );
# } elsif( $r == -1 ) {
# #not exists ?!
# log::debug($job->{fullname}.' pid don\'t exists ('.$c.')');
# finished( $job );
# }
my( $job, $syscheck ) = @_;
return 0 unless $job->{pid};
return $job->{pid} unless $syscheck;
my $r = waitpid( $job->{pid}, WNOHANG );
my $c = $?;
if( $r > 0 ) {
#terminated
log::debug($job->{fullname}.' pid terminated ('.$c.')');
finished( $job, $c );
} elsif( $r == -1 ) {
#not exists ?!
log::debug($job->{fullname}.' pid don\'t exists');
finished( $job );
}
$job->{pid}
}

Expand All @@ -443,23 +448,23 @@ sub timeout {

sub stop {
my $job = shift;
return undef unless my $jid = running( $job );
unless( kill( 15, $jid ) ) {
log::debug($job->{fullname}.' did not receive term signal');
return undef unless my $jid = running( $job, 1 );
unless( kill( TERM => $jid ) ) {
log::core($job->{fullname}.' did not receive term signal');
return 0;
}
log::debug($job->{fullname}.' receive term signal');
log::core($job->{fullname}.' receive term signal');
1
}

sub kill {
my $job = shift;
return undef unless my $jid = running( $job );
unless( kill( 9, $jid ) ) {
log::debug($job->{fullname}.' did not receive kill signal');
return undef unless my $jid = running( $job, 1 );
unless( kill( KILL => $jid ) ) {
log::core($job->{fullname}.' did not receive kill signal');
return 0;
}
log::debug($job->{fullname}.' receive kill signal');
log::core($job->{fullname}.' receive kill signal');
1
}

Expand Down
8 changes: 6 additions & 2 deletions lib/joq/queue.pm
Expand Up @@ -18,6 +18,8 @@ my $timeoutcount = 0;
my $paused = 0;
my $alone = 0;

our $pollend = 0;

our %cfg = (
maxfork => 4, #max simultaneous running
maxhistory => 16, #max done jobs kept
Expand Down Expand Up @@ -133,10 +135,11 @@ sub stopjob {
joq::job::stop( $job );
log::info('stop '.$job->{fullname});
my $count = $cfg{termtimeout} * 10;
sleep 0.1 while( $count-- && joq::job::running( $job ) );
if( joq::job::running( $job ) ) {
sleep 0.1 while( $count-- && joq::job::running( $job, 1 ) );
if( joq::job::running( $job, 1 ) ) {
log::info('kill '.$job->{fullname}.' (still running after stop)');
joq::job::kill( $job );
joq::job::finished( $job );
} else {
log::info($job->{fullname}.' softly stopped');
}
Expand Down Expand Up @@ -299,6 +302,7 @@ sub poll {
) if @jobids;
}
$polling = 0;
$pollend = time;
( scalar @jobids, scalar @runs, $nbevent );
}

Expand Down

0 comments on commit 58c5b71

Please sign in to comment.