Skip to content

Commit

Permalink
Merge pull request #38 from trandangtri/feature/issue-34
Browse files Browse the repository at this point in the history
Mark ContentBasedDeduplication as optional attribute of FIFO queue only
  • Loading branch information
trandangtri committed Sep 21, 2018
2 parents 9b3d5a9 + 55743a3 commit 06bf3ca
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 44 deletions.
49 changes: 28 additions & 21 deletions DependencyInjection/Compiler/SQSQueuePass.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;
use TriTran\SqsQueueBundle\Service\BaseQueue;
use TriTran\SqsQueueBundle\Service\QueueManager;

/**
* Class SQSQueuePass
Expand Down Expand Up @@ -51,6 +52,32 @@ public function process(ContainerBuilder $container)
);
}

$queueAttr = [
'DelaySeconds' =>
$queueOption['attributes']['delay_seconds'] ?? 0,
'MaximumMessageSize' =>
$queueOption['attributes']['maximum_message_size'] ?? 262144,
'MessageRetentionPeriod' =>
$queueOption['attributes']['message_retention_period'] ?? 345600,
'ReceiveMessageWaitTimeSeconds' =>
$queueOption['attributes']['receive_message_wait_time_seconds'] ?? 20,
'VisibilityTimeout' =>
$queueOption['attributes']['visibility_timeout'] ?? 30,
'RedrivePolicy' => !empty($queueOption['attributes']['redrive_policy']['dead_letter_queue'])
? json_encode([
'deadLetterTargetArn' =>
$queueOption['attributes']['redrive_policy']['dead_letter_queue'] ?? '',
'maxReceiveCount' =>
$queueOption['attributes']['redrive_policy']['max_receive_count'] ?? 5,
]) : ''
];
if (QueueManager::isFifoQueue($queueName)) {
$queueAttr = array_merge($queueAttr, [
'ContentBasedDeduplication' =>
$queueOption['attributes']['content_based_deduplication'] ?? true
]);
}

$queueDefinition = new Definition(BaseQueue::class);
$queueDefinition
->setFactory(
Expand All @@ -64,27 +91,7 @@ public function process(ContainerBuilder $container)
$queueName,
$queueOption['queue_url'],
$callable,
[
'DelaySeconds' =>
$queueOption['attributes']['delay_seconds'] ?? 0,
'MaximumMessageSize' =>
$queueOption['attributes']['maximum_message_size'] ?? 262144,
'MessageRetentionPeriod' =>
$queueOption['attributes']['message_retention_period'] ?? 345600,
'ReceiveMessageWaitTimeSeconds' =>
$queueOption['attributes']['receive_message_wait_time_seconds'] ?? 20,
'VisibilityTimeout' =>
$queueOption['attributes']['visibility_timeout'] ?? 30,
'RedrivePolicy' => !empty($queueOption['attributes']['redrive_policy']['dead_letter_queue'])
? json_encode([
'deadLetterTargetArn' =>
$queueOption['attributes']['redrive_policy']['dead_letter_queue'] ?? '',
'maxReceiveCount' =>
$queueOption['attributes']['redrive_policy']['max_receive_count'] ?? 5,
]) : '',
'ContentBasedDeduplication' =>
$queueOption['attributes']['content_based_deduplication'] ?? false
]
$queueAttr
]);

$queueId = sprintf('tritran.sqs_queue.%s', $queueName);
Expand Down
2 changes: 1 addition & 1 deletion Service/BaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,6 @@ public function getClient(): SqsClient
*/
final public function isFIFO(): bool
{
return '.fifo' === substr($this->getQueueName(), -5);
return QueueManager::isFifoQueue($this->getQueueName());
}
}
28 changes: 23 additions & 5 deletions Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ class QueueManager
'MessageRetentionPeriod' => 345600, // 4 days
'ReceiveMessageWaitTimeSeconds' => 0,
'VisibilityTimeout' => 30,
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
'RedrivePolicy' => ''
];

const QUEUE_FIFO_ATTR_DEFAULT = [
'ContentBasedDeduplication' => true
];

