Skip to content

Commit

Permalink
[9.x] Prevent redis crash when large number of jobs are scheduled for…
Browse files Browse the repository at this point in the history
… a specific time (#43310)

* Added support for limitting the number of jobs to migrate in each job in redis queue in order to prevent redis crash when large number of jobs are scheduled for a specific time

* Added support for limitting the number of jobs to migrate in each job in redis queue in order to prevent redis crash when large number of jobs are scheduled for a specific time

* formatting

Co-authored-by: amir <amir@abiri.info>
Co-authored-by: Taylor Otwell <taylor@laravel.com>
  • Loading branch information
3 people committed Jul 29, 2022
1 parent 9a6bfc4 commit 5582614
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/RedisConnector.php
Expand Up @@ -47,7 +47,8 @@ public function connect(array $config)
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null,
$config['after_commit'] ?? null
$config['after_commit'] ?? null,
$config['migration_batch_size'] ?? -1
);
}
}
2 changes: 1 addition & 1 deletion src/Illuminate/Queue/LuaScripts.php
Expand Up @@ -106,7 +106,7 @@ public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, ARGV[2])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
Expand Down
17 changes: 15 additions & 2 deletions src/Illuminate/Queue/RedisQueue.php
Expand Up @@ -45,6 +45,15 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue
*/
protected $blockFor = null;

/**
* The batch size to use when migrating delayed / expired jobs onto the primary queue.
*
* Negative values are infinite.
*
* @var int
*/
protected $migrationBatchSize = -1;

/**
* Create a new Redis queue instance.
*
Expand All @@ -54,21 +63,24 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue
* @param int $retryAfter
* @param int|null $blockFor
* @param bool $dispatchAfterCommit
* @param int $migrationBatchSize
* @return void
*/
public function __construct(Redis $redis,
$default = 'default',
$connection = null,
$retryAfter = 60,
$blockFor = null,
$dispatchAfterCommit = false)
$dispatchAfterCommit = false,
$migrationBatchSize = -1)
{
$this->redis = $redis;
$this->default = $default;
$this->blockFor = $blockFor;
$this->connection = $connection;
$this->retryAfter = $retryAfter;
$this->dispatchAfterCommit = $dispatchAfterCommit;
$this->migrationBatchSize = $migrationBatchSize;
}

/**
Expand Down Expand Up @@ -243,12 +255,13 @@ protected function migrate($queue)
*
* @param string $from
* @param string $to
* @param int $limit
* @return array
*/
public function migrateExpiredJobs($from, $to)
{
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime()
LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime(), $this->migrationBatchSize
);
}

Expand Down

0 comments on commit 5582614

Please sign in to comment.