-
-
Notifications
You must be signed in to change notification settings - Fork 295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queue Interop Support #104
Conversation
bae3cd6
to
6b89628
Compare
Tests fail because of php5.5 requirement and dependency on enqueue/amqp-lib 0.7 which has not been released yet. |
This approach is based purely on queue-interop interfaces and does require enqueue. We can propose another solution based on enqueue client. It'll introduce a dependency on enqueue but it allows to remove a lot of low-level MQ stuff from here. |
src/drivers/queue_interop/Queue.php
Outdated
while (true) { | ||
if ($message = $consumer->receive()) { | ||
list($ttr, $body) = explode(';', $message->getBody(), 2); | ||
if ($this->handleMessage(null, $body, $ttr, 1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we reject it on false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queue reserves message for handling before method calling.
If method returns false
, queue should keep message in reserve. If false
, message must be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ASKozienko could you please do message requeue on false and ack on true?
src/drivers/queue_interop/Queue.php
Outdated
$this->getContext()->createQueue($this->queueName) | ||
); | ||
|
||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to add usleep. 10-100ms would be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ability to configure delay would be good with 10ms default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wrong. There is no need for sleep since receive
is blocking (it waits on a socket, or does sleep internally). No need to do it here.
src/drivers/queue_interop/Queue.php
Outdated
|
||
$this->producer->send( | ||
$this->getContext()->createQueue($this->queueName), | ||
$this->getContext()->createMessage("$ttr;$message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samdark what does ttr
mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@makasim "time to read"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Time to reserve. See guide about this:
https://github.com/yiisoft/yii2-queue/blob/master/docs/guide/retryable.md
https://github.com/yiisoft/yii2-queue/blob/master/docs/guide-ru/retryable.md
@samdark could you please have a look? |
composer.json
Outdated
}, | ||
"suggest": { | ||
"ext-pcntl": "Need for process signals.", | ||
"yiisoft/yii2-redis": "Need for Redis queue.", | ||
"pda/pheanstalk": "Need for Beanstalk queue.", | ||
"php-amqplib/php-amqplib": "Need for AMQP queue.", | ||
"ext-gearman": "Need for Gearman queue." | ||
"ext-gearman": "Need for Gearman queue.", | ||
"php-enqueue/amqp-ext": "Needs for support of AMQP with queue interop", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs for support of AMQP with queue interop → Required for AMQP with queue interop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for below descriptions.
docs/guide/driver-queue-interop.md
Outdated
|
||
The driver works with many queue brokers. | ||
|
||
Full list of supported brokers you can find on the [Queue Interop](https://github.com/queue-interop/queue-interop) page |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can find full list of...
docs/guide/driver-queue-interop.md
Outdated
|
||
Full list of supported brokers you can find on the [Queue Interop](https://github.com/queue-interop/queue-interop) page | ||
|
||
To get it works you have to install and setup one of the supported implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order for it to work you need to install and configure one the implementations supported.
docs/guide/driver-queue-interop.md
Outdated
|
||
To get it works you have to install and setup one of the supported implementation. | ||
|
||
Configuration example for the RabbitMQ AMQP: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's a good example since there's RabbitMQ driver that works w/o Queue Interop already. It all depends on the goal.
@zhuravljov Do we want to remove our own implementations and use Queue Interop ones instead? That would be less maintenance and more profit for PHP overall. Or do we want to have additional drivers through Queue Interop?
docs/guide/driver-queue-interop.md
Outdated
|
||
`listen` command has options: | ||
|
||
- `--verbose`, `-v`: print executing statuses into console. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print executing statuses into console → print execution status into console
docs/guide/driver-queue-interop.md
Outdated
`listen` command has options: | ||
|
||
- `--verbose`, `-v`: print executing statuses into console. | ||
- `--isolate`: verbose mode of a job execute. If enabled, execute result of each job will be printed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verbose mode of a job execute → verbose mode of a job execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execute result → execution result
src/drivers/queue_interop/Queue.php
Outdated
$this->getContext()->createQueue($this->queueName) | ||
); | ||
|
||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ability to configure delay would be good with 10ms default.
src/drivers/queue_interop/Queue.php
Outdated
} | ||
|
||
$rc = new \ReflectionClass($this->factoryClass); | ||
if (false == $rc->implementsInterface(PsrConnectionFactory::class)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
http://php.net/is_a could be used instead of reflection.
src/drivers/queue_interop/Queue.php
Outdated
@@ -108,8 +110,7 @@ private function getContext() | |||
throw new \LogicException(sprintf('The "factoryClass" option "%s" is not a class', $this->factoryClass)); | |||
} | |||
|
|||
$rc = new \ReflectionClass($this->factoryClass); | |||
if (false == $rc->implementsInterface(PsrConnectionFactory::class)) { | |||
if (false == is_a($this->factoryClass, PsrConnectionFactory::class)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
===
could be used.
Overall the code looks fine but I'd like to hear @zhuravljov opinion on future development and path we should take. |
src/drivers/queue_interop/Queue.php
Outdated
protected function pushMessage($message, $ttr, $delay, $priority) | ||
{ | ||
if ($delay) { | ||
throw new NotSupportedException('Delayed work is not supported in the driver.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ASKozienko Could you please implement this (since their support has been merged php-enqueue/enqueue-dev#149)
src/drivers/queue_interop/Queue.php
Outdated
@@ -117,6 +123,21 @@ private function getContext() | |||
/** @var PsrConnectionFactory $factory */ | |||
$factory = new $this->factoryClass(isset($this->factoryConfig['dsn']) ? $this->factoryConfig['dsn'] : $this->factoryConfig); | |||
|
|||
if ($factory instanceof DelayStrategyAware) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ASKozienko I am not sure factory implements delay strategy aware interface.
Consider marking classes final so it easier for us to change in future (without BC breaks) |
@samdark @zhuravljov tests fail only on php5.5 because enqueue requires php5.6 as the minimum version. How should we proceed? |
There are only two ways:
|
@samdark I am thinking of dropping 5.x at all. Why are you so stick to it? |
Because we have Yii 2.0 and it works with 5.4. Even when we'll release 2.1 that requires PHP 7.1, we'll have to support 2.0 for some time cause there are countless projects in production using it. |
@samdark we decided to skip the tests for php5.5 for now. If someone wants to use yiiqueue with enqueue they have to make sure they are on php5.6 at least. Could you please review it? |
src/drivers/queue_interop/Queue.php
Outdated
|
||
class Queue extends CliQueue | ||
{ | ||
const RABBITMQ_DELAY_DLX = 'dlx'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why there are rabbit-specific things in a general purpose class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We tried to keep it as simple as possible, so everything in one class.
we can move amqp interop specific stuff to separate class. Is it ok?
@@ -45,7 +61,8 @@ | |||
"yii\\queue\\file\\": "src/drivers/file", | |||
"yii\\queue\\gearman\\": "src/drivers/gearman", | |||
"yii\\queue\\redis\\": "src/drivers/redis", | |||
"yii\\queue\\sync\\": "src/drivers/sync" | |||
"yii\\queue\\sync\\": "src/drivers/sync", | |||
"yii\\queue\\queue_interop\\": "src/drivers/queue_interop" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use just yii\queue\interop
as a namespace?
Merged from #158. |
No description provided.