/**
Expand Down Expand Up @@ -74,11 +77,16 @@ public function createQueue(string $queueName, array $queueAttribute = [])
{
$queryUrl = '';

$queueAttribute = array_filter($queueAttribute, function ($key) {
return array_key_exists($key, self::QUEUE_ATTR_DEFAULT);
$queueAttributeDefault = self::QUEUE_ATTR_DEFAULT;
if (static::isFifoQueue($queueName)) {
$queueAttributeDefault = array_merge($queueAttributeDefault, self::QUEUE_FIFO_ATTR_DEFAULT);
}

$queueAttribute = array_filter($queueAttribute, function ($key) use ($queueAttributeDefault) {
return array_key_exists($key, $queueAttributeDefault);
}, ARRAY_FILTER_USE_KEY);
$attr = [
'Attributes' => array_merge(self::QUEUE_ATTR_DEFAULT, $queueAttribute),
'Attributes' => array_merge($queueAttributeDefault, $queueAttribute),
'QueueName' => $queueName
];

Expand Down Expand Up @@ -166,4 +174,14 @@ public function getClient(): SqsClient
{
return $this->client;
}

/**
* @param $queueName
*
* @return bool
*/
public static function isFifoQueue($queueName): bool
{
return '.fifo' === substr($queueName, -5);
}
}
64 changes: 49 additions & 15 deletions Tests/Unit/DependencyInjection/Compiler/SQSQueuePassTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ public function configurationProvider(): array
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
'RedrivePolicy' => ''
]
]
]
Expand All @@ -131,8 +130,7 @@ public function configurationProvider(): array
'redrive_policy' => [
'dead_letter_queue' => 'basic_dead_letter_queue_1',
'max_receive_count' => 1
],
'content_based_deduplication' => true
]
]
]
],
Expand All @@ -149,8 +147,7 @@ public function configurationProvider(): array
'RedrivePolicy' => json_encode([
'deadLetterTargetArn' => 'basic_dead_letter_queue_1',
'maxReceiveCount' => 1
]),
'ContentBasedDeduplication' => true
])
]
]
]
Expand All @@ -171,8 +168,7 @@ public function configurationProvider(): array
'redrive_policy' => [
'dead_letter_queue' => 'basic_dead_letter_queue_2',
'max_receive_count' => 2
],
'content_based_deduplication' => true
]
]
]
],
Expand All @@ -189,18 +185,44 @@ public function configurationProvider(): array
'RedrivePolicy' => json_encode([
'deadLetterTargetArn' => 'basic_dead_letter_queue_2',
'maxReceiveCount' => 2
]),
])
]
]
]
],
// Case #3: Load a FIFO queue
[
$container,
[
'queue.fifo' => [
'queue_url' => 'fifo-queue-url',
'worker' => $basicWorker,
'attributes' => []
]
],
[
'queue.fifo' => [
'fifo-queue-url',
new Definition($basicWorker),
[
'DelaySeconds' => 0,
'MaximumMessageSize' => 262144,
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => '',
'ContentBasedDeduplication' => true
]
]
]
],
// Case #3: Load multi queues at the same time
// Case #4: Load multi queues at the same time
[
$container,
[
'basic-queue-1' => ['queue_url' => 'basic-url-1', 'worker' => $basicWorker],
'basic-queue-2' => ['queue_url' => 'basic-url-2', 'worker' => $basicWorkerAsService]
'basic-queue-2' => ['queue_url' => 'basic-url-2', 'worker' => $basicWorkerAsService],
'queue.fifo' => ['queue_url' => 'fifo-queue-url', 'worker' => $basicWorkerAsService]
],
[
'basic-queue-1' => [
Expand All @@ -212,25 +234,37 @@ public function configurationProvider(): array
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
'RedrivePolicy' => ''
]
],
'basic-queue-2' => [
'basic-url-2',
new Reference($basicWorkerAsService),
[
'DelaySeconds' => 0,
'MaximumMessageSize' => 262144,
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => ''
]
],
'queue.fifo' => [
'fifo-queue-url',
new Reference($basicWorkerAsService),
[
'DelaySeconds' => 0,
'MaximumMessageSize' => 262144,
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
'ContentBasedDeduplication' => true
]
]
]
]
],

];
}

