Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "yiisoft/yii2-queue",
"description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk and Gearman",
"description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk, SQS and Gearman",
"type": "yii2-extension",
"keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman"],
"keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman", "sqs"],
"license": "BSD-3-Clause",
"authors": [
{
Expand All @@ -27,14 +27,16 @@
"jeremeamia/superclosure": "*",
"yiisoft/yii2-debug": "*",
"yiisoft/yii2-gii": "*",
"phpunit/phpunit": "~4.4"
"phpunit/phpunit": "~4.4",
"aws/aws-sdk-php": ">=2.4"
},
"suggest": {
"ext-pcntl": "Need for process signals.",
"yiisoft/yii2-redis": "Need for Redis queue.",
"pda/pheanstalk": "Need for Beanstalk queue.",
"php-amqplib/php-amqplib": "Need for AMQP queue.",
"ext-gearman": "Need for Gearman queue."
"ext-gearman": "Need for Gearman queue.",
"aws/aws-sdk-php": "Need for aws SQS."
},
"autoload": {
"psr-4": {
Expand All @@ -45,7 +47,8 @@
"yii\\queue\\file\\": "src/drivers/file",
"yii\\queue\\gearman\\": "src/drivers/gearman",
"yii\\queue\\redis\\": "src/drivers/redis",
"yii\\queue\\sync\\": "src/drivers/sync"
"yii\\queue\\sync\\": "src/drivers/sync",
"yii\\queue\\sqs\\": "src/drivers/sqs"
}
},
"extra": {
Expand Down
41 changes: 41 additions & 0 deletions src/drivers/sqs/Command.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php
/**
* @link http://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license http://www.yiiframework.com/license/
*/

namespace yii\queue\sqs;

use yii\queue\cli\Command as CliCommand;

/**
* Manages application aws sqs-queue.
*
* @author Manoj Malviya <manojm@girnarsoft.com>
*/
class Command extends CliCommand
{
/**
* @var Queue
*/
public $queue;

/**
* Runs all jobs from sqs.
* It can be used as cron job.
*/
public function actionRun()
{
$this->queue->run();
}

/**
* Listens sqs and runs new jobs.
* It can be used as demon process.
*/
public function actionListen()
{
$this->queue->listen();
}
}
206 changes: 206 additions & 0 deletions src/drivers/sqs/Queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
<?php

namespace yii\queue\sqs;

use yii\base\NotSupportedException;
use yii\queue\cli\Queue as CliQueue;
use yii\queue\cli\Signal;
use \Aws\Sqs\SqsClient;
use Aws\Credentials\CredentialProvider;

/**
* SQS Queue
*
* @author Manoj Malviya <manojm@girnarsoft.com>
*/
class Queue extends CliQueue
{
/**
* @var SqsClient
*/
private $_client;

/**
* The SQS url.
* @var string
*/
public $url;

/**
* aws access key
* @var string
*/
public $key = '';

/**
* aws secret
* @var string
*/
public $secret = '';

/**
* region where queue is hosted.
* @var string
*/
public $region = '';

/**
* API version
* @var string
*/
public $version = 'latest';

/**
* @var string command class name
*/
public $commandClass = Command::class;

/**
* @inheritdoc
*/
public function init()
{
parent::init();
}

/**
* Runs all jobs from queue.
*/
public function run()
{
while (!Signal::isExit() && ($payload = $this->getPayload())) {
list($ttr, $message) = explode(';', $payload['Body'], 2);

$this->reserve($payload, $ttr); //reserve it so it is not visible to another worker till ttr

if ($this->handleMessage(null, $message, $ttr, 1)) {
//if handled then remove from queue
$this->release($payload);
}
}
}

/**
* Listens to get new jobs.
*/
public function listen()
{
$this->run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method should do the same as in other queue drivers. $queue->listen() should run an infinite loop with timeout (sleep) to use as a daemon, regardless of payload existence.

}

/**
* @inheritdoc
*/
protected function pushMessage($message, $ttr, $delay, $priority)
Copy link

@dmirogin dmirogin Sep 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used anywhere ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I know it's used here

$event->id = $this->pushMessage($message, $event->ttr, $event->delay, $event->priority);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i didn't notice.

{
if ($priority) {
throw new NotSupportedException('Priority is not supported in this driver');
}

$model = $this->getClient()->sendMessage([
'DelaySeconds' => $delay,
'QueueUrl' => $this->url,
'MessageBody' => "$ttr;$message",
]);

if ($model !== null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ($model === null) {
    return false;
}

return $model['MessageId'];

return $model['MessageId'];
} else {
return false;
}
}

/**
* @inheritdoc
*/
public function status($id)
{
throw new NotSupportedException('Status is not supported in the driver.');
}

/**
* @return \Aws\Sqs\SqsClient
*/
protected function getClient()
{
if ($this->key && $this->secret) {
$provider = [
'key' => $this->key,
'secret' => $this->secret
];
} else {
// use default provider if no key and secret passed
//see - http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html#credential-profiles
$provider = CredentialProvider::defaultProvider();
}

$config = [
'credential' => $provider,
'region' => $this->region,
'version' => $this->version,
];

if (!$this->_client) {
$this->_client = SqsClient::factory($config);

}
return $this->_client;
}

/**
*
*/
private function getPayload()
{
$payload = $this->getClient()->receiveMessage([
'QueueUrl' => $this->url,
'AttributeNames' => ['ApproximateReceiveCount'],
'MaxNumberOfMessages' => 1,
]);

$payload = $payload['Messages'];
if ($payload) {
return array_pop($payload);
}

return null;
}

/**
* Set the visibilty to reserve message
* So that no other worker can see this message
*
* @param array $payload
* @param int $ttr
*/
private function reserve($payload, $ttr)
{
$receiptHandle = $payload['ReceiptHandle'];
$this->getClient()->changeMessageVisibility(array(
'QueueUrl' => $this->url,
'ReceiptHandle' => $receiptHandle,
'VisibilityTimeout' => $ttr
));
}

/**
* Mark the message as handled
*
* @param array $payload
* @return boolean
*/
private function release($payload)
{
if (!empty($payload['ReceiptHandle'])) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (empty($payload['ReceiptHandle'])) {
    return false;
}

// rest of the code here

$receiptHandle = $payload['ReceiptHandle'];
$response = $this->getClient()->deleteMessage([
'QueueUrl' => $this->url,
'ReceiptHandle' => $receiptHandle,
]);

return $response !== null;
}

return false;
}
}