diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index ffb4734..0000000 --- a/.travis.yml +++ /dev/null @@ -1,17 +0,0 @@ -language: php -php: 8.0 -services: - - docker - -notifications: - email: - - team@appwrite.io - -before_script: - - composer install --ignore-platform-reqs - -script: - - docker-compose up -d - - docker-compose exec tests vendor/bin/pint --test - - docker-compose exec tests vendor/bin/phpstan analyse - - docker-compose exec tests vendor/bin/phpunit \ No newline at end of file diff --git a/README.md b/README.md index f87c4b8..5bacd62 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ # Utopia Queue -[![Build Status](https://travis-ci.com/utopia-php/queue.svg?branch=main)](https://travis-ci.com/utopia-php/queue) ![Total Downloads](https://img.shields.io/packagist/dt/utopia-php/queue.svg) [![Discord](https://img.shields.io/discord/564160730845151244?label=discord)](https://appwrite.io/discord) diff --git a/composer.json b/composer.json index a50802d..751c7ab 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,7 @@ }, "scripts":{ "test": "phpunit", - "check": "vendor/bin/phpstan analyse", + "check": "vendor/bin/phpstan analyse --memory-limit=1G", "format": "vendor/bin/pint", "lint": "vendor/bin/pint --test" }, diff --git a/phpstan.neon b/phpstan.neon index 6852b4c..15ab6e0 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -6,4 +6,4 @@ parameters: - tests scanDirectories: - - vendor/swoole \ No newline at end of file + - vendor/swoole diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index 510fe08..d88f191 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -2,16 +2,20 @@ namespace Utopia\Queue; +use Utopia\DI\Container; + abstract class Adapter { - public int $workerNum; public Queue $queue; - public string $namespace; - public Consumer $consumer; + protected ?Container $context = null; - public function __construct(int $workerNum, string $queue, string $namespace = 'utopia-queue') - { - $this->workerNum = $workerNum; + public function __construct( + public Consumer $consumer, + public int $workerNum, + string $queue, + public string $namespace = 'utopia-queue', + protected Container $resources = new Container(), + ) { $this->queue = new Queue($queue, $namespace); } @@ -27,6 +31,36 @@ abstract public function start(): self; */ abstract public function stop(): self; + public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + $this->consumer->consume( + $this->queue, + function (Message $message) use ($messageCallback) { + $this->context = new Container($this->resources()); + + return $messageCallback($message); + }, + $successCallback, + function (?Message $message, \Throwable $error) use ($errorCallback) { + if ($message === null) { + $this->context = new Container($this->resources()); + } + + $errorCallback($message, $error); + }, + ); + } + + public function resources(): Container + { + return $this->resources; + } + + public function context(): Container + { + return $this->context ??= new Container($this->resources()); + } + /** * Is called when a Worker starts. * @param callable $callback diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index 52b53bc..aab5623 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -3,12 +3,18 @@ namespace Utopia\Queue\Adapter; use Swoole\Coroutine; +use Swoole\Coroutine\Channel; use Swoole\Process; +use Utopia\DI\Container; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; +use Utopia\Queue\Error\ConsumerFailures; +use Utopia\Queue\Message; class Swoole extends Adapter { + protected const string CONTEXT_KEY = '__utopia__'; + /** @var Process[] */ protected array $workers = []; @@ -23,9 +29,11 @@ public function __construct( int $workerNum, string $queue, string $namespace = 'utopia-queue', + protected int $maxCoroutines = 1, + Container $resources = new Container(), ) { - parent::__construct($workerNum, $queue, $namespace); - $this->consumer = $consumer; + parent::__construct($consumer, $workerNum, $queue, $namespace, $resources); + $this->maxCoroutines = \max(1, $maxCoroutines); } public function start(): self @@ -71,6 +79,65 @@ protected function spawnWorker(int $workerId): void $this->workers[$pid] = $process; } + public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + $messageCallback = function (Message $message) use ($messageCallback) { + Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); + + return $messageCallback($message); + }; + + $errorCallback = function (?Message $message, \Throwable $error) use ($errorCallback) { + if ($message === null) { + Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); + } + + $errorCallback($message, $error); + }; + + $channel = new Channel($this->maxCoroutines); + $errors = []; + + for ($i = 0; $i < $this->maxCoroutines; $i++) { + Coroutine::create(function () use ($messageCallback, $successCallback, $errorCallback, $channel, &$errors) { + try { + $this->consumer->consume( + $this->queue, + $messageCallback, + $successCallback, + $errorCallback, + ); + } catch (\Throwable $error) { + $errors[] = $error; + $this->consumer->close(); + $channel->push(true); + return; + } + + $channel->push(true); + }); + } + + for ($i = 0; $i < $this->maxCoroutines; $i++) { + $channel->pop(); + } + + $channel->close(); + + if ($errors !== []) { + throw new ConsumerFailures($errors); + } + } + + public function context(): Container + { + if (Coroutine::getCid() !== -1) { + return Coroutine::getContext()[self::CONTEXT_KEY] ?? $this->resources(); + } + + return $this->resources(); + } + protected function reap(): void { while (($ret = Process::wait(false)) !== false) { diff --git a/src/Queue/Adapter/Workerman.php b/src/Queue/Adapter/Workerman.php index 5e86b33..c0cc577 100644 --- a/src/Queue/Adapter/Workerman.php +++ b/src/Queue/Adapter/Workerman.php @@ -2,6 +2,7 @@ namespace Utopia\Queue\Adapter; +use Utopia\DI\Container; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; use Workerman\Worker; @@ -10,13 +11,17 @@ class Workerman extends Adapter { protected Worker $worker; - public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') - { - parent::__construct($workerNum, $queue, $namespace); + public function __construct( + Consumer $consumer, + int $workerNum, + string $queue, + string $namespace = 'utopia-queue', + Container $resources = new Container(), + ) { + parent::__construct($consumer, $workerNum, $queue, $namespace, $resources); $this->worker = new Worker(); $this->worker->count = $workerNum; - $this->consumer = $consumer; } public function start(): self diff --git a/src/Queue/Error/ConsumerFailures.php b/src/Queue/Error/ConsumerFailures.php new file mode 100644 index 0000000..9523e87 --- /dev/null +++ b/src/Queue/Error/ConsumerFailures.php @@ -0,0 +1,25 @@ +errors; + } +} diff --git a/src/Queue/Server.php b/src/Queue/Server.php index f822b6f..97f8100 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -14,13 +14,6 @@ class Server { - /** - * Queue Adapter - * - * @var Adapter - */ - protected Adapter $adapter; - /** * Job * @@ -63,9 +56,6 @@ class Server */ protected array $workerStopHooks = []; - protected Container $container; - protected ?Container $messageContainer = null; - private Histogram $jobWaitTime; private Histogram $processDuration; private ObservableGauge $queueDepth; @@ -73,12 +63,9 @@ class Server /** * Creates an instance of a Queue server. * @param Adapter $adapter - * @param Container|null $container */ - public function __construct(Adapter $adapter, ?Container $container = null) + public function __construct(protected Adapter $adapter) { - $this->adapter = $adapter; - $this->container = $container ?? new Container(); $this->setTelemetry(new NoTelemetry()); } @@ -89,25 +76,28 @@ public function job(): Job } /** - * Set a new resource on the container - * - * @param string $name - * @param callable $callback - * @param array $injections + * Static resources container. * - * @return void + * Shortcut for the underlying adapter's {@see Adapter::resources()}. Use + * `$server->resources()->set(...)` to register app-wide services that are + * shared across every message for the lifetime of the server. */ - public function setResource( - string $name, - callable $callback, - array $injections = [], - ): void { - $this->container->set($name, $callback, $injections); + public function resources(): Container + { + return $this->adapter->resources(); } - public function getContainer(): Container + /** + * Per-message context container. + * + * Shortcut for the underlying adapter's {@see Adapter::context()}. Use + * `$server->context()->set(...)` to register message-scoped resources and + * `$server->context()->get(...)` to read them. Lookups fall through to the + * static resources container, so app-wide services remain accessible. + */ + public function context(): Container { - return $this->messageContainer ?? $this->container; + return $this->adapter->context(); } public function setTelemetry(Telemetry $telemetry): void @@ -206,9 +196,9 @@ public function stop(): self try { $this->adapter->stop(); } catch (Throwable $error) { - $this->getContainer()->set('error', fn () => $error); + $this->resources()->set('error', fn () => $error); foreach ($this->errorHooks as $hook) { - $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); + $hook->getAction()(...$this->getArguments($this->resources(), $hook)); } } return $this; @@ -235,31 +225,28 @@ public function start(): self { try { $this->adapter->workerStart(function (string $workerId) { - $this->getContainer()->set('workerId', fn () => $workerId); + $this->resources()->set('workerId', fn () => $workerId); foreach ($this->workerStartHooks as $hook) { - $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); + $hook->getAction()(...$this->getArguments($this->resources(), $hook)); } - $this->adapter->consumer->consume( - $this->adapter->queue, + $this->adapter->consume( function (Message $message) { - $this->messageContainer = new Container($this->container); - $receivedAtTimestamp = microtime(true); try { $waitDuration = microtime(true) - $message->getTimestamp(); $this->jobWaitTime->record($waitDuration); - $this->getContainer()->set('message', fn () => $message); + $this->context()->set('message', fn () => $message); if ($this->job->getHook()) { foreach ($this->initHooks as $hook) { // Global init hooks if (\in_array('*', $hook->getGroups())) { $arguments = $this->getArguments( - $this->getContainer(), + $this->context(), $hook, $message->getPayload(), ); @@ -273,7 +260,7 @@ function (Message $message) { // Group init hooks if (\in_array($group, $hook->getGroups())) { $arguments = $this->getArguments( - $this->getContainer(), + $this->context(), $hook, $message->getPayload(), ); @@ -285,7 +272,7 @@ function (Message $message) { return \call_user_func_array( $this->job->getAction(), $this->getArguments( - $this->getContainer(), + $this->context(), $this->job, $message->getPayload(), ), @@ -297,14 +284,14 @@ function (Message $message) { } }, function (Message $message) { - $this->getContainer()->set('message', fn () => $message); + $this->context()->set('message', fn () => $message); if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { // Global shutdown hooks if (\in_array('*', $hook->getGroups())) { $arguments = $this->getArguments( - $this->getContainer(), + $this->context(), $hook, $message->getPayload(), ); @@ -318,7 +305,7 @@ function (Message $message) { // Group shutdown hooks if (\in_array($group, $hook->getGroups())) { $arguments = $this->getArguments( - $this->getContainer(), + $this->context(), $hook, $message->getPayload(), ); @@ -328,26 +315,26 @@ function (Message $message) { } }, function (?Message $message, Throwable $th) { - $this->getContainer()->set('error', fn () => $th); + $this->context()->set('error', fn () => $th); if ($message !== null) { - $this->getContainer()->set('message', fn () => $message); + $this->context()->set('message', fn () => $message); } foreach ($this->errorHooks as $hook) { - $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); + $hook->getAction()(...$this->getArguments($this->context(), $hook)); } }, ); }); $this->adapter->workerStop(function (string $workerId) { - $this->getContainer()->set('workerId', fn () => $workerId); + $this->resources()->set('workerId', fn () => $workerId); try { // Call user-defined workerStop hooks foreach ($this->workerStopHooks as $hook) { try { - $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); + $hook->getAction()(...$this->getArguments($this->resources(), $hook)); } catch (Throwable $e) { } } @@ -359,9 +346,9 @@ function (?Message $message, Throwable $th) { $this->adapter->start(); } catch (Throwable $error) { - $this->getContainer()->set('error', fn () => $error); + $this->resources()->set('error', fn () => $error); foreach ($this->errorHooks as $hook) { - $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); + $hook->getAction()(...$this->getArguments($this->resources(), $hook)); } } return $this; @@ -412,12 +399,12 @@ public function getWorkerStop(): array /** * Get Arguments * - * @param Container $container + * @param Container $context * @param Hook $hook * @param array $payload * @return array */ - protected function getArguments(Container $container, Hook $hook, array $payload = []): array + protected function getArguments(Container $context, Hook $hook, array $payload = []): array { $arguments = []; foreach ($hook->getParams() as $key => $param) { @@ -436,13 +423,13 @@ protected function getArguments(Container $container, Hook $hook, array $payload $value = $value === '' || $value === null ? $param['default'] : $value; - $this->validate($key, $param, $value, $container); + $this->validate($key, $param, $value, $context); $hook->setParamValue($key, $value); $arguments[$param['order']] = $value; } foreach ($hook->getInjections() as $key => $injection) { - $arguments[$injection['order']] = $container->get( + $arguments[$injection['order']] = $context->get( $injection['name'], ); } @@ -458,21 +445,21 @@ protected function getArguments(Container $container, Hook $hook, array $payload * @param string $key * @param array $param * @param mixed $value - * @param Container $container + * @param Container $context * * @throws Exception * * @return void */ - protected function validate(string $key, array $param, mixed $value, Container $container): void + protected function validate(string $key, array $param, mixed $value, Container $context): void { if ('' !== $value && $value !== null) { $validator = $param['validator']; // checking whether the class exists if (\is_callable($validator)) { $validatorKey = '_validator:' . $key; - $container->set($validatorKey, $validator, $param['injections']); - $validator = $container->get($validatorKey); + $context->set($validatorKey, $validator, $param['injections']); + $validator = $context->get($validatorKey); } if (!$validator instanceof Validator) { diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index 581c216..d176caf 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -2,6 +2,7 @@ namespace Tests\E2E\Adapter; +use PHPUnit\Framework\Attributes\Depends; use PHPUnit\Framework\TestCase; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; @@ -132,9 +133,7 @@ public function testParamAliases(): void sleep(1); } - /** - * @depends testEvents - */ + #[Depends('testEvents')] public function testRetry(): void { $publisher = $this->getPublisher(); diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index a4d1109..7607d0a 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -3,6 +3,7 @@ namespace Tests\E2E\Adapter; use PHPUnit\Framework\TestCase; +use Utopia\DI\Container; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; use Utopia\Queue\Message; @@ -72,6 +73,78 @@ public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void $this->assertArrayNotHasKey('messaging.queue.depth.errors', $telemetry->counters); } + public function testInjectsAdapterResourcesAndContext(): void + { + $consumer = new ServerTelemetryConsumer(); + $adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite'); + $server = new Server($adapter); + $injections = []; + + $server->resources()->set('resourceValue', fn () => 'resource'); + + $server + ->init() + ->inject('message') + ->action(function (Message $message) use ($server): void { + $server->context()->set('contextValue', fn () => $message->getPid()); + }); + + $server + ->job() + ->inject('message') + ->inject('resourceValue') + ->inject('contextValue') + ->action(function (Message $message, string $resourceValue, string $contextValue) use (&$injections): void { + $injections = [$message->getPid(), $resourceValue, $contextValue]; + }); + + $server->start(); + + $this->assertSame(['test-pid', 'resource', 'test-pid'], $injections); + } + + public function testContextDoesNotLeakBetweenMessages(): void + { + $consumer = new ServerTelemetryMultiMessageConsumer([ + new Message([ + 'pid' => 'first-pid', + 'queue' => 'emails', + 'timestamp' => time() - 1, + 'payload' => [], + ]), + new Message([ + 'pid' => 'second-pid', + 'queue' => 'emails', + 'timestamp' => time() - 1, + 'payload' => [], + ]), + ]); + $adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite'); + $server = new Server($adapter); + $contextValues = []; + + $server + ->init() + ->inject('message') + ->action(function (Message $message) use ($server): void { + if ($message->getPid() === 'first-pid') { + $server->context()->set('contextValue', fn () => $message->getPid()); + } + }); + + $server + ->job() + ->action(function () use ($server, &$contextValues): void { + $contextValues[] = $server->context()->has('contextValue') + ? $server->context()->get('contextValue') + : null; + }); + + $server->start(); + + $this->assertSame(['first-pid', null], $contextValues); + } + /** * @return array */ @@ -103,10 +176,14 @@ final class ServerTelemetryAdapter extends Adapter */ private array $onWorkerStop = []; - public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') - { - parent::__construct($workerNum, $queue, $namespace); - $this->consumer = $consumer; + public function __construct( + Consumer $consumer, + int $workerNum, + string $queue, + string $namespace = 'utopia-queue', + Container $resources = new Container(), + ) { + parent::__construct($consumer, $workerNum, $queue, $namespace, $resources); } public function start(): self @@ -164,6 +241,32 @@ public function close(): void } } +final class ServerTelemetryMultiMessageConsumer implements Consumer +{ + /** + * @param Message[] $messages + */ + public function __construct(private array $messages) + { + } + + public function consume( + Queue $queue, + callable $messageCallback, + callable $successCallback, + callable $errorCallback + ): void { + foreach ($this->messages as $message) { + $messageCallback($message); + $successCallback($message); + } + } + + public function close(): void + { + } +} + final class ServerTelemetryPublisherConsumer extends ServerTelemetryConsumer implements Publisher { /** diff --git a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php new file mode 100644 index 0000000..d82c514 --- /dev/null +++ b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php @@ -0,0 +1,94 @@ +consume(fn () => null, fn () => null, fn () => null); + }); + + $this->assertSame(3, $consumer->consumeCalls); + $this->assertSame(3, $consumer->maxActive); + } + + public function testPreservesAllCoroutineConsumerErrors(): void + { + $consumer = new FailingConcurrentConsumer(); + $failure = null; + + \Swoole\Coroutine\run(function () use ($consumer, &$failure) { + $adapter = new Swoole($consumer, 1, 'swoole-concurrency', maxCoroutines: 3); + + try { + $adapter->consume(fn () => null, fn () => null, fn () => null); + } catch (ConsumerFailures $error) { + $failure = $error; + } + }); + + $this->assertInstanceOf(ConsumerFailures::class, $failure); + $this->assertSame(3, $consumer->consumeCalls); + $this->assertSame(3, $consumer->closed); + $this->assertCount(3, $failure->getErrors()); + $messages = \array_map(fn (\Throwable $error) => $error->getMessage(), $failure->getErrors()); + \sort($messages); + + $this->assertSame(['consumer 1 failed', 'consumer 2 failed', 'consumer 3 failed'], $messages); + $this->assertSame($failure->getErrors()[0], $failure->getPrevious()); + } +} + +final class ConcurrentConsumer implements Consumer +{ + public int $active = 0; + public int $consumeCalls = 0; + public int $maxActive = 0; + + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + $this->consumeCalls++; + $this->active++; + $this->maxActive = \max($this->maxActive, $this->active); + + \Swoole\Coroutine::sleep(0.05); + + $this->active--; + } + + public function close(): void + { + } +} + +final class FailingConcurrentConsumer implements Consumer +{ + public int $closed = 0; + public int $consumeCalls = 0; + + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + $this->consumeCalls++; + $id = $this->consumeCalls; + + \Swoole\Coroutine::sleep(0.05); + + throw new \RuntimeException("consumer {$id} failed"); + } + + public function close(): void + { + $this->closed++; + } +}