diff --git a/src/Illuminate/Bus/BusServiceProvider.php b/src/Illuminate/Bus/BusServiceProvider.php index bd6192d0c48e..4031e15f441c 100644 --- a/src/Illuminate/Bus/BusServiceProvider.php +++ b/src/Illuminate/Bus/BusServiceProvider.php @@ -2,10 +2,12 @@ namespace Illuminate\Bus; +use Aws\DynamoDb\DynamoDbClient; use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract; use Illuminate\Contracts\Bus\QueueingDispatcher as QueueingDispatcherContract; use Illuminate\Contracts\Queue\Factory as QueueFactoryContract; use Illuminate\Contracts\Support\DeferrableProvider; +use Illuminate\Support\Arr; use Illuminate\Support\ServiceProvider; class BusServiceProvider extends ServiceProvider implements DeferrableProvider @@ -41,7 +43,13 @@ public function register() */ protected function registerBatchServices() { - $this->app->singleton(BatchRepository::class, DatabaseBatchRepository::class); + $this->app->singleton(BatchRepository::class, function ($app) { + $driver = $app->config->get('queue.batching.driver', 'database'); + + return $driver === 'dynamodb' + ? $app->make(DynamoBatchRepository::class) + : $app->make(DatabaseBatchRepository::class); + }); $this->app->singleton(DatabaseBatchRepository::class, function ($app) { return new DatabaseBatchRepository( @@ -50,6 +58,32 @@ protected function registerBatchServices() $app->config->get('queue.batching.table', 'job_batches') ); }); + + $this->app->singleton(DynamoBatchRepository::class, function ($app) { + $config = $app->config->get('queue.batching'); + + $dynamoConfig = [ + 'region' => $config['region'], + 'version' => 'latest', + 'endpoint' => $config['endpoint'] ?? null, + ]; + + if (! empty($config['key']) && ! empty($config['secret'])) { + $dynamoConfig['credentials'] = Arr::only( + $config, + ['key', 'secret', 'token'] + ); + } + + return new DynamoBatchRepository( + $app->make(BatchFactory::class), + new DynamoDbClient($dynamoConfig), + $app->config->get('app.name'), + $app->config->get('queue.batching.table', 'job_batches'), + ttl: $app->config->get('queue.batching.ttl', null), + ttlAttribute: $app->config->get('queue.batching.ttl_attribute', 'ttl'), + ); + }); } /** diff --git a/src/Illuminate/Bus/DynamoBatchRepository.php b/src/Illuminate/Bus/DynamoBatchRepository.php new file mode 100644 index 000000000000..25a4d4a90ebc --- /dev/null +++ b/src/Illuminate/Bus/DynamoBatchRepository.php @@ -0,0 +1,535 @@ +factory = $factory; + $this->dynamoDbClient = $dynamoDbClient; + $this->applicationName = $applicationName; + $this->table = $table; + $this->ttl = $ttl; + $this->ttlAttribute = $ttlAttribute; + $this->marshaler = new Marshaler; + } + + /** + * Retrieve a list of batches. + * + * @param int $limit + * @param mixed $before + * @return \Illuminate\Bus\Batch[] + */ + public function get($limit = 50, $before = null) + { + $condition = 'application = :application'; + + if ($before) { + $condition = 'application = :application AND id < :id'; + } + + $result = $this->dynamoDbClient->query([ + 'TableName' => $this->table, + 'KeyConditionExpression' => $condition, + 'ExpressionAttributeValues' => array_filter([ + ':application' => ['S' => $this->applicationName], + ':id' => array_filter(['S' => $before]), + ]), + 'Limit' => $limit, + ]); + + return array_map( + fn ($b) => $this->toBatch($this->marshaler->unmarshalItem($b, mapAsObject: true)), + $result['Items'] + ); + } + + /** + * Retrieve information about an existing batch. + * + * @param string $batchId + * @return \Illuminate\Bus\Batch|null + */ + public function find(string $batchId) + { + if ($batchId === '') { + return null; + } + + $b = $this->dynamoDbClient->getItem([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + ]); + + if (! isset($b['Item'])) { + // If we didn't find it via a standard read, attempt consistent read... + $b = $this->dynamoDbClient->getItem([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + 'ConsistentRead' => true, + ]); + + if (! isset($b['Item'])) { + return null; + } + } + + $batch = $this->marshaler->unmarshalItem($b['Item'], mapAsObject: true); + + if ($batch) { + return $this->toBatch($batch); + } + } + + /** + * Store a new pending batch. + * + * @param \Illuminate\Bus\PendingBatch $batch + * @return \Illuminate\Bus\Batch + */ + public function store(PendingBatch $batch) + { + $id = (string) Str::orderedUuid(); + + $batch = [ + 'id' => $id, + 'name' => $batch->name, + 'total_jobs' => 0, + 'pending_jobs' => 0, + 'failed_jobs' => 0, + 'failed_job_ids' => [], + 'options' => $this->serialize($batch->options ?? []), + 'created_at' => time(), + 'cancelled_at' => null, + 'finished_at' => null, + ]; + + if (! is_null($this->ttl)) { + $batch[$this->ttlAttribute] = time() + $this->ttl; + } + + $this->dynamoDbClient->putItem([ + 'TableName' => $this->table, + 'Item' => $this->marshaler->marshalItem( + array_merge(['application' => $this->applicationName], $batch) + ), + ]); + + return $this->find($id); + } + + /** + * Increment the total number of jobs within the batch. + * + * @param string $batchId + * @param int $amount + * @return void + */ + public function incrementTotalJobs(string $batchId, int $amount) + { + $update = 'SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val'; + + if ($this->ttl) { + $update = "SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val, #{$this->ttlAttribute} = :ttl"; + } + + $this->dynamoDbClient->updateItem(array_filter([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + 'UpdateExpression' => $update, + 'ExpressionAttributeValues' => array_filter([ + ':val' => ['N' => "$amount"], + ':ttl' => array_filter(['N' => $this->getExpiryTime()]), + ]), + 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(), + 'ReturnValues' => 'ALL_NEW', + ])); + } + + /** + * Decrement the total number of pending jobs for the batch. + * + * @param string $batchId + * @param string $jobId + * @return \Illuminate\Bus\UpdatedBatchJobCounts + */ + public function decrementPendingJobs(string $batchId, string $jobId) + { + $update = 'SET pending_jobs = pending_jobs - :inc'; + + if ($this->ttl !== null) { + $update = "SET pending_jobs = pending_jobs - :inc, #{$this->ttlAttribute} = :ttl"; + } + + $batch = $this->dynamoDbClient->updateItem(array_filter([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + 'UpdateExpression' => $update, + 'ExpressionAttributeValues' => array_filter([ + ':inc' => ['N' => '1'], + ':ttl' => array_filter(['N' => $this->getExpiryTime()]), + ]), + 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(), + 'ReturnValues' => 'ALL_NEW', + ])); + + $values = $this->marshaler->unmarshalItem($batch['Attributes']); + + return new UpdatedBatchJobCounts( + $values['pending_jobs'], + $values['failed_jobs'] + ); + } + + /** + * Increment the total number of failed jobs for the batch. + * + * @param string $batchId + * @param string $jobId + * @return \Illuminate\Bus\UpdatedBatchJobCounts + */ + public function incrementFailedJobs(string $batchId, string $jobId) + { + $update = 'SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId)'; + + if ($this->ttl !== null) { + $update = "SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId), #{$this->ttlAttribute} = :ttl"; + } + + $batch = $this->dynamoDbClient->updateItem(array_filter([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + 'UpdateExpression' => $update, + 'ExpressionAttributeValues' => array_filter([ + ':jobId' => $this->marshaler->marshalValue([$jobId]), + ':inc' => ['N' => '1'], + ':ttl' => array_filter(['N' => $this->getExpiryTime()]), + ]), + 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(), + 'ReturnValues' => 'ALL_NEW', + ])); + + $values = $this->marshaler->unmarshalItem($batch['Attributes']); + + return new UpdatedBatchJobCounts( + $values['pending_jobs'], + $values['failed_jobs'] + ); + } + + /** + * Mark the batch that has the given ID as finished. + * + * @param string $batchId + * @return void + */ + public function markAsFinished(string $batchId) + { + $update = 'SET finished_at = :timestamp'; + + if ($this->ttl !== null) { + $update = "SET finished_at = :timestamp, #{$this->ttlAttribute} = :ttl"; + } + + $this->dynamoDbClient->updateItem(array_filter([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + 'UpdateExpression' => $update, + 'ExpressionAttributeValues' => array_filter([ + ':timestamp' => ['N' => (string) time()], + ':ttl' => array_filter(['N' => $this->getExpiryTime()]), + ]), + 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(), + ])); + } + + /** + * Cancel the batch that has the given ID. + * + * @param string $batchId + * @return void + */ + public function cancel(string $batchId) + { + $update = 'SET cancelled_at = :timestamp, finished_at = :timestamp'; + + if ($this->ttl !== null) { + $update = "SET cancelled_at = :timestamp, finished_at = :timestamp, #{$this->ttlAttribute} = :ttl"; + } + + $this->dynamoDbClient->updateItem(array_filter([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + 'UpdateExpression' => $update, + 'ExpressionAttributeValues' => array_filter([ + ':timestamp' => ['N' => (string) time()], + ':ttl' => array_filter(['N' => $this->getExpiryTime()]), + ]), + 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(), + ])); + } + + /** + * Delete the batch that has the given ID. + * + * @param string $batchId + * @return void + */ + public function delete(string $batchId) + { + $this->dynamoDbClient->deleteItem([ + 'TableName' => $this->table, + 'Key' => [ + 'application' => ['S' => $this->applicationName], + 'id' => ['S' => $batchId], + ], + ]); + } + + /** + * Execute the given Closure within a storage specific transaction. + * + * @param \Closure $callback + * @return mixed + */ + public function transaction(Closure $callback) + { + return $callback(); + } + + /** + * Rollback the last database transaction for the connection. + * + * @return void + */ + public function rollBack() + { + } + + /** + * Convert the given raw batch to a Batch object. + * + * @param object $batch + * @return \Illuminate\Bus\Batch + */ + protected function toBatch($batch) + { + return $this->factory->make( + $this, + $batch->id, + $batch->name, + (int) $batch->total_jobs, + (int) $batch->pending_jobs, + (int) $batch->failed_jobs, + $batch->failed_job_ids, + $this->unserialize($batch->options) ?? [], + CarbonImmutable::createFromTimestamp($batch->created_at), + $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at) : $batch->cancelled_at, + $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at) : $batch->finished_at + ); + } + + /** + * Create the underlying DynamoDB table. + * + * @return void + */ + public function createAwsDynamoTable(): void + { + $definition = [ + 'TableName' => $this->table, + 'AttributeDefinitions' => [ + [ + 'AttributeName' => 'application', + 'AttributeType' => 'S', + ], + [ + 'AttributeName' => 'id', + 'AttributeType' => 'S', + ], + ], + 'KeySchema' => [ + [ + 'AttributeName' => 'application', + 'KeyType' => 'HASH', + ], + [ + 'AttributeName' => 'id', + 'KeyType' => 'RANGE', + ], + ], + 'BillingMode' => 'PAY_PER_REQUEST', + ]; + + $this->dynamoDbClient->createTable($definition); + + if (! is_null($this->ttl)) { + $this->dynamoDbClient->updateTimeToLive([ + 'TableName' => $this->table, + 'TimeToLiveSpecification' => [ + 'AttributeName' => $this->ttlAttribute, + 'Enabled' => true, + ], + ]); + } + } + + /** + * Delete the underlying DynamoDB table. + */ + public function deleteAwsDynamoTable(): void + { + $this->dynamoDbClient->deleteTable([ + 'TableName' => $this->table, + ]); + } + + /** + * Get the expiry time based on the configured time-to-live. + * + * @return string|null + */ + protected function getExpiryTime(): ?string + { + return is_null($this->ttl) ? null : (string) (time() + $this->ttl); + } + + /** + * Get the expression attribute name for the time-to-live attribute. + * + * @return array + */ + protected function ttlExpressionAttributeName(): array + { + return is_null($this->ttl) ? [] : ["#{$this->ttlAttribute}" => $this->ttlAttribute]; + } + + /** + * Serialize the given value. + * + * @param mixed $value + * @return string + */ + protected function serialize($value) + { + return serialize($value); + } + + /** + * Unserialize the given value. + * + * @param string $serialized + * @return mixed + */ + protected function unserialize($serialized) + { + return unserialize($serialized); + } + + /** + * Get the underlying DynamoDB client instance. + * + * @return \Aws\DynamoDb\DynamoDbClient + */ + public function getDynamoClient(): DynamoDbClient + { + return $this->dynamoDbClient; + } + + /** + * The the name of the table that contains the batch records. + * + * @return string + */ + public function getTable(): string + { + return $this->table; + } +} diff --git a/tests/Integration/Queue/DynamoBatchTest.php b/tests/Integration/Queue/DynamoBatchTest.php new file mode 100644 index 000000000000..43086a39d6aa --- /dev/null +++ b/tests/Integration/Queue/DynamoBatchTest.php @@ -0,0 +1,198 @@ +set('queue.batching', [ + 'driver' => 'dynamodb', + 'region' => 'us-west-2', + 'endpoint' => static::DYNAMODB_ENDPOINT, + 'key' => 'key', + 'secret' => 'secret', + ]); + } + + public function setUp(): void + { + parent::setUp(); + + BatchRunRecorder::reset(); + app(DynamoBatchRepository::class)->createAwsDynamoTable(); + } + + public function tearDown(): void + { + app(DynamoBatchRepository::class)->deleteAwsDynamoTable(); + + parent::tearDown(); + } + + public function test_running_a_batch() + { + Bus::batch([ + new BatchJob('1'), + new BatchJob('2'), + ])->dispatch(); + + $this->assertEquals(['1', '2'], BatchRunRecorder::$results); + } + + public function test_retrieve_batch_by_id() + { + $batch = Bus::batch([ + new BatchJob('1'), + new BatchJob('2'), + ])->dispatch(); + + /** @var DynamoBatchRepository */ + $repo = app(DynamoBatchRepository::class); + $retrieved = $repo->find($batch->id); + $this->assertEquals(2, $retrieved->totalJobs); + $this->assertEquals(0, $retrieved->failedJobs); + $this->assertTrue($retrieved->finishedAt->between(now()->subSecond(30), now())); + } + + public function test_retrieve_non_existent_batch() + { + /** @var DynamoBatchRepository */ + $repo = app(DynamoBatchRepository::class); + $retrieved = $repo->find(Str::orderedUuid()); + $this->assertNull($retrieved); + } + + public function test_delete_batch_by_id() + { + $batch = Bus::batch([ + new BatchJob('1'), + ])->dispatch(); + + /** @var DynamoBatchRepository */ + $repo = app(DynamoBatchRepository::class); + $retrieved = $repo->find($batch->id); + $this->assertNotNull($retrieved); + $repo->delete($retrieved->id); + $retrieved = $repo->find($batch->id); + $this->assertNull($retrieved); + } + + public function test_delete_non_existent_batch() + { + /** @var DynamoBatchRepository */ + $repo = app(DynamoBatchRepository::class); + $repo->delete(Str::orderedUuid()); + // Ensure we didn't throw an exception + $this->assertTrue(true); + } + + public function test_batch_with_failing_job() + { + $batch = Bus::batch([ + new BatchJob('1'), + new FailingJob('2'), + ])->dispatch(); + + /** @var DynamoBatchRepository */ + $repo = app(DynamoBatchRepository::class); + $retrieved = $repo->find($batch->id); + $this->assertEquals(2, $retrieved->totalJobs); + $this->assertEquals(1, $retrieved->failedJobs); + $this->assertTrue($retrieved->finishedAt->between(now()->subSecond(30), now())); + $this->assertTrue($retrieved->cancelledAt->between(now()->subSecond(30), now())); + } + + public function test_get_batches() + { + $batches = [ + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + Bus::batch([new BatchJob('1')])->dispatch(), + ]; + + /** @var DynamoBatchRepository */ + $repo = app(DynamoBatchRepository::class); + $this->assertCount(10, $repo->get()); + $this->assertCount(6, $repo->get(6)); + $this->assertCount(6, $repo->get(100, $batches[6]->id)); + $this->assertCount(0, $repo->get(100, $batches[0]->id)); + $this->assertCount(9, $repo->get(100, $batches[9]->id)); + $this->assertCount(10, $repo->get(100, Str::orderedUuid())); + } +} + +class BatchJob implements ShouldQueue +{ + use Batchable, Dispatchable, InteractsWithQueue, Queueable; + + public static $results = []; + + public string $id; + + public function __construct(string $id) + { + $this->id = $id; + } + + public function handle() + { + BatchRunRecorder::record($this->id); + } +} + +class FailingJob extends BatchJob +{ + public function handle() + { + BatchRunRecorder::recordFailure($this->id); + $this->fail(); + } +} + +class BatchRunRecorder +{ + public static $results = []; + + public static $failures = []; + + public static function record(string $id) + { + self::$results[] = $id; + } + + public static function recordFailure(string $message) + { + self::$failures[] = $message; + + return $message; + } + + public static function reset() + { + self::$results = []; + self::$failures = []; + } +} diff --git a/tests/Integration/Queue/DynamoBatchTestWithTTL.php b/tests/Integration/Queue/DynamoBatchTestWithTTL.php new file mode 100644 index 000000000000..4522b7f3b2fa --- /dev/null +++ b/tests/Integration/Queue/DynamoBatchTestWithTTL.php @@ -0,0 +1,19 @@ +set('queue.batching', [ + 'driver' => 'dynamodb', + 'region' => 'us-west-2', + 'endpoint' => static::DYNAMODB_ENDPOINT, + 'key' => 'key', + 'secret' => 'secret', + 'ttl' => 1, + 'ttlAttribute' => 'ttl_value', + ]); + } +}