Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ matrix:

sudo: false

services:
- redis-server

before_install:
- travis_retry composer self-update

Expand Down
27 changes: 21 additions & 6 deletions src/Illuminate/Queue/Jobs/RedisJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,26 @@ class RedisJob extends Job implements JobContract
*/
protected $job;

/**
* The Redis job payload inside the reserved queue.
*
* @var string
*/
protected $reserved;

/**
* Create a new job instance.
*
* @param \Illuminate\Container\Container $container
* @param \Illuminate\Queue\RedisQueue $redis
* @param string $job
* @param string $reserved
* @param string $queue
* @return void
*/
public function __construct(Container $container, RedisQueue $redis, $job, $queue)
public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $queue)
{
$this->job = $job;
$this->reserved = $reserved;
$this->redis = $redis;
$this->queue = $queue;
$this->container = $container;
Expand Down Expand Up @@ -69,7 +77,7 @@ public function delete()
{
parent::delete();

$this->redis->deleteReserved($this->queue, $this->job);
$this->redis->deleteReserved($this->queue, $this->reserved);
}

/**
Expand All @@ -82,9 +90,7 @@ public function release($delay = 0)
{
parent::release($delay);

$this->delete();

$this->redis->release($this->queue, $this->job, $delay, $this->attempts() + 1);
$this->redis->deleteAndRelease($this->queue, $this->reserved, $delay);
}

/**
Expand Down Expand Up @@ -136,4 +142,13 @@ public function getRedisJob()
{
return $this->job;
}

/**
* Get the underlying reserved Redis job.
* @return string
*/
public function getReservedJob()
{
return $this->reserved;
}
}
131 changes: 42 additions & 89 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,6 @@ public function later($delay, $job, $data = '', $queue = null)
return Arr::get(json_decode($payload, true), 'id');
}

/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
*/
public function release($queue, $payload, $delay, $attempts)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);

$this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);
}

/**
* Pop the next job off of the queue.
*
Expand All @@ -129,16 +113,27 @@ public function pop($queue = null)

$queue = $this->getQueue($queue);

$this->migrateExpiredJobs($queue.':delayed', $queue);

if (! is_null($this->expire)) {
$this->migrateAllExpiredJobs($queue);
$this->migrateExpiredJobs($queue.':reserved', $queue);
}

$job = $this->getConnection()->lpop($queue);

if (! is_null($job)) {
$this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job);

return new RedisJob($this->container, $this, $job, $original);
$script = <<<'LUA'
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], KEYS[3], reserved)
end
return {job, reserved}
LUA;
list($job, $reserved) = $this->getConnection()->eval($script, 3, $queue, $queue.':reserved', $this->getTime() + $this->expire);

if ($reserved) {
return new RedisJob($this->container, $this, $job, $reserved, $original);
}
}

Expand All @@ -155,16 +150,22 @@ public function deleteReserved($queue, $job)
}

/**
* Migrate all of the waiting jobs in the queue.
* Delete a reserved job from the reserved queue and release it back onto the main queue.
*
* @param string $queue
* @return void
* @param string $queue
* @param string $job
* @param int $delay
*/
protected function migrateAllExpiredJobs($queue)
public function deleteAndRelease($queue, $job, $delay)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
$queue = $this->getQueue($queue);

$this->migrateExpiredJobs($queue.':reserved', $queue);
$script = <<<'LUA'
redis.call('zrem', KEYS[2], KEYS[3])
redis.call('zadd', KEYS[1], KEYS[4], KEYS[3])
return true
LUA;
$this->getConnection()->eval($script, 4, $queue.':delayed', $queue.':reserved', $job, $this->getTime() + $delay);
}

/**
Expand All @@ -176,66 +177,18 @@ protected function migrateAllExpiredJobs($queue)
*/
public function migrateExpiredJobs($from, $to)
{
$options = ['cas' => true, 'watch' => $from, 'retry' => 10];

$this->getConnection()->transaction($options, function ($transaction) use ($from, $to) {
// First we need to get all of jobs that have expired based on the current time
// so that we can push them onto the main queue. After we get them we simply
// remove them from this "delay" queues. All of this within a transaction.
$jobs = $this->getExpiredJobs(
$transaction, $from, $time = $this->getTime()
);

// If we actually found any jobs, we will remove them from the old queue and we
// will insert them onto the new (ready) "queue". This means they will stand
// ready to be processed by the queue worker whenever their turn comes up.
if (count($jobs) > 0) {
$this->removeExpiredJobs($transaction, $from, $time);

$this->pushExpiredJobsOntoNewQueue($transaction, $to, $jobs);
}
});
}

/**
* Get the expired jobs from a given queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $from
* @param int $time
* @return array
*/
protected function getExpiredJobs($transaction, $from, $time)
{
return $transaction->zrangebyscore($from, '-inf', $time);
}

/**
* Remove the expired jobs from a given queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $from
* @param int $time
* @return void
*/
protected function removeExpiredJobs($transaction, $from, $time)
{
$transaction->multi();

$transaction->zremrangebyscore($from, '-inf', $time);
}

/**
* Push all of the given jobs onto another queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $to
* @param array $jobs
* @return void
*/
protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
{
call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
$redis = $this->getConnection();
$script = <<<'LUA'
local val = redis.call('zrangebyscore', KEYS[1], '-inf', KEYS[3])
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return true
LUA;
$redis->eval($script, 3, $from, $to, $this->getTime());
}

