Skip to content

Commit

Permalink
Fully support SQS with auto queue creation
Browse files Browse the repository at this point in the history
  • Loading branch information
siosphere committed Oct 23, 2018
1 parent e2a6a14 commit 25d7318
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 18 deletions.
64 changes: 46 additions & 18 deletions Virge/Graphite/Service/Provider/SqsProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
*/
class SqsProvider extends AbstractProvider
{

const ERROR_NON_EXISTENT_QUEUE = 'AWS.SimpleQueueService.NonExistentQueue';

protected $client;

/**
Expand All @@ -22,7 +23,7 @@ class SqsProvider extends AbstractProvider
*/
public function push($queueName, Task $task) : bool
{
$serializedTask = serialize($task);
$serializedTask = base64_encode(serialize($task));

$params = [
'MessageBody' => $serializedTask,
Expand All @@ -33,6 +34,11 @@ public function push($queueName, Task $task) : bool
$result = $this->getClient()->sendMessage($params);
return true;
} catch (AwsException $e) {
if($e->getAwsErrorCode() === self::ERROR_NON_EXISTENT_QUEUE) {
if($this->createQueue($queueName)) {
return $this->push($queueName, $task);
}
}
error_log($e->getMessage());
return false;
}
Expand All @@ -45,20 +51,21 @@ public function push($queueName, Task $task) : bool
public function listen($queueName, $callback)
{
try {
$result = $client->setQueueAttributes(array(
'Attributes' => [
'ReceiveMessageWaitTimeSeconds' => 20
],
$result = $this->getClient()->receiveMessage([
'MaxNumberOfMessages' => 1,
'WaitTimeSeconds' => 20,
'QueueUrl' => $this->getQueueUrl($queueName),
));

foreach($result->get('Messages') as $message) {
try {
$task = unserialize($message['Body']);
call_user_func($callback, $task);
$this->ack($message['ReceiptHandle']);
} catch(\Throwable $err) {
Cli::output($err->getMessage());
]);

if($result->get('Messages')) {
foreach($result->get('Messages') as $message) {
try {
$task = unserialize(base64_decode($message['Body']));
call_user_func($callback, $task);
$this->ack($queueName, $message['ReceiptHandle']);
} catch(\Throwable $err) {
Cli::output($err->getMessage());
}
}
}

Expand All @@ -72,18 +79,34 @@ protected function ack($queueName, $receiptHandle)
{
return $this->getClient()->deleteMessage([
'QueueUrl' => $this->getQueueUrl($queueName),
'ReceiptHandle' => $result->get('Messages')[0]['ReceiptHandle']
'ReceiptHandle' => $receiptHandle,
]);
}

protected function createQueue($queueName) : bool
{
try {
$result = $this->getClient()->createQueue([
'Attributes' => [
'ReceiveMessageWaitTimeSeconds' => 20
],
'QueueName' => $this->normalizeQueueName($queueName),
]);
return true;
} catch(AwsException $ex) {
error_log($ex->getMessage());
return false;
}
}

protected function getClient() : SqsClient
{
if($this->client) {
return $this->client;
}

return $this->client = new SqsClient([
'profile' => Config::get('queue', 'aws.profile') ?? 'default',
'profile' => Config::get('queue', 'aws.profile'),
'region' => Config::get('queue', 'aws.region') ?? 'us-east-1',
'version' => Config::get('queue', 'aws.version') ?? '2012-11-05',
'credentials' => [
Expand All @@ -95,6 +118,11 @@ protected function getClient() : SqsClient

protected function getQueueUrl(string $queueName) : string
{
return 'https://sqs.'.Config::get('queue', 'aws.region').'.amazonaws.com/'.Config::get('queue', 'aws.account_id').'/' . $queueName;
return 'https://sqs.'.Config::get('queue', 'aws.region').'.amazonaws.com/'.Config::get('queue', 'aws.account_id').'/' . $this->normalizeQueueName($queueName);
}

protected function normalizeQueueName(string $queueName) : string
{
return str_replace(':', '_', $queueName);
}
}
1 change: 1 addition & 0 deletions Virge/Graphite/config/services.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php

use Virge\Graphite\Service\QueueService;
use Virge\Core\Config;
use Virge\Virge;

/**
Expand Down

0 comments on commit 25d7318

Please sign in to comment.