Permalink
Browse files

Finish job before killing the processes

When the worker processes got a TERM signal the processes were
immediately killed. With this change they finish the current job before
terminating.
This only works when using Gearman::XS though.
  • Loading branch information...
1 parent 6ce2fb5 commit 8fad471743a1a2bce460d3e82488bf07267ab996 Othello Maurer committed Jan 13, 2012
@@ -25,11 +25,19 @@ sub add_function {
sub work {
my ($self) = @_;
+ # Make sure that the current job is processed until the end before shutting
+ # down.
+ my $must_stop;
+ local $SIG{TERM} = sub {
+ $must_stop = 1;
+ };
+
$self->worker->add_options(GEARMAN_WORKER_NON_BLOCKING);
- while (1) {
+ while (!$must_stop) {
my $ret = $self->worker->work;
- if ($ret == GEARMAN_IO_WAIT || $ret == GEARMAN_NO_JOBS ) {
+ if (!$must_stop && ($ret == GEARMAN_IO_WAIT || $ret == GEARMAN_NO_JOBS )) {
+ local $SIG{TERM} = 'DEFAULT';
$self->worker->wait;
}
elsif ($ret != GEARMAN_SUCCESS ) {
View
@@ -0,0 +1,29 @@
+use strict;
+use warnings;
+use Test::More tests => 1;
+use FindBin;
+use lib "$FindBin::Bin/lib";
+use Gearman::Driver::Test;
+use File::Slurp;
+use File::Temp qw(tempfile);
+
+
+eval { require Gearman::XS };
+my $no_gearman_xs = $@;
+
+SKIP: {
+ skip ('You need Gearman:XS for this test', 1)
+ if $no_gearman_xs;
+
+ $ENV{GEARMAN_DRIVER_ADAPTOR} = 'Gearman::Driver::Adaptor::XS';
+
+ my $test = Gearman::Driver::Test->new();
+ my $gc = $test->gearman_client;
+
+ $test->prepare('Gearman::Driver::Test::Live::Shutdown');
+
+ my ( $fh, $filename ) = tempfile( CLEANUP => 1 );
+ $gc->do_task( 'Gearman::Driver::Test::Live::Shutdown::job1' => $filename );
+ my $text = read_file($filename);
+ is( $text, "begin ...\nstarted job1 ...\ndone with job1 ...\nend ...\n", 'Worker completed task during shutdown' );
+};
@@ -12,7 +12,8 @@ use Danga::Socket;
use IO::Socket::INET;
BEGIN {
- $ENV{GEARMAN_DRIVER_ADAPTOR} = 'Gearman::Driver::Adaptor::PP';
+ $ENV{GEARMAN_DRIVER_ADAPTOR} = 'Gearman::Driver::Adaptor::PP'
+ unless $ENV{GEARMAN_DRIVER_ADAPTOR};
}
my ( $host, $port ) = ( '127.0.0.1', 4731 );
@@ -0,0 +1,39 @@
+package # hide from PAUSE
+ Gearman::Driver::Test::Live::Shutdown;
+
+use base qw(Gearman::Driver::Test::Base::All);
+use Moose;
+
+sub begin {
+ my ( $self, $job, $workload ) = @_;
+ open my $fh, ">>$workload" or die "cannot open file $workload: $!";
+ print $fh "begin ...\n";
+ close $fh;
+}
+
+sub job1 : Job : ProcessGroup(group1) {
+ my ( $self, $job, $workload ) = @_;
+
+ open my $fh, ">>$workload" or die "cannot open file $workload: $!";
+ print $fh "started job1 ...\n";
+
+ my $telnet_client = Net::Telnet->new(
+ Timeout => 30,
+ Host => '127.0.0.1',
+ Port => 47300,
+ );
+ $telnet_client->print('shutdown');
+ sleep 1;
+
+ print $fh "done with job1 ...\n";
+ close $fh;
+}
+
+sub end {
+ my ( $self, $job, $workload ) = @_;
+ open my $fh, ">>$workload" or die "cannot open file $workload: $!";
+ print $fh "end ...\n";
+ close $fh;
+}
+
+1;

0 comments on commit 8fad471

Please sign in to comment.