/**
Expand Down
12 changes: 7 additions & 5 deletions tests/Queue/QueueRedisJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,28 @@ public function testFireProperlyCallsTheJobHandler()
public function testDeleteRemovesTheJobFromRedis()
{
$job = $this->getJob();
$job->getRedisQueue()->shouldReceive('deleteReserved')->once()->with('default', $job->getRedisJob());
$job->getRedisQueue()->shouldReceive('deleteReserved')->once()
->with('default', json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]));

$job->delete();
}

public function testReleaseProperlyReleasesJobOntoRedis()
{
$job = $this->getJob();
$job->getRedisQueue()->shouldReceive('deleteReserved')->once()->with('default', $job->getRedisJob());
$job->getRedisQueue()->shouldReceive('release')->once()->with('default', $job->getRedisJob(), 1, 2);
$job->getRedisQueue()->shouldReceive('deleteAndRelease')->once()
->with('default', json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]), 1);

$job->release(1);
}

protected function getJob()
{
return new Illuminate\Queue\Jobs\RedisJob(
m::mock('Illuminate\Container\Container'),
m::mock('Illuminate\Queue\RedisQueue'),
m::mock(Illuminate\Container\Container::class),
m::mock(Illuminate\Queue\RedisQueue::class),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 1]),
json_encode(['job' => 'foo', 'data' => ['data'], 'attempts' => 2]),
'default'
);
}
Expand Down
71 changes: 0 additions & 71 deletions tests/Queue/QueueRedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,75 +55,4 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()

$queue->later($date, 'foo', ['data']);
}

public function testPopProperlyPopsJobOffOfRedis()
{
$queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime', 'migrateAllExpiredJobs'], [$redis = m::mock('Illuminate\Redis\Database'), 'default']);
$queue->setContainer(m::mock('Illuminate\Container\Container'));
$queue->expects($this->once())->method('getTime')->will($this->returnValue(1));
$queue->expects($this->once())->method('migrateAllExpiredJobs')->with($this->equalTo('queues:default'));

$redis->shouldReceive('connection')->andReturn($redis);
$redis->shouldReceive('lpop')->once()->with('queues:default')->andReturn('foo');
$redis->shouldReceive('zadd')->once()->with('queues:default:reserved', 61, 'foo');

$result = $queue->pop();

$this->assertInstanceOf('Illuminate\Queue\Jobs\RedisJob', $result);
}

public function testReleaseMethod()
{
$queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime'], [$redis = m::mock('Illuminate\Redis\Database'), 'default']);
$queue->expects($this->once())->method('getTime')->will($this->returnValue(1));
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('zadd')->once()->with('queues:default:delayed', 2, json_encode(['attempts' => 2]));

$queue->release('default', json_encode(['attempts' => 1]), 1, 2);
}

public function testMigrateExpiredJobs()
{
$queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime'], [$redis = m::mock('Illuminate\Redis\Database'), 'default']);
$queue->expects($this->once())->method('getTime')->will($this->returnValue(1));
$transaction = m::mock('StdClass');
$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('transaction')->with(m::any(), m::type('Closure'))->andReturnUsing(function ($options, $callback) use ($transaction) {
$callback($transaction);
});
$transaction->shouldReceive('zrangebyscore')->once()->with('from', '-inf', 1)->andReturn(['foo', 'bar']);
$transaction->shouldReceive('multi')->once();
$transaction->shouldReceive('zremrangebyscore')->once()->with('from', '-inf', 1);
$transaction->shouldReceive('rpush')->once()->with('to', 'foo', 'bar');

$queue->migrateExpiredJobs('from', 'to');
}

public function testNotExpireJobsWhenExpireNull()
{
$queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime', 'migrateAllExpiredJobs'], [$redis = m::mock('Illuminate\Redis\Database'), 'default', null]);
$redis->shouldReceive('connection')->andReturn($predis = m::mock('Predis\Client'));
$queue->setContainer(m::mock('Illuminate\Container\Container'));
$queue->setExpire(null);
$queue->expects($this->once())->method('getTime')->will($this->returnValue(1));
$queue->expects($this->never())->method('migrateAllExpiredJobs');
$predis->shouldReceive('lpop')->once()->with('queues:default')->andReturn('foo');
$predis->shouldReceive('zadd')->once()->with('queues:default:reserved', 1, 'foo');

$result = $queue->pop();
}

public function testExpireJobsWhenExpireSet()
{
$queue = $this->getMock('Illuminate\Queue\RedisQueue', ['getTime', 'migrateAllExpiredJobs'], [$redis = m::mock('Illuminate\Redis\Database'), 'default', null]);
$redis->shouldReceive('connection')->andReturn($predis = m::mock('Predis\Client'));
$queue->setContainer(m::mock('Illuminate\Container\Container'));
$queue->setExpire(30);
$queue->expects($this->once())->method('getTime')->will($this->returnValue(1));
$queue->expects($this->once())->method('migrateAllExpiredJobs')->with($this->equalTo('queues:default'));
$predis->shouldReceive('lpop')->once()->with('queues:default')->andReturn('foo');
$predis->shouldReceive('zadd')->once()->with('queues:default:reserved', 31, 'foo');

$result = $queue->pop();
}
}
Loading