diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index 64ddf2f..fef30da 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -2,7 +2,6 @@ namespace Utopia\Queue\Adapter; -use Swoole\Constant; use Swoole\Process\Pool; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; @@ -11,9 +10,6 @@ class Swoole extends Adapter { protected Pool $pool; - /** @var callable */ - private $onStop; - public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') { parent::__construct($workerNum, $queue, $namespace); @@ -31,16 +27,13 @@ public function start(): self public function stop(): self { - if ($this->onStop) { - call_user_func($this->onStop); - } $this->pool->shutdown(); return $this; } public function workerStart(callable $callback): self { - $this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) { + $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { call_user_func($callback, $workerId); }); @@ -49,8 +42,7 @@ public function workerStart(callable $callback): self public function workerStop(callable $callback): self { - $this->onStop = $callback; - $this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) { + $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { call_user_func($callback, $workerId); }); diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 1811302..65a1624 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -131,7 +131,6 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { - $this->channel?->stopConsume(); $this->channel?->getConnection()?->close(); } diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 8c74fa1..a465676 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -217,19 +217,45 @@ public function start(): self call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } - $this->adapter->consumer->consume( - $this->adapter->queue, - function (Message $message) { - $receivedAtTimestamp = microtime(true); - Console::info("[Job] Received Job ({$message->getPid()})."); - try { - $waitDuration = microtime(true) - $message->getTimestamp(); - $this->jobWaitTime->record($waitDuration); - - $this->resources = []; - self::setResource('message', fn () => $message); + while (true) { + $this->adapter->consumer->consume( + $this->adapter->queue, + function (Message $message) { + $receivedAtTimestamp = microtime(true); + Console::info("[Job] Received Job ({$message->getPid()})."); + try { + $waitDuration = microtime(true) - $message->getTimestamp(); + $this->jobWaitTime->record($waitDuration); + + $this->resources = []; + self::setResource('message', fn () => $message); + if ($this->job->getHook()) { + foreach ($this->initHooks as $hook) { // Global init hooks + if (in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } + } + } + + foreach ($this->job->getGroups() as $group) { + foreach ($this->initHooks as $hook) { // Group init hooks + if (in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments($hook, $message->getPayload()); + \call_user_func_array($hook->getAction(), $arguments); + } + } + } + + return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); + } finally { + $processDuration = microtime(true) - $receivedAtTimestamp; + $this->processDuration->record($processDuration); + } + }, + function (Message $message) { if ($this->job->getHook()) { - foreach ($this->initHooks as $hook) { // Global init hooks + foreach ($this->shutdownHooks as $hook) { // Global init hooks if (in_array('*', $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); @@ -238,55 +264,29 @@ function (Message $message) { } foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks + foreach ($this->shutdownHooks as $hook) { // Group init hooks if (in_array($group, $hook->getGroups())) { $arguments = $this->getArguments($hook, $message->getPayload()); \call_user_func_array($hook->getAction(), $arguments); } } } + Console::success("[Job] ({$message->getPid()}) successfully run."); + }, + function (?Message $message, Throwable $th) { + Console::error("[Job] ({$message?->getPid()}) failed to run."); + Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); - return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); - } finally { - $processDuration = microtime(true) - $receivedAtTimestamp; - $this->processDuration->record($processDuration); - } - }, - function (Message $message) { - if ($this->job->getHook()) { - foreach ($this->shutdownHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } - } - } + self::setResource('error', fn () => $th); - foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } + foreach ($this->errorHooks as $hook) { + ($hook->getAction())(...$this->getArguments($hook)); } - } - Console::success("[Job] ({$message->getPid()}) successfully run."); - }, - function (?Message $message, Throwable $th) { - Console::error("[Job] ({$message?->getPid()}) failed to run."); - Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); - - self::setResource('error', fn () => $th); - - foreach ($this->errorHooks as $hook) { - ($hook->getAction())(...$this->getArguments($hook)); - } - }, - ); + }, + ); + } }); - $this->adapter->workerStop(fn () => $this->adapter->consumer->close()); - $this->adapter->start(); } catch (Throwable $error) { self::setResource('error', fn () => $error); @@ -318,6 +318,31 @@ public function getWorkerStart(): Hook return $this->workerStartHook; } + /** + * Is called when a Worker stops. + * @param callable|null $callback + * @return self + * @throws Exception + */ + public function workerStop(?callable $callback = null): self + { + try { + $this->adapter->workerStop(function (string $workerId) use ($callback) { + Console::success("[Worker] Worker {$workerId} is ready!"); + if (!is_null($callback)) { + call_user_func($callback); + } + }); + } catch (Throwable $error) { + self::setResource('error', fn () => $error); + foreach ($this->errorHooks as $hook) { + call_user_func_array($hook->getAction(), $this->getArguments($hook)); + } + } + + return $this; + } + /** * Get Arguments *