From 55826147af150a51ab5ca19c1be689cc222ac387 Mon Sep 17 00:00:00 2001 From: AbiriAmir Date: Sat, 30 Jul 2022 01:17:33 +0430 Subject: [PATCH] [9.x] Prevent redis crash when large number of jobs are scheduled for a specific time (#43310) * Added support for limitting the number of jobs to migrate in each job in redis queue in order to prevent redis crash when large number of jobs are scheduled for a specific time * Added support for limitting the number of jobs to migrate in each job in redis queue in order to prevent redis crash when large number of jobs are scheduled for a specific time * formatting Co-authored-by: amir Co-authored-by: Taylor Otwell --- .../Queue/Connectors/RedisConnector.php | 3 ++- src/Illuminate/Queue/LuaScripts.php | 2 +- src/Illuminate/Queue/RedisQueue.php | 17 +++++++++++++++-- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/Illuminate/Queue/Connectors/RedisConnector.php b/src/Illuminate/Queue/Connectors/RedisConnector.php index 966fe49071a8..d442eea99f11 100644 --- a/src/Illuminate/Queue/Connectors/RedisConnector.php +++ b/src/Illuminate/Queue/Connectors/RedisConnector.php @@ -47,7 +47,8 @@ public function connect(array $config) $config['connection'] ?? $this->connection, $config['retry_after'] ?? 60, $config['block_for'] ?? null, - $config['after_commit'] ?? null + $config['after_commit'] ?? null, + $config['migration_batch_size'] ?? -1 ); } } diff --git a/src/Illuminate/Queue/LuaScripts.php b/src/Illuminate/Queue/LuaScripts.php index fa278426bdbb..5452c116c198 100644 --- a/src/Illuminate/Queue/LuaScripts.php +++ b/src/Illuminate/Queue/LuaScripts.php @@ -106,7 +106,7 @@ public static function migrateExpiredJobs() { return <<<'LUA' -- Get all of the jobs with an expired "score"... -local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) +local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, ARGV[2]) -- If we have values in the array, we will remove them from the first queue -- and add them onto the destination queue in chunks of 100, which moves diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index f623f04b6683..4c376a81a2c0 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -45,6 +45,15 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue */ protected $blockFor = null; + /** + * The batch size to use when migrating delayed / expired jobs onto the primary queue. + * + * Negative values are infinite. + * + * @var int + */ + protected $migrationBatchSize = -1; + /** * Create a new Redis queue instance. * @@ -54,6 +63,7 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * @param int $retryAfter * @param int|null $blockFor * @param bool $dispatchAfterCommit + * @param int $migrationBatchSize * @return void */ public function __construct(Redis $redis, @@ -61,7 +71,8 @@ public function __construct(Redis $redis, $connection = null, $retryAfter = 60, $blockFor = null, - $dispatchAfterCommit = false) + $dispatchAfterCommit = false, + $migrationBatchSize = -1) { $this->redis = $redis; $this->default = $default; @@ -69,6 +80,7 @@ public function __construct(Redis $redis, $this->connection = $connection; $this->retryAfter = $retryAfter; $this->dispatchAfterCommit = $dispatchAfterCommit; + $this->migrationBatchSize = $migrationBatchSize; } /** @@ -243,12 +255,13 @@ protected function migrate($queue) * * @param string $from * @param string $to + * @param int $limit * @return array */ public function migrateExpiredJobs($from, $to) { return $this->getConnection()->eval( - LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime() + LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime(), $this->migrationBatchSize ); }