Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions .travis.yml

This file was deleted.

1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Utopia Queue

[![Build Status](https://travis-ci.com/utopia-php/queue.svg?branch=main)](https://travis-ci.com/utopia-php/queue)
![Total Downloads](https://img.shields.io/packagist/dt/utopia-php/queue.svg)
[![Discord](https://img.shields.io/discord/564160730845151244?label=discord)](https://appwrite.io/discord)

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
},
"scripts":{
"test": "phpunit",
"check": "vendor/bin/phpstan analyse",
"check": "vendor/bin/phpstan analyse --memory-limit=1G",
"format": "vendor/bin/pint",
"lint": "vendor/bin/pint --test"
},
Expand Down
2 changes: 1 addition & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ parameters:
- tests

scanDirectories:
- vendor/swoole
- vendor/swoole
46 changes: 40 additions & 6 deletions src/Queue/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

namespace Utopia\Queue;

use Utopia\DI\Container;

abstract class Adapter
{
public int $workerNum;
public Queue $queue;
public string $namespace;
public Consumer $consumer;
protected ?Container $context = null;

public function __construct(int $workerNum, string $queue, string $namespace = 'utopia-queue')
{
$this->workerNum = $workerNum;
public function __construct(
public Consumer $consumer,
public int $workerNum,
string $queue,
public string $namespace = 'utopia-queue',
protected Container $resources = new Container(),
) {
$this->queue = new Queue($queue, $namespace);
}

Expand All @@ -27,6 +31,36 @@ abstract public function start(): self;
*/
abstract public function stop(): self;

public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void
{
$this->consumer->consume(
$this->queue,
function (Message $message) use ($messageCallback) {
$this->context = new Container($this->resources());

return $messageCallback($message);
},
$successCallback,
function (?Message $message, \Throwable $error) use ($errorCallback) {
if ($message === null) {
$this->context = new Container($this->resources());
}

$errorCallback($message, $error);
},
);
}

public function resources(): Container
{
return $this->resources;
}

public function context(): Container
{
return $this->context ??= new Container($this->resources());
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

/**
* Is called when a Worker starts.
* @param callable $callback
Expand Down
71 changes: 69 additions & 2 deletions src/Queue/Adapter/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
namespace Utopia\Queue\Adapter;

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Process;
use Utopia\DI\Container;
use Utopia\Queue\Adapter;
use Utopia\Queue\Consumer;
use Utopia\Queue\Error\ConsumerFailures;
use Utopia\Queue\Message;

class Swoole extends Adapter
{
protected const string CONTEXT_KEY = '__utopia__';

/** @var Process[] */
protected array $workers = [];

Expand All @@ -23,9 +29,11 @@ public function __construct(
int $workerNum,
string $queue,
string $namespace = 'utopia-queue',
protected int $maxCoroutines = 1,
Container $resources = new Container(),
) {
parent::__construct($workerNum, $queue, $namespace);
$this->consumer = $consumer;
parent::__construct($consumer, $workerNum, $queue, $namespace, $resources);
$this->maxCoroutines = \max(1, $maxCoroutines);
}

public function start(): self
Expand Down Expand Up @@ -71,6 +79,65 @@ protected function spawnWorker(int $workerId): void
$this->workers[$pid] = $process;
}

public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void
{
$messageCallback = function (Message $message) use ($messageCallback) {
Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources());

return $messageCallback($message);
};

$errorCallback = function (?Message $message, \Throwable $error) use ($errorCallback) {
if ($message === null) {
Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources());
}

$errorCallback($message, $error);
};

$channel = new Channel($this->maxCoroutines);
$errors = [];

for ($i = 0; $i < $this->maxCoroutines; $i++) {
Coroutine::create(function () use ($messageCallback, $successCallback, $errorCallback, $channel, &$errors) {
try {
$this->consumer->consume(
$this->queue,
$messageCallback,
$successCallback,
$errorCallback,
);
} catch (\Throwable $error) {
$errors[] = $error;
$this->consumer->close();
$channel->push(true);
return;
}

$channel->push(true);
});
Comment thread
greptile-apps[bot] marked this conversation as resolved.
}

for ($i = 0; $i < $this->maxCoroutines; $i++) {
$channel->pop();
}

$channel->close();

if ($errors !== []) {
throw new ConsumerFailures($errors);
}
}

public function context(): Container
{
if (Coroutine::getCid() !== -1) {
return Coroutine::getContext()[self::CONTEXT_KEY] ?? $this->resources();
}

return $this->resources();
}

protected function reap(): void
{
while (($ret = Process::wait(false)) !== false) {
Expand Down
13 changes: 9 additions & 4 deletions src/Queue/Adapter/Workerman.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Utopia\Queue\Adapter;

use Utopia\DI\Container;
use Utopia\Queue\Adapter;
use Utopia\Queue\Consumer;
use Workerman\Worker;
Expand All @@ -10,13 +11,17 @@ class Workerman extends Adapter
{
protected Worker $worker;

public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
{
parent::__construct($workerNum, $queue, $namespace);
public function __construct(
Consumer $consumer,
int $workerNum,
string $queue,
string $namespace = 'utopia-queue',
Container $resources = new Container(),
) {
parent::__construct($consumer, $workerNum, $queue, $namespace, $resources);

$this->worker = new Worker();
$this->worker->count = $workerNum;
$this->consumer = $consumer;
}

public function start(): self
Expand Down
25 changes: 25 additions & 0 deletions src/Queue/Error/ConsumerFailures.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace Utopia\Queue\Error;

class ConsumerFailures extends \RuntimeException
{
/**
* @param \Throwable[] $errors
*/
public function __construct(private array $errors)
{
parent::__construct(
'Queue consumers failed with ' . \count($errors) . ' error(s).',
previous: $errors[0] ?? null,
);
}

/**
* @return \Throwable[]
*/
public function getErrors(): array
{
return $this->errors;
}
}
Loading
Loading