From e7ab3710f04f33d47248c47d2403b5e1f7dc5b43 Mon Sep 17 00:00:00 2001 From: Walther Lalk Date: Fri, 12 Oct 2018 11:22:38 +0200 Subject: [PATCH 1/3] Add isConsuming method to job manager --- src/DelayedJob/JobManager.php | 8 ++++++++ src/DelayedJob/ManagerInterface.php | 5 +++++ src/TestSuite/TestManager.php | 8 ++++++++ 3 files changed, 21 insertions(+) diff --git a/src/DelayedJob/JobManager.php b/src/DelayedJob/JobManager.php index 668cd54..7a11381 100644 --- a/src/DelayedJob/JobManager.php +++ b/src/DelayedJob/JobManager.php @@ -679,6 +679,14 @@ public function stopConsuming() ->stopConsuming(); } + /** + * @return bool + */ + public function isConsuming(): bool + { + return $this->consuming; + } + /** * @param \DelayedJobs\DelayedJob\Job $job * @return void diff --git a/src/DelayedJob/ManagerInterface.php b/src/DelayedJob/ManagerInterface.php index 72ba3d1..e13eace 100644 --- a/src/DelayedJob/ManagerInterface.php +++ b/src/DelayedJob/ManagerInterface.php @@ -89,6 +89,11 @@ public function startConsuming(); */ public function stopConsuming(); + /** + * @return bool + */ + public function isConsuming(): bool; + /** * @param \DelayedJobs\DelayedJob\Job $job * @return void diff --git a/src/TestSuite/TestManager.php b/src/TestSuite/TestManager.php index 604fe15..01ed0ae 100644 --- a/src/TestSuite/TestManager.php +++ b/src/TestSuite/TestManager.php @@ -179,4 +179,12 @@ public function stopConsuming() public function requeueJob(Job $job) { } + + /** + * @return bool + */ + public function isConsuming(): bool + { + return false; + } } From 53b11817aa50c4fab700c5939e1e491406f5f9b2 Mon Sep 17 00:00:00 2001 From: Walther Lalk Date: Fri, 12 Oct 2018 11:23:34 +0200 Subject: [PATCH 2/3] Wait until current job is finished before stopping the worker --- src/Shell/WorkerShell.php | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/Shell/WorkerShell.php b/src/Shell/WorkerShell.php index 15c75e9..b7e6d87 100644 --- a/src/Shell/WorkerShell.php +++ b/src/Shell/WorkerShell.php @@ -87,6 +87,8 @@ class WorkerShell extends AppShell implements EventListenerInterface * @var float */ protected $_timeOfLastJob; + protected $_signalReceived = null; + protected $_busy = false; /** * @inheritDoc @@ -130,11 +132,26 @@ public function startup() parent::startup(); } + /** + * @return void + */ + protected function _processKillSignal($signal) + { + if (!$this->_busy) { + $this->stopHammerTime($signal); + } + + $this->_signalReceived = $signal; + $this->_manager->stopConsuming(); + + $this->out('' . __('Received {signal}, will shutdown once current job is completed.', ['signal' => $signal]) . ''); + } + protected function _enableListeners() { $this->ProcessManager->getEventManager() ->on('CLI.signal', function (Event $event) { - $this->stopHammerTime(ProcessManagerTask::$signals[$event->getData('signo')] ?? 'Unknown'); + $this->_processKillSignal(ProcessManagerTask::$signals[$event->getData('signo')] ?? 'Unknown'); }); $this->ProcessManager->handleKillSignals(); } @@ -165,7 +182,9 @@ public function stopHammerTime($reason, $exitCode = 0) { $this->out('Shutting down...'); - $this->_manager->stopConsuming(); + if ($this->_manager->isConsuming()) { + $this->_manager->stopConsuming(); + } if ($this->_worker) { $this->_worker->status = WorkersTable::STATUS_DEAD; @@ -274,6 +293,10 @@ public function heartbeat() protected function _checkSuicideStatus() { + if ($this->_signalReceived) { + $this->stopHammerTime($this->_signalReceived); + } + if ($this->_suicideMode['enabled'] !== true) { return; } @@ -299,6 +322,8 @@ public function beforeExecute(Event $event, Job $job) return false; } + $this->_busy = true; + cli_set_process_title(sprintf('DJ Worker :: %s :: Working %s', $this->_workerId, $job->getId())); $this->out(__('Job: {0}', $job->getId())); @@ -386,6 +411,7 @@ public function afterCompleted(Event $event, ResultInterface $result) $this->stopHammerTime(Worker::SHUTDOWN_ERROR, self::WORKER_ERROR_EXIT_CODE); } + $this->_busy = false; $this->_timeOfLastJob = microtime(true); $this->_checkSuicideStatus(); From ee3699c1751c1c6477a0a1f3c3119a3d33edec35 Mon Sep 17 00:00:00 2001 From: Walther Lalk Date: Fri, 12 Oct 2018 11:32:28 +0200 Subject: [PATCH 3/3] Shorter line --- src/Shell/WorkerShell.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Shell/WorkerShell.php b/src/Shell/WorkerShell.php index b7e6d87..dca2970 100644 --- a/src/Shell/WorkerShell.php +++ b/src/Shell/WorkerShell.php @@ -144,7 +144,11 @@ protected function _processKillSignal($signal) $this->_signalReceived = $signal; $this->_manager->stopConsuming(); - $this->out('' . __('Received {signal}, will shutdown once current job is completed.', ['signal' => $signal]) . ''); + $this->out( + '' . + __('Received {signal}, will shutdown once current job is completed.', ['signal' => $signal]) . + '' + ); } protected function _enableListeners()