Skip to content

Conversation

@elitemaks
Copy link
Contributor

Continuing the work started by manoj-malviya - implementation of the queue based on Amazon SQS.
Currently, all commits by manoj-malviya are merged in this PR. I will continue to fix requests from old PR and glad to hear new feedbacks.

@zhuravljov
Copy link
Member

zhuravljov commented Dec 26, 2017

@elitemaks, signal helper is deprecated. Please, merge master into the patch and see beanstalk driver as example for updating.

/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
* @param int $timeout number of seconds to wait for next message.
* @return null|int exit code.
* @internal for worker command only.
* @since 2.0.2
*/
public function run($repeat, $timeout = 0)
{
return $this->runWorker(function (LoopInterface $loop) use ($repeat, $timeout) {
while ($loop->canContinue()) {
if ($payload = $this->getPheanstalk()->reserveFromTube($this->tube, $timeout)) {
$info = $this->getPheanstalk()->statsJob($payload);
if ($this->handleMessage(
$payload->getId(),
$payload->getData(),
$info->ttr,
$info->reserves
)) {
$this->getPheanstalk()->delete($payload);
}
} elseif (!$repeat) {
break;
}
}
});
}

/**
* Runs all jobs from beanstalk-queue.
* It can be used as cron job.
*
* @return null|int exit code.
*/
public function actionRun()
{
return $this->queue->run(false);
}

/**
* Listens beanstalk-queue and runs new jobs.
* It can be used as demon process.
*
* @param int $timeout number of seconds to wait a job.
* @return null|int exit code.
*/
public function actionListen($timeout = 3)
{
return $this->queue->run(true, $timeout);
}

Also, it should be covered with tests.

@elitemaks
Copy link
Contributor Author

@zhuravljov Thanks, already implemented correct "run" and "listen" methods, please check.
Regarding tests - will work on them soon.

Copy link
Member

@zhuravljov zhuravljov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also consider ability to clear a queue and remove separate messages by ID in case this is possible.

public function actionClear()
{
if ($this->confirm('Are you sure?')) {
$this->queue->clear();
$this->stdout("Queue has been cleared.\n");
}
}

http://docs.aws.amazon.com/aws-sdk-php/v2/api/class-Aws.Sqs.SqsClient.html#_purgeQueue

} elseif (!$repeat) {
break;
} elseif ($delay) {
sleep($delay);
Copy link
Member

@zhuravljov zhuravljov Dec 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, please check. The limitation is that the max time long polling keeps the connection is 20 secs, so after that, the loop will be restarted. And, it works differently from "sleep" in a way, that if the message appears in a queue, it's handled without waiting for polling timeout.

if ($payload = $this->getPayload()) {
list($ttr, $message) = explode(';', $payload['Body'], 2);
//reserve it so it is not visible to another worker till ttr
$this->reserve($payload, $ttr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use reserve inside getPayload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about that. I think it's better to separate responsibility of these methods, as we can, for example, get payload and then delete the message, without the need of reserving it.

'MessageBody' => "$ttr;$message",
]);

if ($model === null) {
Copy link
Member

@zhuravljov zhuravljov Dec 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which cases this->getClient()->sendMessage() can return null instead of array?
If this is possible, pushMessage must throw an exception, because it always returns message ID. Else the block should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, due to the documentation it returns model only. Exceptions are raised in aws sdk methods. I removed If block.

@elitemaks
Copy link
Contributor Author

@zhuravljov "clear" has been implemented. Unfortunately, as I got it, there is no way to delete a specific message by ID. We can only get the next message in a queue and delete it - I think it does not make sense.

@thiagotalma
Copy link
Contributor

Unfortunately, it is not possible to retrieve or remove a message from the queue based on an ID.
It is a limitation of SQS.

@zhuravljov This PR looks like it's good enough already.
Are you planning to release a new version when you accept it?

@elitemaks
Copy link
Contributor Author

@thiagotalma still unit tests and documentation should be done.

@zhuravljov I started with unit tests, please check the last commit. But I need some assistance - with SQS almost everything depends on AWS client, so I had to mock it. Not sure if this is the best way, and if I need to write tests for other functions, as there is no way to check a real queue.

@elitemaks
Copy link
Contributor Author

Created eng and rus docs for SQS queue driver.
@zhuravljov please review my edits for your review and the PR in general and let me know what should be done for its merging.

@elitemaks
Copy link
Contributor Author

@samdark Hello Alex, please advise how to proceed with this PR as I did not get any reply form reviewers for a long time already.

@thiagotalma
Copy link
Contributor

@zhuravljov That PR looks good enough already.
Do you have any predictions of when it will be accepted?

@zhuravljov
Copy link
Member

@elitemaks would be nice to create tests with real connection to AWS, if possible. If not, it will without tests.

@elitemaks
Copy link
Contributor Author

@zhuravljov Thanks for the reply! it's possible, but this way "key" and "secret" for account access will be stored in the extension's code. I can create a test acc and use those credentials. Please let me know if that's ok, then I can do it.

@elitemaks
Copy link
Contributor Author

@zhuravljov The only way to include real AWS SQS account into tests is to put credentials to the config file. I think it's not correct to include creds in public repo.
Even if I find a way not to provide creds (like setup anonymous access or so) - SQS is a paid service so I'm not going to use my account for that. Yii team can create it but I don't think it worth it. So I think the easiest way, for now, is to go without tests including real SQS account.

@makasim
Copy link
Contributor

makasim commented Apr 7, 2018

It is possible to create a secret env var on travis side, no one can see it once stored and only build run by owners can use them. enqueue/sqs is tested that way

@elitemaks
Copy link
Contributor Author

Ok but still, someone has to create that account on SQS.

@makasim
Copy link
Contributor

makasim commented Apr 7, 2018

That should be done by repo owners.

@mkubenka
Copy link
Contributor

I would suggest to use ElasticMQ to mock SQS communication.

zhuravljov pushed a commit that referenced this pull request Apr 27, 2018
zhuravljov added a commit that referenced this pull request Apr 27, 2018
zhuravljov added a commit that referenced this pull request Apr 27, 2018
Only one PurgeQueue operation is allowed every 60 seconds
@zhuravljov
Copy link
Member

Merged. Thank you!

@zhuravljov zhuravljov closed this Apr 27, 2018
@zhuravljov zhuravljov added this to the 2.0.3 milestone Apr 27, 2018
zhuravljov added a commit that referenced this pull request Apr 27, 2018
zhuravljov added a commit that referenced this pull request Apr 27, 2018
zhuravljov added a commit that referenced this pull request Apr 27, 2018
@zhuravljov zhuravljov mentioned this pull request Apr 28, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants