diff --git a/composer.json b/composer.json index f7b8406da4..bc45b0a5ed 100644 --- a/composer.json +++ b/composer.json @@ -1,8 +1,8 @@ { "name": "yiisoft/yii2-queue", - "description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk and Gearman", + "description": "Yii2 Queue Extension which supported DB, Redis, RabbitMQ, Beanstalk, SQS and Gearman", "type": "yii2-extension", - "keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman"], + "keywords": ["yii", "queue", "async", "gii", "db", "redis", "rabbitmq", "beanstalk", "gearman", "sqs"], "license": "BSD-3-Clause", "authors": [ { @@ -27,14 +27,16 @@ "jeremeamia/superclosure": "*", "yiisoft/yii2-debug": "*", "yiisoft/yii2-gii": "*", - "phpunit/phpunit": "~4.4" + "phpunit/phpunit": "~4.4", + "aws/aws-sdk-php": ">=2.4" }, "suggest": { "ext-pcntl": "Need for process signals.", "yiisoft/yii2-redis": "Need for Redis queue.", "pda/pheanstalk": "Need for Beanstalk queue.", "php-amqplib/php-amqplib": "Need for AMQP queue.", - "ext-gearman": "Need for Gearman queue." + "ext-gearman": "Need for Gearman queue.", + "aws/aws-sdk-php": "Need for aws SQS." }, "autoload": { "psr-4": { @@ -45,7 +47,8 @@ "yii\\queue\\file\\": "src/drivers/file", "yii\\queue\\gearman\\": "src/drivers/gearman", "yii\\queue\\redis\\": "src/drivers/redis", - "yii\\queue\\sync\\": "src/drivers/sync" + "yii\\queue\\sync\\": "src/drivers/sync", + "yii\\queue\\sqs\\": "src/drivers/sqs" } }, "extra": { diff --git a/src/drivers/sqs/Command.php b/src/drivers/sqs/Command.php new file mode 100644 index 0000000000..7d1c5a0094 --- /dev/null +++ b/src/drivers/sqs/Command.php @@ -0,0 +1,41 @@ + + */ +class Command extends CliCommand +{ + /** + * @var Queue + */ + public $queue; + + /** + * Runs all jobs from sqs. + * It can be used as cron job. + */ + public function actionRun() + { + $this->queue->run(); + } + + /** + * Listens sqs and runs new jobs. + * It can be used as demon process. + */ + public function actionListen() + { + $this->queue->listen(); + } +} \ No newline at end of file diff --git a/src/drivers/sqs/Queue.php b/src/drivers/sqs/Queue.php new file mode 100644 index 0000000000..a2dad72f6e --- /dev/null +++ b/src/drivers/sqs/Queue.php @@ -0,0 +1,206 @@ + + */ +class Queue extends CliQueue +{ + /** + * @var SqsClient + */ + private $_client; + + /** + * The SQS url. + * @var string + */ + public $url; + + /** + * aws access key + * @var string + */ + public $key = ''; + + /** + * aws secret + * @var string + */ + public $secret = ''; + + /** + * region where queue is hosted. + * @var string + */ + public $region = ''; + + /** + * API version + * @var string + */ + public $version = 'latest'; + + /** + * @var string command class name + */ + public $commandClass = Command::class; + + /** + * @inheritdoc + */ + public function init() + { + parent::init(); + } + + /** + * Runs all jobs from queue. + */ + public function run() + { + while (!Signal::isExit() && ($payload = $this->getPayload())) { + list($ttr, $message) = explode(';', $payload['Body'], 2); + + $this->reserve($payload, $ttr); //reserve it so it is not visible to another worker till ttr + + if ($this->handleMessage(null, $message, $ttr, 1)) { + //if handled then remove from queue + $this->release($payload); + } + } + } + + /** + * Listens to get new jobs. + */ + public function listen() + { + $this->run(); + } + + /** + * @inheritdoc + */ + protected function pushMessage($message, $ttr, $delay, $priority) + { + if ($priority) { + throw new NotSupportedException('Priority is not supported in this driver'); + } + + $model = $this->getClient()->sendMessage([ + 'DelaySeconds' => $delay, + 'QueueUrl' => $this->url, + 'MessageBody' => "$ttr;$message", + ]); + + if ($model !== null) { + return $model['MessageId']; + } else { + return false; + } + } + + /** + * @inheritdoc + */ + public function status($id) + { + throw new NotSupportedException('Status is not supported in the driver.'); + } + + /** + * @return \Aws\Sqs\SqsClient + */ + protected function getClient() + { + if ($this->key && $this->secret) { + $provider = [ + 'key' => $this->key, + 'secret' => $this->secret + ]; + } else { + // use default provider if no key and secret passed + //see - http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/credentials.html#credential-profiles + $provider = CredentialProvider::defaultProvider(); + } + + $config = [ + 'credential' => $provider, + 'region' => $this->region, + 'version' => $this->version, + ]; + + if (!$this->_client) { + $this->_client = SqsClient::factory($config); + + } + return $this->_client; + } + + /** + * + */ + private function getPayload() + { + $payload = $this->getClient()->receiveMessage([ + 'QueueUrl' => $this->url, + 'AttributeNames' => ['ApproximateReceiveCount'], + 'MaxNumberOfMessages' => 1, + ]); + + $payload = $payload['Messages']; + if ($payload) { + return array_pop($payload); + } + + return null; + } + + /** + * Set the visibilty to reserve message + * So that no other worker can see this message + * + * @param array $payload + * @param int $ttr + */ + private function reserve($payload, $ttr) + { + $receiptHandle = $payload['ReceiptHandle']; + $this->getClient()->changeMessageVisibility(array( + 'QueueUrl' => $this->url, + 'ReceiptHandle' => $receiptHandle, + 'VisibilityTimeout' => $ttr + )); + } + + /** + * Mark the message as handled + * + * @param array $payload + * @return boolean + */ + private function release($payload) + { + if (!empty($payload['ReceiptHandle'])) { + $receiptHandle = $payload['ReceiptHandle']; + $response = $this->getClient()->deleteMessage([ + 'QueueUrl' => $this->url, + 'ReceiptHandle' => $receiptHandle, + ]); + + return $response !== null; + } + + return false; + } +} \ No newline at end of file