diff --git a/.travis.yml b/.travis.yml index 094a4a3c4083..20d73d042e04 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,9 @@ matrix: sudo: false +services: + - redis-server + before_install: - travis_retry composer self-update diff --git a/src/Illuminate/Queue/Jobs/RedisJob.php b/src/Illuminate/Queue/Jobs/RedisJob.php index af9ca9784827..88eda2a95deb 100644 --- a/src/Illuminate/Queue/Jobs/RedisJob.php +++ b/src/Illuminate/Queue/Jobs/RedisJob.php @@ -23,18 +23,26 @@ class RedisJob extends Job implements JobContract */ protected $job; + /** + * The Redis job payload inside the reserved queue. + * + * @var string + */ + protected $reserved; + /** * Create a new job instance. * * @param \Illuminate\Container\Container $container * @param \Illuminate\Queue\RedisQueue $redis * @param string $job + * @param string $reserved * @param string $queue - * @return void */ - public function __construct(Container $container, RedisQueue $redis, $job, $queue) + public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $queue) { $this->job = $job; + $this->reserved = $reserved; $this->redis = $redis; $this->queue = $queue; $this->container = $container; @@ -69,7 +77,7 @@ public function delete() { parent::delete(); - $this->redis->deleteReserved($this->queue, $this->job); + $this->redis->deleteReserved($this->queue, $this->reserved); } /** @@ -82,9 +90,7 @@ public function release($delay = 0) { parent::release($delay); - $this->delete(); - - $this->redis->release($this->queue, $this->job, $delay, $this->attempts() + 1); + $this->redis->deleteAndRelease($this->queue, $this->reserved, $delay); } /** @@ -136,4 +142,13 @@ public function getRedisJob() { return $this->job; } + + /** + * Get the underlying reserved Redis job. + * @return string + */ + public function getReservedJob() + { + return $this->reserved; + } } diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index acd36ab29039..f1197be6d46b 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -101,22 +101,6 @@ public function later($delay, $job, $data = '', $queue = null) return Arr::get(json_decode($payload, true), 'id'); } - /** - * Release a reserved job back onto the queue. - * - * @param string $queue - * @param string $payload - * @param int $delay - * @param int $attempts - * @return void - */ - public function release($queue, $payload, $delay, $attempts) - { - $payload = $this->setMeta($payload, 'attempts', $attempts); - - $this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload); - } - /** * Pop the next job off of the queue. * @@ -129,16 +113,27 @@ public function pop($queue = null) $queue = $this->getQueue($queue); + $this->migrateExpiredJobs($queue.':delayed', $queue); + if (! is_null($this->expire)) { - $this->migrateAllExpiredJobs($queue); + $this->migrateExpiredJobs($queue.':reserved', $queue); } - $job = $this->getConnection()->lpop($queue); - - if (! is_null($job)) { - $this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job); - - return new RedisJob($this->container, $this, $job, $original); + $script = <<<'LUA' +local job = redis.call('lpop', KEYS[1]) +local reserved = false +if(job ~= false) then + reserved = cjson.decode(job) + reserved['attempts'] = reserved['attempts'] + 1 + reserved = cjson.encode(reserved) + redis.call('zadd', KEYS[2], KEYS[3], reserved) +end +return {job, reserved} +LUA; + list($job, $reserved) = $this->getConnection()->eval($script, 3, $queue, $queue.':reserved', $this->getTime() + $this->expire); + + if ($reserved) { + return new RedisJob($this->container, $this, $job, $reserved, $original); } } @@ -155,16 +150,22 @@ public function deleteReserved($queue, $job) } /** - * Migrate all of the waiting jobs in the queue. + * Delete a reserved job from the reserved queue and release it back onto the main queue. * - * @param string $queue - * @return void + * @param string $queue + * @param string $job + * @param int $delay */ - protected function migrateAllExpiredJobs($queue) + public function deleteAndRelease($queue, $job, $delay) { - $this->migrateExpiredJobs($queue.':delayed', $queue); + $queue = $this->getQueue($queue); - $this->migrateExpiredJobs($queue.':reserved', $queue); + $script = <<<'LUA' +redis.call('zrem', KEYS[2], KEYS[3]) +redis.call('zadd', KEYS[1], KEYS[4], KEYS[3]) +return true +LUA; + $this->getConnection()->eval($script, 4, $queue.':delayed', $queue.':reserved', $job, $this->getTime() + $delay); } /** @@ -176,66 +177,18 @@ protected function migrateAllExpiredJobs($queue) */ public function migrateExpiredJobs($from, $to) { - $options = ['cas' => true, 'watch' => $from, 'retry' => 10]; - - $this->getConnection()->transaction($options, function ($transaction) use ($from, $to) { - // First we need to get all of jobs that have expired based on the current time - // so that we can push them onto the main queue. After we get them we simply - // remove them from this "delay" queues. All of this within a transaction. - $jobs = $this->getExpiredJobs( - $transaction, $from, $time = $this->getTime() - ); - - // If we actually found any jobs, we will remove them from the old queue and we - // will insert them onto the new (ready) "queue". This means they will stand - // ready to be processed by the queue worker whenever their turn comes up. - if (count($jobs) > 0) { - $this->removeExpiredJobs($transaction, $from, $time); - - $this->pushExpiredJobsOntoNewQueue($transaction, $to, $jobs); - } - }); - } - - /** - * Get the expired jobs from a given queue. - * - * @param \Predis\Transaction\MultiExec $transaction - * @param string $from - * @param int $time - * @return array - */ - protected function getExpiredJobs($transaction, $from, $time) - { - return $transaction->zrangebyscore($from, '-inf', $time); - } - - /** - * Remove the expired jobs from a given queue. - * - * @param \Predis\Transaction\MultiExec $transaction - * @param string $from - * @param int $time - * @return void - */ - protected function removeExpiredJobs($transaction, $from, $time) - { - $transaction->multi(); - - $transaction->zremrangebyscore($from, '-inf', $time); - } - - /** - * Push all of the given jobs onto another queue. - * - * @param \Predis\Transaction\MultiExec $transaction - * @param string $to - * @param array $jobs - * @return void - */ - protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs) - { - call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs)); + $redis = $this->getConnection(); + $script = <<<'LUA' +local val = redis.call('zrangebyscore', KEYS[1], '-inf', KEYS[3]) +if(next(val) ~= nil) then + redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) + for i = 1, #val, 100 do + redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) + end +end +return true +LUA; + $redis->eval($script, 3, $from, $to, $this->getTime()); } /** diff --git a/tests/Queue/QueueRedisJobTest.php b/tests/Queue/QueueRedisJobTest.php index 425325b69e01..51f977fea7d2 100644 --- a/tests/Queue/QueueRedisJobTest.php +++ b/tests/Queue/QueueRedisJobTest.php @@ -21,7 +21,8 @@ public function testFireProperlyCallsTheJobHandler() public function testDeleteRemovesTheJobFromRedis() { $job = $this->getJob(); - $job->getRedisQueue()->shouldReceive('deleteReserved')->once()->with('default', $job->getRedisJob()); + $job->getRedisQueue()->shouldReceive('deleteReserved')->once() + ->with('default', json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2])); $job->delete(); } @@ -29,8 +30,8 @@ public function testDeleteRemovesTheJobFromRedis() public function testReleaseProperlyReleasesJobOntoRedis() { $job = $this->getJob(); - $job->getRedisQueue()->shouldReceive('deleteReserved')->once()->with('default', $job->getRedisJob()); - $job->getRedisQueue()->shouldReceive('release')->once()->with('default', $job->getRedisJob(), 1, 2); + $job->getRedisQueue()->shouldReceive('deleteAndRelease')->once() + ->with('default', json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]), 1); $job->release(1); } @@ -38,9 +39,10 @@ public function testReleaseProperlyReleasesJobOntoRedis() protected function getJob() { return new Illuminate\Queue\Jobs\RedisJob( - m::mock('Illuminate\Container\Container'), - m::mock('Illuminate\Queue\RedisQueue'), + m::mock(Illuminate\Container\Container::class), + m::mock(Illuminate\Queue\RedisQueue::class), json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 1]), + json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]), 'default' ); } diff --git a/tests/Queue/QueueRedisQueueTest.php b/tests/Queue/QueueRedisQueueTest.php index 5ad4583d7ae3..0cc099ef0534 100644 --- a/tests/Queue/QueueRedisQueueTest.php +++ b/tests/Queue/QueueRedisQueueTest.php @@ -55,75 +55,4 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis() $queue->later($date, 'foo', ['data']); } - - public function testPopProperlyPopsJobOffOfRedis() - { - $queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime', 'migrateAllExpiredJobs'], [$redis = m::mock('Illuminate\Redis\Database'), 'default']); - $queue->setContainer(m::mock('Illuminate\Container\Container')); - $queue->expects($this->once())->method('getTime')->will($this->returnValue(1)); - $queue->expects($this->once())->method('migrateAllExpiredJobs')->with($this->equalTo('queues:default')); - - $redis->shouldReceive('connection')->andReturn($redis); - $redis->shouldReceive('lpop')->once()->with('queues:default')->andReturn('foo'); - $redis->shouldReceive('zadd')->once()->with('queues:default:reserved', 61, 'foo'); - - $result = $queue->pop(); - - $this->assertInstanceOf('Illuminate\Queue\Jobs\RedisJob', $result); - } - - public function testReleaseMethod() - { - $queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime'], [$redis = m::mock('Illuminate\Redis\Database'), 'default']); - $queue->expects($this->once())->method('getTime')->will($this->returnValue(1)); - $redis->shouldReceive('connection')->once()->andReturn($redis); - $redis->shouldReceive('zadd')->once()->with('queues:default:delayed', 2, json_encode(['attempts' => 2])); - - $queue->release('default', json_encode(['attempts' => 1]), 1, 2); - } - - public function testMigrateExpiredJobs() - { - $queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime'], [$redis = m::mock('Illuminate\Redis\Database'), 'default']); - $queue->expects($this->once())->method('getTime')->will($this->returnValue(1)); - $transaction = m::mock('StdClass'); - $redis->shouldReceive('connection')->once()->andReturn($redis); - $redis->shouldReceive('transaction')->with(m::any(), m::type('Closure'))->andReturnUsing(function ($options, $callback) use ($transaction) { - $callback($transaction); - }); - $transaction->shouldReceive('zrangebyscore')->once()->with('from', '-inf', 1)->andReturn(['foo', 'bar']); - $transaction->shouldReceive('multi')->once(); - $transaction->shouldReceive('zremrangebyscore')->once()->with('from', '-inf', 1); - $transaction->shouldReceive('rpush')->once()->with('to', 'foo', 'bar'); - - $queue->migrateExpiredJobs('from', 'to'); - } - - public function testNotExpireJobsWhenExpireNull() - { - $queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime', 'migrateAllExpiredJobs'], [$redis = m::mock('Illuminate\Redis\Database'), 'default', null]); - $redis->shouldReceive('connection')->andReturn($predis = m::mock('Predis\Client')); - $queue->setContainer(m::mock('Illuminate\Container\Container')); - $queue->setExpire(null); - $queue->expects($this->once())->method('getTime')->will($this->returnValue(1)); - $queue->expects($this->never())->method('migrateAllExpiredJobs'); - $predis->shouldReceive('lpop')->once()->with('queues:default')->andReturn('foo'); - $predis->shouldReceive('zadd')->once()->with('queues:default:reserved', 1, 'foo'); - - $result = $queue->pop(); - } - - public function testExpireJobsWhenExpireSet() - { - $queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime', 'migrateAllExpiredJobs'], [$redis = m::mock('Illuminate\Redis\Database'), 'default', null]); - $redis->shouldReceive('connection')->andReturn($predis = m::mock('Predis\Client')); - $queue->setContainer(m::mock('Illuminate\Container\Container')); - $queue->setExpire(30); - $queue->expects($this->once())->method('getTime')->will($this->returnValue(1)); - $queue->expects($this->once())->method('migrateAllExpiredJobs')->with($this->equalTo('queues:default')); - $predis->shouldReceive('lpop')->once()->with('queues:default')->andReturn('foo'); - $predis->shouldReceive('zadd')->once()->with('queues:default:reserved', 31, 'foo'); - - $result = $queue->pop(); - } } diff --git a/tests/Queue/RedisQueueIntegrationTest.php b/tests/Queue/RedisQueueIntegrationTest.php new file mode 100644 index 000000000000..bf9c7b78877d --- /dev/null +++ b/tests/Queue/RedisQueueIntegrationTest.php @@ -0,0 +1,277 @@ +redis = new Database([ + 'cluster' => false, + 'default' => [ + 'host' => '127.0.0.1', + 'port' => 6379, + 'database' => 5, + ], + ]); + $this->redis->connection()->flushdb(); + + $this->queue = new RedisQueue($this->redis); + $this->queue->setContainer(m::mock(Container::class)); + } + + public function tearDown() + { + parent::tearDown(); + m::close(); + $this->redis->connection()->flushdb(); + } + + public function testExpiredJobsArePopped() + { + $jobs = [ + new RedisQueueIntegrationTestJob(0), + new RedisQueueIntegrationTestJob(1), + new RedisQueueIntegrationTestJob(2), + new RedisQueueIntegrationTestJob(3), + ]; + + $this->queue->later(1000, $jobs[0]); + $this->queue->later(-200, $jobs[1]); + $this->queue->later(-300, $jobs[2]); + $this->queue->later(-100, $jobs[3]); + + $this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $this->assertNull($this->queue->pop()); + + $this->assertEquals(1, $this->redis->connection()->zcard('queues:default:delayed')); + $this->assertEquals(3, $this->redis->connection()->zcard('queues:default:reserved')); + } + + public function testPopProperlyPopsJobOffOfRedis() + { + // Push an item into queue + $job = new RedisQueueIntegrationTestJob(10); + $this->queue->push($job); + + // Pop and check it is popped correctly + $before = time(); + /** @var RedisJob $redisJob */ + $redisJob = $this->queue->pop(); + $after = time(); + + $this->assertEquals($job, unserialize(json_decode($redisJob->getRedisJob())->data->command)); + $this->assertEquals(1, $redisJob->attempts()); + $this->assertEquals($job, unserialize(json_decode($redisJob->getReservedJob())->data->command)); + $this->assertEquals(2, json_decode($redisJob->getReservedJob())->attempts); + $this->assertEquals($redisJob->getJobId(), json_decode($redisJob->getReservedJob())->id); + + // Check reserved queue + $this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved')); + $result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]); + $reservedJob = array_keys($result)[0]; + $score = $result[$reservedJob]; + $this->assertGreaterThanOrEqual($score, $before + 60); + $this->assertLessThanOrEqual($score, $after + 60); + $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command)); + } + + public function testPopProperlyPopsDelayedJobOffOfRedis() + { + // Push an item into queue + $job = new RedisQueueIntegrationTestJob(10); + $this->queue->later(-10, $job); + + // Pop and check it is popped correctly + $before = time(); + $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $after = time(); + + // Check reserved queue + $this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved')); + $result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]); + $reservedJob = array_keys($result)[0]; + $score = $result[$reservedJob]; + $this->assertGreaterThanOrEqual($score, $before + 60); + $this->assertLessThanOrEqual($score, $after + 60); + $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command)); + } + + public function testPopPopsDelayedJobOffOfRedisWhenExpireNull() + { + $this->queue->setExpire(null); + + // Push an item into queue + $job = new RedisQueueIntegrationTestJob(10); + $this->queue->later(-10, $job); + + // Pop and check it is popped correctly + $before = time(); + $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $after = time(); + + // Check reserved queue + $this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved')); + $result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]); + $reservedJob = array_keys($result)[0]; + $score = $result[$reservedJob]; + $this->assertGreaterThanOrEqual($score, $before); + $this->assertLessThanOrEqual($score, $after); + $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command)); + } + + public function testNotExpireJobsWhenExpireNull() + { + $this->queue->setExpire(null); + + // Make an expired reserved job + $failed = new RedisQueueIntegrationTestJob(-20); + $this->queue->push($failed); + $beforeFailPop = time(); + $this->queue->pop(); + $afterFailPop = time(); + + // Push an item into queue + $job = new RedisQueueIntegrationTestJob(10); + $this->queue->push($job); + + // Pop and check it is popped correctly + $before = time(); + $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $after = time(); + + // Check reserved queue + $this->assertEquals(2, $this->redis->connection()->zcard('queues:default:reserved')); + $result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]); + + foreach ($result as $payload => $score) { + $command = unserialize(json_decode($payload)->data->command); + $this->assertInstanceOf(RedisQueueIntegrationTestJob::class, $command); + $this->assertContains($command->i, [10, -20]); + if ($command->i == 10) { + $this->assertGreaterThanOrEqual($score, $before); + $this->assertLessThanOrEqual($score, $after); + } else { + $this->assertGreaterThanOrEqual($score, $beforeFailPop); + $this->assertLessThanOrEqual($score, $afterFailPop); + } + } + } + + public function testExpireJobsWhenExpireSet() + { + $this->queue->setExpire(30); + + // Push an item into queue + $job = new RedisQueueIntegrationTestJob(10); + $this->queue->push($job); + + // Pop and check it is popped correctly + $before = time(); + $this->assertEquals($job, unserialize(json_decode($this->queue->pop()->getRawBody())->data->command)); + $after = time(); + + // Check reserved queue + $this->assertEquals(1, $this->redis->connection()->zcard('queues:default:reserved')); + $result = $this->redis->connection()->zrangebyscore('queues:default:reserved', -INF, INF, ['WITHSCORES' => true]); + $reservedJob = array_keys($result)[0]; + $score = $result[$reservedJob]; + $this->assertGreaterThanOrEqual($score, $before + 30); + $this->assertLessThanOrEqual($score, $after + 30); + $this->assertEquals($job, unserialize(json_decode($reservedJob)->data->command)); + } + + public function testRelease() + { + //push a job into queue + $job = new RedisQueueIntegrationTestJob(30); + $this->queue->push($job); + + //pop and release the job + /** @var \Illuminate\Queue\Jobs\RedisJob $redisJob */ + $redisJob = $this->queue->pop(); + $before = time(); + $redisJob->release(1000); + $after = time(); + + //check the content of delayed queue + $this->assertEquals(1, $this->redis->connection()->zcard('queues:default:delayed')); + + $results = $this->redis->connection()->zrangebyscore('queues:default:delayed', -INF, INF, 'withscores'); + + $payload = array_keys($results)[0]; + + $score = $results[$payload]; + + $this->assertGreaterThanOrEqual($before + 1000, $score); + $this->assertLessThanOrEqual($after + 1000, $score); + + $decoded = json_decode($payload); + + $this->assertEquals(2, $decoded->attempts); + $this->assertEquals($job, unserialize($decoded->data->command)); + + //check if the queue has no ready item yet + $this->assertNull($this->queue->pop()); + } + + public function testReleaseInThePast() + { + $job = new RedisQueueIntegrationTestJob(30); + $this->queue->push($job); + + /** @var RedisJob $redisJob */ + $redisJob = $this->queue->pop(); + $redisJob->release(-3); + + $this->assertInstanceOf(RedisJob::class, $this->queue->pop()); + } + + public function testDelete() + { + $job = new RedisQueueIntegrationTestJob(30); + $this->queue->push($job); + + /** @var \Illuminate\Queue\Jobs\RedisJob $redisJob */ + $redisJob = $this->queue->pop(); + + $redisJob->delete(); + + $this->assertEquals(0, $this->redis->connection()->zcard('queues:default:delayed')); + $this->assertEquals(0, $this->redis->connection()->zcard('queues:default:reserved')); + $this->assertEquals(0, $this->redis->connection()->llen('queues:default')); + + $this->assertNull($this->queue->pop()); + } +} + +class RedisQueueIntegrationTestJob +{ + public $i; + + public function __construct($i) + { + $this->i = $i; + } + + public function handle() + { + } +}