diff --git a/src/DelayedJob/JobManager.php b/src/DelayedJob/JobManager.php index 668cd54..7a11381 100644 --- a/src/DelayedJob/JobManager.php +++ b/src/DelayedJob/JobManager.php @@ -679,6 +679,14 @@ public function stopConsuming() ->stopConsuming(); } + /** + * @return bool + */ + public function isConsuming(): bool + { + return $this->consuming; + } + /** * @param \DelayedJobs\DelayedJob\Job $job * @return void diff --git a/src/DelayedJob/ManagerInterface.php b/src/DelayedJob/ManagerInterface.php index 72ba3d1..e13eace 100644 --- a/src/DelayedJob/ManagerInterface.php +++ b/src/DelayedJob/ManagerInterface.php @@ -89,6 +89,11 @@ public function startConsuming(); */ public function stopConsuming(); + /** + * @return bool + */ + public function isConsuming(): bool; + /** * @param \DelayedJobs\DelayedJob\Job $job * @return void diff --git a/src/Shell/WorkerShell.php b/src/Shell/WorkerShell.php index 15c75e9..dca2970 100644 --- a/src/Shell/WorkerShell.php +++ b/src/Shell/WorkerShell.php @@ -87,6 +87,8 @@ class WorkerShell extends AppShell implements EventListenerInterface * @var float */ protected $_timeOfLastJob; + protected $_signalReceived = null; + protected $_busy = false; /** * @inheritDoc @@ -130,11 +132,30 @@ public function startup() parent::startup(); } + /** + * @return void + */ + protected function _processKillSignal($signal) + { + if (!$this->_busy) { + $this->stopHammerTime($signal); + } + + $this->_signalReceived = $signal; + $this->_manager->stopConsuming(); + + $this->out( + '' . + __('Received {signal}, will shutdown once current job is completed.', ['signal' => $signal]) . + '' + ); + } + protected function _enableListeners() { $this->ProcessManager->getEventManager() ->on('CLI.signal', function (Event $event) { - $this->stopHammerTime(ProcessManagerTask::$signals[$event->getData('signo')] ?? 'Unknown'); + $this->_processKillSignal(ProcessManagerTask::$signals[$event->getData('signo')] ?? 'Unknown'); }); $this->ProcessManager->handleKillSignals(); } @@ -165,7 +186,9 @@ public function stopHammerTime($reason, $exitCode = 0) { $this->out('Shutting down...'); - $this->_manager->stopConsuming(); + if ($this->_manager->isConsuming()) { + $this->_manager->stopConsuming(); + } if ($this->_worker) { $this->_worker->status = WorkersTable::STATUS_DEAD; @@ -274,6 +297,10 @@ public function heartbeat() protected function _checkSuicideStatus() { + if ($this->_signalReceived) { + $this->stopHammerTime($this->_signalReceived); + } + if ($this->_suicideMode['enabled'] !== true) { return; } @@ -299,6 +326,8 @@ public function beforeExecute(Event $event, Job $job) return false; } + $this->_busy = true; + cli_set_process_title(sprintf('DJ Worker :: %s :: Working %s', $this->_workerId, $job->getId())); $this->out(__('Job: {0}', $job->getId())); @@ -386,6 +415,7 @@ public function afterCompleted(Event $event, ResultInterface $result) $this->stopHammerTime(Worker::SHUTDOWN_ERROR, self::WORKER_ERROR_EXIT_CODE); } + $this->_busy = false; $this->_timeOfLastJob = microtime(true); $this->_checkSuicideStatus(); diff --git a/src/TestSuite/TestManager.php b/src/TestSuite/TestManager.php index 604fe15..01ed0ae 100644 --- a/src/TestSuite/TestManager.php +++ b/src/TestSuite/TestManager.php @@ -179,4 +179,12 @@ public function stopConsuming() public function requeueJob(Job $job) { } + + /** + * @return bool + */ + public function isConsuming(): bool + { + return false; + } }