Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]
### Changed
- `Phlib\JobQueue\AwsSqs\JobQueue` optionally accepts a `$groupKey` parameter to set `MessageGroupId` on the message

## [2.1.0] - 2025-08-19
### Added
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ CREATE TABLE `scheduled_queue` (
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
```

## SQS Fair Queues
JobQueue can automatically set `MessageGroupId` from a parameter on the job's body see [the AWS documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fair-queues.html) for more information.
``` php
$client = new \Aws\Sqs\SqsClient(['region' => 'eu-west-1', 'credentials' => ['key' => 'foo', 'secret' => 'bar']]);
$db = new \Phlib\Db\Adapter(['host' => '127.0.0.1', 'dbname' => 'example']);
$scheduler = new \Phlib\JobQueue\Scheduler\DbScheduler($db, 300, 600, true);

$jobQueue = new \Phlib\JobQueue\AwsSqs\JobQueue($client, $scheduler, '', 'tenantId'); //tenantId from the job body will be used as the MessageGroupId
$job = new \Phlib\JobQueue\Job('my-queue', ['foo' => 'bar', 'tenantId' => 'tenant-26']); //MessageGroupId will get set to 'tenant-26'
$jobQueue->put($job);
```

## License

This package is free software: you can redistribute it and/or modify
Expand Down
42 changes: 37 additions & 5 deletions src/AwsSqs/JobQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,18 @@ class JobQueue implements BatchableJobQueueInterface
*/
private $queuePrefix;

public function __construct(SqsClient $client, SchedulerInterface $scheduler, $queuePrefix = '')
{
private ?string $groupKey;

public function __construct(
SqsClient $client,
SchedulerInterface $scheduler,
$queuePrefix = '',
?string $groupKey = null
) {
$this->client = $client;
$this->scheduler = $scheduler;
$this->queuePrefix = $queuePrefix;
$this->groupKey = $groupKey;
}

/**
Expand All @@ -58,11 +65,13 @@ public function put(JobInterface $job): self
return $this;
}

$this->client->sendMessage([
$message = $this->getMessageWithGroupId($job, [
'QueueUrl' => $this->getQueueUrlWithPrefix($job->getQueue()),
'DelaySeconds' => $job->getDelay(),
'MessageBody' => JobFactory::serializeBody($job),
]);

$this->client->sendMessage($message);
return $this;
} catch (SqsException $exception) {
throw new RuntimeException($exception->getMessage(), $exception->getCode(), $exception);
Expand All @@ -80,11 +89,11 @@ public function putBatch(array $jobs): self
continue;
}

$queues[$job->getQueue()][] = [
$queues[$job->getQueue()][] = $this->getMessageWithGroupId($job, [
'Id' => (string) $key,
'DelaySeconds' => $job->getDelay(),
'MessageBody' => JobFactory::serializeBody($job),
];
]);
}

foreach ($queues as $queue => $jobs) {
Expand Down Expand Up @@ -209,4 +218,27 @@ private function determineDeadletterQueue($queue)
throw new RuntimeException("Specified queue '{$name}' does not have a Redrive Policy");
}
}

private function getMessageWithGroupId(JobInterface $job, array $message): array
{
if (!$this->groupKey) {
return $message;
}

$body = $job->getBody();

$groupId = null;

if (is_array($body) && isset($body[$this->groupKey])) {
$groupId = $body[$this->groupKey];
} elseif (is_object($body)) {
$groupId = $body->{$this->groupKey} ?? null;
}

if ($groupId !== null) {
$message['MessageGroupId'] = (string) $groupId;
}

return $message;
}
}
Loading