Skip to content

Commit

Permalink
Merge pull request #65 from m0rth1um/merge
Browse files Browse the repository at this point in the history
Gentle queue abort with pcntl_signal handler
  • Loading branch information
josegonzalez committed Jul 11, 2017
2 parents 9116a52 + 554030f commit 6fdde6f
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 8 deletions.
8 changes: 4 additions & 4 deletions src/josegonzalez/Queuesadilla/Worker/Base.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,25 @@ public function shutdownHandler($signo = null)
{
$this->logger->info("Shutting down");

$signals = array(
$signals = [
SIGQUIT => "SIGQUIT",
SIGTERM => "SIGTERM",
SIGINT => "SIGINT",
SIGINT => "SIGINT",
SIGUSR1 => "SIGUSR1",
);
];

if ($signo !== null) {
$signal = $signals[$signo];
$this->logger->info(sprintf("Received received %s... Shutting down", $signal));
}

$this->disconnect();

$this->logger->info(sprintf(
"Worker shutting down after running %d iterations in %ds",
$this->iterations,
$this->runtime
));

return true;
}

Expand Down
71 changes: 70 additions & 1 deletion src/josegonzalez/Queuesadilla/Worker/SequentialWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,28 @@

namespace josegonzalez\Queuesadilla\Worker;

declare(ticks=1);

use Exception;
use josegonzalez\Queuesadilla\Worker\Base;
use josegonzalez\Queuesadilla\Engine\EngineInterface;
use Psr\Log\LoggerInterface;

class SequentialWorker extends Base
{
protected $running;

public function __construct(EngineInterface $engine, LoggerInterface $logger = null, $params = [])
{
parent::__construct($engine, $logger, $params);
pcntl_signal(SIGQUIT, [&$this, 'signalHandler']);
pcntl_signal(SIGTERM, [&$this, 'signalHandler']);
pcntl_signal(SIGINT, [&$this, 'signalHandler']);
pcntl_signal(SIGUSR1, [&$this, 'signalHandler']);

$this->running = true;
}

/**
* {@inheritDoc}
* @SuppressWarnings(PHPMD.CyclomaticComplexity)
Expand All @@ -16,12 +33,13 @@ public function work()
if (!$this->connect()) {
$this->logger()->alert(sprintf('Worker unable to connect, exiting'));
$this->dispatchEvent('Worker.job.connectionFailed');

return false;
}

$jobClass = $this->engine->getJobClass();
$time = microtime(true);
while (true) {
while ($this->running) {
if (is_int($this->maxRuntime) && $this->runtime >= $this->maxRuntime) {
$this->logger()->debug('Max runtime reached, exiting');
$this->dispatchEvent('Worker.maxRuntime');
Expand Down Expand Up @@ -84,6 +102,7 @@ public function connect()
{
$maxIterations = $this->maxIterations ? sprintf(', max iterations %s', $this->maxIterations) : '';
$this->logger()->info(sprintf('Starting worker%s', $maxIterations));

return (bool)$this->engine->connection();
}

Expand Down Expand Up @@ -113,4 +132,54 @@ public function perform($item, $job)
protected function disconnect()
{
}

public function signalHandler($signo = null)
{
$signals = [
SIGQUIT => "SIGQUIT",
SIGTERM => "SIGTERM",
SIGINT => "SIGINT",
SIGUSR1 => "SIGUSR1",
];

if ($signo !== null) {
$signal = $signals[$signo];
$this->logger->info(sprintf("Received %s... Shutting down", $signal));
}

switch ($signo) {
case SIGQUIT:
$this->logger()->debug('SIG: Caught SIGQUIT');
$this->running = false;
break;
case SIGTERM:
$this->logger()->debug('SIG: Caught SIGTERM');
$this->running = false;
break;
case SIGINT:
$this->logger()->debug('SIG: Caught CTRL+C');
$this->running = false;
break;
case SIGUSR1:
$this->logger()->debug('SIG: Caught SIGUSR1');
$this->running = false;
break;
default:
$this->logger()->debug('SIG:received other signal');
break;
}
return true;
}

public function shutdownHandler($signo = null)
{
$this->disconnect();

$this->logger->info(sprintf(
"Worker shutting down after running %d iterations in %ds",
$this->iterations,
$this->runtime
));
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public function testCleanup()

$this->Engine->push($this->Fixtures->default['first'], [
'queue' => 'default',
'expires_in' => 1
'expires_in' => 2
]);
$pop1 = $this->Engine->pop();
$this->assertEquals($pop1['id'], 1);
Expand Down
3 changes: 1 addition & 2 deletions tests/josegonzalez/Queuesadilla/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

namespace josegonzalez\Queuesadilla;

use PHPUnit_Framework_TestCase;
use ReflectionClass;

class TestCase extends PHPUnit_Framework_TestCase
class TestCase extends \PHPUnit\Framework\TestCase
{
protected function protectedMethodCall(&$object, $methodName, array $parameters = [])
{
Expand Down
24 changes: 24 additions & 0 deletions tests/josegonzalez/Queuesadilla/Worker/SequentialWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,28 @@ public function testDisconnect()
{
$this->assertNull($this->protectedMethodCall($this->Worker, 'disconnect'));
}

/**
* @covers josegonzalez\Queuesadilla\Worker\SequentialWorker::signalHandler
*/
public function testSignalHandler()
{
$this->assertEquals(true, $this->Worker->signalHandler());
$this->assertEquals(true, $this->Worker->signalHandler(SIGQUIT));
$this->assertEquals(true, $this->Worker->signalHandler(SIGTERM));
$this->assertEquals(true, $this->Worker->signalHandler(SIGINT));
$this->assertEquals(true, $this->Worker->signalHandler(SIGUSR1));
}

/**
* @covers josegonzalez\Queuesadilla\Worker\SequentialWorker::shutdownHandler
*/
public function testShutdownHandler()
{
$this->assertEquals(true, $this->Worker->shutdownHandler());
$this->assertEquals(true, $this->Worker->shutdownHandler(SIGQUIT));
$this->assertEquals(true, $this->Worker->shutdownHandler(SIGTERM));
$this->assertEquals(true, $this->Worker->shutdownHandler(SIGINT));
$this->assertEquals(true, $this->Worker->shutdownHandler(SIGUSR1));
}
}

0 comments on commit 6fdde6f

Please sign in to comment.