Expand Down
99 changes: 97 additions & 2 deletions Tests/Unit/Service/BaseQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use Aws\Exception\AwsException;
use Aws\Result;
use Aws\Sqs\SqsClient;
use PHPUnit\Framework\Error as PHPUnitError;
use PHPUnit\Framework\TestCase;
use TriTran\SqsQueueBundle\Service\BaseQueue;
use TriTran\SqsQueueBundle\Service\Message;
Expand All @@ -19,6 +18,52 @@
*/
class BaseQueueTest extends TestCase
{
/**
* @var array
*/
private $errors = [];

/**
*
*/
protected function setUp()
{
$this->errors = [];
set_error_handler([$this, "errorHandler"]);
}

/**
* @param $errno
* @param $errstr
* @param $errfile
* @param $errline
* @param $errcontext
*/
public function errorHandler($errno, $errstr, $errfile, $errline, $errcontext)
{
$this->errors[] = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
}

/**
* @param $errstr
* @param $errno
*/
public function assertError($errstr, $errno)
{
foreach ($this->errors as $error) {
if ($error["errstr"] === $errstr
&& $error["errno"] === $errno) {
$this->assertTrue(true);

return;
}
}
$this->fail(
"Error with level " . $errno . " and message '" . $errstr . "' not found in ",
var_export($this->errors, true)
);
}

/**
* @param array $entries
*
Expand Down Expand Up @@ -131,6 +176,27 @@ public function testSendMessage()
);
}

/**
* Test: send message to a queue
*/
public function testSendPingMessage()
{
$queueUrl = 'queue-url';

$client = $this->getAwsClient();
$client->expects($this->any())
->method('sendMessage')
->with([
'MessageBody' => 'ping',
'MessageAttributes' => [],
'QueueUrl' => $queueUrl
])
->willReturn($this->getAwsResult(['MessageId' => 'new-message-id']));

$queue = new BaseQueue($client, 'queue-name', $queueUrl, new BasicWorker(), []);
$this->assertEquals('new-message-id', $queue->ping());
}

/**
* Test: send message to a FIFO queue
*/
Expand Down Expand Up @@ -161,6 +227,35 @@ public function testSendMessageToFifoQueue()
);
}

/**
* Test: send message to a FIFO queue
*/
public function testSendMessageToFifoQueueWithDelay()
{
$delay = random_int(0, 10);
$messageBody = 'my-message';
$messageAttr = ['x', 'y', 'z'];
$queueUrl = 'queue-url';
$groupId = 'group-name';
$deduplicationId = 'deduplication-id';

$client = $this->getAwsClient();
$client->expects($this->any())
->method('sendMessage')
->with([
'MessageAttributes' => $messageAttr,
'MessageBody' => $messageBody,
'QueueUrl' => $queueUrl,
'MessageGroupId' => $groupId,
'MessageDeduplicationId' => $deduplicationId,
])
->willReturn($this->getAwsResult(['MessageId' => 'new-message-id']));

$queue = new BaseQueue($client, 'queue-name.fifo', $queueUrl, new BasicWorker(), []);
$queue->sendMessage(new Message($messageBody, $messageAttr, $groupId, $deduplicationId), $delay);
$this->assertError('FIFO queues don\'t support per-message delays, only per-queue delays.', E_USER_WARNING);
}

/**
* Test: send message to a queue in failure
*/
Expand Down Expand Up @@ -218,7 +313,7 @@ public function testSendMessageToFifoQueueWarning()

$queue = new BaseQueue($client, 'queue-name.fifo', 'queue-url', new BasicWorker(), []);

$this->expectException(PHPUnitError\Warning::class);
$this->expectException(\InvalidArgumentException::class);
$queue->sendMessage(new Message('my-message', [], ''), random_int(0, 10));
}

Expand Down
Loading

0 comments on commit 06bf3ca

Please sign in to comment.