Skip to content

Commit

Permalink
Merge pull request #24 from dakota/handle-signals-better
Browse files Browse the repository at this point in the history
Handle signals better
  • Loading branch information
Johan Meiring committed Oct 12, 2018
2 parents a5f637f + ee3699c commit 4a2e722
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/DelayedJob/JobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,14 @@ public function stopConsuming()
->stopConsuming();
}

/**
* @return bool
*/
public function isConsuming(): bool
{
return $this->consuming;
}

/**
* @param \DelayedJobs\DelayedJob\Job $job
* @return void
Expand Down
5 changes: 5 additions & 0 deletions src/DelayedJob/ManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ public function startConsuming();
*/
public function stopConsuming();

/**
* @return bool
*/
public function isConsuming(): bool;

/**
* @param \DelayedJobs\DelayedJob\Job $job
* @return void
Expand Down
34 changes: 32 additions & 2 deletions src/Shell/WorkerShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class WorkerShell extends AppShell implements EventListenerInterface
* @var float
*/
protected $_timeOfLastJob;
protected $_signalReceived = null;
protected $_busy = false;

/**
* @inheritDoc
Expand Down Expand Up @@ -130,11 +132,30 @@ public function startup()
parent::startup();
}

/**
* @return void
*/
protected function _processKillSignal($signal)
{
if (!$this->_busy) {
$this->stopHammerTime($signal);
}

$this->_signalReceived = $signal;
$this->_manager->stopConsuming();

$this->out(
'<success>' .
__('Received {signal}, will shutdown once current job is completed.', ['signal' => $signal]) .
'</success>'
);
}

protected function _enableListeners()
{
$this->ProcessManager->getEventManager()
->on('CLI.signal', function (Event $event) {
$this->stopHammerTime(ProcessManagerTask::$signals[$event->getData('signo')] ?? 'Unknown');
$this->_processKillSignal(ProcessManagerTask::$signals[$event->getData('signo')] ?? 'Unknown');
});
$this->ProcessManager->handleKillSignals();
}
Expand Down Expand Up @@ -165,7 +186,9 @@ public function stopHammerTime($reason, $exitCode = 0)
{
$this->out('Shutting down...');

$this->_manager->stopConsuming();
if ($this->_manager->isConsuming()) {
$this->_manager->stopConsuming();
}

if ($this->_worker) {
$this->_worker->status = WorkersTable::STATUS_DEAD;
Expand Down Expand Up @@ -274,6 +297,10 @@ public function heartbeat()

protected function _checkSuicideStatus()
{
if ($this->_signalReceived) {
$this->stopHammerTime($this->_signalReceived);
}

if ($this->_suicideMode['enabled'] !== true) {
return;
}
Expand All @@ -299,6 +326,8 @@ public function beforeExecute(Event $event, Job $job)
return false;
}

$this->_busy = true;

cli_set_process_title(sprintf('DJ Worker :: %s :: Working %s', $this->_workerId, $job->getId()));

$this->out(__('<info>Job: {0}</info>', $job->getId()));
Expand Down Expand Up @@ -386,6 +415,7 @@ public function afterCompleted(Event $event, ResultInterface $result)
$this->stopHammerTime(Worker::SHUTDOWN_ERROR, self::WORKER_ERROR_EXIT_CODE);
}

$this->_busy = false;
$this->_timeOfLastJob = microtime(true);
$this->_checkSuicideStatus();

Expand Down
8 changes: 8 additions & 0 deletions src/TestSuite/TestManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,12 @@ public function stopConsuming()
public function requeueJob(Job $job)
{
}

/**
* @return bool
*/
public function isConsuming(): bool
{
return false;
}
}

0 comments on commit 4a2e722

Please sign in to comment.