From a54d7542d486741b16d7ee44f3794e9e83167cfb Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Fri, 8 Sep 2017 11:31:27 +0530 Subject: [PATCH 01/11] implemented sqs --- src/drivers/sqs/Command.php | 41 +++++++++++ src/drivers/sqs/Queue.php | 140 ++++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) create mode 100644 src/drivers/sqs/Command.php create mode 100644 src/drivers/sqs/Queue.php diff --git a/src/drivers/sqs/Command.php b/src/drivers/sqs/Command.php new file mode 100644 index 0000000000..47c40e6938 --- /dev/null +++ b/src/drivers/sqs/Command.php @@ -0,0 +1,41 @@ + + */ +class Command extends CliCommand +{ + /** + * @var Queue + */ + public $queue; + + /** + * Runs all jobs from gearman-queue. + * It can be used as cron job. + */ + public function actionRun() + { + $this->queue->run(); + } + + /** + * Listens gearman-queue and runs new jobs. + * It can be used as demon process. + */ + public function actionListen() + { + $this->queue->listen(); + } +} \ No newline at end of file diff --git a/src/drivers/sqs/Queue.php b/src/drivers/sqs/Queue.php new file mode 100644 index 0000000000..daa37afd54 --- /dev/null +++ b/src/drivers/sqs/Queue.php @@ -0,0 +1,140 @@ + + */ +class Queue extends CliQueue +{ + /** + * The SQS url. + * @var string + */ + public $url; + + public $key = ''; + public $secret = ''; + public $region = ''; + public $version = 'latest'; + + /** + * @var string command class name + */ + public $commandClass = Command::class; + + /** + * @inheritdoc + */ + public function init() + { + parent::init(); + } + + /** + * Runs all jobs from gearman-queue. + */ + public function run() + { + while (!Signal::isExit() && ($payload = $this->getPayload())) { + list($ttr, $message) = explode(';', $payload->workload(), 2); + if($this->handleMessage(null, $message, $ttr, 1)) + { + $this->release($payload); + } + } + } + + /** + * Listens to get new jobs. + */ + public function listen() + { + $this->run(); + } + + /** + * @inheritdoc + */ + protected function pushMessage($message, $ttr, $delay, $priority) + { + if ($priority) + { + throw new NotSupportedException('Priority is not supported in this driver'); + } + + $model = $this->getClient()->sendMessage([ + 'DelaySeconds' => $delay, + 'QueueUrl' => $this->url, + 'MessageBody' => "$ttr;message", + ]); + + if ($model !== null) { + return $model['MessageId']; + } else { + return false; + } + } + + /** + * @inheritdoc + */ + public function status($id) + { + $status = $this->getClient()->jobStatus($id); + if ($status[0] && !$status[1]) { + return self::STATUS_WAITING; + } elseif ($status[0] && $status[1]) { + return self::STATUS_RESERVED; + } else { + return self::STATUS_DONE; + } + } + + /** + * @return \Aws\Sqs\SqsClient + */ + protected function getClient() + { + if (!$this->_client) { + $this->_client = SqsClient::factory($this->config); + + } + return $this->_client; + } + + private $_client; + + private function getPayload() + { + $payload = $this->getClient()->receiveMessage([ + 'QueueUrl' => $this->url, + 'AttributeNames' => ['ApproximateReceiveCount'], + 'MaxNumberOfMessages' => 1, + ]); + + return $payload; + } + + private function release() + { + if (!empty($job->header['ReceiptHandle'])) { + $receiptHandle = $job->header['ReceiptHandle']; + $response = $this->getClient()->deleteMessage([ + 'QueueUrl' => $this->url, + 'ReceiptHandle' => $receiptHandle, + ]); + + return $response !== null; + } + + return false; + } +} \ No newline at end of file From c6d0999d139e7e11fb634a28dc33e4cd3f95e94e Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Fri, 8 Sep 2017 11:47:41 +0530 Subject: [PATCH 02/11] composer updated --- composer.json | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/composer.json b/composer.json index f7b8406da4..bcaae3cef7 100644 --- a/composer.json +++ b/composer.json @@ -1,13 +1,17 @@ { - "name": "yiisoft/yii2-queue", - "description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk and Gearman", + "name": "manoj-malviya/yii2-queue", + "description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk, SQS and Gearman", "type": "yii2-extension", - "keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman"], + "keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman", "sqs"], "license": "BSD-3-Clause", "authors": [ { "name": "Roman Zhuravlev", "email": "zhuravljov@gmail.com" + }, + { + "name": "Manoj Malviya", + "email": "manojm@girnarsoft.com" } ], "support": { From f7af7d669b4dfd171e0b007d7261ac21d6f08f39 Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Fri, 8 Sep 2017 11:51:15 +0530 Subject: [PATCH 03/11] composer updated --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index bcaae3cef7..34f7b7202b 100644 --- a/composer.json +++ b/composer.json @@ -1,5 +1,5 @@ { - "name": "manoj-malviya/yii2-queue", + "name": "manoj-malviya/yii-2-queue", "description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk, SQS and Gearman", "type": "yii2-extension", "keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman", "sqs"], From f577bc1f6c06d5fafed178d51e05774a80ec508d Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Fri, 8 Sep 2017 16:00:55 +0530 Subject: [PATCH 04/11] composer updated --- composer.json | 1 + 1 file changed, 1 insertion(+) diff --git a/composer.json b/composer.json index 34f7b7202b..e1f918a18f 100644 --- a/composer.json +++ b/composer.json @@ -22,6 +22,7 @@ "require": { "php": ">=5.5.0", "yiisoft/yii2": "~2.0.10", + "aws/aws-sdk-php": ">=2.4", "symfony/process": "*" }, "require-dev": { From 98129b452d696f390685856ea5447096fbcc03ec Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Sun, 10 Sep 2017 12:31:57 +0530 Subject: [PATCH 05/11] final commit --- composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index e1f918a18f..9d00ff7756 100644 --- a/composer.json +++ b/composer.json @@ -50,7 +50,8 @@ "yii\\queue\\file\\": "src/drivers/file", "yii\\queue\\gearman\\": "src/drivers/gearman", "yii\\queue\\redis\\": "src/drivers/redis", - "yii\\queue\\sync\\": "src/drivers/sync" + "yii\\queue\\sync\\": "src/drivers/sync", + "yii\\queue\\sqs\\": "src/drivers/sqs" } }, "extra": { From 55caefeaad33c21dc9abb7283e853b5d836867ce Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Sun, 10 Sep 2017 12:32:07 +0530 Subject: [PATCH 06/11] final commit --- composer.json | 7 ++-- src/drivers/sqs/Queue.php | 71 ++++++++++++++++++++++++++++++--------- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/composer.json b/composer.json index 9d00ff7756..53669102d5 100644 --- a/composer.json +++ b/composer.json @@ -22,7 +22,6 @@ "require": { "php": ">=5.5.0", "yiisoft/yii2": "~2.0.10", - "aws/aws-sdk-php": ">=2.4", "symfony/process": "*" }, "require-dev": { @@ -32,14 +31,16 @@ "jeremeamia/superclosure": "*", "yiisoft/yii2-debug": "*", "yiisoft/yii2-gii": "*", - "phpunit/phpunit": "~4.4" + "phpunit/phpunit": "~4.4", + "aws/aws-sdk-php": ">=2.4" }, "suggest": { "ext-pcntl": "Need for process signals.", "yiisoft/yii2-redis": "Need for Redis queue.", "pda/pheanstalk": "Need for Beanstalk queue.", "php-amqplib/php-amqplib": "Need for AMQP queue.", - "ext-gearman": "Need for Gearman queue." + "ext-gearman": "Need for Gearman queue.", + "aws/aws-sdk-php": "Need for aws SQS." }, "autoload": { "psr-4": { diff --git a/src/drivers/sqs/Queue.php b/src/drivers/sqs/Queue.php index daa37afd54..7a33964404 100644 --- a/src/drivers/sqs/Queue.php +++ b/src/drivers/sqs/Queue.php @@ -6,6 +6,7 @@ use yii\queue\cli\Queue as CliQueue; use yii\queue\cli\Signal; use \Aws\Sqs\SqsClient; +use Aws\Credentials\CredentialProvider; /** * SQS Queue @@ -39,14 +40,18 @@ public function init() } /** - * Runs all jobs from gearman-queue. + * Runs all jobs from queue. */ public function run() { while (!Signal::isExit() && ($payload = $this->getPayload())) { - list($ttr, $message) = explode(';', $payload->workload(), 2); + list($ttr, $message) = explode(';', $payload['Body'], 2); + + $this->reserve($payload, $ttr); //reserve it so it is not visible to another worker till ttr + if($this->handleMessage(null, $message, $ttr, 1)) { + //if handled then remove from queue $this->release($payload); } } @@ -73,7 +78,7 @@ protected function pushMessage($message, $ttr, $delay, $priority) $model = $this->getClient()->sendMessage([ 'DelaySeconds' => $delay, 'QueueUrl' => $this->url, - 'MessageBody' => "$ttr;message", + 'MessageBody' => "$ttr;$message", ]); if ($model !== null) { @@ -88,14 +93,7 @@ protected function pushMessage($message, $ttr, $delay, $priority) */ public function status($id) { - $status = $this->getClient()->jobStatus($id); - if ($status[0] && !$status[1]) { - return self::STATUS_WAITING; - } elseif ($status[0] && $status[1]) { - return self::STATUS_RESERVED; - } else { - return self::STATUS_DONE; - } + throw new NotSupportedException('Status is not supported in the driver.'); } /** @@ -103,8 +101,26 @@ public function status($id) */ protected function getClient() { + if ($this->key && $this->secret) + { + $provider = [ + 'key' => $this->key, + 'secret' => $this->secret + ]; + } else { + // use default provider if no key and secret key passed + //see - http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html#credential-profiles + $provider = CredentialProvider::defaultProvider(); + } + + $config = [ + 'credential' => $provider, + 'region' => $this->region, + 'version' => $this->version, + ]; + if (!$this->_client) { - $this->_client = SqsClient::factory($this->config); + $this->_client = SqsClient::factory($config); } return $this->_client; @@ -120,13 +136,36 @@ private function getPayload() 'MaxNumberOfMessages' => 1, ]); - return $payload; + $payload = $payload['Messages']; + if ($payload){ + return array_pop($payload); + } + + return null; + + } + + /** + * Set the visibilty to reserve message + * So that no other worker can see this message + */ + private function reserve($payload, $ttr) + { + $receiptHandle = $payload['ReceiptHandle']; + $this->getClient()->changeMessageVisibility(array( + 'QueueUrl' => $this->url, + 'ReceiptHandle' => $queue_handle, + 'VisibilityTimeout' => $ttr + )); } - private function release() + /** + * Mark the message as handled + */ + private function release($payload) { - if (!empty($job->header['ReceiptHandle'])) { - $receiptHandle = $job->header['ReceiptHandle']; + if (!empty($payload['ReceiptHandle'])) { + $receiptHandle = $payload['ReceiptHandle']; $response = $this->getClient()->deleteMessage([ 'QueueUrl' => $this->url, 'ReceiptHandle' => $receiptHandle, From cc97769faa478a1053b167b317cbb936832ca350 Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Sun, 10 Sep 2017 13:36:17 +0530 Subject: [PATCH 07/11] final commit --- composer.json | 4 ---- 1 file changed, 4 deletions(-) diff --git a/composer.json b/composer.json index 53669102d5..92c6858b87 100644 --- a/composer.json +++ b/composer.json @@ -8,10 +8,6 @@ { "name": "Roman Zhuravlev", "email": "zhuravljov@gmail.com" - }, - { - "name": "Manoj Malviya", - "email": "manojm@girnarsoft.com" } ], "support": { From d0cdc2701d8e79539c096f90ac1ace8ae865e015 Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Sun, 10 Sep 2017 13:37:31 +0530 Subject: [PATCH 08/11] final commit --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 92c6858b87..bc45b0a5ed 100644 --- a/composer.json +++ b/composer.json @@ -1,5 +1,5 @@ { - "name": "manoj-malviya/yii-2-queue", + "name": "yiisoft/yii2-queue", "description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk, SQS and Gearman", "type": "yii2-extension", "keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman", "sqs"], From d5810703f6061be7212679bc1ec1ec5ec7c17a72 Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Sun, 10 Sep 2017 17:48:16 +0530 Subject: [PATCH 09/11] fixed psr-2 related issues and added missing comments --- src/drivers/sqs/Queue.php | 49 ++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/src/drivers/sqs/Queue.php b/src/drivers/sqs/Queue.php index 7a33964404..36dcbe0b4c 100644 --- a/src/drivers/sqs/Queue.php +++ b/src/drivers/sqs/Queue.php @@ -15,15 +15,39 @@ */ class Queue extends CliQueue { + /** + * @var SqsClient + */ + private $_client; + /** * The SQS url. * @var string */ public $url; + /** + * aws access key + * @var string + */ public $key = ''; + + /** + * aws secret + * @var string + */ public $secret = ''; + + /** + * region where queue is hosted. + * @var string + */ public $region = ''; + + /** + * API version + * @var string + */ public $version = 'latest'; /** @@ -49,8 +73,7 @@ public function run() $this->reserve($payload, $ttr); //reserve it so it is not visible to another worker till ttr - if($this->handleMessage(null, $message, $ttr, 1)) - { + if ($this->handleMessage(null, $message, $ttr, 1)) { //if handled then remove from queue $this->release($payload); } @@ -70,8 +93,7 @@ public function listen() */ protected function pushMessage($message, $ttr, $delay, $priority) { - if ($priority) - { + if ($priority) { throw new NotSupportedException('Priority is not supported in this driver'); } @@ -101,14 +123,13 @@ public function status($id) */ protected function getClient() { - if ($this->key && $this->secret) - { + if ($this->key && $this->secret) { $provider = [ 'key' => $this->key, 'secret' => $this->secret ]; } else { - // use default provider if no key and secret key passed + // use default provider if no key and secret passed //see - http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html#credential-profiles $provider = CredentialProvider::defaultProvider(); } @@ -126,8 +147,9 @@ protected function getClient() return $this->_client; } - private $_client; - + /** + * + */ private function getPayload() { $payload = $this->getClient()->receiveMessage([ @@ -137,17 +159,19 @@ private function getPayload() ]); $payload = $payload['Messages']; - if ($payload){ + if ($payload) { return array_pop($payload); } return null; - } /** * Set the visibilty to reserve message * So that no other worker can see this message + * + * @param array $payload + * @param int $ttr */ private function reserve($payload, $ttr) { @@ -161,6 +185,9 @@ private function reserve($payload, $ttr) /** * Mark the message as handled + * + * @param array $payload + * @return boolean */ private function release($payload) { From ff2ddc500bcd5628ac0e0b2de6e1cf100d1cf918 Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Sun, 10 Sep 2017 21:32:31 +0530 Subject: [PATCH 10/11] Updated command to match driver --- src/drivers/sqs/Command.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/drivers/sqs/Command.php b/src/drivers/sqs/Command.php index 47c40e6938..7d1c5a0094 100644 --- a/src/drivers/sqs/Command.php +++ b/src/drivers/sqs/Command.php @@ -10,9 +10,9 @@ use yii\queue\cli\Command as CliCommand; /** - * Manages application gearman-queue. + * Manages application aws sqs-queue. * - * @author Roman Zhuravlev + * @author Manoj Malviya */ class Command extends CliCommand { @@ -22,7 +22,7 @@ class Command extends CliCommand public $queue; /** - * Runs all jobs from gearman-queue. + * Runs all jobs from sqs. * It can be used as cron job. */ public function actionRun() @@ -31,7 +31,7 @@ public function actionRun() } /** - * Listens gearman-queue and runs new jobs. + * Listens sqs and runs new jobs. * It can be used as demon process. */ public function actionListen() From 4d925212a4c094bc60a893a4ee442b7ba5b64e88 Mon Sep 17 00:00:00 2001 From: Manoj Malviya Date: Wed, 13 Sep 2017 11:42:29 +0530 Subject: [PATCH 11/11] Fixed worng variable name --- src/drivers/sqs/Queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drivers/sqs/Queue.php b/src/drivers/sqs/Queue.php index 36dcbe0b4c..a2dad72f6e 100644 --- a/src/drivers/sqs/Queue.php +++ b/src/drivers/sqs/Queue.php @@ -178,7 +178,7 @@ private function reserve($payload, $ttr) $receiptHandle = $payload['ReceiptHandle']; $this->getClient()->changeMessageVisibility(array( 'QueueUrl' => $this->url, - 'ReceiptHandle' => $queue_handle, + 'ReceiptHandle' => $receiptHandle, 'VisibilityTimeout' => $ttr )); }